From 4b12db7a435ee074c291ae455dd754e9119cb013 Mon Sep 17 00:00:00 2001 From: twalthr Date: Mon, 31 Jul 2017 20:14:31 +0200 Subject: [PATCH 1/2] [FLINK-7301] [docs] Rework state documentation --- docs/check_links.sh | 5 +- docs/concepts/programming-model.md | 4 +- docs/concepts/runtime.md | 10 +- docs/dev/api_concepts.md | 4 +- docs/dev/batch/connectors.md | 2 +- docs/dev/batch/python.md | 2 +- docs/dev/cluster_execution.md | 2 +- docs/dev/connectors/index.md | 2 +- docs/dev/connectors/kafka.md | 2 +- docs/dev/datastream_api.md | 4 +- docs/dev/execution_configuration.md | 2 +- docs/dev/libs/storm_compatibility.md | 2 +- docs/dev/linking_with_flink.md | 2 +- docs/dev/local_execution.md | 2 +- docs/dev/migration.md | 8 +- docs/dev/packaging.md | 2 +- docs/dev/parallel.md | 4 +- docs/dev/stream/process_function.md | 2 +- docs/dev/stream/{ => state}/checkpointing.md | 14 +- docs/dev/stream/state/custom_serialization.md | 188 +++++++++++++++++ docs/dev/stream/state/index.md | 56 +++++ .../dev/stream/{ => state}/queryable_state.md | 11 +- docs/dev/stream/{ => state}/state.md | 192 +----------------- docs/dev/stream/state/state_backends.md | 46 +++++ docs/dev/table/index.md | 2 +- docs/internals/components.md | 4 +- docs/internals/stream_checkpointing.md | 8 +- docs/internals/task_lifecycle.md | 4 +- docs/monitoring/debugging_classloading.md | 2 +- docs/monitoring/historyserver.md | 2 +- docs/{setup => ops}/cli.md | 12 +- docs/{setup => ops}/config.md | 8 +- docs/{setup => ops/deployment}/aws.md | 8 +- .../deployment}/cluster_setup.md | 2 +- docs/{setup => ops/deployment}/docker.md | 0 docs/{setup => ops/deployment}/gce_setup.md | 2 +- .../deployment.md => ops/deployment/index.md} | 4 +- docs/{setup => ops/deployment}/kubernetes.md | 0 docs/{setup => ops/deployment}/mapr_setup.md | 2 +- docs/{setup => ops/deployment}/mesos.md | 2 +- docs/{setup => ops/deployment}/yarn_setup.md | 6 +- docs/{setup => ops}/index.md | 2 +- .../jobmanager_high_availability.md | 6 +- docs/ops/production_ready.md | 6 +- docs/ops/security-kerberos.md | 8 +- docs/{setup => ops}/security-ssl.md | 4 +- docs/{setup => ops/state}/checkpoints.md | 6 +- docs/ops/{README.md => state/index.md} | 11 +- .../state}/large_state_tuning.md | 14 +- docs/{setup => ops/state}/savepoints.md | 6 +- docs/ops/{ => state}/state_backends.md | 8 +- docs/ops/upgrading.md | 14 +- docs/quickstart/setup_quickstart.md | 2 +- docs/redirects/cli.md | 2 +- docs/redirects/fault_tolerance.md | 2 +- docs/redirects/savepoints.md | 2 +- docs/redirects/state.md | 2 +- docs/redirects/state_backends.md | 2 +- docs/{setup => start}/building.md | 0 docs/{setup => start}/flink_on_windows.md | 0 60 files changed, 429 insertions(+), 304 deletions(-) rename docs/dev/stream/{ => state}/checkpointing.md (93%) create mode 100644 docs/dev/stream/state/custom_serialization.md create mode 100644 docs/dev/stream/state/index.md rename docs/dev/stream/{ => state}/queryable_state.md (98%) rename docs/dev/stream/{ => state}/state.md (66%) create mode 100644 docs/dev/stream/state/state_backends.md rename docs/{setup => ops}/cli.md (97%) rename docs/{setup => ops}/config.md (99%) rename docs/{setup => ops/deployment}/aws.md (96%) rename docs/{setup => ops/deployment}/cluster_setup.md (98%) rename docs/{setup => ops/deployment}/docker.md (100%) rename docs/{setup => ops/deployment}/gce_setup.md (95%) rename docs/{setup/deployment.md => ops/deployment/index.md} (96%) rename docs/{setup => ops/deployment}/kubernetes.md (100%) rename docs/{setup => ops/deployment}/mapr_setup.md (99%) rename docs/{setup => ops/deployment}/mesos.md (99%) rename docs/{setup => ops/deployment}/yarn_setup.md (98%) rename docs/{setup => ops}/index.md (98%) rename docs/{setup => ops}/jobmanager_high_availability.md (99%) rename docs/{setup => ops}/security-ssl.md (99%) rename docs/{setup => ops/state}/checkpoints.md (95%) rename docs/ops/{README.md => state/index.md} (83%) rename docs/{monitoring => ops/state}/large_state_tuning.md (94%) rename docs/{setup => ops/state}/savepoints.md (97%) rename docs/ops/{ => state}/state_backends.md (96%) rename docs/{setup => start}/building.md (100%) rename docs/{setup => start}/flink_on_windows.md (100%) diff --git a/docs/check_links.sh b/docs/check_links.sh index c4307a56094b8..5d9f7628c472c 100755 --- a/docs/check_links.sh +++ b/docs/check_links.sh @@ -31,6 +31,9 @@ fi # Fail the build if any broken links are found broken_links_str=$(grep -e 'Found [[:digit:]]\+ broken links' spider.log) if [ -n "$broken_links_str" ]; then - echo -e "\e[1;31m$broken_links_str\e[0m" + grep -B 1 "Remote file does not exist -- broken link!!!" spider.log + echo "---------------------------------------------------------------------------" + echo -e "$broken_links_str" + echo "Search for page containing broken link using 'grep -R BROKEN_PATH DOCS_DIR'" exit 1 fi diff --git a/docs/concepts/programming-model.md b/docs/concepts/programming-model.md index 7b0cfb5effd61..fd5ebeeca63da 100644 --- a/docs/concepts/programming-model.md +++ b/docs/concepts/programming-model.md @@ -171,7 +171,7 @@ This alignment also allows Flink to redistribute the state and adjust the stream State and Partitioning -For more information, see the documentation on [working with state](../dev/stream/state.html). +For more information, see the documentation on [state](../dev/stream/state/index.html). {% top %} @@ -188,7 +188,7 @@ of events that need to be replayed). The description of the [fault tolerance internals]({{ site.baseurl }}/internals/stream_checkpointing.html) provides more information about how Flink manages checkpoints and related topics. -Details about enabling and configuring checkpointing are in the [checkpointing API docs](../dev/stream/checkpointing.html). +Details about enabling and configuring checkpointing are in the [checkpointing API docs](../dev/stream/state/checkpointing.html). {% top %} diff --git a/docs/concepts/runtime.md b/docs/concepts/runtime.md index c598b12f3c682..cb6d58f0c1e97 100644 --- a/docs/concepts/runtime.md +++ b/docs/concepts/runtime.md @@ -54,8 +54,8 @@ The Flink runtime consists of two types of processes: There must always be at least one TaskManager. -The JobManagers and TaskManagers can be started in various ways: directly on the machines as a [standalone cluster](../setup/cluster_setup.html), in -containers, or managed by resource frameworks like [YARN](../setup/yarn_setup.html) or [Mesos](../setup/mesos.html). +The JobManagers and TaskManagers can be started in various ways: directly on the machines as a [standalone cluster](../ops/deployment/cluster_setup.html), in +containers, or managed by resource frameworks like [YARN](../ops/deployment/yarn_setup.html) or [Mesos](../ops/deployment/mesos.html). TaskManagers connect to JobManagers, announcing themselves as available, and are assigned work. The **client** is not part of the runtime and program execution, but is used to prepare and send a dataflow to the JobManager. @@ -107,7 +107,7 @@ With hyper-threading, each slot then takes 2 or more hardware thread contexts. ## State Backends -The exact data structures in which the key/values indexes are stored depends on the chosen [state backend](../ops/state_backends.html). One state backend +The exact data structures in which the key/values indexes are stored depends on the chosen [state backend](../ops/state/state_backends.html). One state backend stores data in an in-memory hash map, another state backend uses [RocksDB](http://rocksdb.org) as the key/value store. In addition to defining the data structure that holds the state, the state backends also implement the logic to take a point-in-time snapshot of the key/value state and store that snapshot as part of a checkpoint. @@ -120,8 +120,8 @@ take a point-in-time snapshot of the key/value state and store that snapshot as Programs written in the Data Stream API can resume execution from a **savepoint**. Savepoints allow both updating your programs and your Flink cluster without losing any state. -[Savepoints](..//setup/savepoints.html) are **manually triggered checkpoints**, which take a snapshot of the program and write it out to a state backend. They rely on the regular checkpointing mechanism for this. During execution programs are periodically snapshotted on the worker nodes and produce checkpoints. For recovery only the last completed checkpoint is needed and older checkpoints can be safely discarded as soon as a new one is completed. +[Savepoints](../ops/state/savepoints.html) are **manually triggered checkpoints**, which take a snapshot of the program and write it out to a state backend. They rely on the regular checkpointing mechanism for this. During execution programs are periodically snapshotted on the worker nodes and produce checkpoints. For recovery only the last completed checkpoint is needed and older checkpoints can be safely discarded as soon as a new one is completed. -Savepoints are similar to these periodic checkpoints except that they are **triggered by the user** and **don't automatically expire** when newer checkpoints are completed. Savepoints can be created from the [command line](../setup/cli.html#savepoints) or when cancelling a job via the [REST API](../monitoring/rest_api.html#cancel-job-with-savepoint). +Savepoints are similar to these periodic checkpoints except that they are **triggered by the user** and **don't automatically expire** when newer checkpoints are completed. Savepoints can be created from the [command line](../ops/cli.html#savepoints) or when cancelling a job via the [REST API](../monitoring/rest_api.html#cancel-job-with-savepoint). {% top %} diff --git a/docs/dev/api_concepts.md b/docs/dev/api_concepts.md index b3618aa0ae75a..c447f274ba867 100644 --- a/docs/dev/api_concepts.md +++ b/docs/dev/api_concepts.md @@ -100,7 +100,7 @@ will do the right thing depending on the context: if you are executing your program inside an IDE or as a regular Java program it will create a local environment that will execute your program on your local machine. If you created a JAR file from your program, and invoke it through the -[command line]({{ site.baseurl }}/setup/cli.html), the Flink cluster manager +[command line]({{ site.baseurl }}/ops/cli.html), the Flink cluster manager will execute your main method and `getExecutionEnvironment()` will return an execution environment for executing your program on a cluster. @@ -169,7 +169,7 @@ will do the right thing depending on the context: if you are executing your program inside an IDE or as a regular Java program it will create a local environment that will execute your program on your local machine. If you created a JAR file from your program, and invoke it through the -[command line]({{ site.baseurl }}/apis/cli.html), the Flink cluster manager +[command line]({{ site.baseurl }}/ops/cli.html), the Flink cluster manager will execute your main method and `getExecutionEnvironment()` will return an execution environment for executing your program on a cluster. diff --git a/docs/dev/batch/connectors.md b/docs/dev/batch/connectors.md index b7a071820c6d6..93bbf72c2706f 100644 --- a/docs/dev/batch/connectors.md +++ b/docs/dev/batch/connectors.md @@ -58,7 +58,7 @@ In order to use a Hadoop file system with Flink, make sure that #### Amazon S3 -See [Deployment & Operations - Deployment - AWS - S3: Simple Storage Service]({{ site.baseurl }}/setup/aws.html) for available S3 file system implementations, their configuration and required libraries. +See [Deployment & Operations - Deployment - AWS - S3: Simple Storage Service]({{ site.baseurl }}/ops/deployment/aws.html) for available S3 file system implementations, their configuration and required libraries. #### Alluxio diff --git a/docs/dev/batch/python.md b/docs/dev/batch/python.md index c4c26715846e0..0383f54b9407a 100644 --- a/docs/dev/batch/python.md +++ b/docs/dev/batch/python.md @@ -615,7 +615,7 @@ env.execute() A system-wide default parallelism for all execution environments can be defined by setting the `parallelism.default` property in `./conf/flink-conf.yaml`. See the -[Configuration]({{ site.baseurl }}/setup/config.html) documentation for details. +[Configuration]({{ site.baseurl }}/ops/config.html) documentation for details. {% top %} diff --git a/docs/dev/cluster_execution.md b/docs/dev/cluster_execution.md index d6148462b7be7..03af63744ad46 100644 --- a/docs/dev/cluster_execution.md +++ b/docs/dev/cluster_execution.md @@ -33,7 +33,7 @@ are two ways to send a program to a cluster for execution: The command line interface lets you submit packaged programs (JARs) to a cluster (or single machine setup). -Please refer to the [Command Line Interface]({{ site.baseurl }}/setup/cli.html) documentation for +Please refer to the [Command Line Interface]({{ site.baseurl }}/ops/cli.html) documentation for details. ## Remote Environment diff --git a/docs/dev/connectors/index.md b/docs/dev/connectors/index.md index 177e6011caac2..00c08534240f3 100644 --- a/docs/dev/connectors/index.md +++ b/docs/dev/connectors/index.md @@ -80,5 +80,5 @@ When a Flink application pushes a lot of data to an external data store, this can become an I/O bottleneck. If the data involved has many fewer reads than writes, a better approach can be for an external application to pull from Flink the data it needs. -The [Queryable State]({{ site.baseurl }}/dev/stream/queryable_state.html) interface +The [Queryable State]({{ site.baseurl }}/dev/stream/state/queryable_state.html) interface enables this by allowing the state being managed by Flink to be queried on demand. diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md index d4e8978b9ea00..042ad11bc7705 100644 --- a/docs/dev/connectors/kafka.md +++ b/docs/dev/connectors/kafka.md @@ -573,5 +573,5 @@ Once Kerberos-based Flink security is enabled, you can authenticate to Kafka wit When using standalone Flink deployment, you can also use `SASL_SSL`; please see how to configure the Kafka client for SSL [here](https://kafka.apache.org/documentation/#security_configclients). - Set `sasl.kerberos.service.name` to `kafka` (default `kafka`): The value for this should match the `sasl.kerberos.service.name` used for Kafka broker configurations. A mismatch in service name between client and server configuration will cause the authentication to fail. -For more information on Flink configuration for Kerberos security, please see [here]({{ site.baseurl}}/setup/config.html). +For more information on Flink configuration for Kerberos security, please see [here]({{ site.baseurl}}/ops/config.html). You can also find [here]({{ site.baseurl}}/ops/security-kerberos.html) further details on how Flink internally setups Kerberos-based security. diff --git a/docs/dev/datastream_api.md b/docs/dev/datastream_api.md index 7191d82f23ef8..8b3899b88b3d8 100644 --- a/docs/dev/datastream_api.md +++ b/docs/dev/datastream_api.md @@ -1158,7 +1158,7 @@ previous transformation. For example, you can use `someStream.map(...).startNewC you cannot use `someStream.startNewChain()`. A resource group is a slot in Flink, see -[slots]({{site.baseurl}}/setup/config.html#configuring-taskmanager-processing-slots). You can +[slots]({{site.baseurl}}/ops/config.html#configuring-taskmanager-processing-slots). You can manually isolate operators in separate slots if desired.
@@ -1604,7 +1604,7 @@ for an explanation of most parameters. These parameters pertain specifically to ### Fault Tolerance -[State & Checkpointing]({{ site.baseurl }}/dev/stream/checkpointing.html) describes how to enable and configure Flink's checkpointing mechanism. +[State & Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) describes how to enable and configure Flink's checkpointing mechanism. ### Controlling Latency diff --git a/docs/dev/execution_configuration.md b/docs/dev/execution_configuration.md index 94e788c7e2d10..23164500cf006 100644 --- a/docs/dev/execution_configuration.md +++ b/docs/dev/execution_configuration.md @@ -23,7 +23,7 @@ under the License. --> The `StreamExecutionEnvironment` contains the `ExecutionConfig` which allows to set job specific configuration values for the runtime. -To change the defaults that affect all jobs, see [Configuration]({{ site.baseurl }}/setup/config.html). +To change the defaults that affect all jobs, see [Configuration]({{ site.baseurl }}/ops/config.html).
diff --git a/docs/dev/libs/storm_compatibility.md b/docs/dev/libs/storm_compatibility.md index 89d77061f7d2c..6b24dc00bfa0a 100644 --- a/docs/dev/libs/storm_compatibility.md +++ b/docs/dev/libs/storm_compatibility.md @@ -134,7 +134,7 @@ DataStream rawInput = env.addSource( If a Spout emits a finite number of tuples, `SpoutWrapper` can be configures to terminate automatically by setting `numberOfInvocations` parameter in its constructor. This allows the Flink program to shut down automatically after all data is processed. -Per default the program will run until it is [canceled]({{site.baseurl}}/setup/cli.html) manually. +Per default the program will run until it is [canceled]({{site.baseurl}}/ops/cli.html) manually. ## Embed Bolts diff --git a/docs/dev/linking_with_flink.md b/docs/dev/linking_with_flink.md index 73ca6779354b7..3f55b9ee5d33e 100644 --- a/docs/dev/linking_with_flink.md +++ b/docs/dev/linking_with_flink.md @@ -126,7 +126,7 @@ to run your program on Flink with Scala 2.11, you need to add a `_2.11` suffix t values of the Flink modules in your dependencies section. If you are looking for building Flink with Scala 2.11, please check -[build guide]({{ site.baseurl }}/setup/building.html#scala-versions). +[build guide]({{ site.baseurl }}/start/building.html#scala-versions). #### Hadoop Dependency Versions diff --git a/docs/dev/local_execution.md b/docs/dev/local_execution.md index 45a39e30e64ff..cf89956159695 100644 --- a/docs/dev/local_execution.md +++ b/docs/dev/local_execution.md @@ -57,7 +57,7 @@ The `LocalEnvironment` is a handle to local execution for Flink programs. Use it The local environment is instantiated via the method `ExecutionEnvironment.createLocalEnvironment()`. By default, it will use as many local threads for execution as your machine has CPU cores (hardware contexts). You can alternatively specify the desired parallelism. The local environment can be configured to log to the console using `enableLogging()`/`disableLogging()`. -In most cases, calling `ExecutionEnvironment.getExecutionEnvironment()` is the even better way to go. That method returns a `LocalEnvironment` when the program is started locally (outside the command line interface), and it returns a pre-configured environment for cluster execution, when the program is invoked by the [command line interface]({{ site.baseurl }}/setup/cli.html). +In most cases, calling `ExecutionEnvironment.getExecutionEnvironment()` is the even better way to go. That method returns a `LocalEnvironment` when the program is started locally (outside the command line interface), and it returns a pre-configured environment for cluster execution, when the program is invoked by the [command line interface]({{ site.baseurl }}/ops/cli.html). ~~~java public static void main(String[] args) throws Exception { diff --git a/docs/dev/migration.md b/docs/dev/migration.md index cf8c8f8b14b5b..3369a2c01b87b 100644 --- a/docs/dev/migration.md +++ b/docs/dev/migration.md @@ -37,7 +37,7 @@ This would be relevant mostly for users implementing custom `TypeSerializer`s fo Since Flink 1.3, two additional methods have been added that are related to serializer compatibility across savepoint restores. Please see -[Handling serializer upgrades and compatibility]({{ site.baseurl }}/dev/stream/state.html#handling-serializer-upgrades-and-compatibility) +[Handling serializer upgrades and compatibility]({{ site.baseurl }}/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility) for further details on how to implement these methods. ### `ProcessFunction` is always a `RichFunction` @@ -75,7 +75,7 @@ For other custom projects, make sure to add logger dependencies. For example, in ## Migrating from Flink 1.1 to Flink 1.2 -As mentioned in the [State documentation]({{ site.baseurl }}/dev/stream/state.html), Flink has two types of state: +As mentioned in the [State documentation]({{ site.baseurl }}/dev/stream/state/state.html), Flink has two types of state: **keyed** and **non-keyed** state (also called **operator** state). Both types are available to both operators and user-defined functions. This document will guide you through the process of migrating your Flink 1.1 function code to Flink 1.2 and will present some important internal changes introduced in Flink 1.2 that concern the @@ -89,7 +89,7 @@ The migration process will serve two goals: Flink 1.1 predecessor. After following the steps in this guide, you will be able to migrate your running job from Flink 1.1 to Flink 1.2 -simply by taking a [savepoint]({{ site.baseurl }}/setup/savepoints.html) with your Flink 1.1 job and giving it to +simply by taking a [savepoint]({{ site.baseurl }}/ops/state/savepoints.html) with your Flink 1.1 job and giving it to your Flink 1.2 job as a starting point. This will allow the Flink 1.2 job to resume execution from where its Flink 1.1 predecessor left off. @@ -203,7 +203,7 @@ contains elements `(test1, 2)` and `(test2, 2)`, when increasing the parallelism while `(test2, 2)` will go to task 1. More details on the principles behind rescaling of both keyed state and non-keyed state can be found in -the [State documentation]({{ site.baseurl }}/dev/stream/state.html). +the [State documentation]({{ site.baseurl }}/dev/stream/state/index.html). ##### ListCheckpointed diff --git a/docs/dev/packaging.md b/docs/dev/packaging.md index ee351aef71835..e83d9ac35e12a 100644 --- a/docs/dev/packaging.md +++ b/docs/dev/packaging.md @@ -27,7 +27,7 @@ under the License. As described earlier, Flink programs can be executed on clusters by using a `remote environment`. Alternatively, programs can be packaged into JAR Files (Java Archives) for execution. Packaging the program is a prerequisite to executing them through the -[command line interface]({{ site.baseurl }}/setup/cli.html). +[command line interface]({{ site.baseurl }}/ops/cli.html). ### Packaging Programs diff --git a/docs/dev/parallel.md b/docs/dev/parallel.md index 549481fce2446..ae6f863de335f 100644 --- a/docs/dev/parallel.md +++ b/docs/dev/parallel.md @@ -27,7 +27,7 @@ program consists of multiple tasks (transformations/operators, data sources, and several parallel instances for execution and each parallel instance processes a subset of the task's input data. The number of parallel instances of a task is called its *parallelism*. -If you want to use [savepoints]({{ site.baseurl }}/setup/savepoints.html) you should also consider +If you want to use [savepoints]({{ site.baseurl }}/ops/state/savepoints.html) you should also consider setting a maximum parallelism (or `max parallelism`). When restoring from a savepoint you can change the parallelism of specific operators or the whole program and this setting specifies an upper bound on the parallelism. This is required because Flink internally partitions state @@ -181,7 +181,7 @@ try { A system-wide default parallelism for all execution environments can be defined by setting the `parallelism.default` property in `./conf/flink-conf.yaml`. See the -[Configuration]({{ site.baseurl }}/setup/config.html) documentation for details. +[Configuration]({{ site.baseurl }}/ops/config.html) documentation for details. ## Setting the Maximum Parallelism diff --git a/docs/dev/stream/process_function.md b/docs/dev/stream/process_function.md index fb5f39db61f4f..696a8b8ab2870 100644 --- a/docs/dev/stream/process_function.md +++ b/docs/dev/stream/process_function.md @@ -38,7 +38,7 @@ all (acyclic) streaming applications: The `ProcessFunction` can be thought of as a `FlatMapFunction` with access to keyed state and timers. It handles events by being invoked for each event received in the input stream(s). -For fault-tolerant state, the `ProcessFunction` gives access to Flink's [keyed state](state.html), accessible via the +For fault-tolerant state, the `ProcessFunction` gives access to Flink's [keyed state](state/state.html), accessible via the `RuntimeContext`, similar to the way other stateful functions can access keyed state. The timers allow applications to react to changes in processing time and in [event time](../event_time.html). diff --git a/docs/dev/stream/checkpointing.md b/docs/dev/stream/state/checkpointing.md similarity index 93% rename from docs/dev/stream/checkpointing.md rename to docs/dev/stream/state/checkpointing.md index 4fe06c1d292e9..34c16b0fe1ad2 100644 --- a/docs/dev/stream/checkpointing.md +++ b/docs/dev/stream/state/checkpointing.md @@ -1,7 +1,7 @@ --- title: "Checkpointing" -nav-parent_id: streaming -nav-pos: 50 +nav-parent_id: streaming_state +nav-pos: 2 --- + +If your application uses Flink's managed state, it might be necessary to implement a custom serialization logic for special use cases. + +This page is targeted as a guideline for users who require the use of custom serialization for their state, covering how +to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using +Flink's own serializers, this page is irrelevant and can be skipped. + +### Using custom serializers + +As demonstrated in the above examples, when registering a managed operator or keyed state, a `StateDescriptor` is required +to specify the state's name, as well as information about the type of the state. The type information is used by Flink's +[type serialization framework](../../types_serialization.html) to create appropriate serializers for the state. + +It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states, +simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation: + +
+
+{% highlight java %} +public class CustomTypeSerializer extends TypeSerializer> {...}; + +ListStateDescriptor> descriptor = + new ListStateDescriptor<>( + "state-name", + new CustomTypeSerializer()); + +checkpointedState = getRuntimeContext().getListState(descriptor); +{% endhighlight %} +
+ +
+{% highlight scala %} +class CustomTypeSerializer extends TypeSerializer[(String, Integer)] {...} + +val descriptor = new ListStateDescriptor[(String, Integer)]( + "state-name", + new CustomTypeSerializer) +) + +checkpointedState = getRuntimeContext.getListState(descriptor); +{% endhighlight %} +
+
+ +Note that Flink writes state serializers along with the state as metadata. In certain cases on restore (see following +subsections), the written serializer needs to be deserialized and used. Therefore, it is recommended to avoid using +anonymous classes as your state serializers. Anonymous classes do not have a guarantee on the generated classname, +varying across compilers and depends on the order that they are instantiated within the enclosing class, which can +easily cause the previously written serializer to be unreadable (since the original class can no longer be found in the +classpath). + +### Handling serializer upgrades and compatibility + +Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any +specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer +that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, +and is replaced as the new serializer for the state. + +A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, +and the written binary format of the state also remains identical. The means to check the new serializer's compatibility +is provided through the following two methods of the `TypeSerializer` interface: + +{% highlight java %} +public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); +public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); +{% endhighlight %} + +Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a +point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the +checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot +will be provided to the _new_ serializer of the same state via the counterpart method, `ensureCompatibility`, to verify +compatibility of the new serializer. This method serves as a check for whether or not the new serializer is compatible, +as well as a hook to possibly reconfigure the new serializer in the case that it is incompatible. + +Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the +same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible +with their previous configuration. + +The following subsections illustrate guidelines to implement these two methods when using custom serializers. + +#### Implementing the `snapshotConfiguration` method + +The serializer's configuration snapshot should capture enough information such that on restore, the information +carried over to the new serializer for the state is sufficient for it to determine whether or not it is compatible. +This could typically contain information about the serializer's parameters or binary format of the serialized data; +generally, anything that allows the new serializer to decide whether or not it can be used to read previous serialized +bytes, and that it writes in the same binary format. + +How the serializer's configuration snapshot is written to and read from checkpoints is fully customizable. The below +is the base class for all serializer configuration snapshot implementations, the `TypeSerializerConfigSnapshot`. + +{% highlight java %} +public abstract TypeSerializerConfigSnapshot extends VersionedIOReadableWritable { + public abstract int getVersion(); + public void read(DataInputView in) {...} + public void write(DataOutputView out) {...} +} +{% endhighlight %} + +The `read` and `write` methods define how the configuration is read from and written to the checkpoint. The base +implementations contain logic to read and write the version of the configuration snapshot, so it should be extended and +not completely overridden. + +The version of the configuration snapshot is determined through the `getVersion` method. Versioning for the serializer +configuration snapshot is the means to maintain compatible configurations, as information included in the configuration +may change over time. By default, configuration snapshots are only compatible with the current version (as returned by +`getVersion`). To indicate that the configuration is compatible with other versions, override the `getCompatibleVersions` +method to return more version values. When reading from the checkpoint, you can use the `getReadVersion` method to +determine the version of the written configuration and adapt the read logic to the specific version. + +Attention The version of the serializer's configuration snapshot is **not** +related to upgrading the serializer. The exact same serializer can have different implementations of its +configuration snapshot, for example when more information is added to the configuration to allow more comprehensive +compatibility checks in the future. + +One limitation of implementing a `TypeSerializerConfigSnapshot` is that an empty constructor must be present. The empty +constructor is required when reading the configuration snapshot from checkpoints. + +#### Implementing the `ensureCompatibility` method + +The `ensureCompatibility` method should contain logic that performs checks against the information about the previous +serializer carried over via the provided `TypeSerializerConfigSnapshot`, basically doing one of the following: + + * Check whether the serializer is compatible, while possibly reconfiguring itself (if required) so that it may be + compatible. Afterwards, acknowledge with Flink that the serializer is compatible. + + * Acknowledge that the serializer is incompatible and that state migration is required before Flink can proceed with + using the new serializer. + +The above cases can be translated to code by returning one of the following from the `ensureCompatibility` method: + + * **`CompatibilityResult.compatible()`**: This acknowledges that the new serializer is compatible, or has been reconfigured to + be compatible, and Flink can proceed with the job with the serializer as is. + + * **`CompatibilityResult.requiresMigration()`**: This acknowledges that the serializer is incompatible, or cannot be + reconfigured to be compatible, and requires a state migration before the new serializer can be used. State migration + is performed by using the previous serializer to read the restored state bytes to objects, and then serialized again + using the new serializer. + + * **`CompatibilityResult.requiresMigration(TypeDeserializer deserializer)`**: This acknowledgement has equivalent semantics + to `CompatibilityResult.requiresMigration()`, but in the case that the previous serializer cannot be found or loaded + to read the restored state bytes for the migration, a provided `TypeDeserializer` can be used as a fallback resort. + +Attention Currently, as of Flink 1.3, if the result of the compatibility check +acknowledges that state migration needs to be performed, the job simply fails to restore from the checkpoint as state +migration is currently not available. The ability to migrate state will be introduced in future releases. + +### Managing `TypeSerializer` and `TypeSerializerConfigSnapshot` classes in user code + +Since `TypeSerializer`s and `TypeSerializerConfigSnapshot`s are written as part of checkpoints along with the state +values, the availability of the classes within the classpath may affect restore behaviour. + +`TypeSerializer`s are directly written into checkpoints using Java Object Serialization. In the case that the new +serializer acknowledges that it is incompatible and requires state migration, it will be required to be present to be +able to read the restored state bytes. Therefore, if the original serializer class no longer exists or has been modified +(resulting in a different `serialVersionUID`) as a result of a serializer upgrade for the state, the restore would +not be able to proceed. The alternative to this requirement is to provide a fallback `TypeDeserializer` when +acknowledging that state migration is required, using `CompatibilityResult.requiresMigration(TypeDeserializer deserializer)`. + +The class of `TypeSerializerConfigSnapshot`s in the restored checkpoint must exist in the classpath, as they are +fundamental components to compatibility checks on upgraded serializers and would not be able to be restored if the class +is not present. Since configuration snapshots are written to checkpoints using custom serialization, the implementation +of the class is free to be changed, as long as compatibility of the configuration change is handled using the versioning +mechanisms in `TypeSerializerConfigSnapshot`. diff --git a/docs/dev/stream/state/index.md b/docs/dev/stream/state/index.md new file mode 100644 index 0000000000000..0d676754f9ecf --- /dev/null +++ b/docs/dev/stream/state/index.md @@ -0,0 +1,56 @@ +--- +title: "State & Fault Tolerance" +nav-id: streaming_state +nav-title: "State & Fault Tolerance" +nav-parent_id: streaming +nav-pos: 3 +nav-show_overview: true +--- + + +Stateful functions and operators store data across the processing of individual elements/events, making state a critical building block for +any type of more elaborate operation. + +For example: + + - When an application searches for certain event patterns, the state will store the sequence of events encountered so far. + - When aggregating events per minute/hour/day, the state holds the pending aggregates. + - When training a machine learning model over a stream of data points, the state holds the current version of the model parameters. + - When historic data needs to be managed, the state allows efficient access to events occured in the past. + +Flink needs to be aware of the state in order to make state fault tolerant using [checkpoints](checkpointing.html) and allow [savepoints]({{ site.baseurl }}/ops/state/savepoints.html) of streaming applications. + +Knowledge about the state also allows for rescaling Flink applications, meaning that Flink takes care of redistributing state across parallel instances. + +The [queryable state](queryable_state.html) feature of Flink allows you to access state from outside of Flink during runtime. + +When working with state, it might also be useful to read about [Flink's state backends]({{ site.baseurl }}/ops/state/state_backends.html). Flink provides different state backends that specify how and where state is stored. State can be located on Java's heap or off-heap. Depending on your state backend, Flink can also *manage* the state for the application, meaning Flink deals with the memory management (possibly spilling to disk if necessary) to allow applications to hold very large state. State backends can be configured without changing your application logic. + +{% top %} + +Where to go next? +----------------- + +* [Working with State](state.html): Shows how to use state in a Flink application and explains the different kinds of state. +* [Checkpointing](checkpointing.html): Describes how to enable and configure checkpointing for fault tolerance. +* [Queryable State](queryable_state.html): Explains how to access state from outside of Flink during runtime. +* [Custom Serialization for Managed State](custom_serialization.html): Discusses custom serialization logic for state and its upgrades. + +{% top %} \ No newline at end of file diff --git a/docs/dev/stream/queryable_state.md b/docs/dev/stream/state/queryable_state.md similarity index 98% rename from docs/dev/stream/queryable_state.md rename to docs/dev/stream/state/queryable_state.md index 234be51cf352b..bd0d7fbef4851 100644 --- a/docs/dev/stream/queryable_state.md +++ b/docs/dev/stream/state/queryable_state.md @@ -1,7 +1,8 @@ --- title: "Queryable State" -nav-parent_id: streaming -nav-pos: 61 +nav-parent_id: streaming_state +nav-pos: 3 +is_beta: true --- -* ToC -{:toc} - -Stateful functions and operators store data across the processing of individual elements/events, making state a critical building block for -any type of more elaborate operation. For example: - - - When an application searches for certain event patterns, the state will store the sequence of events encountered so far. - - When aggregating events per minute, the state holds the pending aggregates. - - When training a machine learning model over a stream of data points, the state holds the current version of the model parameters. - -In order to make state fault tolerant, Flink needs to be aware of the state and [checkpoint](checkpointing.html) it. -In many cases, Flink can also *manage* the state for the application, meaning Flink deals with the memory management (possibly spilling to disk -if necessary) to allow applications to hold very large state. - This document explains how to use Flink's state abstractions when developing an application. +* ToC +{:toc} ## Keyed State and Operator State @@ -64,7 +52,7 @@ for one or more Key Groups. With *Operator State* (or *non-keyed state*), each operator state is bound to one parallel operator instance. -The [Kafka Connector](../connectors/kafka.html) is a good motivating example for the use of Operator State +The [Kafka Connector]({{ site.baseurl }}/dev/connectors/kafka.html) is a good motivating example for the use of Operator State in Flink. Each parallel instance of the Kafka consumer maintains a map of topic partitions and offsets as its Operator State. @@ -87,6 +75,10 @@ Using managed state (rather than raw state) is recommended, since with managed state Flink is able to automatically redistribute state when the parallelism is changed, and also do better memory management. +Attention If your managed state needs custom serialization logic, please see +the [corresponding guide](custom_serialization.html) in order to ensure future compatibility. Flink's default serializers +don't need special treatment. + ## Using Managed Keyed State The managed keyed state interface provides access to different types of state that are all scoped to @@ -601,168 +593,4 @@ class CounterSource
-Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface. - -## Custom Serialization for Managed State - -This section is targeted as a guideline for users who require the use of custom serialization for their state, covering how -to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using -Flink's own serializers, this section is irrelevant and can be skipped. - -### Using custom serializers - -As demonstrated in the above examples, when registering a managed operator or keyed state, a `StateDescriptor` is required -to specify the state's name, as well as information about the type of the state. The type information is used by Flink's -[type serialization framework](../types_serialization.html) to create appropriate serializers for the state. - -It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states, -simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation: - -
-
-{% highlight java %} -public class CustomTypeSerializer extends TypeSerializer> {...}; - -ListStateDescriptor> descriptor = - new ListStateDescriptor<>( - "state-name", - new CustomTypeSerializer()); - -checkpointedState = getRuntimeContext().getListState(descriptor); -{% endhighlight %} -
- -
-{% highlight scala %} -class CustomTypeSerializer extends TypeSerializer[(String, Integer)] {...} - -val descriptor = new ListStateDescriptor[(String, Integer)]( - "state-name", - new CustomTypeSerializer) -) - -checkpointedState = getRuntimeContext.getListState(descriptor); -{% endhighlight %} -
-
- -Note that Flink writes state serializers along with the state as metadata. In certain cases on restore (see following -subsections), the written serializer needs to be deserialized and used. Therefore, it is recommended to avoid using -anonymous classes as your state serializers. Anonymous classes do not have a guarantee on the generated classname, -varying across compilers and depends on the order that they are instantiated within the enclosing class, which can -easily cause the previously written serializer to be unreadable (since the original class can no longer be found in the -classpath). - -### Handling serializer upgrades and compatibility - -Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any -specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer -that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, -and is replaced as the new serializer for the state. - -A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, -and the written binary format of the state also remains identical. The means to check the new serializer's compatibility -is provided through the following two methods of the `TypeSerializer` interface: - -{% highlight java %} -public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); -public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); -{% endhighlight %} - -Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a -point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the -checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot -will be provided to the _new_ serializer of the same state via the counterpart method, `ensureCompatibility`, to verify -compatibility of the new serializer. This method serves as a check for whether or not the new serializer is compatible, -as well as a hook to possibly reconfigure the new serializer in the case that it is incompatible. - -Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the -same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible -with their previous configuration. - -The following subsections illustrate guidelines to implement these two methods when using custom serializers. - -#### Implementing the `snapshotConfiguration` method - -The serializer's configuration snapshot should capture enough information such that on restore, the information -carried over to the new serializer for the state is sufficient for it to determine whether or not it is compatible. -This could typically contain information about the serializer's parameters or binary format of the serialized data; -generally, anything that allows the new serializer to decide whether or not it can be used to read previous serialized -bytes, and that it writes in the same binary format. - -How the serializer's configuration snapshot is written to and read from checkpoints is fully customizable. The below -is the base class for all serializer configuration snapshot implementations, the `TypeSerializerConfigSnapshot`. - -{% highlight java %} -public abstract TypeSerializerConfigSnapshot extends VersionedIOReadableWritable { - public abstract int getVersion(); - public void read(DataInputView in) {...} - public void write(DataOutputView out) {...} -} -{% endhighlight %} - -The `read` and `write` methods define how the configuration is read from and written to the checkpoint. The base -implementations contain logic to read and write the version of the configuration snapshot, so it should be extended and -not completely overridden. - -The version of the configuration snapshot is determined through the `getVersion` method. Versioning for the serializer -configuration snapshot is the means to maintain compatible configurations, as information included in the configuration -may change over time. By default, configuration snapshots are only compatible with the current version (as returned by -`getVersion`). To indicate that the configuration is compatible with other versions, override the `getCompatibleVersions` -method to return more version values. When reading from the checkpoint, you can use the `getReadVersion` method to -determine the version of the written configuration and adapt the read logic to the specific version. - -Attention The version of the serializer's configuration snapshot is **not** -related to upgrading the serializer. The exact same serializer can have different implementations of its -configuration snapshot, for example when more information is added to the configuration to allow more comprehensive -compatibility checks in the future. - -One limitation of implementing a `TypeSerializerConfigSnapshot` is that an empty constructor must be present. The empty -constructor is required when reading the configuration snapshot from checkpoints. - -#### Implementing the `ensureCompatibility` method - -The `ensureCompatibility` method should contain logic that performs checks against the information about the previous -serializer carried over via the provided `TypeSerializerConfigSnapshot`, basically doing one of the following: - - * Check whether the serializer is compatible, while possibly reconfiguring itself (if required) so that it may be - compatible. Afterwards, acknowledge with Flink that the serializer is compatible. - - * Acknowledge that the serializer is incompatible and that state migration is required before Flink can proceed with - using the new serializer. - -The above cases can be translated to code by returning one of the following from the `ensureCompatibility` method: - - * **`CompatibilityResult.compatible()`**: This acknowledges that the new serializer is compatible, or has been reconfigured to - be compatible, and Flink can proceed with the job with the serializer as is. - - * **`CompatibilityResult.requiresMigration()`**: This acknowledges that the serializer is incompatible, or cannot be - reconfigured to be compatible, and requires a state migration before the new serializer can be used. State migration - is performed by using the previous serializer to read the restored state bytes to objects, and then serialized again - using the new serializer. - - * **`CompatibilityResult.requiresMigration(TypeDeserializer deserializer)`**: This acknowledgement has equivalent semantics - to `CompatibilityResult.requiresMigration()`, but in the case that the previous serializer cannot be found or loaded - to read the restored state bytes for the migration, a provided `TypeDeserializer` can be used as a fallback resort. - -Attention Currently, as of Flink 1.3, if the result of the compatibility check -acknowledges that state migration needs to be performed, the job simply fails to restore from the checkpoint as state -migration is currently not available. The ability to migrate state will be introduced in future releases. - -### Managing `TypeSerializer` and `TypeSerializerConfigSnapshot` classes in user code - -Since `TypeSerializer`s and `TypeSerializerConfigSnapshot`s are written as part of checkpoints along with the state -values, the availability of the classes within the classpath may affect restore behaviour. - -`TypeSerializer`s are directly written into checkpoints using Java Object Serialization. In the case that the new -serializer acknowledges that it is incompatible and requires state migration, it will be required to be present to be -able to read the restored state bytes. Therefore, if the original serializer class no longer exists or has been modified -(resulting in a different `serialVersionUID`) as a result of a serializer upgrade for the state, the restore would -not be able to proceed. The alternative to this requirement is to provide a fallback `TypeDeserializer` when -acknowledging that state migration is required, using `CompatibilityResult.requiresMigration(TypeDeserializer deserializer)`. - -The class of `TypeSerializerConfigSnapshot`s in the restored checkpoint must exist in the classpath, as they are -fundamental components to compatibility checks on upgraded serializers and would not be able to be restored if the class -is not present. Since configuration snapshots are written to checkpoints using custom serialization, the implementation -of the class is free to be changed, as long as compatibility of the configuration change is handled using the versioning -mechanisms in `TypeSerializerConfigSnapshot`. +Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface. \ No newline at end of file diff --git a/docs/dev/stream/state/state_backends.md b/docs/dev/stream/state/state_backends.md new file mode 100644 index 0000000000000..1357f2e5a2305 --- /dev/null +++ b/docs/dev/stream/state/state_backends.md @@ -0,0 +1,46 @@ +--- +title: "State Backends" +nav-parent_id: streaming_state +nav-pos: 5 +--- + + +Flink provides different state backends that specify how and where state is stored. + +State can be located on Java’s heap or off-heap. Depending on your state backend, Flink can also manage the state for the application, meaning Flink deals with the memory management (possibly spilling to disk if necessary) to allow applications to hold very large state. By default, the configuration file *flink-conf.yaml* determines the state backend for all Flink jobs. + +However, the default state backend can be overridden on a per-job basis, as shown below. + +For more information about the available state backends, their advantages, limitations, and configuration parameters see the corresponding section in [Deployment & Operations]({{ site.baseurl }}/ops/state/state_backends.html). + +
+
+{% highlight java %} +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setStateBackend(...); +{% endhighlight %} +
+
+{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.setStateBackend(...) +{% endhighlight %} +
+
diff --git a/docs/dev/table/index.md b/docs/dev/table/index.md index df2ccbaaf9465..5845c95099cbb 100644 --- a/docs/dev/table/index.md +++ b/docs/dev/table/index.md @@ -74,7 +74,7 @@ Where to go next? * [Concepts & Common API]({{ site.baseurl }}/dev/table/common.html): Shared concepts and APIs of the Table API and SQL. * [Streaming Table API & SQL]({{ site.baseurl }}/dev/table/streaming.html): Streaming-specific documentation for the Table API or SQL such as configuration of time attributes and handling of updating results. -* [Table API]({{ site.baseurl }}/dev/table/tableapi.html): Supported operations and API for the Table API. +* [Table API]({{ site.baseurl }}/dev/table/tableApi.html): Supported operations and API for the Table API. * [SQL]({{ site.baseurl }}/dev/table/sql.html): Supported operations and syntax for SQL * [Table Sources & Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html): Reading tables from and emitting tables to external storage systems. * [User-Defined Functions]({{ site.baseurl }}/dev/table/udfs.html): Definition and usage of user-defined functions. diff --git a/docs/internals/components.md b/docs/internals/components.md index b750ffd90397e..cf3b6597135ec 100644 --- a/docs/internals/components.md +++ b/docs/internals/components.md @@ -54,6 +54,6 @@ You can click on the components in the figure to learn more. - - + + diff --git a/docs/internals/stream_checkpointing.md b/docs/internals/stream_checkpointing.md index d701c5e13c38c..330b0aae3f4ad 100644 --- a/docs/internals/stream_checkpointing.md +++ b/docs/internals/stream_checkpointing.md @@ -45,7 +45,7 @@ The system then restarts the operators and resets them to the latest successful point of the state snapshot. Any records that are processed as part of the restarted parallel dataflow are guaranteed to not have been part of the previously checkpointed state. -*Note:* By default, checkpointing is disabled. See [Checkpointing]({{ site.baseurl }}/dev/stream/checkpointing.html) for details on how to enable and configure checkpointing. +*Note:* By default, checkpointing is disabled. See [Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) for details on how to enable and configure checkpointing. *Note:* For this mechanism to realize its full guarantees, the data stream source (such as message queue or broker) needs to be able to rewind the stream to a defined recent point. [Apache Kafka](http://kafka.apache.org) has this ability and Flink's connector to @@ -106,10 +106,10 @@ the barrier *n* from the other inputs as well. Otherwise, it would mix records t When operators contain any form of *state*, this state must be part of the snapshots as well. Operator state comes in different forms: - - *User-defined state*: This is state that is created and modified directly by the transformation functions (like `map()` or `filter()`). See [State in Streaming Applications]({{ site.baseurl }}/dev/stream/state.html) for details. + - *User-defined state*: This is state that is created and modified directly by the transformation functions (like `map()` or `filter()`). See [State in Streaming Applications]({{ site.baseurl }}/dev/stream/state/index.html) for details. - *System state*: This state refers to data buffers that are part of the operator's computation. A typical example for this state are the *window buffers*, inside which the system collects (and aggregates) records for windows until the window is evaluated and evicted. -Operators snapshot their state at the point in time when they have received all snapshot barriers from their input streams, and before emitting the barriers to their output streams. At that point, all updates to the state from records before the barriers will have been made, and no updates that depend on records from after the barriers have been applied. Because the state of a snapshot may be large, it is stored in a configurable *[state backend]({{ site.baseurl }}/ops/state_backends.html)*. By default, this is the JobManager's memory, but for production use a distributed reliable storage should be configured (such as HDFS). After the state has been stored, the operator acknowledges the checkpoint, emits the snapshot barrier into the output streams, and proceeds. +Operators snapshot their state at the point in time when they have received all snapshot barriers from their input streams, and before emitting the barriers to their output streams. At that point, all updates to the state from records before the barriers will have been made, and no updates that depend on records from after the barriers have been applied. Because the state of a snapshot may be large, it is stored in a configurable *[state backend]({{ site.baseurl }}/ops/state/state_backends.html)*. By default, this is the JobManager's memory, but for production use a distributed reliable storage should be configured (such as HDFS). After the state has been stored, the operator acknowledges the checkpoint, emits the snapshot barrier into the output streams, and proceeds. The resulting snapshot now contains: @@ -145,7 +145,7 @@ It is possible to let an operator continue processing while it stores its state After receiving the checkpoint barriers on its inputs, the operator starts the asynchronous snapshot copying of its state. It immediately emits the barrier to its outputs and continues with the regular stream processing. Once the background copy process has completed, it acknowledges the checkpoint to the checkpoint coordinator (the JobManager). The checkpoint is now only complete after all sinks have received the barriers and all stateful operators have acknowledged their completed backup (which may be after the barriers reach the sinks). -See [State Backends]({{ site.baseurl }}/ops/state_backends.html) for details on the state snapshots. +See [State Backends]({{ site.baseurl }}/ops/state/state_backends.html) for details on the state snapshots. ## Recovery diff --git a/docs/internals/task_lifecycle.md b/docs/internals/task_lifecycle.md index cc557a1462851..fed2cb9792f17 100644 --- a/docs/internals/task_lifecycle.md +++ b/docs/internals/task_lifecycle.md @@ -89,7 +89,7 @@ and skips any intermediate phases between the phase the operator was in when the **Checkpoints:** The `snapshotState()` method of the operator is called asynchronously to the rest of the methods described above whenever a checkpoint barrier is received. Checkpoints are performed during the processing phase, *i.e.* after the operator is opened and before it is closed. The responsibility of this method is to store the current state of the operator -to the specified [state backend]({{ site.baseurl }}/ops/state_backends.html) from where it will be retrieved when +to the specified [state backend]({{ site.baseurl }}/ops/state/state_backends.html) from where it will be retrieved when the job resumes execution after a failure. Below we include a brief description of Flink's checkpointing mechanism, and for a more detailed discussion on the principles around checkpointing in Flink please read the corresponding documentation: [Data Streaming Fault Tolerance]({{ site.baseurl }}/internals/stream_checkpointing.html). @@ -125,7 +125,7 @@ first step for the task is to retrieve its initial, task-wide state. This is don particularly important in two cases: 1. when the task is recovering from a failure and restarts from the last successful checkpoint -2. when resuming from a [savepoint]({{ site.baseurl }}/setup/savepoints.html). +2. when resuming from a [savepoint]({{ site.baseurl }}/ops/state/savepoints.html). If it is the first time the task is executed, the initial task state is empty. diff --git a/docs/monitoring/debugging_classloading.md b/docs/monitoring/debugging_classloading.md index 8c91e0f91b13d..d69870afad83b 100644 --- a/docs/monitoring/debugging_classloading.md +++ b/docs/monitoring/debugging_classloading.md @@ -57,7 +57,7 @@ YARN classloading differs between single job deploymens and sessions: **Mesos** -Mesos setups following [this documentation](../setup/mesos.html) currently behave very much like the a +Mesos setups following [this documentation](../ops/deployment/mesos.html) currently behave very much like the a YARN session: The TaskManager and JobManager processes are started with the Flink framework classes in classpath, job classes are loaded dynamically when the jobs are submitted. diff --git a/docs/monitoring/historyserver.md b/docs/monitoring/historyserver.md index 773f3be95801f..e109512a5804f 100644 --- a/docs/monitoring/historyserver.md +++ b/docs/monitoring/historyserver.md @@ -71,7 +71,7 @@ historyserver.archive.fs.refresh-interval: 10000 The contained archives are downloaded and cached in the local filesystem. The local directory for this is configured via `historyserver.web.tmpdir`. -Check out the configuration page for a [complete list of configuration options]({{ site.baseurl }}/setup/config.html#history-server). +Check out the configuration page for a [complete list of configuration options]({{ site.baseurl }}/ops/config.html#history-server). ## Available Requests diff --git a/docs/setup/cli.md b/docs/ops/cli.md similarity index 97% rename from docs/setup/cli.md rename to docs/ops/cli.md index ef5adb3e6a3cf..7b36177223f39 100644 --- a/docs/setup/cli.md +++ b/docs/ops/cli.md @@ -1,8 +1,8 @@ --- title: "Command-Line Interface" nav-title: CLI -nav-parent_id: setup -nav-pos: 2 +nav-parent_id: ops +nav-pos: 6 --- - -This folder contains the documentation in the category -**Deployment & Operations**. diff --git a/docs/monitoring/large_state_tuning.md b/docs/ops/state/large_state_tuning.md similarity index 94% rename from docs/monitoring/large_state_tuning.md rename to docs/ops/state/large_state_tuning.md index d8b52ee23e2dc..aa0b0d8f0a24d 100644 --- a/docs/monitoring/large_state_tuning.md +++ b/docs/ops/state/large_state_tuning.md @@ -1,6 +1,6 @@ --- title: "Debugging and Tuning Checkpoints and Large State" -nav-parent_id: monitoring +nav-parent_id: ops_state nav-pos: 12 --- -If your application uses Flink's managed state, it might be necessary to implement a custom serialization logic for special use cases. +If your application uses Flink's managed state, it might be necessary to implement custom serialization logic for special use cases. This page is targeted as a guideline for users who require the use of custom serialization for their state, covering how to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using @@ -69,7 +69,7 @@ checkpointedState = getRuntimeContext.getListState(descriptor); Note that Flink writes state serializers along with the state as metadata. In certain cases on restore (see following subsections), the written serializer needs to be deserialized and used. Therefore, it is recommended to avoid using anonymous classes as your state serializers. Anonymous classes do not have a guarantee on the generated classname, -varying across compilers and depends on the order that they are instantiated within the enclosing class, which can +which varies across compilers and depends on the order that they are instantiated within the enclosing class, which can easily cause the previously written serializer to be unreadable (since the original class can no longer be found in the classpath). diff --git a/docs/dev/stream/state/index.md b/docs/dev/stream/state/index.md index 0d676754f9ecf..d1c0edbfc844f 100644 --- a/docs/dev/stream/state/index.md +++ b/docs/dev/stream/state/index.md @@ -1,4 +1,4 @@ ---- + --- title: "State & Fault Tolerance" nav-id: streaming_state nav-title: "State & Fault Tolerance" @@ -35,7 +35,7 @@ For example: - When training a machine learning model over a stream of data points, the state holds the current version of the model parameters. - When historic data needs to be managed, the state allows efficient access to events occured in the past. -Flink needs to be aware of the state in order to make state fault tolerant using [checkpoints](checkpointing.html) and allow [savepoints]({{ site.baseurl }}/ops/state/savepoints.html) of streaming applications. +Flink needs to be aware of the state in order to make state fault tolerant using [checkpoints](checkpointing.html) and to allow [savepoints]({{ site.baseurl }}/ops/state/savepoints.html) of streaming applications. Knowledge about the state also allows for rescaling Flink applications, meaning that Flink takes care of redistributing state across parallel instances.