# Try Apache Beam - Python

In this notebook, we set up your development environment and work through a simple example using the [DirectRunner](https://beam.apache.org/documentation/runners/direct/). You can explore other runners with the [Beam Capatibility Matrix](https://beam.apache.org/documentation/runners/capability-matrix/).

To navigate through different sections, use the table of contents. From **View**  drop-down list, select **Table of contents**.

To run a code cell, you can click the **Run cell** button at the top left of the cell, or by select it and press **`Shift+Enter`**. Try modifying a code cell and re-running it to see what happens.

To learn more about Colab, see [Welcome to Colaboratory!](https://colab.sandbox.google.com/notebooks/welcome.ipynb).

# Setup

First, you need to set up your environment, which includes installing `apache-beam` and downloading a text file from Cloud Storage to your local file system. We are using this file to test your pipeline.

#### Setting Dependancies

In [1]:
import apache_beam as beam
import re
from apache_beam.options.pipeline_options import PipelineOptions
import warnings
warnings.filterwarnings("ignore")


In [2]:
# Run and print a shell command.
def run(cmd):
  print('>> {}'.format(cmd))
  !{cmd}
  print('')

# Install apache-beam.
# run('pip install --quiet apache-beam')

# Copy the input file into the local file system.
run('mkdir -p data')
run('gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/')

>> mkdir -p data

>> gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/
Copying gs://dataflow-samples/shakespeare/kinglear.txt...
/ [1 files][153.6 KiB/153.6 KiB]                                                
Operation completed over 1 objects/153.6 KiB.                                    



# Word count with comments

Below is mostly the same code as above, but with comments explaining every line in more detail.

In [None]:
inputs_pattern = 'data/*'
outputs_prefix = 'outputs/part'

# Running locally in the DirectRunner.
with beam.Pipeline() as pipeline:
  # Store the word counts in a PCollection.
  # Each element is a tuple of (word, count) of types (str, int).
  word_counts = (
      # The input PCollection is an empty pipeline.
      pipeline

      # Read lines from a text file.
      | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)
      # Element type: str - text line

      # Use a regular expression to iterate over all words in the line.
      # FlatMap will yield an element for every element in an iterable.
      | 'Find words' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line))
      # Element type: str - word
      
      ##### MODIFICATION 1 #####
      
      # Modified by Rohan: Convert all words to lowercase to ensure that the same words with different cases are counted together. 
      # For example, 'The' and 'the' will be treated as the same word 'the'. Similary 'KING', 'King' and 'king' will be treated
      # as the same word 'king'.
      | 'Lowercase words' >> beam.Map(lambda word: word.lower())

      # Create key-value pairs where the value is 1, this way we can group by
      # the same word while adding those 1s and get the counts for every word.
      | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))
      # Element type: (str, int) - key: word, value: 1

      # Group by key while combining the value using the sum() function.
      | 'Group and sum' >> beam.CombinePerKey(sum)
      # Element type: (str, int) - key: word, value: counts
  )

  # We can process a PCollection through other pipelines too.
  (
      # The input PCollection is the word_counts created from the previous step.
      word_counts

      # Format the results into a string so we can write them to a file.
      | 'Format results' >> beam.Map(lambda word_count: str(word_count))
      # Element type: str - text line

      # Finally, write the results to a file.
      | 'Write results' >> beam.io.WriteToText(outputs_prefix)
  )

# Sample the first 20 results, remember there are no ordering guarantees.
run('head -n 200 {}-00000-of-*'.format(outputs_prefix))

####
#### Using SparkRunner (Executing on local Spark engine)
#### This method is designed to cater to real workloads since it involves full Spark parallelism

In [None]:
inputs_pattern = 'data/*'
outputs_prefix = 'outputs_spark/part'

# Spark runner options
beam_options = PipelineOptions([
    '--runner=SparkRunner',           # This tells Beam to use the Spark runtime
    '--spark_master=local[*]',        # using local Spark (Tells Spark to run only ON MY MACHINE and not a Spark cluster)
    '--job_name=wordcount-sparklocal' ])

with beam.Pipeline(options=beam_options) as pipeline:
    word_counts = (
        pipeline
        | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)
        | 'Find words' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line))
        
        ##### MODIFICATION #####
      
        # Converts all words to lowercase to ensure that the same words with different cases are counted together. 
        # For example, 'The' and 'the' will be treated as the same word 'the'. Similary 'KING', 'King' and 'king' will be treated
        # as the same word 'king'.
        | 'Lowercase words' >> beam.Map(lambda word: word.lower())
        
        | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))
        | 'Group and sum' >> beam.CombinePerKey(sum)
    )

    (
        word_counts
        | 'Format results' >> beam.Map(lambda wc: f'{wc}')
        | 'Write results' >> beam.io.WriteToText(outputs_prefix)
    )

# Peek at output
run('cat outputs_spark/part-* > outputs_spark/all_results.txt')
# run('cat outputs_spark/part-* | head -n 50')
run('head -n 200 {}-00000-of-*'.format(outputs_prefix))



#### The data gets stored in the outputs_spark folder and we can explore the complete word counts(all_results.txt) there. 
#### The results are an exact match with that of the DirectRunner results. 



>> cat outputs_spark/part-* > outputs_spark/all_results.txt

>> head -n 200 outputs_spark/part-00000-of-*
('few', 1)
('lecherous', 1)
('adieu', 1)
('such', 44)
('persuade', 2)
('bred', 2)
('hourly', 3)
("fellow's", 1)
('guardians', 1)
('heart', 49)
('cur', 2)
('terrible', 2)
('worth', 9)
('mire', 1)
('physician', 1)
('spirit', 6)
('friendly', 2)
('wish', 2)
('owest', 2)
('longer', 2)
('kinder', 1)
('poorest', 2)
('kin', 1)
('fleshment', 1)
("'parel", 1)
('thanks', 2)
('sing', 2)
('matter', 15)
('its', 1)
('party', 4)
('banishment', 3)
('liege', 2)
('lightning', 1)
('speaks', 3)
('lacks', 1)
('soliciting', 1)
('songs', 1)
('attempting', 1)
('moonshine', 1)
('shows', 3)
("talk'd", 1)
('deadly', 2)
('masts', 1)
('steeples', 1)
('sixth', 1)
('suits', 1)
('miles', 1)
('custom', 1)
('borest', 1)
('bondage', 1)
('rotundity', 1)
('turns', 3)
('throughly', 1)
('morning', 1)
('sennet', 1)
('light', 5)
('broils', 1)
('beaks', 1)
("tom's", 7)
('contriving', 1)
("unburthen'd", 1)
('che', 1)
('strat