<a href="https://colab.research.google.com/github/RM-RAMASAMY/Apache_beam/blob/main/ApacheBeamAssignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Run and print a shell command.
def run(cmd):
  print('>> {}'.format(cmd))
  !{cmd}
  print('')

# Install apache-beam.
run('pip install --quiet apache-beam')

# Copy the input file into the local file system.
run('mkdir -p data')
run('gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/')

>> pip install --quiet apache-beam
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m5.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m7.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.5/43.5 kB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m15.7/15.7 MB[0m [31m51.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.1/3.1 MB[0m [31m52.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.8/40.8 MB[0m [31m16.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━

In [2]:
import apache_beam as beam
import re

inputs_pattern = 'data/*'
outputs_prefix = 'outputs/part'

# Running locally in the DirectRunner.
with beam.Pipeline() as pipeline:
  (
      pipeline
      | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)
      | 'Find words' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line))
      | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))
      | 'Group and sum' >> beam.CombinePerKey(sum)
      | 'Format results' >> beam.Map(lambda word_count: str(word_count))
      | 'Write results' >> beam.io.WriteToText(outputs_prefix)
  )

# Sample the first 20 results, remember there are no ordering guarantees.
run('head -n 20 {}-00000-of-*'.format(outputs_prefix))



>> head -n 20 outputs/part-00000-of-*
('KING', 243)
('LEAR', 236)
('DRAMATIS', 1)
('PERSONAE', 1)
('king', 65)
('of', 447)
('Britain', 2)
('OF', 15)
('FRANCE', 10)
('DUKE', 3)
('BURGUNDY', 8)
('CORNWALL', 63)
('ALBANY', 67)
('EARL', 2)
('KENT', 156)
('GLOUCESTER', 141)
('EDGAR', 126)
('son', 29)
('to', 438)
('Gloucester', 26)



In [3]:
import apache_beam as beam
import re

inputs_pattern = 'data/*'
outputs_prefix = 'outputs/part'

# Running locally in the DirectRunner.
with beam.Pipeline() as pipeline:
  # Store the word counts in a PCollection.
  # Each element is a tuple of (word, count) of types (str, int).
  word_counts = (
      # The input PCollection is an empty pipeline.
      pipeline

      # Read lines from a text file.
      | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)
      # Element type: str - text line

      # Use a regular expression to iterate over all words in the line.
      # FlatMap will yield an element for every element in an iterable.
      | 'Find words' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line))
      # Element type: str - word

      # Create key-value pairs where the value is 1, this way we can group by
      # the same word while adding those 1s and get the counts for every word.
      | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))
      # Element type: (str, int) - key: word, value: 1

      # Group by key while combining the value using the sum() function.
      | 'Group and sum' >> beam.CombinePerKey(sum)
      # Element type: (str, int) - key: word, value: counts
  )

  # We can process a PCollection through other pipelines too.
  (
      # The input PCollection is the word_counts created from the previous step.
      word_counts

      # Format the results into a string so we can write them to a file.
      | 'Format results' >> beam.Map(lambda word_count: str(word_count))
      # Element type: str - text line

      # Finally, write the results to a file.
      | 'Write results' >> beam.io.WriteToText(outputs_prefix)
  )

# Sample the first 20 results, remember there are no ordering guarantees.
run('head -n 20 {}-00000-of-*'.format(outputs_prefix))



>> head -n 20 outputs/part-00000-of-*
('KING', 243)
('LEAR', 236)
('DRAMATIS', 1)
('PERSONAE', 1)
('king', 65)
('of', 447)
('Britain', 2)
('OF', 15)
('FRANCE', 10)
('DUKE', 3)
('BURGUNDY', 8)
('CORNWALL', 63)
('ALBANY', 67)
('EARL', 2)
('KENT', 156)
('GLOUCESTER', 141)
('EDGAR', 126)
('son', 29)
('to', 438)
('Gloucester', 26)

