In [1]:
!gcloud services enable dataflow

In [2]:
import re
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.runners import DataflowRunner
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

In [3]:
import google.auth

In [4]:
class ReadWordsFromText(beam.PTransform):
    
    def __init__(self, file_pattern):
        self._file_pattern = file_pattern
    
    def expand(self, pcoll):
        return (pcoll.pipeline
                | beam.io.ReadFromText(self._file_pattern)
                | beam.FlatMap(lambda line: re.findall(r'[\w\']+', line.strip(), re.UNICODE)))
    
p = beam.Pipeline(InteractiveRunner())

words = p | 'read' >> ReadWordsFromText('gs://sandeep-apache/data.txt')

counts = (words 
          | 'count' >> beam.combiners.Count.PerElement())

lower_counts = (words
                | "lower" >> beam.Map(lambda word: word.lower())
                | "lower_count" >> beam.combiners.Count.PerElement())

In [5]:
ib.show(counts)

In [6]:
# Setting up the Apache Beam pipeline options.
# Note that use_runner_v2 experiment is needed to run pipeline when
# using DataflowRunner with Beam version 2.50.0 - https://github.com/apache/beam/issues/28399
options = pipeline_options.PipelineOptions(flags={'--experiments=use_runner_v2'} if beam.version.__version__ == '2.50.0' else {})

# Sets the project to the default project in your current Google Cloud environment.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Sets the Google Cloud Region in which Cloud Dataflow runs.
options.view_as(GoogleCloudOptions).region = 'us-central1'



In [7]:
# IMPORTANT! Adjust the following to choose a Cloud Storage location.
dataflow_gcs_location = 'gs://sandeep-apache/dataflow'


# Dataflow Staging Location. This location is used to stage the Dataflow Pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Dataflow Temp Location. This location is used to store temporary files or intermediate results before finally outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location

In [8]:
# The directory to store the output files of the job.
output_gcs_location = '%s/output' % dataflow_gcs_location

# Specifying the Cloud Storage location to write `counts` to,
# based on the `output_gcs_location` variable set earlier.
(counts | 'Write counts to Cloud Storage' 
 >> beam.io.WriteToText(output_gcs_location + '/wordcount-output.txt'))

# Specifying the Cloud Storage location to write `lower_counts` to,
# based on the `output_gcs_location` variable set earlier.
(lower_counts | 'Write lower counts to Cloud Storage' 
 >> beam.io.WriteToText(output_gcs_location + '/wordcount-lower-output.txt'))

<PCollection[[8]: Write lower counts to Cloud Storage/Write/WriteImpl/FinalizeWrite.None] at 0x7f4bf466aec0>

In [9]:
# IMPORTANT! Ensure that the graph is correct before sending it out to Dataflow.
# Because this is a notebook environment, unintended additions to the graph may have occurred when rerunning cells. 
ib.show_graph(p)


/usr/bin/dot


In [10]:
pipeline_result = DataflowRunner().run_pipeline(p, options=options)


In [11]:
from IPython.core.display import display, HTML
url = ('https://console.cloud.google.com/dataflow/jobs/%s/%s?project=%s' % 
      (pipeline_result._job.location, pipeline_result._job.id, pipeline_result._job.projectId))
display(HTML('Click <a href="%s" target="_new">here</a> for the details of your Dataflow job!' % url))

  from IPython.core.display import display, HTML


In [12]:
pipeline_result.wait_until_finish()

'DONE'