## Word Count with Beam

In [None]:
import re
import logging
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions

In [None]:
options = PipelineOptions()
options.view_as(StandardOptions).runner = 'DirectRunner'

In [None]:
# Run the whole thing in one go
with beam.Pipeline(options=options) as p:
    lines = p | ReadFromText('war_and_peace.txt')
    
    # Count the occurrences of each word.
    counts = (
        lines
        | 'Split' >> (
            beam.FlatMap(
                lambda x: re.findall(r'[A-Za-z\']+', x)).with_output_types(str))
        | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
        | 'GroupAndSum' >> beam.CombinePerKey(sum))

    # Format the counts into a PCollection of strings.
    def format_result(word_count):
      (word, count) = word_count
      return '%s: %s' % (word, count)

    output = counts | 'Format' >> beam.Map(format_result)
    
    output | "print" >> beam.Map(print)