From 3d5cee2dcc6b764556f430404b5b7936ee1cd009 Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 7 Jun 2017 12:03:57 +0200 Subject: [PATCH 01/10] [hotfix] Improve readability in SPV2#convertToOperatorStateSavepointV2 --- .../runtime/checkpoint/savepoint/SavepointV2.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java index 6a3b57fc1feb2..1b2963d0166c8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java @@ -188,8 +188,8 @@ public static Savepoint convertToOperatorStateSavepointV2( for (int chainIndex = 0; chainIndex < taskState.getChainLength(); chainIndex++) { // task consists of multiple operators so we have to break the state apart - for (int o = 0; o < operatorIDs.size(); o++) { - OperatorID operatorID = operatorIDs.get(o); + for (int operatorIndex = 0; operatorIndex < operatorIDs.size(); operatorIndex++) { + OperatorID operatorID = operatorIDs.get(operatorIndex); OperatorState operatorState = operatorStates.get(operatorID); if (operatorState == null) { @@ -204,15 +204,15 @@ public static Savepoint convertToOperatorStateSavepointV2( KeyedStateHandle rawKeyedState = null; // only the head operator retains the keyed state - if (o == operatorIDs.size() - 1) { + if (operatorIndex == operatorIDs.size() - 1) { managedKeyedState = subtaskState.getManagedKeyedState(); rawKeyedState = subtaskState.getRawKeyedState(); } OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState( - nonPartitionedState != null ? nonPartitionedState.get(o) : null, - partitioneableState != null ? partitioneableState.get(o) : null, - rawOperatorState != null ? rawOperatorState.get(o) : null, + nonPartitionedState != null ? nonPartitionedState.get(operatorIndex) : null, + partitioneableState != null ? partitioneableState.get(operatorIndex) : null, + rawOperatorState != null ? rawOperatorState.get(operatorIndex) : null, managedKeyedState, rawKeyedState); From 2bbfe0292c13d875b531a6c168ea78bfc7f21f0b Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 7 Jun 2017 12:03:21 +0200 Subject: [PATCH 02/10] [FLINK-6742] Improve savepoint migration failure error message --- .../checkpoint/savepoint/SavepointV2.java | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java index 1b2963d0166c8..5e46f93bb220c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java @@ -168,10 +168,27 @@ public static Savepoint convertToOperatorStateSavepointV2( expandedToLegacyIds = true; } + if (jobVertex == null) { + throw new IllegalStateException( + "Could not find task for state with ID " + taskState.getJobVertexID() + ". " + + "When migrating a savepoint from a version < 1.3 please make sure that the topology was not " + + "changed through removal of a stateful operator or modification of a chain containing a stateful " + + "operator."); + } + List operatorIDs = jobVertex.getOperatorIDs(); for (int subtaskIndex = 0; subtaskIndex < jobVertex.getParallelism(); subtaskIndex++) { - SubtaskState subtaskState = taskState.getState(subtaskIndex); + SubtaskState subtaskState; + try { + subtaskState = taskState.getState(subtaskIndex); + } catch (Exception e) { + throw new IllegalStateException( + "Could not find subtask with index " + subtaskIndex + " for task " + jobVertex.getJobVertexId() + ". " + + "When migrating a savepoint from a version < 1.3 please make sure that no changes were made " + + "to the parallelism of stateful operators.", + e); + } if (subtaskState == null) { continue; From 31fd58258b1346f3c9adcf397ffdec316333a4bd Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Tue, 6 Jun 2017 14:24:14 +0200 Subject: [PATCH 03/10] [FLINK-6798][docs] update old network buffer notices This closes #4080. --- docs/monitoring/large_state_tuning.md | 20 +++++-------------- .../configuration/TaskManagerOptions.java | 4 ++-- .../io/network/NetworkEnvironment.java | 4 ++-- 3 files changed, 9 insertions(+), 19 deletions(-) diff --git a/docs/monitoring/large_state_tuning.md b/docs/monitoring/large_state_tuning.md index 9e1ecc7cb5883..a356970121cc5 100644 --- a/docs/monitoring/large_state_tuning.md +++ b/docs/monitoring/large_state_tuning.md @@ -94,21 +94,11 @@ When a savepoint is manually triggered, it may be in process concurrently with a ## Tuning Network Buffers -The number of network buffers is a parameter that can currently have an effect on checkpointing at large scale. -The Flink community is working on eliminating that parameter in the next versions of Flink. - -The number of network buffers defines how much data a TaskManager can hold in-flight before back-pressure kicks in. -A very high number of network buffers means that a lot of data may be in the stream network channels when a checkpoint -is started. Because the checkpoint barriers travel with that data (see [description of how checkpointing works](../internals/stream_checkpointing.html)), -a lot of in-flight data means that the barriers have to wait for that data to be transported/processed before arriving -at the target operator. - -Having a lot of data in-flight also does not speed up the data processing as a whole. It only means that data is picked up faster -from the data source (log, files, message queue) and buffered longer in Flink. Having fewer network buffers means that -data is picked up from the source more immediately before it is actually being processed, which is generally desirable. -The number of network buffers should hence not be set arbitrarily large, but to a low multiple (such as 2x) of the -minimum number of required buffers. - +Before Flink 1.3, an increased number of network buffers also caused increased checkpointing times since +keeping more in-flight data meant that checkpoint barriers got delayed. Since Flink 1.3, the +number of network buffers used per outgoing/incoming channel is limited and thus network buffers +may be configured without affecting checkpoint times +(see [network buffer configuration](../setup/config.html#configuring-the-network-buffers)). ## Make state checkpointing Asynchronous where possible diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java index 8480045bbbcea..bde564a0a8825 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java @@ -137,7 +137,7 @@ public class TaskManagerOptions { .defaultValue(1024L << 20); // 1 GB /** - * Number of network buffers to use for each outgoing/ingoing channel (subpartition/input channel). + * Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel). * * Reasoning: 1 buffer for in-flight data in the subpartition + 1 buffer for parallel serialization */ @@ -146,7 +146,7 @@ public class TaskManagerOptions { .defaultValue(2); /** - * Number of extra network buffers to use for each outgoing/ingoing gate (result partition/input gate). + * Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). */ public static final ConfigOption NETWORK_EXTRA_BUFFERS_PER_GATE = key("taskmanager.network.memory.floating-buffers-per-gate") diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index cc4cb77b0be98..4269af6401551 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -73,9 +73,9 @@ public class NetworkEnvironment { private final int partitionRequestMaxBackoff; - /** Number of network buffers to use for each outgoing/ingoing channel (subpartition/input channel). */ + /** Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel). */ private final int networkBuffersPerChannel; - /** Number of extra network buffers to use for each outgoing/ingoing gate (result partition/input gate). */ + /** Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). */ private final int extraNetworkBuffersPerGate; private boolean isShutdown; From c815ada2b68fd0692b1ccd77c4bcff2a3e9175ea Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Wed, 31 May 2017 16:22:22 +0200 Subject: [PATCH 04/10] [FLINK-6784][docs] update externalized checkpoints documentation This closes #4033. --- docs/dev/stream/checkpointing.md | 21 +++++++++++++ docs/setup/checkpoints.md | 54 +++++++++++++++++++++++++++++--- 2 files changed, 70 insertions(+), 5 deletions(-) diff --git a/docs/dev/stream/checkpointing.md b/docs/dev/stream/checkpointing.md index 3a0a1ae44a0b5..4fe06c1d292e9 100644 --- a/docs/dev/stream/checkpointing.md +++ b/docs/dev/stream/checkpointing.md @@ -72,6 +72,8 @@ Other parameters for checkpointing include: This option cannot be used when a minimum time between checkpoints is defined. + - *externalized checkpoints*: You can configure periodic checkpoints to be persisted externally. Externalized checkpoints write their meta data out to persistent storage and are *not* automatically cleaned up when the job fails. This way, you will have a checkpoint around to resume from if your job fails. There are more details in the [deployment notes on externalized checkpoints](../../setup/checkpoints.html#externalized-checkpoints). +
{% highlight java %} @@ -93,6 +95,9 @@ env.getCheckpointConfig().setCheckpointTimeout(60000); // allow only one checkpoint to be in progress at the same time env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); + +// enable externalized checkpoints which are retained after job cancellation +env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); {% endhighlight %}
@@ -119,6 +124,22 @@ env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
+### Related Config Options + +Some more parameters and/or defaults may be set via `conf/flink-conf.yaml` (see [configuration](config.html) for a full guide): + +- `state.backend`: The backend that will be used to store operator state checkpoints if checkpointing is enabled. Supported backends: + - `jobmanager`: In-memory state, backup to JobManager's/ZooKeeper's memory. Should be used only for minimal state (Kafka offsets) or testing and local debugging. + - `filesystem`: State is in-memory on the TaskManagers, and state snapshots are stored in a file system. Supported are all filesystems supported by Flink, for example HDFS, S3, ... + +- `state.backend.fs.checkpointdir`: Directory for storing checkpoints in a Flink supported filesystem. Note: State backend must be accessible from the JobManager, use `file://` only for local setups. + +- `state.backend.rocksdb.checkpointdir`: The local directory for storing RocksDB files, or a list of directories separated by the systems directory delimiter (for example ‘:’ (colon) on Linux/Unix). (DEFAULT value is `taskmanager.tmp.dirs`) + +- `state.checkpoints.dir`: The target directory for meta data of [externalized checkpoints](../../setup/checkpoints.html#externalized-checkpoints). + +- `state.checkpoints.num-retained`: The number of completed checkpoint instances to retain. Having more than one allows recovery fallback to an earlier checkpoints if the latest checkpoint is corrupt. (Default: 1) + {% top %} diff --git a/docs/setup/checkpoints.md b/docs/setup/checkpoints.md index 65d14b7a8a0aa..0070cb13dfce2 100644 --- a/docs/setup/checkpoints.md +++ b/docs/setup/checkpoints.md @@ -28,12 +28,22 @@ under the License. ## Overview -TBD +Checkpoints make state in Flink fault tolerant by allowing state and the +corresponding stream positions to be recovered, thereby giving the application +the same semantics as a failure-free execution. +See [Checkpointing](../dev/stream/checkpointing.html) for how to enable and +configure checkpoints for your program. ## Externalized Checkpoints -You can configure periodic checkpoints to be persisted externally. Externalized checkpoints write their meta data out to persistent storage and are *not* automatically cleaned up when the job fails. This way, you will have a checkpoint around to resume from if your job fails. +Checkpoints are by default not persisted externally and are only used to +resume a job from failures. They are deleted when a program is cancelled. +You can, however, configure periodic checkpoints to be persisted externally +similarly to [savepoints](savepoints.html). These *externalized checkpoints* +write their meta data out to persistent storage and are *not* automatically +cleaned up when the job fails. This way, you will have a checkpoint around +to resume from if your job fails. ```java CheckpointConfig config = env.getCheckpointConfig(); @@ -46,12 +56,46 @@ The `ExternalizedCheckpointCleanup` mode configures what happens with externaliz - **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**: Delete the externalized checkpoint when the job is cancelled. The checkpoint state will only be available if the job fails. -The **target directory** for the checkpoint is determined from the default checkpoint directory configuration. This is configured via the configuration key `state.checkpoints.dir`, which should point to the desired target directory: +### Directory Structure + +Similarly to [savepoints](savepoints.html), an externalized checkpoint consists +of a meta data file and, depending on the state back-end, some additional data +files. The **target directory** for the externalized checkpoint's meta data is +determined from the configuration key `state.checkpoints.dir` which, currently, +can only be set via the configuration files. ``` state.checkpoints.dir: hdfs:///checkpoints/ ``` -This directory will then contain the checkpoint meta data required to restore the checkpoint. The actual checkpoint files will still be available in their configured directory. You currently can only set this via the configuration files. +This directory will then contain the checkpoint meta data required to restore +the checkpoint. For the `MemoryStateBackend`, this meta data file will be +self-contained and no further files are needed. + +`FsStateBackend` and `RocksDBStateBackend` write separate data files +and only write the paths to these files into the meta data file. These data +files are stored at the path given to the state back-end during construction. + +```java +env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/"); +``` + +### Difference to Savepoints + +Externalized checkpoints have a few differences from [savepoints](savepoints.html). They +- use a state backend specific (low-level) data format, +- may be incremental, +- do not support Flink specific features like rescaling. + +### Resuming from an externalized checkpoint -Follow the [savepoint guide]({{ site.baseurl }}/setup/cli.html#savepoints) when you want to resume from a specific checkpoint. +A job may be resumed from an externalized checkpoint just as from a savepoint +by using the checkpoint's meta data file instead (see the +[savepoint restore guide](cli.html#restore-a-savepoint)). Note that if the +meta data file is not self-contained, the jobmanager needs to have access to +the data files it refers to (see [Directory Structure](#directory-structure) +above). + +```sh +$ bin/flink run -s :checkpointMetaDataPath [:runArgs] +``` From b954eda8763dd030b0325e0e001aa930f64ef72f Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Wed, 31 May 2017 10:50:39 +0200 Subject: [PATCH 05/10] [hotfix][docs] update Checkpoint docs with correct code example --- .../org/apache/flink/streaming/api/checkpoint/Checkpointed.java | 2 +- .../streaming/api/checkpoint/CheckpointedAsynchronously.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java index 6de7329c2cc7f..74c5189164fe8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java @@ -41,7 +41,7 @@ * } * * public void restoreState(List state) throws Exception { - * this.value = state.count.isEmpty() ? 0 : state.get(0); + * this.value = state.isEmpty() ? 0 : state.get(0); * } * * public T map(T value) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java index b96c2421782a4..96edb3a7a5cc5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java @@ -42,7 +42,7 @@ * } * * public void restoreState(List state) throws Exception { - * this.value = state.count.isEmpty() ? 0 : state.get(0); + * this.value = state.isEmpty() ? 0 : state.get(0); * } * * public T map(T value) { From bfba2b1d453266f28523c47682d3cfe5ae67ce74 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Wed, 31 May 2017 13:51:12 +0200 Subject: [PATCH 06/10] [FLINK-6782][docs] update snapshot documentation to reflect flink 1.3 This closes #4024. --- docs/setup/savepoints.md | 43 +++++++++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/docs/setup/savepoints.md b/docs/setup/savepoints.md index eada9b4309c19..28494236ce9a4 100644 --- a/docs/setup/savepoints.md +++ b/docs/setup/savepoints.md @@ -27,12 +27,15 @@ under the License. ## Overview -Savepoints are externally stored checkpoints that you can use to stop-and-resume or update your Flink programs. They use Flink's [checkpointing mechanism]({{ site.baseurl }}/internals/stream_checkpointing.html) to create a snapshot of the state of your streaming program and write the checkpoint meta data out to an external file system. - -This page covers all steps involved in triggering, restoring, and disposing savepoints. In order to allow upgrades between programs and Flink versions, it is important to check out the section about [assigning IDs to your operators](#assigning-operator-ids). +Savepoints are externally stored self-contained checkpoints that you can use to stop-and-resume or update your Flink programs. They use Flink's [checkpointing mechanism]({{ site.baseurl }}/internals/stream_checkpointing.html) to create a (non-incremental) snapshot of the state of your streaming program and write the checkpoint data and meta data out to an external file system. +This page covers all steps involved in triggering, restoring, and disposing savepoints. For more details on how Flink handles state and failures in general, check out the [State in Streaming Programs]({{ site.baseurl }}/dev/stream/state.html) page. +
+Attention: In order to allow upgrades between programs and Flink versions, it is important to check out the following section about assigning IDs to your operators. +
+ ## Assigning Operator IDs It is **highly recommended** that you adjust your programs as described in this section in order to be able to upgrade your programs in the future. The main required change is to manually specify operator IDs via the **`uid(String)`** method. These IDs are used to scope the state of each operator. @@ -73,17 +76,29 @@ With Flink >= 1.2.0 it is also possible to *resume from savepoints* using the we ### Triggering Savepoints -When triggering a savepoint, a single savepoint file will be created that contains the checkpoint *meta data*. The actual checkpoint state will be kept around in the configured checkpoint directory. For example with a `FsStateBackend` or `RocksDBStateBackend`: +When triggering a savepoint, a new savepoint directory beneath the target directory is created. In there, the data as well as the meta data will be stored. For example with a `FsStateBackend` or `RocksDBStateBackend`: ```sh +# Savepoint target directory +/savepoints/ + +# Savepoint directory +/savepoints/savepoint-:shortjobid-:savepointid/ + # Savepoint file contains the checkpoint meta data -/savepoints/savepoint-123123 +/savepoints/savepoint-:shortjobid-:savepointid/_metadata -# Checkpoint directory contains the actual state -/checkpoints/:jobid/chk-:id/... +# Savepoint state +/savepoints/savepoint-:shortjobid-:savepointid/... ``` -The savepoint file is usually much smaller than the actual checkpointed state. Note that if you use the `MemoryStateBackend`, the savepoint file will be self-contained and contain all the state. +
+ Note: +Although it looks as if the savepoints may be moved, it is currently not possible due to absolute paths in the _metadata file. +Please follow FLINK-5778 for progress on lifting this restriction. +
+ +Note that if you use the `MemoryStateBackend`, metadata *and* savepoint state will be stored in the `_metadata` file. Since it is self-contained, you may move the file and restore from any location. #### Trigger a Savepoint @@ -111,7 +126,7 @@ If you don't specify a target directory, you need to have [configured a default $ bin/flink run -s :savepointPath [:runArgs] ``` -This submits a job and specifies the savepoint path. The execution will resume from the respective savepoint state. The savepoint file holds the meta data of a checkpoint and points to the actual checkpoint files. This is why the savepoint file is usually much smaller than the actual checkpoint state. +This submits a job and specifies a savepoint to resume from. You may give a path to either the savepoint's directory or the `_metadata` file. #### Allowing Non-Restored State @@ -129,11 +144,11 @@ $ bin/flink savepoint -d :savepointPath This disposes the savepoint stored in `:savepointPath`. -Note that since savepoints always go to a file system it is possible to also manually delete the savepoint via a regular file system operation. Keep in mind though that the savepoint only stores meta data that points to the actual checkpoint data. Therefore, if you manually want to delete a savepoint, you would have to include the checkpoint files as well. Since there is currently no straight forward way to figure out how a savepoint maps to a checkpoint, it is recommended to use the savepoint tool for this as described above. +Note that it is possible to also manually delete a savepoint via regular file system operations without affecting other savepoints or checkpoints (recall that each savepoint is self-contained). Up to Flink 1.2, this was a more tedious task which was performed with the savepoint command above. ### Configuration -You can configure a default savepoint target directory via the `state.savepoints.dir` key. When triggering savepoints, this directory will be used to store the savepoint meta data. You can overwrite the default by specifying a custom target directory with the trigger commands (see the [`:targetDirectory` argument](#trigger-a-savepoint)). +You can configure a default savepoint target directory via the `state.savepoints.dir` key. When triggering savepoints, this directory will be used to store the savepoint. You can overwrite the default by specifying a custom target directory with the trigger commands (see the [`:targetDirectory` argument](#trigger-a-savepoint)). ```sh # Default savepoint target directory @@ -150,10 +165,6 @@ As a rule of thumb, yes. Strictly speaking, it is sufficient to only assign IDs In practice, it is recommended to assign it to all operators, because some of Flink's built-in operators like the Window operator are also stateful and it is not obvious which built-in operators are actually stateful and which are not. If you are absolutely certain that an operator is stateless, you can skip the `uid` method. -### Why is the savepoint file so small? - -The savepoint file only contains the meta data of the checkpoint and has pointers to the checkpoint state, which is usually much larger. In case of using the `MemoryStateBackend`, the checkpoint will include all state, but is constrained by the backend to small state. - ### What happens if I add a new operator that requires state to my job? When you add a new operator to your job it will be initialized without any state. Savepoints contain the state of each stateful operator. Stateless operators are simply not part of the savepoint. The new operator behaves similar to a stateless operator. @@ -184,4 +195,4 @@ If you did not assign IDs, the auto generated IDs of the stateful operators will If the savepoint was triggered with Flink >= 1.2.0 and using no deprecated state API like `Checkpointed`, you can simply restore the program from a savepoint and specify a new parallelism. -If you are resuming from a savepoint triggered with Flink < 1.2.0 or using now deprecated APIs you first have to migrate your job and savepoint to Flink 1.2.0 before being able to change the parallelism. See the [upgrading jobs and Flink versions guide]({{ site.baseurl }}/ops/upgrading.html). +If you are resuming from a savepoint triggered with Flink < 1.2.0 or using now deprecated APIs you first have to migrate your job and savepoint to Flink >= 1.2.0 before being able to change the parallelism. See the [upgrading jobs and Flink versions guide]({{ site.baseurl }}/ops/upgrading.html). From ff2bb718cd9e87dfe7ab5601e9ff9f7293eb3cfe Mon Sep 17 00:00:00 2001 From: zentol Date: Sat, 13 May 2017 18:09:30 +0200 Subject: [PATCH 07/10] [FLINK-6541] Improve tmp dir setup in TM/WebMonitor --- .../flink/runtime/webmonitor/WebRuntimeMonitor.java | 2 +- .../flink/runtime/taskexecutor/TaskManagerServices.java | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index 5c66545d80db0..5edcbe37f1574 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -175,7 +175,7 @@ public WebRuntimeMonitor( // create storage for uploads this.uploadDir = getUploadDir(config); // the upload directory should either 1. exist and writable or 2. can be created and writable - if (!(uploadDir.exists() && uploadDir.canWrite()) && !(uploadDir.mkdir() && uploadDir.canWrite())) { + if (!(uploadDir.exists() && uploadDir.canWrite()) && !(uploadDir.mkdirs() && uploadDir.canWrite())) { throw new IOException( String.format("Jar upload directory %s cannot be created or is not writable.", uploadDir.getAbsolutePath())); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index 86a2fdfb3d9a8..1c30ff6fbb659 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -603,11 +603,11 @@ public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration } /** - * Validates that all the directories denoted by the strings do actually exist, are proper + * Validates that all the directories denoted by the strings do actually exist or can be created, are proper * directories (not files), and are writable. * * @param tmpDirs The array of directory paths to check. - * @throws IOException Thrown if any of the directories does not exist or is not writable + * @throws IOException Thrown if any of the directories does not exist and cannot be created or is not writable * or is a file, rather than a directory. */ private static void checkTempDirs(String[] tmpDirs) throws IOException { @@ -615,7 +615,9 @@ private static void checkTempDirs(String[] tmpDirs) throws IOException { if (dir != null && !dir.equals("")) { File file = new File(dir); if (!file.exists()) { - throw new IOException("Temporary file directory " + file.getAbsolutePath() + " does not exist."); + if (!file.mkdirs()) { + throw new IOException("Temporary file directory " + file.getAbsolutePath() + " does not exist and could not be created."); + } } if (!file.isDirectory()) { throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not a directory."); From b1f762127234e323b947aa4a363935f87be1994f Mon Sep 17 00:00:00 2001 From: zhangminglei Date: Tue, 20 Jun 2017 19:43:44 +0800 Subject: [PATCH 08/10] [FLINK-6682] [checkpoints] Improve error message in case parallelism exceeds maxParallelism This closes #4125. --- .../runtime/checkpoint/StateAssignmentOperation.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java index 1042d5aedeb20..5712ea1d43827 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java @@ -464,6 +464,14 @@ public static List createKeyGroupPartitions(int numberKeyGroups, private static void checkParallelismPreconditions(OperatorState operatorState, ExecutionJobVertex executionJobVertex) { //----------------------------------------max parallelism preconditions------------------------------------- + if (operatorState.getMaxParallelism() < executionJobVertex.getParallelism()) { + throw new IllegalStateException("The state for task " + executionJobVertex.getJobVertexId() + + " can not be restored. The maximum parallelism (" + operatorState.getMaxParallelism() + + ") of the restored state is lower than the configured parallelism (" + executionJobVertex.getParallelism() + + "). Please reduce the parallelism of the task to be lower or equal to the maximum parallelism." + ); + } + // check that the number of key groups have not changed or if we need to override it to satisfy the restored state if (operatorState.getMaxParallelism() != executionJobVertex.getMaxParallelism()) { From b20ae53a9957683f3b7fe44c0a799422021fbb69 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Tue, 30 May 2017 14:52:23 +0200 Subject: [PATCH 09/10] [FLINK-6774][build] set missing build-helper-maven-plugin version This closes #4017. --- flink-connectors/flink-connector-kafka-base/pom.xml | 1 + pom.xml | 1 + 2 files changed, 2 insertions(+) diff --git a/flink-connectors/flink-connector-kafka-base/pom.xml b/flink-connectors/flink-connector-kafka-base/pom.xml index 263eb9aaed7e1..5f785453eaf4a 100644 --- a/flink-connectors/flink-connector-kafka-base/pom.xml +++ b/flink-connectors/flink-connector-kafka-base/pom.xml @@ -217,6 +217,7 @@ under the License. org.codehaus.mojo build-helper-maven-plugin + 1.7 generate-test-sources diff --git a/pom.xml b/pom.xml index 9d78d4a7a431f..d5e2ef56977b5 100644 --- a/pom.xml +++ b/pom.xml @@ -650,6 +650,7 @@ under the License. org.codehaus.mojo build-helper-maven-plugin + 1.7 generate-sources From 0fd3683c3489d98c5c82d21b9a7a5ee93c0d6b2e Mon Sep 17 00:00:00 2001 From: zentol Date: Mon, 8 May 2017 11:56:57 +0200 Subject: [PATCH 10/10] [FLINK-5892] Enable 1.2 keyed state test --- .../operator/restore/keyed/KeyedJob.java | 14 ++++++-------- .../complexKeyed-flink1.2/_metadata | Bin 137490 -> 134953 bytes 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java index 523e937c14ef1..95d0efc610288 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java @@ -98,10 +98,9 @@ public static SingleOutputStreamOperator createFirstStatefulMap(Executi .map(new StatefulStringStoringMap(mode, "first")) .setParallelism(4); - // TODO: re-enable this when generating the actual 1.2 savepoint - //if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) { - map.uid("first"); - //} + if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) { + map.uid("first"); + } return map; } @@ -111,10 +110,9 @@ public static SingleOutputStreamOperator createSecondStatefulMap(Execut .map(new StatefulStringStoringMap(mode, "second")) .setParallelism(4); - // TODO: re-enable this when generating the actual 1.2 savepoint - //if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) { - map.uid("second"); - //} + if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) { + map.uid("second"); + } return map; } diff --git a/flink-tests/src/test/resources/operatorstate/complexKeyed-flink1.2/_metadata b/flink-tests/src/test/resources/operatorstate/complexKeyed-flink1.2/_metadata index 9e03876cfce81f15b09d91532b38322e647c55a2..0a1ed10e4823dada53249966850b13e9180ab2f5 100644 GIT binary patch delta 5680 zcmds5TZ|Sp6rO*7U|0TG6a@tr@poB4(c;`^il8V80*Z>FfQUOY?VzHl>lJUmFBmn( zkllw3FJ59oLImYvH1Y&7QD00X~@Df@MIEre&i(syX!k@o++REaZ(gs#)L02 zDDS6GJ_uM=WrAliEP0X-_D*SgYn%t2IQ4<8W4c;bs7J?XEJIv{C=vS*TZk_szK8fB z;%A88BL0NSw8&tu~30 z$&{Q;$vKW1zpXTdnx;~6E+t)*OrzvHN~TkC{t(gM6IfH_K+@v}toLi`wD%YskG?Pg zm-0khij(g%_9=%9PTOLz*==yfy9Q@1GC2EdgLCdR=>KIfe#KymH_gMmJq8zCV{qY@ z1{dFLaOs~0m!l7@&;xXwmAg#->Z=W|{lZ{xWw89i;QE&gZqQTaI2+#vpUL>lq>95N zU}+SlEDB@6GLaN46<(V7k<1I3I8NV!_YX8{{q>Xg4lB*69afb6tm3i=b0&%^WKkk> zmZxz9QxWC9h^wedBQ46l5oPJgQt(PJUL+-ma?V+y&rny1 za@=THIk-SIXNmSMOJ-1u_E&4UkXmL^GK&&Da$DzOYMDJkkT0R8ODSQL%%S8mN<2#R zTClwa)Dr4}s=>l{Qg^iz7#zK0M5>H!B&G!Xbl8$ol2O7bnM;Y@y=x$ddNZ0wP4g+y z(S?15D~BvfS(|vNOiV{AtgU0EtVb!V8=YgKtWg#T8G*1S6~rM?1(K2p3F^wzAWS?F z$Pm87sUfUwhZWX1EaQ|zxC(;k!S(`<&m#zai$s=5A=0c8LJMnPgf*64>_vK|6ug`)tMAsio) z?x=*5L@8sIdmd+5mIXl;3g1sbaXMA#Xu{pr0@b~B+^ZER-gtni1hAc{9^*Pg;^t4!_<9bW(AF2co8!35!k_RbyNWYp9BQ-)5@j-hU+tst` zzx`i-2Kpd(Xi^@g)TM7~1{7%<9CR`*PX&ihsVX*Fh^dAPR&&NTY^>M-(`G+CpUJ$^k dx=I28N9$Pql=CDjxQvq^_N33j0Zx8-{s(<#?P~x4 literal 137490 zcmeHQZ;V{kb-z37Ke2b$Yr8ez)@?V5F-_^4oj-5h%!_d`jsY8{HjSyF7zf^ab7yy8 zc4nD*V`7^U^PvKNgNP4Ep~97-7^ov56+WOQ68V%?MvQeXn*Gf^^WNL{?z{J%Klj{o_CGjh2_a(YLnW(0jL#+hz25lr!TJ!NGR6UWl*gY648&*0n2R)yWl5nSxi_-StwX` zrkJ&S$8)W;7vy}q;G{F@g1Yd?G)1!26tU$tMbXYCy^MNx-}4fdoi1c8&rSN4bOY)7 zUb2|;ilfvNsa8{rFSjXt+qHc^o3)Y&)irE4SFmz{U9b|4?aNfUkZ`3pN==d0O))m} z)tCOp`q4+VZ;b!dt_%P0pNe|9xg(O)saoowQEt{-KrGoEIAf$=rQ^NhdH zI7$CpmC4^>V~euIA|21)eo` z8fU(T)Lhvy?=>gVyBmw;0TW^=Sytx4`6FUu zu_Vid_T*a+MfONu{mWTB<qD zy~^BdMA|tthM(_kxT#LadE|)X5av!JiRfER&SHgCK zwVRh1-kxP#rn~tLl`3;(8CEODUwZQ8AIBfPmG15l@wixZtR{9CFRObKHF-ShK}~+3 zUiDaB#EOYfZ0>jo2mOa+5Yk(?eo2?>e6mGB^FmClIp&oclE%L3?BXJKcBk)p&7Q4A zgu~TR;efXw>q?3k*w2UDbi)!p)RQ*&F;ka$%v)+T?|8mc@8})Vr%%4~?)ht9iQdR2 zS(Xd3v-5<)xw5e~l%ceuyFE>)<6?DCRl+8Bw=?`6_VvAodRUX*f*L_;GAP9#EmaCt z^=FR1)#JC9tG-vhC!%j3Gt!ytVSGo@LDl%-*c53!X!kJ4jLEm?+WDpE>@hz*t@=$9m2 zc=`&Z3H2JdkQ|}KXF^?TBE2mtP3%6lsi|~RSXebX!APBssL{>SP12Vy^xQmaOg+?1#>;JPUyd4!F1M!V z#NRTW_}fP679;fUX9z{(>U{(#_ik~H@vFrH{PW&&N^)B zDP4EAYjoexxa)q6yFbym_gRhm5*qJ(Q{%yTjfZ$l9X_M$_uZuN=-V2-P^09YHGfXm z%lkA|KUBK3Ur0HAChJ&1CL36GF_pHwL@r~=oSO^WAeqS~T_NhH)G1?PiX{s&GuGSV`0jN>0 z#bg!>!0gbTh2z6rj$ zGTkdMHZe9aHZeBIz1rv`((NqPYleVK3z-%&ZRZ|^mE#eHv5B#Xv5B#XH56Vs?G;|e zV5fzh7Is>Rtu|eoLVsho+Q1eBTM%qPum!;uge|J%cNO^=!Jn4QqP=cLU8)55Cio`! zCio`!Cio`!W-D3~_$G@Dt#DJ#;1meEvb;E4u+zd$3p*|BwB5n`V5jYkj|ks`u5giU z0pxNVcn`vr^@qbmxUwqX7BVelTBMaQEX8#16kg$JB_Pv6riDxknHDlF zWLn6ykZB>)LZ*dGtFl-9YO_=!CJADaASMZ7k{~7tVv?+w6r->O!4?Et5NtuP1;G{s zTM%qPum!;u1Y6Lr!xp4A`bLGlU?ZShrZfCeFu|UV6Su1h)sHe#7I0N2PBXs1_!{GR z#`hUN);P&YZYRIYb(IX1%H%;t$#|0S8OF1WKV%vzAQDoMo%3l}q>uE1k=w-Bc=-bF%98r#@4sC`Ly4 zXJZ#wyFn6DBrz=ylyIZHv?JxfA!uHRi8aT(azoPCr);4YxwAXf-mS6C&8<}J0FcRXLJ zhwVq4K7I0?ch6t@N=%I9#U@#n3$oJjb10m=y6CLlN4t!`pejkD-FG*}_Ui6s>ZMVM z?176bI@TCXsN-UFP;FEw?KE|_Gb3O*qWk=S@6%n{8;W{#zFL`UcN=vPDr~l0nIj`6 zra60eP_2ZjTQ_Hg5qVAHC$8RM4e7%meLSKp;54hml+62Cqb9OPXzo>^Uc8`73GZV$ z-BW1aKKq+L3byU5e=^hjM-kF{q4Gry(R8ncY9f5eNM#M8=h*GWW3M+-cNnQR7^#0~ zq<+On{UamwMkDo)ja17>jT@=6Mry)HZS7l&er>jT>!o*5(@~8Wnpmek^krR7W83tj zD$zmF;G$ZF{;1MTbu_Vunl(~cdFwfrRmC3aSB+QLX{6p{r0z0O|HMeWxu4pjG4lap z{CWLtEkDfqw{(32#d%d_Gi$6{Uexuido^x*PvgvE8n<7k@rGA5-guYB_(vL3Pib_x z=euv{`mXym?*2sM-e)!L`(YJNo3pL7(8t2bxEbr4; z{ZMJliEP$$Q&L(5nei+;Y3D3I$YiWkQfB;2E}bZ53qsUSsZ++p6w41`X+hTN%#Fdi z46JJy(mpl$kU8JjispQ)mdx~8vtp#C4Pts%#_C2uXNksdG+EQl()5I}$tDUV>8uYM zi)9&^=tF_Ac{LfeFgD?d0#6ipq71atcWeF;pH$3IsZCL`2JlUISHruy(|m@AsD0K% zZD2!0)L~d!p>G-klXvkY$h44YA=5&pg-i?Y>ZYOsz6rkBOsaP|Nwia#KtxnTL|v-m z0M<}w*+dnrq0Dxh&9H`|Rt|^k!$-b1Sl?{%vhEJo$--zgGb$LHkyePYSvt|p)G;=- z8&0V*7uVnF@!QK)-z(o!s{A^3CBZL9Td=@4!8dtCVr+Wxu*L10yKZ{&x+1Gwh^%rw zj_UB)gFh{N_TaPkC}i4t9DFn0jy1B}U0zs2!5RwIP=>JUTv_9=4+UA}daQ7;(^8xZ z_|w9l7XGvx_TuJCY*>da2>!IREiWF~0O5N8--AVo_07*GvKD*~s+Eq_2>2f8Ksw70 zVurod@I8RLBqvxv9{hj>_#Sk15>Tm+szlr^u zqq2XK(jZ5Lg_{T{m+5ZxnNc5ACfL();&!edWu#o+s!W__e1Y*b#`BEtGk&aba*C02 z_fB$3*U5uSmy9PFpJ6=9_(R5@G5#ClFEvhWVdR{vQwO-Nl8#cDI>Gof({2VDs)t@EiP1Bv$vAh_R2%UaO(uJq5EWE1v2fOLNs6rE_7M}@kp+Bm0Q}<|M5A|zC>Ru!D zRwMN`Bb7Csp8vMbc7^$sKTP9rsMr2eUq%DPO?ow4TBL%qv*g+DV=ziy-+GE(n0 zQvcjYJseS^cS(Qrr~B33V?6O*BlRzg)ccIoBSz}|Mk=ecJq`Gc=vaNX>Z|_Jee)0W zU#>@e<^#m|^ZMIbR!i5vrRy6w>e=Q=UElJe#;x3qx4oz9GmmN9ex1e}Ue$ObzvlQy zx}JJUqqAM3`-a9{{N%eo(e=I0YTTF5c;}lM56){mv`XXQGaB!^N#oJCHF}{&xl!Z% zIgMo=Ce;s>`YclgHhLHaV`LDWx z;?Ihs3|SS#(l?gA+qxB&zU8HYAV=CLmcG59wSfXD&5_caSrbWX*am@Z5ZDHRZ4eZ# zxg!W5_-1cBTJTNqP4LYDeE=?Fa2bQkSh*et-vr+z+c84^m-@r^K;LYq>*|eRfe?&p z(9p;sH-@OAU=0OpD6NSUYbXf8h_K>Abo0Tsdu+SMw)-Kt9h0y>WEVV)O^i*9O^i*1 z#OSvn3}dGVshL!uZz9DUQp~aY6xKHftZgARYp+Rv2|M$4x=(C7`}#GXK1VLFkvVTg zZ!1}~*B3kj`X=;E zUfzda5d4A$xQoF~ix7+m!8j@*7!mh?r?H(#m4mnk2rJ$SDUCH0tf9ab1X~bnL9hkE z7Sz3DhSaQ`?Eza5Y(cOEY11%Lvmz!5$&4cqC>(ZL*lA&>g`F004+aP?*dvBGdx*1# zID3e*7qJM)v<5y=6R-ur7POk{Yu+zd$3p*|BvkAA+I7SA>J$~ghYW483+#o0WZZ^i+(U{=hehJg ze~~zv7h+<~F|XW^H1>(hU*yj2^x5KTjY?Qrknwuxg))A)S}Gjy7Gzy%6tbO^Q(b`y z3)w@Ci8HLertYu7kD0p6W8PA$dB^jmdPmrw>C-3QdH4Lauf)VyUTl(OxgaYYKZnA} ztBcMq+mTV_%eDLNW(fhubc^!>!HSMGMic6|SRGVVPohquK4(V2!;kLs1HMmpX>TY> zhLf*W=0K)dQJ`Buc zQF`pDB?oRysRp8=m~t#HV#P!#Hs@(oeYWgX=4KE256K`@`?h1nBwct~F{0~@(_HUL zyuNIEY5Kt?AwqgDR6GsQbgzYKBKSsXV5Am|RMskbu3I!7J7=WM8>uBD^&unmVI#F{ zq%Ih#6(hB3q>`163NfU1OKL4BYHd=S=*LKbTFaHSJ;%2e%+&AgtFZ3Zb1boGD&7CF zp@}`zV@B$uM(Q_>)Z<3#2_uzt$)0<7%y{fy8mW&*)ac1ptFQEBbTMAvHdz-zu{Gl zH{PW&{*lJiQyQJ^8r?TE?z&&&?oTxCeOBYXgvLAH)Oe5wz@b&Te)x>W`)<;B^lgnE zcLTXm*XPe^Ebr4;{ZJ{^g>1r0B%HuX7aiZSWzx2Cb~0xbGnup}bA@!yv4yCgQl}tT z2HRQ@EJN{{AuEZ*!VuKB6?OuP;@KltrjH|#4aOz{*|Z#RTQf@24tO_(HEG*7_IUM{ zL>?2X_KVeCtrQ;LFV>W0WiFgQA~qIFvRr7NQ4a;iX1P?+mfxU?4HDYPc?3*?k=z?8 z0(7$^)4lJKm~(3?#>T?V_J~B4AupC- zY+`I;Y!1PBgXBkyP2*r z=-{&leG~d-OV_yEN-2Ey4EDX9?i1V2zJAT8&yg!n#D`b(W-$1=w|Y5z-QiCQe_Apy zb*U0q-^BW66u#;*5i;R>@cGAJ!Pvyu#MnIk(vvU$IR5CZBynT^W>@Mz$h44YA=5&p z9kA=NQ_x2;03-vb3Ha<8Yw?*V)d z;CnDI0x5hC;Cley1Na`m_h3MX=1WhI@TY}8?TLPu6!;#@hRrQIv)v;I`A`r$8nL4( z%9KvE)*L4B0UQ|nHx0gC*uQxN+52D%f-MNPAlQOn3qr0U#66hr4ETA;thb}bH^2WM zPs@kDIC0Io@2AG!`bGUKN*$&2pQqG^9O9!?pehJ?R8*bH7!k+Bx(5of=rzjW0}+w_ zsPY|^`!nr3w&UiKmh@9e%eDi@a)V;V%BXtA4YF<_O-YC;-RW5Og}D8!zI?+?iMJkl zp^V?5vK6-zhuwDQ-k*HeUjJ?F=wSDqYYfxct!dJotD+N5(HFXrP^>AIRJhwj^jq}% za$6!1xQfY2{)5-96v{if=8uKq-kIX@qhL6+;{*0