From 9ae9977702147e8e791166a9a5ab2b724a3aa14c Mon Sep 17 00:00:00 2001 From: mingmxu Date: Fri, 17 Mar 2017 21:08:18 -0700 Subject: [PATCH 1/4] add code snippet to page design-your-pipeline --- .../pipelines/design-your-pipeline.md | 88 +++++++++++++++++-- 1 file changed, 82 insertions(+), 6 deletions(-) diff --git a/src/documentation/pipelines/design-your-pipeline.md b/src/documentation/pipelines/design-your-pipeline.md index 937b35c9c69..503f196d685 100644 --- a/src/documentation/pipelines/design-your-pipeline.md +++ b/src/documentation/pipelines/design-your-pipeline.md @@ -31,7 +31,7 @@ The simplest pipelines represent a linear flow of operations, as shown in Figure Figure 1: A linear pipeline. -However, your pipeline can be significantly more complex. A pipeline represents a [Directed Acyclic Graph](https://en.wikipedia.org/wiki/Directed_acyclic_graph) of steps. It can have multiple input sources, multiple output sinks, and its operations (transforms) can output multiple `PCollection`s. The following examples show some of the different shapes your pipeline can take. +However, your pipeline can be significantly more complex. A pipeline represents a [Directed Acyclic Graph](https://en.wikipedia.org/wiki/Directed_acyclic_graph) of steps. It can have multiple input sources, multiple output sinks, and its operations (transforms) can read multiple `PCollection`s, then output multiple `PCollection`s. The following examples show some of the different shapes your pipeline can take. ## Branching PCollections @@ -47,7 +47,28 @@ The pipeline illustrated in Figure 2 below reads its input, first names (Strings A pipeline with multiple transforms. Note that the PCollection of table rows is processed by two transforms. -Figure 2: A pipeline with multiple transforms. Note that the PCollection of the database table rows is processed by two transforms. +Figure 2: A pipeline with multiple transforms. Note that the PCollection of the database table rows is processed by two transforms. It's shown as following code: +```java +PCollection dbRowCollection = ...; + +PCollection aCollection = dbRowCollection.apply("aTrans", ParDo.of(new DoFn(){ + @ProcessElement + public void processElement(ProcessContext c) { + if(c.element().startsWith("A")){ + c.output(c.element()); + } + } +})); + +PCollection bCollection = dbRowCollection.apply("bTrans", ParDo.of(new DoFn(){ + @ProcessElement + public void processElement(ProcessContext c) { + if(c.element().startsWith("B")){ + c.output(c.element()); + } + } +})); +``` ### A single transform that uses side outputs @@ -75,7 +96,37 @@ The pipeline in Figure 3 performs the same operation in a different way - with o
if (starts with 'A') { outputToPCollectionA } else if (starts with 'B') { outputToPCollectionB }
-where each element in the input `PCollection` is processed once. +where each element in the input `PCollection` is processed once. The code is shown as below: +```java +//define main stream and side output +final TupleTag mainStreamTag = new TupleTag(){}; +final TupleTag sideoutTag = new TupleTag(){}; + +PCollectionTuple mixedCollection = + dbRowCollection.apply( + ParDo + // Specify the tag for the main output, wordsBelowCutoffTag. + .withOutputTags(mainStreamTag, + // Specify the tags for the two side outputs as a TupleTagList. + TupleTagList.of(sideoutTag)) + .of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + if(c.element().startsWith("A")){//output to main stream + c.output(c.element()); + }else if(c.element().startsWith("B")){//emit as Side outputs + c.sideOutput(sideoutTag, c.element()); + } + } + } + )); + +// get subset of main stream +mixedCollection.get(mainStreamTag).apply(...); + +// get subset of Side output +mixedCollection.get(sideoutTag).apply(...); +``` You can use either mechanism to produce multiple output `PCollection`s. However, using side outputs makes more sense if the transform's computation per element is time-consuming. @@ -86,13 +137,22 @@ Often, after you've branched your `PCollection` into multiple `PCollection`s via * **Flatten** - You can use the `Flatten` transform in the Beam SDKs to merge multiple `PCollection`s of the **same type**. * **Join** - You can use the `CoGroupByKey` transform in the Beam SDK to perform a relational join between two `PCollection`s. The `PCollection`s must be keyed (i.e. they must be collections of key/value pairs) and they must use the same key type. -The example depicted in Figure 4 below is a continuation of the example illustrated in Figure 2 in the section above. After branching into two `PCollection`s, one with names that begin with 'A' and one with names that begin with 'B', the pipeline merges the two together into a single `PCollection` that now contains all names that begin with either 'A' or 'B'. Here, it makes sense to use `Flatten` because the `PCollection`s being merged both contain the same type. +The example depicted in Figure 4 below is a continuation of the example illustrated in Figure 2 in [the section above](#multiple-transforms-process-the-same-pcollection). After branching into two `PCollection`s, one with names that begin with 'A' and one with names that begin with 'B', the pipeline merges the two together into a single `PCollection` that now contains all names that begin with either 'A' or 'B'. Here, it makes sense to use `Flatten` because the `PCollection`s being merged both contain the same type.
Part of a pipeline that merges multiple PCollections.
-Figure 4: Part of a pipeline that merges multiple PCollections. +Figure 4: Part of a pipeline that merges multiple PCollections. The code is shown as below, +```java +//merge the two PCollections with Flatten +PCollectionList collectionList = PCollectionList.of(aCollection).and(bCollection); +PCollection mergedCollectionWithFlatten = collectionList + .apply(Flatten.pCollections()); + +// continue with the new merged PCollection +mergedCollectionWithFlatten.apply(...); +``` ## Multiple sources @@ -102,7 +162,23 @@ Your pipeline can read its input from one or more sources. If your pipeline read A pipeline with multiple input sources. -Figure 5: A pipeline with multiple input sources. +Figure 5: A pipeline with multiple input sources, the example is shown as below: +```java +PCollection> userAddress = pipeline.apply(JdbcIO.>read()...); + +PCollection> userOrder = pipeline.apply(TextIO.>read()...); + +final TupleTag addressTag = new TupleTag(); +final TupleTag orderTag = new TupleTag(); + +// Merge collection values into a CoGbkResult collection. +PCollection> joinedCollection = + KeyedPCollectionTuple.of(addressTag, userAddress) + .and(orderTag, userOrder) + .apply(CoGroupByKey.create()); + +coGbkResultCollection.apply(...); +``` ## What's next From e95f83e069b1433ffed910721fe61dcee891dcff Mon Sep 17 00:00:00 2001 From: mingmxu Date: Tue, 21 Mar 2017 10:35:51 -0700 Subject: [PATCH 2/4] consistent words --- src/documentation/pipelines/design-your-pipeline.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/documentation/pipelines/design-your-pipeline.md b/src/documentation/pipelines/design-your-pipeline.md index 503f196d685..96cfce130ad 100644 --- a/src/documentation/pipelines/design-your-pipeline.md +++ b/src/documentation/pipelines/design-your-pipeline.md @@ -31,7 +31,7 @@ The simplest pipelines represent a linear flow of operations, as shown in Figure Figure 1: A linear pipeline. -However, your pipeline can be significantly more complex. A pipeline represents a [Directed Acyclic Graph](https://en.wikipedia.org/wiki/Directed_acyclic_graph) of steps. It can have multiple input sources, multiple output sinks, and its operations (transforms) can read multiple `PCollection`s, then output multiple `PCollection`s. The following examples show some of the different shapes your pipeline can take. +However, your pipeline can be significantly more complex. A pipeline represents a [Directed Acyclic Graph](https://en.wikipedia.org/wiki/Directed_acyclic_graph) of steps. It can have multiple input sources, multiple output sinks, its operations (transforms) can read multiple `PCollection`s, and output multiple `PCollection`s. The following examples show some of the different shapes your pipeline can take. ## Branching PCollections @@ -47,7 +47,7 @@ The pipeline illustrated in Figure 2 below reads its input, first names (Strings A pipeline with multiple transforms. Note that the PCollection of table rows is processed by two transforms. -Figure 2: A pipeline with multiple transforms. Note that the PCollection of the database table rows is processed by two transforms. It's shown as following code: +Figure 2: A pipeline with multiple transforms. Note that the PCollection of the database table rows is processed by two transforms. See the example code below: ```java PCollection dbRowCollection = ...; @@ -96,7 +96,7 @@ The pipeline in Figure 3 performs the same operation in a different way - with o
if (starts with 'A') { outputToPCollectionA } else if (starts with 'B') { outputToPCollectionB }
-where each element in the input `PCollection` is processed once. The code is shown as below: +where each element in the input `PCollection` is processed once. See the example code below: ```java //define main stream and side output final TupleTag mainStreamTag = new TupleTag(){}; @@ -143,7 +143,7 @@ The example depicted in Figure 4 below is a continuation of the example illustra Part of a pipeline that merges multiple PCollections. -Figure 4: Part of a pipeline that merges multiple PCollections. The code is shown as below, +Figure 4: Part of a pipeline that merges multiple PCollections. See the example code below: ```java //merge the two PCollections with Flatten PCollectionList collectionList = PCollectionList.of(aCollection).and(bCollection); @@ -162,7 +162,7 @@ Your pipeline can read its input from one or more sources. If your pipeline read A pipeline with multiple input sources. -Figure 5: A pipeline with multiple input sources, the example is shown as below: +Figure 5: A pipeline with multiple input sources. See the example code below: ```java PCollection> userAddress = pipeline.apply(JdbcIO.>read()...); From 116d9bf8854bf7cde252b3a9ecb1e5e52f8aabcb Mon Sep 17 00:00:00 2001 From: mingmxu Date: Tue, 21 Mar 2017 11:26:28 -0700 Subject: [PATCH 3/4] restate pipeline multiple-in multiple-out --- src/documentation/pipelines/design-your-pipeline.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/documentation/pipelines/design-your-pipeline.md b/src/documentation/pipelines/design-your-pipeline.md index 96cfce130ad..19368b00fd9 100644 --- a/src/documentation/pipelines/design-your-pipeline.md +++ b/src/documentation/pipelines/design-your-pipeline.md @@ -31,7 +31,7 @@ The simplest pipelines represent a linear flow of operations, as shown in Figure Figure 1: A linear pipeline. -However, your pipeline can be significantly more complex. A pipeline represents a [Directed Acyclic Graph](https://en.wikipedia.org/wiki/Directed_acyclic_graph) of steps. It can have multiple input sources, multiple output sinks, its operations (transforms) can read multiple `PCollection`s, and output multiple `PCollection`s. The following examples show some of the different shapes your pipeline can take. +However, your pipeline can be significantly more complex. A pipeline represents a [Directed Acyclic Graph](https://en.wikipedia.org/wiki/Directed_acyclic_graph) of steps. It can have multiple input sources, multiple output sinks, and its operations (`Transform`s) can both read and output multiple `PCollection`s. The following examples show some of the different shapes your pipeline can take. ## Branching PCollections From f31778f698c0f1a14694a7503583d229144ae982 Mon Sep 17 00:00:00 2001 From: mingmxu Date: Tue, 21 Mar 2017 13:43:12 -0700 Subject: [PATCH 4/4] correct spelling --- src/documentation/pipelines/design-your-pipeline.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/documentation/pipelines/design-your-pipeline.md b/src/documentation/pipelines/design-your-pipeline.md index 19368b00fd9..11b38cae110 100644 --- a/src/documentation/pipelines/design-your-pipeline.md +++ b/src/documentation/pipelines/design-your-pipeline.md @@ -31,7 +31,7 @@ The simplest pipelines represent a linear flow of operations, as shown in Figure Figure 1: A linear pipeline. -However, your pipeline can be significantly more complex. A pipeline represents a [Directed Acyclic Graph](https://en.wikipedia.org/wiki/Directed_acyclic_graph) of steps. It can have multiple input sources, multiple output sinks, and its operations (`Transform`s) can both read and output multiple `PCollection`s. The following examples show some of the different shapes your pipeline can take. +However, your pipeline can be significantly more complex. A pipeline represents a [Directed Acyclic Graph](https://en.wikipedia.org/wiki/Directed_acyclic_graph) of steps. It can have multiple input sources, multiple output sinks, and its operations (`PTransform`s) can both read and output multiple `PCollection`s. The following examples show some of the different shapes your pipeline can take. ## Branching PCollections