In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

In [None]:
import re

class WordExtractingDoFn(beam.DoFn):
  """Parse each line of input text into words."""
  def process(self, element):
    """Returns an iterator over the words of this element.

    The element is a line of text.  If the line is blank, note that, too.

    Args:
      element: the element being processed

    Returns:
      The processed element.
    """
    return re.findall(r'[\w\']+', element, re.UNICODE)

In [None]:
# Define arguments
input_path = "my_text.txt"
output_path = "outputs/file.txt"

In [None]:
# Define pipeline
p = beam.Pipeline(InteractiveRunner())

In [None]:
# Read the text file[pattern] into a PCollection.
lines = p | 'Read' >> ReadFromText(input_path)
ib.show(lines)

In [None]:
counts = (
    lines
    | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
    | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
    | 'GroupAndSum' >> beam.CombinePerKey(sum))
ib.show(counts)

In [None]:
output1 = lines | (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
ib.show(output1)

In [None]:
output2 = output1 | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
ib.show(output2)

In [None]:
# Format the counts into a PCollection of strings.
def format_result(word, count):
    return '%s: %d' % (word, count)

output = counts | 'Format' >> beam.MapTuple(format_result)

ib.show(output)

In [None]:
_ = output | "Write" >> WriteToText(output_path)
ib.collect(_)