<a href="https://colab.research.google.com/github/RomanKucheruk/gcp_tools_cource/blob/main/Try_Apache_Beam.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Try Apache Beam - Python

In this notebook, we set up your development environment and work through a simple example using the [DirectRunner](https://beam.apache.org/documentation/runners/direct/). You can explore other runners with the [Beam Capatibility Matrix](https://beam.apache.org/documentation/runners/capability-matrix/).

To navigate through different sections, use the table of contents. From **View**  drop-down list, select **Table of contents**.

To run a code cell, you can click the **Run cell** button at the top left of the cell, or by select it and press **`Shift+Enter`**. Try modifying a code cell and re-running it to see what happens.

To learn more about Colab, see [Welcome to Colaboratory!](https://colab.sandbox.google.com/notebooks/welcome.ipynb).

# Setup

First, you need to set up your environment, which includes installing `apache-beam` and downloading a text file from Cloud Storage to your local file system. We are using this file to test your pipeline.

In [None]:
# Run and print a shell command.
def run(cmd):
  print('>> {}'.format(cmd))
  !{cmd}
  print('')

# Install apache-beam.
run('pip install --quiet apache-beam')

# Copy the input file into the local file system.
run('mkdir -p data')
run('gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/')

>> pip install --quiet apache-beam
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m14.6/14.6 MB[0m [31m28.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m140.3/140.3 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m10.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.7/2.7 MB[0m [31m31.6 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m603.6/603.6 kB[0m [31m17.6 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.7/2.7 MB[0m [31m49.7 MB/s[0m eta [36m0:00:00[0m
[?25

In [None]:
!cat data/kinglear.txt

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
	[Exeunt]




	KING LEAR


ACT I



SCENE II	The Earl of Gloucester's castle.


	[Enter EDMUND, with a letter]

EDMUND	Thou, nature, art my goddess; to thy law
	My services are bound. Wherefore should I
	Stand in the plague of custom, and permit
	The curiosity of nations to deprive me,
	For that I am some twelve or fourteen moon-shines
	Lag of a brother? Why bastard? wherefore base?
	When my dimensions are as well compact,
	My mind as generous, and my shape as true,
	As honest madam's issue? Why brand they us
	With base? with baseness? bastardy? base, base?
	Who, in the lusty stealth of nature, take
	More composition and fierce quality
	Than doth, within a dull, stale, tired bed,
	Go to the creating a whole tribe of fops,
	Got 'tween asleep and wake? Well, then,
	Legitimate Edgar, I must have your land:
	Our father's love is to the bastard Edmund
	As to the legitimate: fine word,--legitimate!
	Well, my legitimate, if this

# Minimal word count

The following example is the "Hello, World!" of data processing, a basic implementation of word count. We're creating a simple data processing pipeline that reads a text file and counts the number of occurrences of every word.

There are many scenarios where all the data does not fit in memory. Notice that the outputs of the pipeline go to the file system, which allows for large processing jobs in distributed environments.

In [None]:
import apache_beam as beam
import re

inputs_pattern = 'data/*'
outputs_prefix = 'outputs/part'

# Running locally in the DirectRunner.
with beam.Pipeline() as pipeline:
  (
      pipeline
      | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)
      | 'Find words' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line))
      | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))
      | 'Group and sum' >> beam.CombinePerKey(sum)
      | 'Format results' >> beam.Map(lambda word_count: str(word_count))
      | 'Write results' >> beam.io.WriteToText(outputs_prefix)
  )

# Sample the first 20 results, remember there are no ordering guarantees.
run('head -n 20 {}-00000-of-*'.format(outputs_prefix))



>> head -n 20 outputs/part-00000-of-*
('KING', 243)
('LEAR', 236)
('DRAMATIS', 1)
('PERSONAE', 1)
('king', 65)
('of', 447)
('Britain', 2)
('OF', 15)
('FRANCE', 10)
('DUKE', 3)
('BURGUNDY', 8)
('CORNWALL', 63)
('ALBANY', 67)
('EARL', 2)
('KENT', 156)
('GLOUCESTER', 141)
('EDGAR', 126)
('son', 29)
('to', 438)
('Gloucester', 26)



# Word count with comments

Below is mostly the same code as above, but with comments explaining every line in more detail.

In [None]:
import apache_beam as beam
import re

inputs_pattern = 'data/*'
outputs_prefix = 'outputs/part'

# Running locally in the DirectRunner.
with beam.Pipeline() as pipeline:
  # Store the word counts in a PCollection.
  # Each element is a tuple of (word, count) of types (str, int).
  word_counts = (
      # The input PCollection is an empty pipeline.
      pipeline

      # Read lines from a text file.
      | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)
      # Element type: str - text line

      # Use a regular expression to iterate over all words in the line.
      # FlatMap will yield an element for every element in an iterable.
      | 'Find words' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line))
      # Element type: str - word

      # Create key-value pairs where the value is 1, this way we can group by
      # the same word while adding those 1s and get the counts for every word.
      | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))
      # Element type: (str, int) - key: word, value: 1

      # Group by key while combining the value using the sum() function.
      | 'Group and sum' >> beam.CombinePerKey(sum)
      # Element type: (str, int) - key: word, value: counts
  )

  # We can process a PCollection through other pipelines too.
  (
      # The input PCollection is the word_counts created from the previous step.
      word_counts

      # Format the results into a string so we can write them to a file.
      | 'Format results' >> beam.Map(lambda word_count: str(word_count))
      # Element type: str - text line

      # Finally, write the results to a file.
      | 'Write results' >> beam.io.WriteToText(outputs_prefix)
  )

# Sample the first 20 results, remember there are no ordering guarantees.
run('head -n 20 {}-00000-of-*'.format(outputs_prefix))



>> head -n 20 outputs/part-00000-of-*
('KING', 243)
('LEAR', 236)
('DRAMATIS', 1)
('PERSONAE', 1)
('king', 65)
('of', 447)
('Britain', 2)
('OF', 15)
('FRANCE', 10)
('DUKE', 3)
('BURGUNDY', 8)
('CORNWALL', 63)
('ALBANY', 67)
('EARL', 2)
('KENT', 156)
('GLOUCESTER', 141)
('EDGAR', 126)
('son', 29)
('to', 438)
('Gloucester', 26)



In [None]:
!curl https://raw.githubusercontent.com/Rajeshwari-Rudra/apache_beam-python/main/netflix_titles.csv --output netflix.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  243k  100  243k    0     0   332k      0 --:--:-- --:--:-- --:--:--  332k


In [None]:
!ls

data  netflix.csv  outputs  sample_data


In [None]:
import apache_beam as beam
pipeline = beam.Pipeline()
netflix = (
  pipeline
    | beam.io.ReadFromText("netflix.csv", skip_header_lines=1)
    | beam.io.WriteToText("results.txt")
)
pipeline.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7914dff5f1f0>

In [None]:
import apache_beam as beam
pipeline = beam.Pipeline()
netflix = (
  pipeline
    | beam.io.ReadFromText("netflix.csv", skip_header_lines=1)
    | beam.Map(lambda line:line.split(","))
    | beam.Filter(lambda line:line[1] == "Movie")
    | beam.io.WriteToText("results.txt")
  )
pipeline.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7914edd87dc0>

In [None]:
!ls

data  netflix.csv  outputs  results.txt-00000-of-00001	sample_data


In [None]:
SHOW_ID = 0
TYPE = 1
RELEASE_YEAR = 2
RATING = 3
def is_ReleaseYear(film):
  return film[RELEASE_YEAR] == "2020" and film[TYPE] == "Movie"

import apache_beam as beam
pipeline = beam.Pipeline()
netflix = (
  pipeline
    | beam.io.ReadFromText("netflix.csv", skip_header_lines=1)
    | beam.Map(lambda line:line.split(","))
    | beam.Filter(is_ReleaseYear)
    | beam.io.WriteToText("results.txt")
  )
pipeline.run()



<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7914edb5cd90>

In [None]:
!ls

data  netflix.csv  outputs  results.txt-00000-of-00001	sample_data
