Skip to content

Comments

[BEAM-667] Verify and update Java wordcount snippets#311

Merged
asfgit merged 3 commits intoapache:asf-sitefrom
melap:wordcountupdate
Sep 21, 2017
Merged

[BEAM-667] Verify and update Java wordcount snippets#311
asfgit merged 3 commits intoapache:asf-sitefrom
melap:wordcountupdate

Conversation

@melap
Copy link

@melap melap commented Aug 31, 2017

Verify and update Java wordcount snippets so they match what's in the actual example code + some minor cleanup.

@asfgit
Copy link

asfgit commented Aug 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Website_Stage/668/

Jenkins built the site at commit id f4383c2 with Jekyll and staged it here. Happy reviewing.

Note that any previous site has been deleted. This staged site will be automatically deleted after its TTL expires. Push any commit to the pull request branch or re-trigger the build to get it staged again.

Copy link
Member

@tgroh tgroh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to accept any of the wording changes if you have sufficiently strong feelings.

### Creating the Pipeline

The first step in creating a Beam pipeline is to create a `PipelineOptions` object. This object lets us set various options for our pipeline, such as the pipeline runner that will execute our pipeline and any runner-specific configuration required by the chosen runner. In this example we set these options programmatically, but more often command-line arguments are used to set `PipelineOptions`.
The first step in creating a Beam pipeline is to create a `PipelineOptions` object. This object lets us set various options for our pipeline, such as the pipeline runner that will execute our pipeline and any runner-specific configuration required by the chosen runner. In this example we set these options programmatically, but more often, command-line arguments are used to set `PipelineOptions`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer technically true. It's a required step, but can be performed at any point before execution if the options are provided to the run method (at least in java)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this to be more specific to the example: "In this example, the code first creates a PipelineOptions object." this is a good point from a bigger picture and this is something that may need to be addressed on on some other pages also.

#### Direct Runner

If you execute your pipeline using `DirectRunner`, it will print the log messages directly to your local console.
If you execute your pipeline using `DirectRunner`, it prints the log messages directly to your local console.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is usually the case, but requires Slf4j to be set up appropriately

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added sentence for Java about adding Slf4j to class path

#### Cloud Dataflow Runner

If you execute your pipeline using `DataflowRunner`, you can use Stackdriver Logging. Stackdriver Logging aggregates the logs from all of your Dataflow job's workers to a single location in the Google Cloud Platform Console. You can use Stackdriver Logging to search and access the logs from all of the workers that Dataflow has spun up to complete your Dataflow job. Logging statements in your pipeline's `DoFn` instances will appear in Stackdriver Logging as your pipeline runs.
If you execute your pipeline using `DataflowRunner`, you can use Stackdriver Logging. Stackdriver Logging aggregates the logs from all of your Cloud Dataflow job's workers to a single location in the Google Cloud Platform Console. You can use Stackdriver Logging to search and access the logs from all of the workers that Cloud Dataflow has spun up to complete your job. Logging statements in your pipeline's `DoFn` instances will appear in Stackdriver Logging as your pipeline runs.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...using the DataflowRunner...

If you execute your pipeline using `DataflowRunner`, you can use Stackdriver Logging. Stackdriver Logging aggregates the logs from all of your Cloud Dataflow job's workers to a single location in the Google Cloud Platform Console. You can use Stackdriver Logging to search and access the logs from all of the workers that Cloud Dataflow has spun up to complete your job. Logging statements in your pipeline's `DoFn` instances will appear in Stackdriver Logging as your pipeline runs.

If you execute your pipeline using `DataflowRunner`, you can control the worker log levels. Dataflow workers that execute user code are configured to log to Stackdriver Logging by default at "INFO" log level and higher. You can override log levels for specific logging namespaces by specifying: `--workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...}`. For example, by specifying `--workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"}` when executing this pipeline using the Dataflow service, Stackdriver Logging would contain only "DEBUG" or higher level logs for the package in addition to the default "INFO" or higher level logs.
If you execute your pipeline using `DataflowRunner`, you can control the worker log levels. Cloud Dataflow workers that execute user code are configured to log to Stackdriver Logging by default at "INFO" log level and higher. You can override log levels for specific logging namespaces by specifying: `--workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...}`. For example, by specifying `--workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"}` when executing a pipeline using the Cloud Dataflow service, Stackdriver Logging will contain only "DEBUG" or higher level logs for the package in addition to the default "INFO" or higher level logs.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "If you execute your pipeline using DataflowRunner, ..." seems overly specified (esp. given the section headings)

