From 9c140448e193d0dff067b03e33a161a6a446a399 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Fri, 12 May 2017 16:06:09 -0700 Subject: [PATCH 1/2] Rewrites the section on Coders to not talk about them as a parsing mechanism --- src/documentation/programming-guide.md | 38 ++++++-------------------- 1 file changed, 8 insertions(+), 30 deletions(-) diff --git a/src/documentation/programming-guide.md b/src/documentation/programming-guide.md index 11ec86d23e2..f70e25540be 100644 --- a/src/documentation/programming-guide.md +++ b/src/documentation/programming-guide.md @@ -1175,11 +1175,9 @@ See the [Beam-provided I/O Transforms]({{site.baseurl }}/documentation/io/built ## Data encoding and type safety -When you create or output pipeline data, you'll need to specify how the elements in your `PCollection`s are encoded and decoded to and from byte strings. Byte strings are used for intermediate storage as well reading from sources and writing to sinks. The Beam SDKs use objects called coders to describe how the elements of a given `PCollection` should be encoded and decoded. +When Beam runners execute your pipeline, they often need to materialize the intermediate data in your `PCollection`s, which requires converting elements to and from byte strings. The Beam SDKs use objects called `Coder`s to describe how the elements of a given `PCollection` may be encoded and decoded. -### Using coders - -You typically need to specify a coder when reading data into your pipeline from an external source (or creating pipeline data from local data), and also when you output pipeline data to an external sink. +> Note that coders are unrelated to parsing or formatting data when interacting with external data sources or sinks. Such parsing or formatting should typically be done explicitly, using transforms such as `ParDo` or `MapElements`. {:.language-java} In the Beam SDK for Java, the type `Coder` provides the methods required for encoding and decoding data. The SDK for Java provides a number of Coder subclasses that work with a variety of standard Java types, such as Integer, Long, Double, StringUtf8 and more. You can find all of the available Coder subclasses in the [Coder package](https://github.com/apache/beam/tree/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders). @@ -1187,38 +1185,18 @@ In the Beam SDK for Java, the type `Coder` provides the methods required for enc {:.language-py} In the Beam SDK for Python, the type `Coder` provides the methods required for encoding and decoding data. The SDK for Python provides a number of Coder subclasses that work with a variety of standard Python types, such as primitive types, Tuple, Iterable, StringUtf8 and more. You can find all of the available Coder subclasses in the [apache_beam.coders](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/coders) package. -When you read data into a pipeline, the coder indicates how to interpret the input data into a language-specific type, such as integer or string. Likewise, the coder indicates how the language-specific types in your pipeline should be written into byte strings for an output data sink, or to materialize intermediate data in your pipeline. - -The Beam SDKs set a coder for every `PCollection` in a pipeline, including those generated as output from a transform. Most of the time, the Beam SDKs can automatically infer the correct coder for an output `PCollection`. - > Note that coders do not necessarily have a 1:1 relationship with types. For example, the Integer type can have multiple valid coders, and input and output data can use different Integer coders. A transform might have Integer-typed input data that uses BigEndianIntegerCoder, and Integer-typed output data that uses VarIntCoder. -You can explicitly set a `Coder` when inputting or outputting a `PCollection`. You set the `Coder` by calling the method `.withCoder` setting the `coder` argument when you apply your pipeline's read or write transform. - -Typically, you set the `Coder` when the coder for a `PCollection` cannot be automatically inferred, or when you want to use a different coder than your pipeline's default. The following example code reads a set of numbers from a text file, and sets a `Coder` of type `TextualIntegerCoder` `VarIntCoder` for the resulting `PCollection`: - -```java -PCollection numbers = - p.begin() - .apply(TextIO.Read.named("ReadNumbers") - .from("gs://my_bucket/path/to/numbers-*.txt") - .withCoder(TextualIntegerCoder.of())); -``` - -```py -p = beam.Pipeline() -numbers = ReadFromText("gs://my_bucket/path/to/numbers-*.txt", coder=VarIntCoder()) -``` +### Specifying coders +The Beam SDKs require a coder for every `PCollection` in your pipeline. In most cases, the Beam SDK is able to automatically infer a `Coder` for a `PCollection` based on its element type or the transform that produces it, however, in some cases the pipeline author will need to specify a `Coder` explicitly, or develop a `Coder` for their custom type. {:.language-java} -You can set the coder for an existing `PCollection` by using the method `PCollection.setCoder`. Note that you cannot call `setCoder` on a `PCollection` that has been finalized (e.g. by calling `.apply` on it). +You can explicitly set the coder for an existing `PCollection` by using the method `PCollection.setCoder`. Note that you cannot call `setCoder` on a `PCollection` that has been finalized (e.g. by calling `.apply` on it). {:.language-java} -You can get the coder for an existing `PCollection` by using the method `getCoder`. This method will fail with `anIllegalStateException` if a coder has not been set and cannot be inferred for the given `PCollection`. - -### Coder inference and default coders +You can get the coder for an existing `PCollection` by using the method `getCoder`. This method will fail with an `IllegalStateException` if a coder has not been set and cannot be inferred for the given `PCollection`. -The Beam SDKs require a coder for every `PCollection` in your pipeline. Most of the time, however, you do not need to explicitly specify a coder, such as for an intermediate `PCollection` produced by a transform in the middle of your pipeline. In such cases, the Beam SDKs can infer an appropriate coder from the inputs and outputs of the transform used to produce the PCollection. +Beam SDKs use a variety of mechanisms when attempting to automatically infer the `Coder` for a `PCollection`. {:.language-java} Each pipeline object has a `CoderRegistry`. The `CoderRegistry` represents a mapping of Java types to the default coders that the pipeline should use for `PCollection`s of each type. @@ -1227,7 +1205,7 @@ Each pipeline object has a `CoderRegistry`. The `CoderRegistry` represents a map The Beam SDK for Python has a `CoderRegistry` that represents a mapping of Python types to the default coder that should be used for `PCollection`s of each type. {:.language-java} -By default, the Beam SDK for Java automatically infers the `Coder` for the elements of an output `PCollection` using the type parameter from the transform's function object, such as `DoFn`. In the case of `ParDo`, for example, a `DoFnfunction` object accepts an input element of type `Integer` and produces an output element of type `String`. In such a case, the SDK for Java will automatically infer the default `Coder` for the output `PCollection` (in the default pipeline `CoderRegistry`, this is `StringUtf8Coder`). +By default, the Beam SDK for Java automatically infers the `Coder` for the elements of a `PCollection` produced by a `PTransform` using the type parameter from the transform's function object, such as `DoFn`. In the case of `ParDo`, for example, a `DoFn` function object accepts an input element of type `Integer` and produces an output element of type `String`. In such a case, the SDK for Java will automatically infer the default `Coder` for the output `PCollection` (in the default pipeline `CoderRegistry`, this is `StringUtf8Coder`). {:.language-py} By default, the Beam SDK for Python automatically infers the `Coder` for the elements of an output `PCollection` using the typehints from the transform's function object, such as `DoFn`. In the case of `ParDo`, for example a `DoFn` with the typehints `@beam.typehints.with_input_types(int)` and `@beam.typehints.with_output_types(str)` accepts an input element of type int and produces an output element of type str. In such a case, the Beam SDK for Python will automatically infer the default `Coder` for the output `PCollection` (in the default pipeline `CoderRegistry`, this is `BytesCoder`). From 2a5516664a6273d4ca96082414fe3a7f05813151 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Fri, 12 May 2017 16:07:19 -0700 Subject: [PATCH 2/2] [BEAM-1353] Style Guide fixups Fixes usages of PTransforms affected by changes as part of https://issues.apache.org/jira/browse/BEAM-1353 --- .../pipelines/create-your-pipeline.md | 4 +- .../pipelines/design-your-pipeline.md | 20 ++++--- src/documentation/programming-guide.md | 54 ++++++++++--------- src/documentation/sdks/java-extensions.md | 2 +- src/get-started/mobile-gaming-example.md | 20 +++---- src/get-started/wordcount-example.md | 6 +-- 6 files changed, 53 insertions(+), 53 deletions(-) diff --git a/src/documentation/pipelines/create-your-pipeline.md b/src/documentation/pipelines/create-your-pipeline.md index b7654671639..cbf7d314442 100644 --- a/src/documentation/pipelines/create-your-pipeline.md +++ b/src/documentation/pipelines/create-your-pipeline.md @@ -42,7 +42,7 @@ The following example code shows how to `apply` a `TextIO.Read` root transform t ```java PCollection lines = p.apply( - "ReadLines", TextIO.Read.from("gs://some/inputData.txt")); + "ReadLines", TextIO.read().from("gs://some/inputData.txt")); ``` ## Applying Transforms to Process Pipeline Data @@ -68,7 +68,7 @@ The following example code shows how to `apply` a `TextIO.Write` transform to wr ```java PCollection filteredWords = ...; -filteredWords.apply("WriteMyFile", TextIO.Write.to("gs://some/outputData.txt")); +filteredWords.apply("WriteMyFile", TextIO.write().to("gs://some/outputData.txt")); ``` ## Running Your Pipeline diff --git a/src/documentation/pipelines/design-your-pipeline.md b/src/documentation/pipelines/design-your-pipeline.md index c40803c5ede..ce6a734c6f2 100644 --- a/src/documentation/pipelines/design-your-pipeline.md +++ b/src/documentation/pipelines/design-your-pipeline.md @@ -103,13 +103,7 @@ final TupleTag startsWithATag = new TupleTag(){}; final TupleTag startsWithBTag = new TupleTag(){}; PCollectionTuple mixedCollection = - dbRowCollection.apply( - ParDo - // Specify main output. In this example, it is the output - // with tag startsWithATag. - .withOutputTags(startsWithATag, - // Specify the output with tag startsWithBTag, as a TupleTagList. - TupleTagList.of(startsWithBTag)) + dbRowCollection.apply(ParDo .of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { @@ -121,8 +115,12 @@ PCollectionTuple mixedCollection = c.output(startsWithBTag, c.element()); } } - } - )); + }) + // Specify main output. In this example, it is the output + // with tag startsWithATag. + .withOutputTags(startsWithATag, + // Specify the output with tag startsWithBTag, as a TupleTagList. + TupleTagList.of(startsWithBTag))); // Get subset of the output with tag startsWithATag. mixedCollection.get(startsWithATag).apply(...); @@ -159,7 +157,7 @@ mergedCollectionWithFlatten.apply(...); ## Multiple sources -Your pipeline can read its input from one or more sources. If your pipeline reads from multiple sources and the data from those sources is related, it can be useful to join the inputs together. In the example illustrated in Figure 5 below, the pipeline reads names and addresses from a database table, and names and order numbers from a text file. The pipeline then uses `CoGroupByKey` to join this information, where the key is the name; the resulting `PCollection` contains all the combinations of names, addresses, and orders. +Your pipeline can read its input from one or more sources. If your pipeline reads from multiple sources and the data from those sources is related, it can be useful to join the inputs together. In the example illustrated in Figure 5 below, the pipeline reads names and addresses from a database table, and names and order numbers from a Kafka topic. The pipeline then uses `CoGroupByKey` to join this information, where the key is the name; the resulting `PCollection` contains all the combinations of names, addresses, and orders.
> userAddress = pipeline.apply(JdbcIO.>read()...); -PCollection> userOrder = pipeline.apply(TextIO.>read()...); +PCollection> userOrder = pipeline.apply(KafkaIO.read()...); final TupleTag addressTag = new TupleTag(); final TupleTag orderTag = new TupleTag(); diff --git a/src/documentation/programming-guide.md b/src/documentation/programming-guide.md index f70e25540be..d7e37a03c7e 100644 --- a/src/documentation/programming-guide.md +++ b/src/documentation/programming-guide.md @@ -191,7 +191,7 @@ public static void main(String[] args) { // Create the PCollection 'lines' by applying a 'Read' transform. PCollection lines = p.apply( - "ReadMyFile", TextIO.Read.from("protocol://path/to/some/inputData.txt")); + "ReadMyFile", TextIO.read().from("protocol://path/to/some/inputData.txt")); } ``` @@ -479,8 +479,8 @@ PCollection words = ...; // Apply a MapElements with an anonymous lambda function to the PCollection words. // Save the result as the PCollection wordLengths. PCollection wordLengths = words.apply( - MapElements.via((String word) -> word.length()) - .withOutputType(new TypeDescriptor() {}); + MapElements.into(TypeDescriptors.integers()) + .via((String word) -> word.length())); ``` ```py @@ -862,16 +862,18 @@ Side inputs are useful if your `ParDo` needs to inject additional data when proc // Apply a ParDo that takes maxWordLengthCutOffView as a side input. PCollection wordsBelowCutOff = - words.apply(ParDo.withSideInputs(maxWordLengthCutOffView) - .of(new DoFn() { - public void processElement(ProcessContext c) { - String word = c.element(); - // In our DoFn, access the side input. - int lengthCutOff = c.sideInput(maxWordLengthCutOffView); - if (word.length() <= lengthCutOff) { - c.output(word); - } - }})); + words.apply(ParDo + .of(new DoFn() { + public void processElement(ProcessContext c) { + String word = c.element(); + // In our DoFn, access the side input. + int lengthCutOff = c.sideInput(maxWordLengthCutOffView); + if (word.length() <= lengthCutOff) { + c.output(word); + } + } + }).withSideInputs(maxWordLengthCutOffView) + ); ``` ```py @@ -943,17 +945,16 @@ While `ParDo` always produces a main output `PCollection` (as the return value f // to our ParDo. Note that all of the outputs (including the main output PCollection) are bundled into the returned PCollectionTuple. PCollectionTuple results = - words.apply( - ParDo + words.apply(ParDo + .of(new DoFn() { + // DoFn continues here. + ... + }) // Specify the tag for the main output. .withOutputTags(wordsBelowCutOffTag, // Specify the tags for the two additional outputs as a TupleTagList. TupleTagList.of(wordLengthsAboveCutOffTag) - .and(markedWordsTag)) - .of(new DoFn() { - // DoFn continues here. - ... - } + .and(markedWordsTag))); ``` ```py @@ -1114,7 +1115,7 @@ Read transforms read data from an external source and return a `PCollection` rep #### Using a read transform: ```java -PCollection lines = p.apply(TextIO.Read.from("gs://some/inputData.txt")); +PCollection lines = p.apply(TextIO.read().from("gs://some/inputData.txt")); ``` ```py @@ -1128,7 +1129,7 @@ Write transforms write the data in a `PCollection` to an external data source. Y #### Using a Write transform: ```java -output.apply(TextIO.Write.to("gs://some/outputData")); +output.apply(TextIO.write().to("gs://some/outputData")); ``` ```py @@ -1143,7 +1144,7 @@ Many read transforms support reading from multiple input files matching a glob o ```java p.apply(“ReadFromText”, - TextIO.Read.from("protocol://my_bucket/path/to/input-*.csv"); + TextIO.read().from("protocol://my_bucket/path/to/input-*.csv"); ``` ```py @@ -1161,7 +1162,7 @@ The following write transform example writes multiple output files to a location ```java records.apply("WriteToText", - TextIO.Write.to("protocol://my_bucket/path/to/numbers") + TextIO.write().to("protocol://my_bucket/path/to/numbers") .withSuffix(".csv")); ``` @@ -1563,7 +1564,7 @@ You can allow late data by invoking the `.withAllowedLateness` operation when yo .withAllowedLateness(Duration.standardDays(2))); ``` -When you set `.withAllowedLateness` on a `PCollection`, that allowed lateness propagates forward to any subsequent `PCollection` derived from the first `PCollection` you applied allowed lateness to. If you want to change the allowed lateness later in your pipeline, you must do so explictly by applying `Window.withAllowedLateness()` again. +When you set `.withAllowedLateness` on a `PCollection`, that allowed lateness propagates forward to any subsequent `PCollection` derived from the first `PCollection` you applied allowed lateness to. If you want to change the allowed lateness later in your pipeline, you must do so explictly by applying `Window.configure().withAllowedLateness()`. ### Adding timestamps to a PCollection's elements @@ -1737,7 +1738,7 @@ You set the allowed lateness by using `.withAllowedLateness()` when you set your # The Beam SDK for Python does not support triggers. ``` -This allowed lateness propagates to all `PCollection`s derived as a result of applying transforms to the original `PCollection`. If you want to change the allowed lateness later in your pipeline, you can apply `Window.withAllowedLateness()` again, explicitly. +This allowed lateness propagates to all `PCollection`s derived as a result of applying transforms to the original `PCollection`. If you want to change the allowed lateness later in your pipeline, you can apply `Window.configure().withAllowedLateness()` again, explicitly. ### Composite Triggers @@ -1770,6 +1771,7 @@ You can express this pattern using `AfterWatermark.pastEndOfWindow`. For example ```java .apply(Window + .configure() .triggering(AfterWatermark .pastEndOfWindow() .withLateFirings(AfterProcessingTime diff --git a/src/documentation/sdks/java-extensions.md b/src/documentation/sdks/java-extensions.md index a4694af476a..17a79e75095 100644 --- a/src/documentation/sdks/java-extensions.md +++ b/src/documentation/sdks/java-extensions.md @@ -55,5 +55,5 @@ PCollection>>> grouped = // For every primary key, sort the iterable of pairs by secondary key. PCollection>>> groupedAndSorted = grouped.apply( - SortValues.create(new BufferedExternalSorter.Options())); + SortValues.create(BufferedExternalSorter.options())); ``` diff --git a/src/get-started/mobile-gaming-example.md b/src/get-started/mobile-gaming-example.md index 5c972748b43..9e59274186b 100644 --- a/src/get-started/mobile-gaming-example.md +++ b/src/get-started/mobile-gaming-example.md @@ -107,9 +107,9 @@ public static class ExtractAndSumScore return gameInfo .apply(MapElements - .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore())) - .withOutputType( - TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))) + .into( + TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers())) + .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore()))) .apply(Sum.integersPerKey()); } } @@ -148,7 +148,7 @@ public static void main(String[] args) throws Exception { Pipeline pipeline = Pipeline.create(options); // Read events from a text file and parse them. - pipeline.apply(TextIO.Read.from(options.getInput())) + pipeline.apply(TextIO.read().from(options.getInput())) .apply("ParseGameEvent", ParDo.of(new ParseEventFn())) // Extract and sum username/score pairs from the event data. .apply("ExtractUserScore", new ExtractAndSumScore("user")) @@ -314,7 +314,7 @@ public static void main(String[] args) throws Exception { final Instant startMinTimestamp = new Instant(minFmt.parseMillis(options.getStartMin())); // Read 'gaming' events from a text file. - pipeline.apply(TextIO.Read.from(options.getInput())) + pipeline.apply(TextIO.read().from(options.getInput())) // Parse the incoming data. .apply("ParseGameEvent", ParDo.of(new ParseEventFn())) @@ -601,8 +601,6 @@ public static class CalculateSpammyUsers // Filter the user sums using the global mean. PCollection> filtered = sumScores .apply("ProcessAndFilter", ParDo - // use the derived mean total score as a side input - .withSideInputs(globalMeanScore) .of(new DoFn, KV>() { private final Aggregator numSpammerUsers = createAggregator("SpammerUsers", new Sum.SumLongFn()); @@ -617,7 +615,9 @@ public static class CalculateSpammyUsers c.output(c.element()); } } - })); + }) + // use the derived mean total score as a side input + .withSideInputs(globalMeanScore)); return filtered; } } @@ -635,7 +635,6 @@ rawEvents FixedWindows.of(Duration.standardMinutes(options.getFixedWindowDuration())))) // Filter out the detected spammer users, using the side input derived above. .apply("FilterOutSpammers", ParDo - .withSideInputs(spammersView) .of(new DoFn() { @ProcessElement public void processElement(ProcessContext c) { @@ -644,7 +643,8 @@ rawEvents c.output(c.element()); } } - })) + }) + .withSideInputs(spammersView)) // Extract and sum teamname/score pairs from the event data. .apply("ExtractTeamScore", new ExtractAndSumScore("team")) ``` diff --git a/src/get-started/wordcount-example.md b/src/get-started/wordcount-example.md index 19a82d7d853..9882c2a7731 100644 --- a/src/get-started/wordcount-example.md +++ b/src/get-started/wordcount-example.md @@ -96,7 +96,7 @@ The Minimal WordCount pipeline contains five transforms: 1. A text file `Read` transform is applied to the Pipeline object itself, and produces a `PCollection` as output. Each element in the output PCollection represents one line of text from the input file. This example uses input data stored in a publicly accessible Google Cloud Storage bucket ("gs://"). ```java - p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*")) + p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*")) ``` ```py @@ -157,7 +157,7 @@ The Minimal WordCount pipeline contains five transforms: 5. A text file write transform. This transform takes the final `PCollection` of formatted Strings as input and writes each element to an output text file. Each element in the input `PCollection` represents one line of text in the resulting output file. ```java - .apply(TextIO.Write.to("wordcounts")); + .apply(TextIO.write().to("wordcounts")); ``` ```py @@ -398,7 +398,7 @@ public static void main(String[] args) throws IOException { Pipeline pipeline = Pipeline.create(options); PCollection input = pipeline - .apply(TextIO.Read.from(options.getInputFile())) + .apply(TextIO.read().from(options.getInputFile())) ```