From 35de514544fb1bdde13253283a6f35ee06b1af60 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Thu, 24 Dec 2015 01:19:02 +0100 Subject: [PATCH] [FLINK-3132] [docs] Initial docs restructure --- docs/README.md | 111 ++- docs/_config.yml | 11 +- docs/_includes/navbar.html | 74 +- docs/_layouts/base.html | 4 - docs/_layouts/plain.html | 86 +- docs/_plugins/highlightCode.rb | 3 +- docs/_plugins/info.rb | 20 + docs/_plugins/top.rb | 14 + docs/_plugins/warn.rb | 20 + .../{ => batch}/dataset_transformations.md | 26 +- docs/apis/{ => batch}/examples.md | 57 +- docs/apis/batch/fault_tolerance.md | 100 +++ docs/apis/{ => batch}/fig/LICENSE.txt | 0 .../fig/iterations_delta_iterate_operator.png | Bin ...rations_delta_iterate_operator_example.png | Bin .../fig/iterations_iterate_operator.png | Bin .../iterations_iterate_operator_example.png | Bin .../{ => batch}/fig/iterations_supersteps.png | Bin docs/apis/{ => batch}/fig/plan_visualizer.png | Bin docs/apis/{ => batch}/hadoop_compatibility.md | 29 +- .../{programming_guide.md => batch/index.md} | 236 ++--- docs/apis/{ => batch}/iterations.md | 10 +- docs/apis/{ => batch}/python.md | 78 +- docs/apis/best_practices.md | 7 +- docs/apis/cli.md | 3 + docs/apis/cluster_execution.md | 7 +- .../{example_connectors.md => connectors.md} | 10 +- docs/apis/filesystems.md | 236 +++++ docs/apis/index.md | 2 +- docs/apis/java8.md | 38 +- docs/apis/local_execution.md | 9 +- docs/apis/scala_shell.md | 8 +- docs/apis/streaming/connectors/docker.md | 116 +++ .../streaming/connectors/elasticsearch.md | 165 ++++ docs/apis/streaming/connectors/hdfs.md | 115 +++ docs/apis/streaming/connectors/index.md | 26 + docs/apis/streaming/connectors/kafka.md | 160 ++++ docs/apis/streaming/connectors/rabbitmq.md | 102 +++ docs/apis/streaming/connectors/twitter.md | 89 ++ docs/apis/{ => streaming}/fault_tolerance.md | 83 +- docs/apis/streaming/fig/LICENSE.txt | 17 + .../fig/savepoints-overview.png | Bin .../fig/savepoints-program_ids.png | Bin .../index.md} | 816 +----------------- docs/apis/{ => streaming}/savepoints.md | 2 + docs/apis/{ => streaming}/state_backends.md | 6 +- .../{ => streaming}/storm_compatibility.md | 4 +- docs/apis/web_client.md | 3 + docs/apis/zip_elements_guide.md | 106 --- docs/internals/add_operator.md | 4 + docs/internals/general_arch.md | 4 + docs/internals/ide_setup.md | 5 +- docs/internals/job_scheduling.md | 3 + docs/internals/logging.md | 4 + docs/internals/monitoring_rest_api.md | 3 + docs/internals/stream_checkpointing.md | 4 + docs/internals/types_serialization.md | 3 + docs/libs/gelly_guide.md | 105 +-- docs/libs/index.md | 10 +- docs/libs/ml/als.md | 8 +- docs/libs/ml/contribution_guide.md | 8 +- docs/libs/ml/distance_metrics.md | 8 +- docs/libs/ml/index.md | 12 +- docs/libs/ml/min_max_scaler.md | 6 +- docs/libs/ml/multiple_linear_regression.md | 8 +- docs/libs/ml/optimization.md | 7 +- docs/libs/ml/pipelines.md | 7 +- docs/libs/ml/polynomial_features.md | 7 +- docs/libs/ml/quickstart.md | 7 +- docs/libs/ml/standard_scaler.md | 7 +- docs/libs/ml/svm.md | 7 +- docs/libs/table.md | 12 +- docs/page/css/flink.css | 120 ++- docs/quickstart/java_api_quickstart.md | 4 + docs/quickstart/run_example_quickstart.md | 4 + docs/quickstart/scala_api_quickstart.md | 4 + docs/quickstart/setup_quickstart.md | 4 + docs/setup/building.md | 110 +-- docs/setup/cluster_setup.md | 301 +------ docs/setup/config.md | 464 +++------- docs/setup/flink_on_tez.md | 290 ------- docs/setup/gce_setup.md | 27 +- docs/setup/jobmanager_high_availability.md | 3 + docs/setup/local_setup.md | 25 +- docs/setup/yarn_setup.md | 46 +- 85 files changed, 2271 insertions(+), 2389 deletions(-) create mode 100644 docs/_plugins/info.rb create mode 100644 docs/_plugins/top.rb create mode 100644 docs/_plugins/warn.rb rename docs/apis/{ => batch}/dataset_transformations.md (99%) rename docs/apis/{ => batch}/examples.md (96%) create mode 100644 docs/apis/batch/fault_tolerance.md rename docs/apis/{ => batch}/fig/LICENSE.txt (100%) rename docs/apis/{ => batch}/fig/iterations_delta_iterate_operator.png (100%) rename docs/apis/{ => batch}/fig/iterations_delta_iterate_operator_example.png (100%) rename docs/apis/{ => batch}/fig/iterations_iterate_operator.png (100%) rename docs/apis/{ => batch}/fig/iterations_iterate_operator_example.png (100%) rename docs/apis/{ => batch}/fig/iterations_supersteps.png (100%) rename docs/apis/{ => batch}/fig/plan_visualizer.png (100%) rename docs/apis/{ => batch}/hadoop_compatibility.md (95%) rename docs/apis/{programming_guide.md => batch/index.md} (97%) rename docs/apis/{ => batch}/iterations.md (95%) rename docs/apis/{ => batch}/python.md (95%) rename docs/apis/{example_connectors.md => connectors.md} (98%) create mode 100644 docs/apis/filesystems.md create mode 100644 docs/apis/streaming/connectors/docker.md create mode 100644 docs/apis/streaming/connectors/elasticsearch.md create mode 100644 docs/apis/streaming/connectors/hdfs.md create mode 100644 docs/apis/streaming/connectors/index.md create mode 100644 docs/apis/streaming/connectors/kafka.md create mode 100644 docs/apis/streaming/connectors/rabbitmq.md create mode 100644 docs/apis/streaming/connectors/twitter.md rename docs/apis/{ => streaming}/fault_tolerance.md (74%) create mode 100644 docs/apis/streaming/fig/LICENSE.txt rename docs/apis/{ => streaming}/fig/savepoints-overview.png (100%) rename docs/apis/{ => streaming}/fig/savepoints-program_ids.png (100%) rename docs/apis/{streaming_guide.md => streaming/index.md} (79%) rename docs/apis/{ => streaming}/savepoints.md (99%) rename docs/apis/{ => streaming}/state_backends.md (98%) rename docs/apis/{ => streaming}/storm_compatibility.md (99%) delete mode 100644 docs/apis/zip_elements_guide.md delete mode 100644 docs/setup/flink_on_tez.md diff --git a/docs/README.md b/docs/README.md index 05dcecb742541..d37dc77ae9fdd 100644 --- a/docs/README.md +++ b/docs/README.md @@ -6,9 +6,9 @@ http://flink.apache.org/ is also generated from the files found here. # Requirements -We use Markdown to write and Jekyll to translate the documentation to static HTML. Kramdown is +We use Markdown to write and Jekyll to translate the documentation to static HTML. Kramdown is needed for Markdown processing and the Python based Pygments is used for syntax highlighting. To run -Javascript code from Ruby, you need to install a javascript runtime (e.g. `therubyracer`). You can +Javascript code from Ruby, you need to install a javascript runtime (e.g. `therubyracer`). You can install all needed software via the following commands: gem install jekyll -v 2.5.3 @@ -16,13 +16,13 @@ install all needed software via the following commands: gem install pygments.rb -v 0.6.3 gem install therubyracer -v 0.12.2 sudo easy_install Pygments - -Note that in Ubuntu based systems, it may be necessary to install the `ruby-dev` and + +Note that in Ubuntu based systems, it may be necessary to install the `ruby-dev` and `python-setuptools` packages via apt. # Using Dockerized Jekyll -We dockerized the jekyll environment above. If you have [docker](https://docs.docker.com/), +We dockerized the jekyll environment above. If you have [docker](https://docs.docker.com/), you can run following command to start the container. ``` @@ -33,7 +33,6 @@ cd flink/docs/docker It takes a few moment to build the image for the first time, but will be a second from the second time. The run.sh command brings you in a bash session where you can run following doc commands. - # Build The `docs/build_docs.sh` script calls Jekyll and generates the documentation in `docs/target`. You @@ -44,12 +43,13 @@ If you call the script with the preview flag `build_docs.sh -p`, Jekyll will sta # Contribute -The documentation pages are written in -[Markdown](http://daringfireball.net/projects/markdown/syntax). It is possible to use the -[GitHub flavored syntax](http://github.github.com/github-flavored-markdown) and intermix plain html. +## Markdown + +The documentation pages are written in [Markdown](http://daringfireball.net/projects/markdown/syntax). It is possible to use [GitHub flavored syntax](http://github.github.com/github-flavored-markdown) and intermix plain html. -In addition to Markdown, every page contains a Jekyll front matter, which specifies the title of the -page and the layout to use. The title is used as the top-level heading for the page. +## Front matter + +In addition to Markdown, every page contains a Jekyll front matter, which specifies the title of the page and the layout to use. The title is used as the top-level heading for the page. The default layout is `plain` (found in `_layouts`). --- title: "Title of the Page" @@ -59,20 +59,93 @@ Furthermore, you can access variables found in `docs/_config.yml` as follows: {{ site.NAME }} -This will be replaced with the value of the variable called `NAME` when generating -the docs. +This will be replaced with the value of the variable called `NAME` when generating the docs. + +## Structure -All documents are structed with headings. From these heading, a page outline is -automatically generated for each page. +### Page + +#### Headings + +All documents are structured with headings. From these headings, you can automatically generate a page table of contents (see below). ``` -# Level-1 Heading <- Used for the title of the page +# Level-1 Heading <- Used for the title of the page (don't use this) ## Level-2 Heading <- Start with this one ### Level-3 heading #### Level-4 heading ##### Level-5 heading ``` -Please stick to the "logical order" when using the headlines, e.g. start with level-2 headings and -use level-3 headings for subsections, etc. Don't use a different ordering, because you don't like -how a headline looks. +Please stick to the "logical order" when using the headlines, e.g. start with level-2 headings and use level-3 headings for subsections, etc. Don't use a different ordering, because you don't like how a headline looks. + +#### Table of Contents + + * This will be replaced by the TOC + {:toc} + + +Add this markup (both lines) to the document in order to generate a table of contents for the page. Headings until level 3 headings are included. + +You can exclude a heading from the table of contents: + + # Excluded heading + {:.no_toc} + +#### Back to Top + + {% top %} + +This will be replaced by a default back to top link. It is recommended to use these links at least at the end of each level-2 section. + +#### Labels + + {% info %} + {% warn %} + +These will be replaced by a info or warning label. You can change the text of the label by providing an argument: + + {% info Recommendation %} + +### Documentation + +#### Top Navigation + +You can modify the top-level navigation in two places. You can either edit the `_includes/navbar.html` file or add tags to your page frontmatter (recommended). + + # Top-level navigation + top-nav-group: apis + top-nav-pos: 2 + top-nav-title: Batch Guide (DataSet API) + +This adds the page to the group `apis` (via `top-nav-group`) at position `2` (via `top-nav-pos`). Furthermore, it specifies a custom title for the navigation via `top-nav-title`. If this field is missing, the regular page title (via `title`) will be used. If no position is specified, the element will be added to the end of the group. If no group is specified, the page will not show up. + +Currently, there are groups `quickstart`, `setup`, `deployment`, `apis`, `libs`, and `internals`. + +#### Sub Navigation + +A sub navigation is shown if the field `sub-nav-group` is specified. A sub navigation groups all pages with the same `sub-nav-group`. Check out the streaming or batch guide as an example. + + # Sub-level navigation + sub-nav-group: batch + sub-nav-id: dataset_api + sub-nav-pos: 1 + sub-nav-title: DataSet API + +The fields work similar to their `top-nav-*` counterparts. + +In addition, you can specify a hierarchy via `sub-nav-id` and `sub-nav-parent`: + + # Sub-level navigation + sub-nav-group: batch + sub-nav-parent: dataset_api + sub-nav-pos: 1 + sub-nav-title: Transformations + +This will show the `Transformations` page under the `DataSet API` page. The `sub-nav-parent` field has to have a matching `sub-nav-id`. + +#### Breadcrumbs + +Pages with sub navigations can use breadcrumbs like `Batch Guide > Libraries > Machine Learning > Optimization`. + +The breadcrumbs for the last page are generated from the front matter. For the a sub navigation root to appear (like `Batch Guide` in the example above), you have to specify the `sub-nav-group-title`. This field designates a group page as the root. diff --git a/docs/_config.yml b/docs/_config.yml index 98fb5059d5a18..6b93bfc8bd028 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -5,9 +5,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,7 +22,6 @@ # {{ site.CONFIG_KEY }} #------------------------------------------------------------------------------ - # This are the version referenced in the docs. Please only use these variables # to reference a specific Flink version, because this is the only place where # we change the version for the complete docs when forking of a release branch @@ -56,12 +55,16 @@ defaults: path: "" values: layout: plain + top-nav-pos: 99999 # Move to end + sub-nav-pos: 99999 # Move to end markdown: KramdownPygments highlighter: pygments kramdown: - toc_levels: 1..3 + input: GFM # GitHub syntax + hard_wrap: false # Don't translate new lines to
s + toc_levels: 1..3 # Include h1-h3 for ToC host: 0.0.0.0 diff --git a/docs/_includes/navbar.html b/docs/_includes/navbar.html index 91ba62af1a868..ea78d456d7bff 100644 --- a/docs/_includes/navbar.html +++ b/docs/_includes/navbar.html @@ -39,16 +39,16 @@ -The orders and lineitem files can be generated using the [TPC-H benchmark](http://www.tpc.org/tpch/) suite's data generator tool (DBGEN). +The orders and lineitem files can be generated using the [TPC-H benchmark](http://www.tpc.org/tpch/) suite's data generator tool (DBGEN). Take the following steps to generate arbitrary large input files for the provided Flink programs: 1. Download and unpack DBGEN diff --git a/docs/apis/batch/fault_tolerance.md b/docs/apis/batch/fault_tolerance.md new file mode 100644 index 0000000000000..51a6b4134164a --- /dev/null +++ b/docs/apis/batch/fault_tolerance.md @@ -0,0 +1,100 @@ +--- +title: "Fault Tolerance" + +# Sub-level navigation +sub-nav-group: batch +sub-nav-pos: 2 +--- + + +Flink's fault tolerance mechanism recovers programs in the presence of failures and +continues to execute them. Such failures include machine hardware failures, network failures, +transient program failures, etc. + +* This will be replaced by the TOC +{:toc} + +Batch Processing Fault Tolerance (DataSet API) +---------------------------------------------- + +Fault tolerance for programs in the *DataSet API* works by retrying failed executions. +The number of time that Flink retries the execution before the job is declared as failed is configurable +via the *execution retries* parameter. A value of *0* effectively means that fault tolerance is deactivated. + +To activate the fault tolerance, set the *execution retries* to a value larger than zero. A common choice is a value +of three. + +This example shows how to configure the execution retries for a Flink DataSet program. + +
+
+{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +env.setNumberOfExecutionRetries(3); +{% endhighlight %} +
+
+{% highlight scala %} +val env = ExecutionEnvironment.getExecutionEnvironment() +env.setNumberOfExecutionRetries(3) +{% endhighlight %} +
+
+ + +You can also define default values for the number of execution retries and the retry delay in the `flink-conf.yaml`: + +~~~ +execution-retries.default: 3 +~~~ + + +Retry Delays +------------ + +Execution retries can be configured to be delayed. Delaying the retry means that after a failed execution, the re-execution does not start +immediately, but only after a certain delay. + +Delaying the retries can be helpful when the program interacts with external systems where for example connections or pending transactions should reach a timeout before re-execution is attempted. + +You can set the retry delay for each program as follows (the sample shows the DataStream API - the DataSet API works similarly): + +
+
+{% highlight java %} +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.getConfig().setExecutionRetryDelay(5000); // 5000 milliseconds delay +{% endhighlight %} +
+
+{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.getConfig.setExecutionRetryDelay(5000) // 5000 milliseconds delay +{% endhighlight %} +
+
+ +You can also define the default value for the retry delay in the `flink-conf.yaml`: + +~~~ +execution-retries.delay: 10 s +~~~ + +{% top %} diff --git a/docs/apis/fig/LICENSE.txt b/docs/apis/batch/fig/LICENSE.txt similarity index 100% rename from docs/apis/fig/LICENSE.txt rename to docs/apis/batch/fig/LICENSE.txt diff --git a/docs/apis/fig/iterations_delta_iterate_operator.png b/docs/apis/batch/fig/iterations_delta_iterate_operator.png similarity index 100% rename from docs/apis/fig/iterations_delta_iterate_operator.png rename to docs/apis/batch/fig/iterations_delta_iterate_operator.png diff --git a/docs/apis/fig/iterations_delta_iterate_operator_example.png b/docs/apis/batch/fig/iterations_delta_iterate_operator_example.png similarity index 100% rename from docs/apis/fig/iterations_delta_iterate_operator_example.png rename to docs/apis/batch/fig/iterations_delta_iterate_operator_example.png diff --git a/docs/apis/fig/iterations_iterate_operator.png b/docs/apis/batch/fig/iterations_iterate_operator.png similarity index 100% rename from docs/apis/fig/iterations_iterate_operator.png rename to docs/apis/batch/fig/iterations_iterate_operator.png diff --git a/docs/apis/fig/iterations_iterate_operator_example.png b/docs/apis/batch/fig/iterations_iterate_operator_example.png similarity index 100% rename from docs/apis/fig/iterations_iterate_operator_example.png rename to docs/apis/batch/fig/iterations_iterate_operator_example.png diff --git a/docs/apis/fig/iterations_supersteps.png b/docs/apis/batch/fig/iterations_supersteps.png similarity index 100% rename from docs/apis/fig/iterations_supersteps.png rename to docs/apis/batch/fig/iterations_supersteps.png diff --git a/docs/apis/fig/plan_visualizer.png b/docs/apis/batch/fig/plan_visualizer.png similarity index 100% rename from docs/apis/fig/plan_visualizer.png rename to docs/apis/batch/fig/plan_visualizer.png diff --git a/docs/apis/hadoop_compatibility.md b/docs/apis/batch/hadoop_compatibility.md similarity index 95% rename from docs/apis/hadoop_compatibility.md rename to docs/apis/batch/hadoop_compatibility.md index d88dc0b74b223..68a6b055f7181 100644 --- a/docs/apis/hadoop_compatibility.md +++ b/docs/apis/batch/hadoop_compatibility.md @@ -1,6 +1,9 @@ --- title: "Hadoop Compatibility" is_beta: true +# Sub-level navigation +sub-nav-group: batch +sub-nav-pos: 7 --- - - DataSet programs in Flink are regular programs that implement transformations on data sets (e.g., filtering, mapping, joining, grouping). The data sets are initially created from certain sources (e.g., by reading files, or from local collections). Results are returned via sinks, which may for @@ -103,7 +113,7 @@ object WordCount { -[Back to top](#top) +{% top %} Linking with Flink @@ -197,7 +207,7 @@ to run your program on Flink with Scala 2.11, you need to add a `_2.11` suffix t values of the Flink modules in your dependencies section. If you are looking for building Flink with Scala 2.11, please check -[build guide](../setup/building.html#build-flink-for-a-specific-scala-version). +[build guide]({{ site.baseurl }}/setup/building.html#scala-versions). #### Hadoop Dependency Versions @@ -211,9 +221,9 @@ In order to link against the latest SNAPSHOT versions of the code, please follow The *flink-clients* dependency is only necessary to invoke the Flink program locally (for example to run it standalone for testing and debugging). If you intend to only export the program as a JAR -file and [run it on a cluster](cluster_execution.html), you can skip that dependency. +file and [run it on a cluster]({{ site.baseurl }}/apis/cluster_execution.html), you can skip that dependency. -[Back to top](#top) +{% top %} Program Skeleton ---------------- @@ -253,8 +263,8 @@ Typically, you only need to use `getExecutionEnvironment()`, since this will do the right thing depending on the context: if you are executing your program inside an IDE or as a regular Java program it will create a local environment that will execute your program on your local machine. If -you created a JAR file from your program, and invoke it through the [command line](cli.html) -or the [web interface](web_client.html), +you created a JAR file from your program, and invoke it through the [command line]({{ site.baseurl }}/apis/cli.html) +or the [web interface]({{ site.baseurl }}/apis/web_client.html), the Flink cluster manager will execute your main method and `getExecutionEnvironment()` will return an execution environment for executing your program on a cluster. @@ -294,7 +304,7 @@ This will create a new DataSet by converting every String in the original set to an Integer. For more information and a list of all the transformations, please refer to [Transformations](#transformations). -Once you have a DataSet containing your final results, you can either write the result +Once you have a DataSet containing your final results, you can either write the result to a file system (HDFS or local) or print it. {% highlight java %} @@ -321,7 +331,7 @@ programs with a `main()` method. Each program consists of the same basic parts: 5. Trigger the program execution We will now give an overview of each of those steps, please refer to the respective sections for -more details. Note that all core classes of the Scala API are found in the package +more details. Note that all core classes of the Scala API are found in the package {% gh_link /flink-scala/src/main/scala/org/apache/flink/api/scala "org.apache.flink.api.scala" %}. @@ -405,16 +415,16 @@ def collect() -The first two methods (`writeAsText()` and `writeAsCsv()`) do as the name suggests, the third one -can be used to specify a custom data output format. Please refer to [Data Sinks](#data-sinks) for +The first two methods (`writeAsText()` and `writeAsCsv()`) do as the name suggests, the third one +can be used to specify a custom data output format. Please refer to [Data Sinks](#data-sinks) for more information on writing to files and also about custom data output formats. -The `print()` method is useful for developing/debugging. It will output the contents of the DataSet +The `print()` method is useful for developing/debugging. It will output the contents of the DataSet to standard output (on the JVM starting the Flink execution). **NOTE** The behavior of the `print()` -method changed with Flink 0.9.x. Before it was printing to the log file of the workers, now its +method changed with Flink 0.9.x. Before it was printing to the log file of the workers, now its sending the DataSet results to the client and printing the results there. -`collect()` retrieve the DataSet from the cluster to the local JVM. The `collect()` method +`collect()` retrieve the DataSet from the cluster to the local JVM. The `collect()` method will return a `List` containing the elements. Both `print()` and `collect()` will trigger the execution of the program. You don't need to further call `execute()`. @@ -424,14 +434,14 @@ Both `print()` and `collect()` will trigger the execution of the program. You do the data sizes you can retrieve with `collect()` are limited due to our RPC system. It is not advised to collect DataSets larger than 10MBs. -There is also a `printOnTaskManager()` method which will print the DataSet contents on the TaskManager +There is also a `printOnTaskManager()` method which will print the DataSet contents on the TaskManager (so you have to get them from the log file). The `printOnTaskManager()` method will not trigger a program execution. Once you specified the complete program you need to **trigger the program execution**. You can call `execute()` directly on the `ExecutionEnviroment` or you implicitly trigger the execution with `collect()` or `print()`. -Depending on the type of the `ExecutionEnvironment` the execution will be triggered on your local +Depending on the type of the `ExecutionEnvironment` the execution will be triggered on your local machine or submit your program for execution on a cluster. Note that you can not call both `print()` (or `collect()`) and `execute()` at the end of program. @@ -441,7 +451,7 @@ accumulator results. `print()` and `collect()` are not returning the result, but accessed from the `getLastJobExecutionResult()` method. -[Back to top](#top) +{% top %} DataSet abstraction @@ -450,13 +460,13 @@ DataSet abstraction A `DataSet` is an abstract representation of a finite immutable collection of data of the same type that may contain duplicates. Note that Flink is not always physically creating (materializing) each DataSet at runtime. This -depends on the used runtime, the configuration and optimizer decisions. DataSets may be "streamed through" +depends on the used runtime, the configuration and optimizer decisions. DataSets may be "streamed through" operations during execution, as under the hood Flink uses a streaming data processing engine. Some DataSets are materialized automatically to avoid distributed deadlocks (at points where the data flow graph branches out and joins again later) or if the execution mode has explicitly been set to blocking execution. -[Back to top](#top) +{% top %} Lazy Evaluation @@ -464,7 +474,7 @@ Lazy Evaluation All Flink DataSet programs are executed lazily: When the program's main method is executed, the data loading and transformations do not happen directly. Rather, each operation is created and added to the -program's plan. The operations are actually executed when the execution is explicitly triggered by +program's plan. The operations are actually executed when the execution is explicitly triggered by an `execute()` call on the ExecutionEnvironment object. Also, `collect()` and `print()` will trigger the job execution. Whether the program is executed locally or on a cluster depends on the environment of the program. @@ -472,7 +482,7 @@ on the environment of the program. The lazy evaluation lets you construct sophisticated programs that Flink executes as one holistically planned unit. -[Back to top](#top) +{% top %} Transformations @@ -552,7 +562,7 @@ data.mapPartition(new MapPartitionFunction() {

Evaluates a boolean function for each element and retains those for which the function returns true.
- + IMPORTANT: The system assumes that the function does not modify the elements on which the predicate is applied. Violating this assumption can lead to incorrect results.

@@ -617,14 +627,14 @@ DataSet> output = input.sum(0).andMin(2); Distinct -

Returns the distinct elements of a data set. It removes the duplicate entries +

Returns the distinct elements of a data set. It removes the duplicate entries from the input DataSet, with respect to all fields of the elements, or a subset of fields.

{% highlight java %} - data.distinct(); + data.distinct(); {% endhighlight %} - + Join @@ -639,11 +649,11 @@ result = input1.join(input2) {% endhighlight %} You can specify the way that the runtime executes the join via Join Hints. The hints describe whether the join happens through partitioning or broadcasting, and whether it uses - a sort-based or a hash-based algorithm. Please refer to the + a sort-based or a hash-based algorithm. Please refer to the Transformations Guide for a list of possible hints and an example.
If no hint is specified, the system will try to make an estimate of the input sizes and - pick a the best strategy according to those estimates. + pick a the best strategy according to those estimates. {% highlight java %} // This executes a join by broadcasting the first data set // using a hash table for the broadcasted data @@ -664,7 +674,7 @@ input1.leftOuterJoin(input2) // rightOuterJoin or fullOuterJoin for right or ful .equalTo(1) // key of the second input (tuple field 1) .with(new JoinFunction() { public String join(String v1, String v2) { - // NOTE: + // NOTE: // - v2 might be null for leftOuterJoin // - v1 might be null for rightOuterJoin // - v1 OR v2 might be null for fullOuterJoin @@ -767,8 +777,8 @@ DataSet result = in.partitionCustom(Partitioner partitioner, key) Sort Partition -

Locally sorts all partitions of a data set on a specified field in a specified order. - Fields can be specified as tuple positions or field expressions. +

Locally sorts all partitions of a data set on a specified field in a specified order. + Fields can be specified as tuple positions or field expressions. Sorting on multiple fields is done by chaining sortPartition() calls.

{% highlight java %} DataSet> in = // [...] @@ -920,16 +930,16 @@ val output: DataSet[(Int, String, Doublr)] = input.sum(0).min(2) {% endhighlight %} - + Distinct -

Returns the distinct elements of a data set. It removes the duplicate entries +

Returns the distinct elements of a data set. It removes the duplicate entries from the input DataSet, with respect to all fields of the elements, or a subset of fields.

{% highlight scala %} - data.distinct() + data.distinct() {% endhighlight %} - + @@ -946,7 +956,7 @@ val result = input1.join(input2).where(0).equalTo(1) {% endhighlight %} You can specify the way that the runtime executes the join via Join Hints. The hints describe whether the join happens through partitioning or broadcasting, and whether it uses - a sort-based or a hash-based algorithm. Please refer to the + a sort-based or a hash-based algorithm. Please refer to the Transformations Guide for a list of possible hints and an example.
If no hint is specified, the system will try to make an estimate of the input sizes and @@ -1057,8 +1067,8 @@ val result = in Sort Partition -

Locally sorts all partitions of a data set on a specified field in a specified order. - Fields can be specified as tuple positions or field expressions. +

Locally sorts all partitions of a data set on a specified field in a specified order. + Fields can be specified as tuple positions or field expressions. Sorting on multiple fields is done by chaining sortPartition() calls.

{% highlight scala %} val in: DataSet[(Int, String)] = // [...] @@ -1094,7 +1104,7 @@ possible for [Data Sources](#data-sources) and [Data Sinks](#data-sinks). `withParameters(Configuration)` passes Configuration objects, which can be accessed from the `open()` method inside the user function. -[Back to Top](#top) +{% top %} Specifying Keys @@ -1208,7 +1218,7 @@ In the example below, we have a `WC` POJO with two fields "word" and "count". To {% highlight java %} // some ordinary POJO (Plain old Java Object) public class WC { - public String word; + public String word; public int count; } DataSet words = // [...] @@ -1317,8 +1327,8 @@ These are valid field expressions for the example code above: ### Define keys using Key Selector Functions {:.no_toc} -An additional way to define keys are "key selector" functions. A key selector function -takes a single dataset element as input and returns the key for the element. The key can be of any type and be derived from arbitrary computations. +An additional way to define keys are "key selector" functions. A key selector function +takes a single dataset element as input and returns the key for the element. The key can be of any type and be derived from arbitrary computations. The following example shows a key selector function that simply returns the field of an object: @@ -1349,7 +1359,7 @@ val wordCounts = words -[Back to top](#top) +{% top %} Passing Functions to Flink @@ -1382,7 +1392,7 @@ data.map(new MapFunction () { #### Java 8 Lambdas -Flink also supports Java 8 Lambdas in the Java API. Please see the full [Java 8 Guide](java8.html). +Flink also supports Java 8 Lambdas in the Java API. Please see the full [Java 8 Guide]({{ site.baseurl }}/apis/java8.html). {% highlight java %} DataSet data = // [...] @@ -1495,7 +1505,7 @@ the [transformations documentation](dataset_transformations.html) for a complete example. -[Back to top](#top) +{% top %} Data Types @@ -1520,7 +1530,7 @@ There are six different categories of data types:
-Tuples are composite types that contain a fixed number of fields with various types. +Tuples are composite types that contain a fixed number of fields with various types. The Java API provides classes from `Tuple1` up to `Tuple25`. Every field of a tuple can be an arbitrary Flink type including further tuples, resulting in nested tuples. Fields of a tuple can be accessed directly using the field's name as `tuple.f4`, or using the generic getter method @@ -1634,16 +1644,16 @@ wordCounts groupBy { _.word } reduce(new MyReduceFunction()) #### Primitive Types -Flink supports all Java and Scala primitive types such as `Integer`, `String`, and `Double`. +Flink supports all Java and Scala primitive types such as `Integer`, `String`, and `Double`. #### General Class Types -Flink supports most Java and Scala classes (API and custom). +Flink supports most Java and Scala classes (API and custom). Restrictions apply to classes containing fields that cannot be serialized, like file pointers, I/O streams, or other native resources. Classes that follow the Java Beans conventions work well in general. -All classes that are not identified as POJO types (see POJO requirements above) are handled by Flink as general class types. -Flink treats these data types as black boxes and is not able to access their their content (i.e., for efficient sorting). General types are de/serialized using the serialization framework [Kryo](https://github.com/EsotericSoftware/kryo). +All classes that are not identified as POJO types (see POJO requirements above) are handled by Flink as general class types. +Flink treats these data types as black boxes and is not able to access their their content (i.e., for efficient sorting). General types are de/serialized using the serialization framework [Kryo](https://github.com/EsotericSoftware/kryo). When grouping, sorting, or joining a data set of generic types, keys must be specified with key selector functions. See the [key definition section](#specifying-keys) or [data transformation section](#transformations) for details. @@ -1721,7 +1731,7 @@ There is a switch at the `ExectionConfig` which allows users to enable the objec -[Back to top](#top) +{% top %} Data Sources @@ -1750,16 +1760,16 @@ File-based: - `readFileOfPrimitives(path, Class)` / `PrimitiveInputFormat` - Parses files of new-line (or another char sequence) delimited primitive data types such as `String` or `Integer`. - + - `readFileOfPrimitives(path, delimiter, Class)` / `PrimitiveInputFormat` - Parses files of new-line (or another char sequence) delimited primitive data types such as `String` or `Integer` using the given delimiter. - -- `readHadoopFile(FileInputFormat, Key, Value, path)` / `FileInputFormat` - Creates a JobConf and reads file from the specified + +- `readHadoopFile(FileInputFormat, Key, Value, path)` / `FileInputFormat` - Creates a JobConf and reads file from the specified path with the specified FileInputFormat, Key class and Value class and returns them as Tuple2. - + - `readSequenceFile(Key, Value, path)` / `SequenceFileInputFormat` - Creates a JobConf and reads file from the specified path with type SequenceFileInputFormat, Key class and Value class and returns them as Tuple2. - + Collection-based: @@ -1807,12 +1817,12 @@ DataSet> csvInput = env.readCsvFile("hdfs:///the/CSV/file // read a CSV file with three fields into a POJO (Person.class) with corresponding fields DataSet> csvInput = env.readCsvFile("hdfs:///the/CSV/file") .pojoType(Person.class, "name", "age", "zipcode"); - -// read a file from the specified path of type TextInputFormat + +// read a file from the specified path of type TextInputFormat DataSet> tuples = env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file"); - + // read a file from the specified path of type SequenceFileInputFormat DataSet> tuples = env.readSequenceFile(IntWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file"); @@ -1824,7 +1834,7 @@ DataSet value = env.fromElements("Foo", "bar", "foobar", "fubar"); DataSet numbers = env.generateSequence(1, 10000000); // Read data from a relational database using the JDBC input format -DataSet dbData = +DataSet dbData = env.createInput( // create and configure input format JDBCInputFormat.buildJDBCInputFormat() @@ -1905,10 +1915,10 @@ File-based: - `readFileOfPrimitives(path, delimiter)` / `PrimitiveInputFormat` - Parses files of new-line (or another char sequence) delimited primitive data types such as `String` or `Integer` using the given delimiter. - -- `readHadoopFile(FileInputFormat, Key, Value, path)` / `FileInputFormat` - Creates a JobConf and reads file from the specified + +- `readHadoopFile(FileInputFormat, Key, Value, path)` / `FileInputFormat` - Creates a JobConf and reads file from the specified path with the specified FileInputFormat, Key class and Value class and returns them as Tuple2. - + - `readSequenceFile(Key, Value, path)` / `SequenceFileInputFormat` - Creates a JobConf and reads file from the specified path with type SequenceFileInputFormat, Key class and Value class and returns them as Tuple2. @@ -1971,10 +1981,10 @@ val values = env.fromElements("Foo", "bar", "foobar", "fubar") // generate a number sequence val numbers = env.generateSequence(1, 10000000); -// read a file from the specified path of type TextInputFormat +// read a file from the specified path of type TextInputFormat val tuples = env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], "hdfs://nnHost:nnPort/path/to/file") - + // read a file from the specified path of type SequenceFileInputFormat val tuples = env.readSequenceFile(classOf[IntWritable], classOf[Text], "hdfs://nnHost:nnPort/path/to/file") @@ -2054,7 +2064,7 @@ The following table lists the currently supported compression methods. -[Back to top](#top) +{% top %} Execution Configuration @@ -2088,13 +2098,13 @@ With the closure cleaner disabled, it might happen that an anonymous user functi - `getExecutionRetryDelay()` / `setExecutionRetryDelay(long executionRetryDelay)` Sets the delay in milliseconds that the system waits after a job has failed, before re-executing it. The delay starts after all tasks have been successfully been stopped on the TaskManagers, and once the delay is past, the tasks are re-started. This parameter is useful to delay re-execution in order to let certain time-out related failures surface fully (like broken connections that have not fully timed out), before attempting a re-execution and immediately failing again due to the same problem. This parameter only has an effect if the number of execution re-tries is one or more. -- `getExecutionMode()` / `setExecutionMode()`. The default execution mode is PIPELINED. Sets the execution mode to execute the program. The execution mode defines whether data exchanges are performed in a batch or on a pipelined manner. +- `getExecutionMode()` / `setExecutionMode()`. The default execution mode is PIPELINED. Sets the execution mode to execute the program. The execution mode defines whether data exchanges are performed in a batch or on a pipelined manner. - `enableForceKryo()` / **`disableForceKryo`**. Kryo is not forced by default. Forces the GenericTypeInformation to use the Kryo serializer for POJOS even though we could analyze them as a POJO. In some cases this might be preferable. For example, when Flink's internal serializers fail to handle a POJO properly. - `enableForceAvro()` / **`disableForceAvro()`**. Avro is not forced by default. Forces the Flink AvroTypeInformation to use the Avro serializer instead of Kryo for serializing Avro POJOs. -- `enableObjectReuse()` / **`disableObjectReuse()`** By default, objects are not reused in Flink. Enabling the [object reuse mode](programming_guide.html#object-reuse-behavior) will instruct the runtime to reuse user objects for better performance. Keep in mind that this can lead to bugs when the user-code function of an operation is not aware of this behavior. +- `enableObjectReuse()` / **`disableObjectReuse()`** By default, objects are not reused in Flink. Enabling the [object reuse mode](#object-reuse-behavior) will instruct the runtime to reuse user objects for better performance. Keep in mind that this can lead to bugs when the user-code function of an operation is not aware of this behavior. - **`enableSysoutLogging()`** / `disableSysoutLogging()` JobManager status updates are printed to `System.out` by default. This setting allows to disable this behavior. @@ -2108,7 +2118,7 @@ With the closure cleaner disabled, it might happen that an anonymous user functi - `registerKryoType(Class type)` If the type ends up being serialized with Kryo, then it will be registered at Kryo to make sure that only tags (integer IDs) are written. If a type is not registered with Kryo, its entire class-name will be serialized with every instance, leading to much higher I/O costs. -- `registerPojoType(Class type)` Registers the given type with the serialization stack. If the type is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the type ends up being serialized with Kryo, then it will be registered at Kryo to make sure that only tags are written. If a type is not registered with Kryo, its entire class-name will be serialized with every instance, leading to much higher I/O costs. +- `registerPojoType(Class type)` Registers the given type with the serialization stack. If the type is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the type ends up being serialized with Kryo, then it will be registered at Kryo to make sure that only tags are written. If a type is not registered with Kryo, its entire class-name will be serialized with every instance, leading to much higher I/O costs. Note that types registered with `registerKryoType()` are not available to Flink's Kryo serializer instance. @@ -2119,7 +2129,7 @@ Note that types registered with `registerKryoType()` are not available to Flink' The `RuntimeContext` which is accessible in `Rich*` functions through the `getRuntimeContext()` method also allows to access the `ExecutionConfig` in all user defined functions. -[Back to top](#top) +{% top %} Data Sinks ---------- @@ -2156,7 +2166,7 @@ same time run additional transformations on them. Standard data sink methods: {% highlight java %} -// text data +// text data DataSet textData = // [...] // write DataSet to a file on the local file system @@ -2258,7 +2268,7 @@ same time run additional transformations on them. Standard data sink methods: {% highlight scala %} -// text data +// text data val textData: DataSet[String] = // [...] // write DataSet to a file on the local file system @@ -2317,7 +2327,7 @@ Globally sorted output is not supported yet.
-[Back to top](#top) +{% top %} Debugging --------- @@ -2422,7 +2432,7 @@ val myLongs = env.fromCollection(longIt) `Serializable`. Furthermore, collection data sources can not be executed in parallel ( parallelism = 1). -[Back to top](#top) +{% top %} Iteration Operators ------------------- @@ -2565,7 +2575,7 @@ val env = ExecutionEnvironment.getExecutionEnvironment() val initial = env.fromElements(0) val count = initial.iterate(10000) { iterationInput: DataSet[Int] => - val result = iterationInput.map { i => + val result = iterationInput.map { i => val x = Math.random() val y = Math.random() i + (if (x * x + y * y < 1) 1 else 0) @@ -2632,24 +2642,24 @@ env.execute() -[Back to top](#top) +{% top %} Semantic Annotations ----------- -Semantic annotations can be used to give Flink hints about the behavior of a function. +Semantic annotations can be used to give Flink hints about the behavior of a function. They tell the system which fields of a function's input the function reads and evaluates and -which fields it unmodified forwards from its input to its output. +which fields it unmodified forwards from its input to its output. Semantic annotations are a powerful means to speed up execution, because they allow the system to reason about reusing sort orders or partitions across multiple operations. Using semantic annotations may eventually save the program from unnecessary data shuffling or unnecessary sorts and significantly improve the performance of a program. -**Note:** The use of semantic annotations is optional. However, it is absolutely crucial to -be conservative when providing semantic annotations! -Incorrect semantic annotations will cause Flink to make incorrect assumptions about your program and -might eventually lead to incorrect results. +**Note:** The use of semantic annotations is optional. However, it is absolutely crucial to +be conservative when providing semantic annotations! +Incorrect semantic annotations will cause Flink to make incorrect assumptions about your program and +might eventually lead to incorrect results. If the behavior of an operator is not clearly predictable, no annotation should be provided. Please read the documentation carefully. @@ -2657,15 +2667,15 @@ The following semantic annotations are currently supported. #### Forwarded Fields Annotation -Forwarded fields information declares input fields which are unmodified forwarded by a function to the same position or to another position in the output. -This information is used by the optimizer to infer whether a data property such as sorting or +Forwarded fields information declares input fields which are unmodified forwarded by a function to the same position or to another position in the output. +This information is used by the optimizer to infer whether a data property such as sorting or partitioning is preserved by a function. For functions that operate on groups of input elements such as `GroupReduce`, `GroupCombine`, `CoGroup`, and `MapPartition`, all fields that are defined as forwarded fields must always be jointly forwarded from the same input element. The forwarded fields of each element that is emitted by a group-wise function may originate from a different element of the function's input group. Field forward information is specified using [field expressions](#define-keys-using-field-expressions). -Fields that are forwarded to the same position in the output can be specified by their position. +Fields that are forwarded to the same position in the output can be specified by their position. The specified position must be valid for the input and output data type and have the same type. -For example the String `"f2"` declares that the third field of a Java input tuple is always equal to the third field in the output tuple. +For example the String `"f2"` declares that the third field of a Java input tuple is always equal to the third field in the output tuple. Fields which are unmodified forwarded to another position in the output are declared by specifying the source field in the input and the target field in the output as field expressions. @@ -2674,7 +2684,7 @@ unchanged copied to the third field of the Java output tuple. The wildcard expre Multiple forwarded fields can be declared in a single String by separating them with semicolons as `"f0; f2->f1; f3->f2"` or in separate Strings `"f0", "f2->f1", "f3->f2"`. When specifying forwarded fields it is not required that all forwarded fields are declared, but all declarations must be correct. -Forwarded field information can be declared by attaching Java annotations on function class definitions or +Forwarded field information can be declared by attaching Java annotations on function class definitions or by passing them as operator arguments after invoking a function on a DataSet as shown below. ##### Function Class Annotations @@ -2686,10 +2696,10 @@ by passing them as operator arguments after invoking a function on a DataSet as ##### Operator Arguments * `data.map(myMapFnc).withForwardedFields()` for single input function such as Map and Reduce. -* `data1.join(data2).where().equalTo().with(myJoinFnc).withForwardFieldsFirst()` for the first input of a function with two inputs such as Join and CoGroup. +* `data1.join(data2).where().equalTo().with(myJoinFnc).withForwardFieldsFirst()` for the first input of a function with two inputs such as Join and CoGroup. * `data1.join(data2).where().equalTo().with(myJoinFnc).withForwardFieldsSecond()` for the second input of a function with two inputs such as Join and CoGroup. -Please note that it is not possible to overwrite field forward information which was specified as a class annotation by operator arguments. +Please note that it is not possible to overwrite field forward information which was specified as a class annotation by operator arguments. ##### Example @@ -2699,7 +2709,7 @@ The following example shows how to declare forwarded field information using a f
{% highlight java %} @ForwardedFields("f0->f2") -public class MyMap implements +public class MyMap implements MapFunction, Tuple3> { @Override public Tuple3 map(Tuple2 val) { @@ -2723,17 +2733,17 @@ class MyMap extends MapFunction[(Int, Int), (String, Int, Int)]{ #### Non-Forwarded Fields -Non-forwarded fields information declares all fields which are not preserved on the same position in a function's output. -The values of all other fields are considered to be preserved at the same position in the output. -Hence, non-forwarded fields information is inverse to forwarded fields information. +Non-forwarded fields information declares all fields which are not preserved on the same position in a function's output. +The values of all other fields are considered to be preserved at the same position in the output. +Hence, non-forwarded fields information is inverse to forwarded fields information. Non-forwarded field information for group-wise operators such as `GroupReduce`, `GroupCombine`, `CoGroup`, and `MapPartition` must fulfill the same requirements as for forwarded field information. -**IMPORTANT**: The specification of non-forwarded fields information is optional. However if used, +**IMPORTANT**: The specification of non-forwarded fields information is optional. However if used, **ALL!** non-forwarded fields must be specified, because all other fields are considered to be forwarded in place. It is safe to declare a forwarded field as non-forwarded. -Non-forwarded fields are specified as a list of [field expressions](#define-keys-using-field-expressions). The list can be either given as a single String with field expressions separated by semicolons or as multiple Strings. -For example both `"f1; f3"` and `"f1", "f3"` declare that the second and fourth field of a Java tuple -are not preserved in place and all other fields are preserved in place. +Non-forwarded fields are specified as a list of [field expressions](#define-keys-using-field-expressions). The list can be either given as a single String with field expressions separated by semicolons or as multiple Strings. +For example both `"f1; f3"` and `"f1", "f3"` declare that the second and fourth field of a Java tuple +are not preserved in place and all other fields are preserved in place. Non-forwarded field information can only be specified for functions which have identical input and output types. Non-forwarded field information is specified as function class annotations using the following annotations: @@ -2750,7 +2760,7 @@ The following example shows how to declare non-forwarded field information:
{% highlight java %} @NonForwardedFields("f1") // second field is not forwarded -public class MyMap implements +public class MyMap implements MapFunction, Tuple2> { @Override public Tuple2 map(Tuple2 val) { @@ -2779,10 +2789,10 @@ all fields that are used by the function to compute its result. For example, fields which are evaluated in conditional statements or used for computations must be marked as read when specifying read fields information. Fields which are only unmodified forwarded to the output without evaluating their values or fields which are not accessed at all are not considered to be read. -**IMPORTANT**: The specification of read fields information is optional. However if used, +**IMPORTANT**: The specification of read fields information is optional. However if used, **ALL!** read fields must be specified. It is safe to declare a non-read field as read. -Read fields are specified as a list of [field expressions](#define-keys-using-field-expressions). The list can be either given as a single String with field expressions separated by semicolons or as multiple Strings. +Read fields are specified as a list of [field expressions](#define-keys-using-field-expressions). The list can be either given as a single String with field expressions separated by semicolons or as multiple Strings. For example both `"f1; f3"` and `"f1", "f3"` declare that the second and fourth field of a Java tuple are read and evaluated by the function. Read field information is specified as function class annotations using the following annotations: @@ -2798,9 +2808,9 @@ The following example shows how to declare read field information:
{% highlight java %} -@ReadFields("f0; f3") // f0 and f3 are read and evaluated by the function. -public class MyMap implements - MapFunction, +@ReadFields("f0; f3") // f0 and f3 are read and evaluated by the function. +public class MyMap implements + MapFunction, Tuple2> { @Override public Tuple2 map(Tuple4 val) { @@ -2830,7 +2840,7 @@ class MyMap extends MapFunction[(Int, Int, Int, Int), (Int, Int)]{
-[Back to top](#top) +{% top %} Broadcast Variables @@ -2903,14 +2913,14 @@ accessing broadcasted data sets. For a complete example program, have a look at too large. For simpler things like scalar values you can simply make parameters part of the closure of a function, or use the `withParameters(...)` method to pass in a configuration. -[Back to top](#top) +{% top %} Passing Parameters to Functions ------------------- Parameters can be passed to functions using either the constructor or the `withParameters(Configuration)` method. The parameters are serialized as part of the function object and shipped to all parallel task instances. -Check also the [best practices guide on how to pass command line arguments to functions](best_practices.html#parsing-command-line-arguments-and-passing-them-around-in-your-flink-application). +Check also the [best practices guide on how to pass command line arguments to functions]({{ site.baseurl }}/apis/best_practices.html#parsing-command-line-arguments-and-passing-them-around-in-your-flink-application). #### Via Constructor @@ -3049,7 +3059,7 @@ public static final class Tokenizer extends RichFlatMapFunction
- See the Programming Guide for details and code examples.
+ See the Programming Guide for details and code examples.
### Example: Incrementing Numbers @@ -176,7 +180,7 @@ setFinalState(solution);
- See the programming guide for details and code examples.
+ See the programming guide for details and code examples.
### Example: Propagate Minimum in Graph diff --git a/docs/apis/python.md b/docs/apis/batch/python.md similarity index 95% rename from docs/apis/python.md rename to docs/apis/batch/python.md index d57e11765de21..74da97e933774 100644 --- a/docs/apis/python.md +++ b/docs/apis/batch/python.md @@ -1,6 +1,12 @@ --- title: "Python Programming Guide" is_beta: true + +# Sub-level navigation +sub-nav-group: batch +sub-nav-id: python_api +sub-nav-pos: 4 +sub-nav-title: Python API --- - - Analysis programs in Flink are regular programs that implement transformations on data sets (e.g., filtering, mapping, joining, grouping). The data sets are initially created from certain sources (e.g., by reading files, or from collections). Results are returned via sinks, which may for @@ -59,17 +63,17 @@ if __name__ == "__main__": env = get_environment() data = env.from_elements("Who's there?", "I think I hear them. Stand, ho! Who's there?") - + data \ .flat_map(lambda x, c: [(1, word) for word in x.lower().split()], (INT, STRING)) \ .group_by(1) \ .reduce_group(Adder(), (INT, STRING), combinable=True) \ .output() - + env.execute(local=True) {% endhighlight %} -[Back to top](#top) +{% top %} Program Skeleton ---------------- @@ -84,7 +88,7 @@ programs with a `if __name__ == "__main__":` block. Each program consists of the 5. Execute your program. We will now give an overview of each of those steps but please refer to the respective sections for -more details. +more details. The `Environment` is the basis for all Flink programs. You can @@ -116,7 +120,7 @@ a map transformation looks like this: data.map(lambda x: x*2, INT) {% endhighlight %} -This will create a new DataSet by doubling every value in the original DataSet. +This will create a new DataSet by doubling every value in the original DataSet. For more information and a list of all the transformations, please refer to [Transformations](#transformations). @@ -133,15 +137,15 @@ The last method is only useful for developing/debugging on a local machine, it will output the contents of the DataSet to standard output. (Note that in a cluster, the result goes to the standard out stream of the cluster nodes and ends up in the *.out* files of the workers). -The first two do as the name suggests. +The first two do as the name suggests. Please refer to [Data Sinks](#data-sinks) for more information on writing to files. Once you specified the complete program you need to call `execute` on -the `Environment`. This will either execute on your local machine or submit your program +the `Environment`. This will either execute on your local machine or submit your program for execution on a cluster, depending on how Flink was started. You can force a local execution by using `execute(local=True)`. -[Back to top](#top) +{% top %} Project setup --------------- @@ -150,7 +154,7 @@ Apart from setting up Flink, no additional work is required. The python package The Python API was tested on Linux systems that have Python 2.7 or 3.4 installed. -[Back to top](#top) +{% top %} Lazy Evaluation --------------- @@ -164,7 +168,7 @@ on the environment of the program. The lazy evaluation lets you construct sophisticated programs that Flink executes as one holistically planned unit. -[Back to top](#top) +{% top %} Transformations @@ -265,10 +269,10 @@ data.reduce_group(Adder(), (INT, STRING)) Join Joins two data sets by creating all pairs of elements that are equal on their keys. - Optionally uses a JoinFunction to turn the pair of elements into a single element. + Optionally uses a JoinFunction to turn the pair of elements into a single element. See keys on how to define join keys. {% highlight python %} -# In this case tuple fields are used as keys. +# In this case tuple fields are used as keys. # "0" is the join field on the first tuple # "1" is the join field on the second tuple. result = input1.join(input2).where(0).equal_to(1) @@ -311,7 +315,7 @@ data.union(data2) -[Back to Top](#top) +{% top %} Specifying Keys @@ -344,7 +348,7 @@ reduced = data \ .reduce_group() {% endhighlight %} -The data set is grouped on the first field of the tuples. +The data set is grouped on the first field of the tuples. The group-reduce function will thus receive groups of tuples with the same value in the first field. @@ -361,7 +365,7 @@ with the same value for both fields. A note on nested Tuples: If you have a DataSet with a nested tuple specifying `group_by()` will cause the system to use the full tuple as a key. -[Back to top](#top) +{% top %} Passing Functions to Flink @@ -381,15 +385,15 @@ class Filter(FilterFunction): data.filter(Filter()) {% endhighlight %} -Rich functions allow the use of imported functions, provide access to broadcast-variables, +Rich functions allow the use of imported functions, provide access to broadcast-variables, can be parameterized using __init__(), and are the go-to-option for complex functions. They are also the only way to define an optional `combine` function for a reduce operation. Lambda functions allow the easy insertion of one-liners. Note that a lambda function has to return an iterable, if the operation can return multiple values. (All functions receiving a collector argument) -Flink requires type information at the time when it prepares the program for execution -(when the main method of the program is called). This is done by passing an exemplary +Flink requires type information at the time when it prepares the program for execution +(when the main method of the program is called). This is done by passing an exemplary object that has the desired type. This holds also for tuples. {% highlight python %} @@ -400,7 +404,7 @@ Would denote a tuple containing an int and a string. Note that for Operations th There are a few Constants defined in flink.plan.Constants that allow this in a more readable fashion. -[Back to top](#top) +{% top %} Data Types ---------- @@ -409,7 +413,7 @@ Flink's Python API currently only supports primitive python types (int, float, b #### Tuples/Lists -You can use the tuples (or lists) for composite types. Python tuples are mapped to the Flink Tuple type, that contain +You can use the tuples (or lists) for composite types. Python tuples are mapped to the Flink Tuple type, that contain a fix number of fields of various types (up to 25). Every field of a tuple can be a primitive type - including further tuples, resulting in nested tuples. {% highlight python %} @@ -428,7 +432,7 @@ wordCounts \ .reduce(MyReduceFunction()) {% endhighlight %} -[Back to top](#top) +{% top %} Data Sources ------------ @@ -464,7 +468,7 @@ csvInput = env.read_csv("hdfs:///the/CSV/file", (INT, STRING, DOUBLE)) values = env.from_elements("Foo", "bar", "foobar", "fubar") {% endhighlight %} -[Back to top](#top) +{% top %} Data Sinks ---------- @@ -502,7 +506,7 @@ values.write_csv("file:///path/to/the/result/file", line_delimiter="\n", field_d values.write_text("file:///path/to/the/result/file") {% endhighlight %} -[Back to top](#top) +{% top %} Broadcast Variables ------------------- @@ -522,11 +526,11 @@ class MapperBcv(MapFunction): return value * factor # 1. The DataSet to be broadcasted -toBroadcast = env.from_elements(1, 2, 3) +toBroadcast = env.from_elements(1, 2, 3) data = env.from_elements("a", "b") # 2. Broadcast the DataSet -data.map(MapperBcv(), INT).with_broadcast_set("bcv", toBroadcast) +data.map(MapperBcv(), INT).with_broadcast_set("bcv", toBroadcast) {% endhighlight %} Make sure that the names (`bcv` in the previous example) match when registering and @@ -535,7 +539,7 @@ accessing broadcasted data sets. **Note**: As the content of broadcast variables is kept in-memory on each node, it should not become too large. For simpler things like scalar values you can simply parameterize the rich function. -[Back to top](#top) +{% top %} Parallel Execution ------------------ @@ -576,31 +580,31 @@ env.execute() A system-wide default parallelism for all execution environments can be defined by setting the `parallelism.default` property in `./conf/flink-conf.yaml`. See the -[Configuration](config.html) documentation for details. +[Configuration]({{ site.baseurl }}/setup/config.html) documentation for details. -[Back to top](#top) +{% top %} Executing Plans --------------- -To run the plan with Flink, go to your Flink distribution, and run the pyflink.sh script from the /bin folder. -use pyflink2.sh for python 2.7, and pyflink3.sh for python 3.4. The script containing the plan has to be passed -as the first argument, followed by a number of additional python packages, and finally, separated by - additional -arguments that will be fed to the script. +To run the plan with Flink, go to your Flink distribution, and run the pyflink.sh script from the /bin folder. +use pyflink2.sh for python 2.7, and pyflink3.sh for python 3.4. The script containing the plan has to be passed +as the first argument, followed by a number of additional python packages, and finally, separated by - additional +arguments that will be fed to the script. {% highlight python %} ./bin/pyflink<2/3>.sh