maybe "When executing using the DataflowRunner, ..."?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to "When executing your pipeline with the DataflowRunner" and removed repetitive use of this for the second paragraph

Beam allows you to create a single pipeline that can handle both bounded and unbounded types of input. If the input is unbounded, then all PCollections of the pipeline will be unbounded as well. The same goes for bounded input. If your input has a fixed number of elements, it's considered a 'bounded' data set. If your input is continuously updating, then it's considered 'unbounded'.

Recall that the input for this example is a set of Shakespeare's texts, finite data. Therefore, this example reads bounded data from a text file:
Recall that the input for this example is a set of Shakespeare's texts, which is finite data. Therefore, this example reads bounded data from a text file:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"which is a finite set of data"?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

### Windowing

Beam uses a concept called **Windowing** to subdivide a `PCollection` according to the timestamps of its individual elements. PTransforms that aggregate multiple elements, process each `PCollection` as a succession of multiple, finite windows, even though the entire collection itself may be of infinite size (unbounded).
Beam uses a concept called **Windowing** to subdivide a `PCollection` according to the timestamps of its individual elements. PTransforms that aggregate multiple elements process each `PCollection` as a succession of multiple, finite windows, even though the entire collection itself may be of infinite size (unbounded).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"according to the timestamps" is true for all of the built-in window fns, but is not a requirement.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to "... to subdivide a PCollection into a
bounded set of elements."

When our input is unbounded, the same is true of our output `PCollection`. We need to make sure that we choose an appropriate, unbounded sink. Some output sinks support only bounded output, while others support both bounded and unbounded outputs. By using a `FilenamePolicy`, we can use `TextIO` to files that are partitioned by windows. We use a composite `PTransform` that uses such a policy internally to write a single sharded file per window.

In this example, we stream the results to a BigQuery table. The results are then formatted for a BigQuery table, and then written to BigQuery using BigQueryIO.Write.
In this example, we stream the results to a BigQuery table. The results are then formatted for a Google BigQuery table, and then written to BigQuery using `BigQueryIO.Write`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The causal relationships here seem broken.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

@melap
Copy link
Author

melap commented Sep 11, 2017

Updated from feedback. All other changes are just formatting to fix the gigantic lines through the file.

@asfgit
Copy link

asfgit commented Sep 11, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Website_Stage/691/

Jenkins built the site at commit id a999f74 with Jekyll and staged it here. Happy reviewing.

Note that any previous site has been deleted. This staged site will be automatically deleted after its TTL expires. Push any commit to the pull request branch or re-trigger the build to get it staged again.

The Minimal WordCount pipeline contains several transforms to read data into the pipeline, manipulate or otherwise transform the data, and write out the results. Each transform represents an operation in the pipeline.
The Minimal WordCount pipeline contains several transforms to read data into the
pipeline, manipulate or otherwise transform the data, and write out the results.
Each transform represents an operation in the pipeline.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the use of the word 'operation' here - it's some transform over the data, but it may be implemented in terms of multiple other transforms. Some basic transforms represent an individual operation (I'm understanding operation to be synonymous with 'primitive', which it may not be as you use it)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Each transform represents an operation in the pipeline.

Each transform takes some kind of input (data or otherwise), and produces some output data. The input and output data is represented by the SDK class `PCollection`. `PCollection` is a special class, provided by the Beam SDK, that you can use to represent a data set of virtually any size, including unbounded data sets.
Each transform takes some kind of input (data or otherwise), and produces some
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is 'otherwise'

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

Each transform takes some kind of input (data or otherwise), and produces some output data. The input and output data is represented by the SDK class `PCollection`. `PCollection` is a special class, provided by the Beam SDK, that you can use to represent a data set of virtually any size, including unbounded data sets.
Each transform takes some kind of input (data or otherwise), and produces some
output data. The input and output data is represented by the SDK class
`PCollection`. `PCollection` is a special class, provided by the Beam SDK, that
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about transforms producing composite types?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added


Run the pipeline by calling the `run` method, which sends your pipeline to be executed by the pipeline runner that you specified when you created your pipeline.
Run the pipeline by calling the `run` method, which sends your pipeline to be
executed by the pipeline runner that you specified when you created your
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when you created your pipeline -> in your PipelineOptions

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


