### Apache Beam is an open source, unified model for defining both batch and streaming-data parallel-processing pipelines.  Using Apache Beam SDKs you build a program that defines the pipeline and run it on any execution engine

In [None]:

import argparse # to create a program through command-line environment

from typing import List, Tuple

import apache_beam as beam
from apache_beam import PCollection
from apache_beam.options.pipeline_options import PipelineOptions 


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--input-location", required=True)
    parser.add_argument("--output-location", required=True)
    parser.add_argument("--num-of-words", default=1000, required=False)

    my_args, beam_args = parser.parse_known_args()
    run_pipeline(my_args, beam_args)


def processed_word(word: str) -> str:
    word = word.lower()
    word = word.replace(",", "").replace(".", "")
    return word


def prettify(tl: List[Tuple[str, int]]) -> str:
    pretty_str = ""
    for t in tl:
        pretty_str += f"{t[0]},{t[1]}\n"
    return pretty_str


def run_pipeline(custom_args, beam_args):
    opts = PipelineOptions(beam_args)     # configure the pipeline to create a custom configuration

    input_location = custom_args.input_location
    output_location = custom_args.output_location
    num_words = custom_args.num_of_words

    with beam.pipeline.Pipeline(options=opts) as p:  # create the pipeline
        # Reading the data
        lines: PCollection[str] = p | "Reading input data" >> beam.io.ReadFromText(file_pattern=input_location)
        # output
        # "En un lugar de la Mancha", "cuyo nombre...", "another sentence", ...
        words: PCollection[str] = lines | "Split words" >> beam.FlatMap(lambda line: line.split())

        # "En", "un", "lugar", ..., "en"
        transformed: PCollection[str] = words | "transform words" >> beam.Map(processed_word)
        # "en", "un", "lugar", ..., "en"

        counted: PCollection[Tuple[str, int]] = transformed | "Count" >> beam.combiners.Count.PerElement()

        ranked: PCollection[List[Tuple[str, int]]] = counted | "Rank" >> beam.combiners.Top.Of(num_words,
                                                                                               key=lambda t: t[1])

        prettied: PCollection[str] = ranked | "awesome print" >> beam.Map(prettify)
        prettied | "Write output" >> beam.io.WriteToText(output_location)


if __name__ == '__main__':
    # Run this script in the shell 
    # python Beam_Pipeline_wc.ipynb --runner=DirectRunner --input-location=./data/el_quijote.txt --output-location=out/word_count.csv
    main()