From 62706c40a54abab63db3cd7d4ef82f1cb92b6ee8 Mon Sep 17 00:00:00 2001 From: "Tzu-Li (Gordon) Tai" Date: Wed, 28 Jun 2017 21:29:36 +0800 Subject: [PATCH 1/2] [FLINK-6674] [docs] Bootstrap API Migration docs update for 1.2 -> 1.3 --- docs/dev/migration.md | 56 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/docs/dev/migration.md b/docs/dev/migration.md index 11eb42c3bb4d7..7041c3076a017 100644 --- a/docs/dev/migration.md +++ b/docs/dev/migration.md @@ -25,6 +25,62 @@ under the License. * This will be replaced by the TOC {:toc} +## Migrating from Flink 1.2 to Flink 1.3 + +There are a few APIs that have been changed since Flink 1.2. Most of the changes are documented in their +specific documentations. The following is a consolidated list of API changes and links to details for migration when +upgrading to Flink 1.3. + +### `TypeSerializer` interface changes + +This would be relevant mostly for users implementing custom `TypeSerializer`s for their state. + +Since Flink 1.3, two additional methods have been added that are related to serializer compatibility +across savepoint restores. Please see +[Handling serializer upgrades and compatibility](../stream/state.html#handling-serializer-upgrades-and-compatibility) +for further details on how to implement these methods. + +### `ProcessFunction` is always a `RichFunction` + +In Flink 1.2, `ProcessFunction` and its rich variant `RichProcessFunction` was introduced. +Since Flink 1.3, `RichProcessFunction` was removed and `ProcessFunction` is now always a `RichFunction` with access to +the lifecycle methods and runtime context. + +### Flink CEP library API changes + +The CEP library in Flink 1.3 ships with a number of new features which have led to some changes in the API. +Please visit the [CEP Migration docs](../libs/cep.html#migrating-from-an-older-flink-version) for details. + +### Table API Changes + +*TBA* + +### Queryable State client construction changes + +*TBA* + +### Logger dependencies removed from Flink core artifacts + +In Flink 1.3, to make sure that users can use their own custom logging framework, core Flink artifacts are +now clean of specific logger dependencies. + +Example and quickstart archtypes already have loggers specified and should not be affected. +For other custom projects, make sure to add logger dependencies. For example, in Maven's `pom.xml`, you can add: + +~~~xml + + org.slf4j + slf4j-log4j12 + 1.7.7 + + + + log4j + log4j + 1.2.17 + +~~~ + ## Migrating from Flink 1.1 to Flink 1.2 As mentioned in the [State documentation]({{ site.baseurl }}/dev/stream/state.html), Flink has two types of state: From bf7dec3ed05613ab3caf9c143e2aa8d4e405e1ac Mon Sep 17 00:00:00 2001 From: "Tzu-Li (Gordon) Tai" Date: Wed, 28 Jun 2017 22:44:01 +0800 Subject: [PATCH 2/2] [FLINK-6680] [docs] Update "Upgrading Applications and Flink Versions" for 1.3 --- docs/ops/upgrading.md | 117 ++++++++++++++++++++++++++---------------- 1 file changed, 72 insertions(+), 45 deletions(-) diff --git a/docs/ops/upgrading.md b/docs/ops/upgrading.md index 7259a6bcdf2f7..6cfef818d0dfe 100644 --- a/docs/ops/upgrading.md +++ b/docs/ops/upgrading.md @@ -73,7 +73,7 @@ val mappedEvents: DataStream[(Int, Long)] = events **Note:** Since the operator IDs stored in a savepoint and IDs of operators in the application to start must be equal, it is highly recommended to assign unique IDs to all operators of an application that might be upgraded in the future. This advice applies to all operators, i.e., operators with and without explicitly declared operator state, because some operators have internal state that is not visible to the user. Upgrading an application without assigned operator IDs is significantly more difficult and may only be possible via a low-level workaround using the `setUidHash()` method. -**Important:** As of 1.3.0 this also applies to operators that are part of a chain. +**Important:** As of 1.3.x this also applies to operators that are part of a chain. By default all state stored in a savepoint must be matched to the operators of a starting application. However, users can explicitly agree to skip (and thereby discard) state that cannot be matched to an operator when starting a application from a savepoint. Stateful operators for which no state is found in the savepoint are initialized with their default state. @@ -107,34 +107,33 @@ When upgrading an application by changing its topology, a few things need to be * **Adding a stateful operator:** The state of the operator will be initialized with the default state unless it takes over the state of another operator. * **Removing a stateful operator:** The state of the removed operator is lost unless another operator takes it over. When starting the upgraded application, you have to explicitly agree to discard the state. * **Changing of input and output types of operators:** When adding a new operator before or behind an operator with internal state, you have to ensure that the input or output type of the stateful operator is not modified to preserve the data type of the internal operator state (see above for details). -* **Changing operator chaining:** Operators can be chained together for improved performance. When restoring from a savepoint taken since 1.3.0 it is possible to modify chains while preversing state consistency. It is possible a break the chain such that a stateful operator is moved out of the chain. It is also possible to append or inject a new or existing stateful operator into a chain, or to modify the operator order within a chain. However, when upgrading a savepoint to 1.3.0 it is paramount that the topology did not change in regards to chaining. All operators that are part of a chain should be assigned an ID as described in the [Matching Operator State](#Matching Operator State) section above. +* **Changing operator chaining:** Operators can be chained together for improved performance. When restoring from a savepoint taken since 1.3.x it is possible to modify chains while preversing state consistency. It is possible a break the chain such that a stateful operator is moved out of the chain. It is also possible to append or inject a new or existing stateful operator into a chain, or to modify the operator order within a chain. However, when upgrading a savepoint to 1.3.x it is paramount that the topology did not change in regards to chaining. All operators that are part of a chain should be assigned an ID as described in the [Matching Operator State](#Matching Operator State) section above. ## Upgrading the Flink Framework Version -This section describes the general way of upgrading Flink framework version from version 1.1.x to 1.2.x and migrating your -jobs between the two versions. +This section describes the general way of upgrading Flink across versions and migrating your jobs between the versions. In a nutshell, this procedure consists of 2 fundamental steps: -1. Take a savepoint in Flink 1.1.x for the jobs you want to migrate. -2. Resume your jobs under Flink 1.2.x from the previously taken savepoints. +1. Take a savepoint in the previous, old Flink version for the jobs you want to migrate. +2. Resume your jobs under the new Flink version from the previously taken savepoints. Besides those two fundamental steps, some additional steps can be required that depend on the way you want to change the -Flink version. In this guide we differentiate two approaches to upgrade from Flink 1.1.x to 1.2.x: **in-place** upgrade and +Flink version. In this guide we differentiate two approaches to upgrade across Flink versions: **in-place** upgrade and **shadow copy** upgrade. For **in-place** update, after taking savepoints, you need to: 1. Stop/cancel all running jobs. - 2. Shutdown the cluster that runs Flink 1.1.x. - 3. Upgrade Flink to 1.2.x. on the cluster. + 2. Shutdown the cluster that runs the old Flink version. + 3. Upgrade Flink to the newer version on the cluster. 4. Restart the cluster under the new version. For **shadow copy**, you need to: - 1. Before resuming from the savepoint, setup a new installation of Flink 1.2.x besides your old Flink 1.1.x installation. - 2. Resume from the savepoints with the new Flink 1.2.x installation. - 3. If everything runs ok, stop and shutdown the old Flink 1.1.x cluster. + 1. Before resuming from the savepoint, setup a new installation of the new Flink version besides your old Flink installation. + 2. Resume from the savepoints with the new Flink installation. + 3. If everything runs ok, stop and shutdown the old Flink cluster. In the following, we will first present the preconditions for successful job migration and then go into more detail about the steps that we outlined before. @@ -142,33 +141,35 @@ about the steps that we outlined before. ### Preconditions Before starting the migration, please check that the jobs you are trying to migrate are following the -best practises for [savepoints]({{ site.baseurl }}/setup/savepoints.html). In particular, we advise you to check that -explicit `uid`s were set for operators in your job. +best practises for [savepoints]({{ site.baseurl }}/setup/savepoints.html). Also, check out the +[API Migration Guides]({{ site.baseurl }}/dev/migration.html) to see if there is any API changes related to migrating +savepoints to newer versions. + +In particular, we advise you to check that explicit `uid`s were set for operators in your job. This is a *soft* precondition, and restore *should* still work in case you forgot about assigning `uid`s. -If you run into a case where this is not working, you can *manually* add the generated legacy vertex ids from Flink 1.1 -to your job using the `setUidHash(String hash)` call. For each operator (in operator chains: only the head operator) you -must assign the 32 character hex string representing the hash that you can see in the web ui or logs for the operator. +If you run into a case where this is not working, you can *manually* add the generated legacy vertex ids from previous +Flink versions to your job using the `setUidHash(String hash)` call. For each operator (in operator chains: only the +head operator) you must assign the 32 character hex string representing the hash that you can see in the web ui or logs +for the operator. -Besides operator uids, there are currently three *hard* preconditions for job migration that will make migration fail: +Besides operator uids, there are currently two *hard* preconditions for job migration that will make migration fail: -1. as mentioned in earlier release notes, we do not support migration for state in RocksDB that was checkpointed using +1. We do not support migration for state in RocksDB that was checkpointed using `semi-asynchronous` mode. In case your old job was using this mode, you can still change your job to use `fully-asynchronous` mode before taking the savepoint that is used as the basis for the migration. -2. The CEP operator is currently not supported for migration. If your job uses this operator you can (curently) not -migrate it. We are planning to provide migration support for the CEP operator in a later bugfix release. - -3. Another **important** precondition is that all the savepoint data must be accessible from the new installation and -reside under the same absolute path. Please notice that the savepoint data is typically not self contained in just the created -savepoint file. Additional files can be referenced from inside the savepoint file (e.g. the output from state backend -snapshots)! There is currently no simple way to identify and move all data that belongs to a savepoint. +2. Another **important** precondition is that for savepoints taken before Flink 1.3.x, all the savepoint data must be +accessible from the new installation and reside under the same absolute path. Before Flink 1.3.x, the savepoint data is +typically not self-contained in just the created savepoint file. Additional files can be referenced from inside the +savepoint file (e.g. the output from state backend snapshots). Since Flink 1.3.x, this is no longer a limitation; +savepoints can be relocated using typical filesystem operations.. -### STEP 1: Taking a savepoint in Flink 1.1.x. +### STEP 1: Take a savepoint in the old Flink version. -First major step in job migration is taking a savepoint of your job running in Flink 1.1.x. You can do this with the -command: +First major step in job migration is taking a savepoint of your job running in the older Flink version. +You can do this with the command: ```sh $ bin/flink savepoint :jobId [:targetDirectory] @@ -176,7 +177,7 @@ $ bin/flink savepoint :jobId [:targetDirectory] For more details, please read the [savepoint documentation]({{ site.baseurl }}/setup/savepoints.html). -### STEP 2: Updating your cluster to Flink 1.2.x. +### STEP 2: Update your cluster to the new Flink version. In this step, we update the framework version of the cluster. What this basically means is replacing the content of the Flink installation with the new version. This step can depend on how you are running Flink in your cluster (e.g. @@ -184,7 +185,7 @@ standalone, on Mesos, ...). If you are unfamiliar with installing Flink in your cluster, please read the [deployment and cluster setup documentation]({{ site.baseurl }}/setup/index.html). -### STEP 3: Resuming the job under Flink 1.2.x from Flink 1.1.x savepoint. +### STEP 3: Resume the job under the new Flink version from savepoint. As the last step of job migration, you resume from the savepoint taken above on the updated cluster. You can do this with the command: @@ -198,18 +199,44 @@ Again, for more details, please take a look at the [savepoint documentation]({{ ## Compatibility Table Savepoints are compatible across Flink versions as indicated by the table below: - -| Created with \ Resumed with | 1.1.x | 1.2.x | -| ---------------------------:|:-----:|:-----:| -| 1.1.x | X | X | -| 1.2.x | | X | - - - -## Limitations and Special Considerations for Upgrades from Flink 1.1.x to Flink 1.2.x - - - The maximum parallelism of a job that was migrated from Flink 1.1.x to 1.2.x is currently fixed as the parallelism of - the job. This means that the parallelism can not be increased after migration. This limitation might be removed in a - future bugfix release. - +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Created with \ Resumed with1.1.x1.2.x1.3.xLimitations
1.1.xOOOThe maximum parallelism of a job that was migrated from Flink 1.1.x to 1.2.x is + currently fixed as the parallelism of the job. This means that the parallelism can not be increased after + migration. This limitation might be removed in a future bugfix release.
1.2.xOOWhen migrating from Flink 1.2.x to Flink 1.3.x, changing parallelism at the same + time is not supported. Users have to first take a savepoint after migrating to Flink 1.3.x, and then change + parallelism.
1.3.xO