Note that the `run` method is asynchronous. For a blocking execution instead, run your pipeline appending the `waitUntilFinish` method.
Note that the `run` method is asynchronous. For a blocking execution, run your
pipeline appending the <span class="language-java">`waitUntilFinish`</span>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

call the waitUntilFinish[+py] method on the result object returned by the call to run

(specific wording as appropriate)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

If you have a processing operation that consists of multiple transforms or `ParDo` steps, you can create it as a subclass of `PTransform`. Creating a `PTransform` subclass allows you to create complex reusable transforms, can make your pipeline's structure more clear and modular, and makes unit testing easier.
If you have a processing operation that consists of multiple transforms or
`ParDo` steps, you can create it as a subclass of `PTransform`. Creating a
`PTransform` subclass allows you to create complex reusable transforms, can make
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create complex reusable transforms -> encapsulate complex transforms?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


```py
This feature is not yet available in the Beam SDK for Python.
# This feature is not yet available in the Beam SDK for Python.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not believe this to be true, but can be fixed in a separate change

@robertwb

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filed BEAM-2976

Beam allows you to create a single pipeline that can handle both bounded and unbounded types of input. If the input is unbounded, then all PCollections of the pipeline will be unbounded as well. The same goes for bounded input. If your input has a fixed number of elements, it's considered a 'bounded' data set. If your input is continuously updating, then it's considered 'unbounded'.
Beam allows you to create a single pipeline that can handle both bounded and
unbounded types of input. If the input is unbounded, then all PCollections of
the pipeline will be unbounded as well. The same goes for bounded input. If your
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all downstream PCollections will be unbounded, but separate branches may be independently bounded; running a pipeline which contains an unbounded PCollection requires streaming

(above is not site-suitable wording)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed wording


Each element in a `PCollection` has an associated **timestamp**. The timestamp
for each element is initially assigned by the source that creates the
`PCollection` and can be adjusted by a `DoFn`. In this example the input is
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(with some caveats - specifically, it timestamps can by default only be moved forwards in time, and some sources let the user attach a timestamp at the site of the read)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed wording


Beam uses a concept called **Windowing** to subdivide a `PCollection` according to the timestamps of its individual elements. PTransforms that aggregate multiple elements, process each `PCollection` as a succession of multiple, finite windows, even though the entire collection itself may be of infinite size (unbounded).
Beam uses a concept called **Windowing** to subdivide a `PCollection` into a
bounded set of elements. PTransforms that aggregate multiple elements process
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a bounded set -> bounded sets

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@asfgit
Copy link

asfgit commented Sep 21, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Website_Stage/711/

Jenkins built the site at commit id accfcec with Jekyll and staged it here. Happy reviewing.

Note that any previous site has been deleted. This staged site will be automatically deleted after its TTL expires. Push any commit to the pull request branch or re-trigger the build to get it staged again.

Beam allows you to create a single pipeline that can handle both bounded and unbounded types of input. If the input is unbounded, then all PCollections of the pipeline will be unbounded as well. The same goes for bounded input. If your input has a fixed number of elements, it's considered a 'bounded' data set. If your input is continuously updating, then it's considered 'unbounded'.
Beam allows you to create a single pipeline that can handle both bounded and
unbounded types of input. If your input has a fixed number of elements, it's
considered a 'bounded' data set. If your input is continuously updating, then
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd file a Jira to update the wording here - the way I think of unbounded is that it doesn't make sense to ask "do I have all of the data", just "at what point do I have all of the data up to", but that might need to be workshopped a little.

@tgroh
Copy link
Member

tgroh commented Sep 21, 2017

@asfgit merge

asfgit pushed a commit that referenced this pull request Sep 21, 2017
@asfgit
Copy link

asfgit commented Sep 21, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Website_Stage/712/

Jenkins built the site at commit id 30d5e8f with Jekyll and staged it here. Happy reviewing.

Note that any previous site has been deleted. This staged site will be automatically deleted after its TTL expires. Push any commit to the pull request branch or re-trigger the build to get it staged again.

@asfgit asfgit merged commit 30d5e8f into apache:asf-site Sep 21, 2017
@melap melap deleted the wordcountupdate branch September 21, 2017 21:19
robertwb pushed a commit to robertwb/incubator-beam that referenced this pull request Jun 5, 2018
robertwb pushed a commit to robertwb/incubator-beam that referenced this pull request Jun 5, 2018
melap pushed a commit to apache/beam that referenced this pull request Jun 20, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants