# Apache Beam pipelines

Un "program" è composto da:

* driver: la pipeline (da sorgenti dati in input a risultati in output)
* runner: esecutore del driver (locale, gcp dataflow, flink, spark, ...)

Funziona sia con dati batch sia con dati in streaming: entrambi condividono le stesse astrazioni:

* **Pipeline:** incapsula tutto il data processing task dall'inizio alla fine.
* **PCollection:** sono gli input e gli output di ogni stadio della pipeline. Sono dataset distribuiti su cui opera la pipeline, *bounded* (data source fissa/batch, ie un file) o *unbounded* (data source continua/stream, ie kafka o pub/sub topic o subscriptions in generale). Si generano o leggendo sorgenti dati o in memory a seguito dei processing steps della pipeline.
* **PTransform:** sono le operazioni di data processing fatte sulle PCollections, aka gli step computazionali della pipeline. Da 1+ PCollections in input, generano 0+ PCollections in output.
* **IO Transform:** lettura/scrittura di dati su vari sistemi di storage (cloud, file system locale, ...)

Di seguito un esempip di un tipico WordCount tramite Beam:

In [1]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import re
import time

### Pipeline Options

Stabilire le pipeline options, fra cui input/output storage e quale runner (locale/distribuito) usare.

Tipicamente si lasciano specificati da terminale, si parsano con argparse ed eventualmente si mettono a default.

Qui non ci sono perchè per eseguire in locale non servono.

In [2]:
pipeline_options = PipelineOptions()

### Computazioni pesanti

Andranno messe dentro i **ParDo**, che verranno eseguiti in parallelo su worker diversi al fine di ridurre l'overhead computazionale.

In [3]:
class ComputeCleanLineFn(beam.DoFn):
    def process(self, element):
        """
        Splits a text line into a list of tuples (word, next_word)
        element: a text line, typed as string e.g. "Hi, I am Gabriele!"
        return: a list of string tuples e.g. [("hi", "i"), ("i", "am"), ("am", "gabriele")]
        """
        regex = r'[a-zA-Z]+'  # r'\w+' to include numbers as well
        line_words = re.findall(regex, element.lower())  # clean punctuation: get a list of (re)
        words_to_tuples = [(line_words[i], line_words[i+1]) for i in range(len(line_words)-1)]
        return words_to_tuples



class ExtractMostLikelyNextWordFn(beam.DoFn):
    def process(self, element):
        """
        Pairs a word with its most likely successor.
        element: a tuple, encoded as (word, [(next_word_i, count_i), ..., (next_word_n, count_n)])
        return: the tuple (word, most_likely_next_word)
        """
        word, next_list = element
        next_list.sort(key=lambda wc_tuple: (-wc_tuple[1], wc_tuple[0]))
        most_likely_successor = next_list[0][0]
        return [(word, most_likely_successor)]

### Computazioni leggere

Basta definirle come semplici funzioni e passarle nella pipeline in una **lambda expression (Map, FlatMap, Reduce, ecc)**. Non verranno eseguite in parallelo.

In [4]:
def format_result(word_next_word):
    """
    Converts to string a tuple (word, most_likely_successor)
    word_next_word: tuple (word, most_likely_successor)
    return: string "word: most_likely_successor"
    """
    (word, most_likely_successor) = word_next_word
    return "{}: {}".format(word, most_likely_successor)

def count_ones(word_ones):
    """
    Counts the ones associated with a (word, next) pair
    word_ones: tuple (word, [1,1,..,1])
    return: tuple (word, count)
    """
    (word, ones) = word_ones
    return (word, sum(ones))


In [5]:
INPUT_FILE = 'alice.txt'
OUTPUT_FILE = 'alice_processed.txt'

start_time = time.time()
with beam.Pipeline(options=pipeline_options) as p:
    output = (p 
             | 'ReadInputFile'     >> beam.io.ReadFromText(INPUT_FILE)                # -> list lines
             | 'CleanLines'        >> beam.ParDo(ComputeCleanLineFn())                # -> list tuple(word, next)
             | 'WordNextWithOne'   >> beam.Map(lambda x: (x, 1))                      # -> tuple((word, next), 1)
             | 'GroupByWordNext'   >> beam.GroupByKey()                               # -> tuple((word, next), [1,1,..1])
             | 'WordNextWithCount' >> beam.Map(count_ones)                            # -> tuple((word, next), count))
             | 'Reshape'           >> beam.Map(lambda x: (x[0][0], (x[0][1], x[1])))  # -> tuple(word, (next, count))
             | 'GroupByWord'       >> beam.GroupByKey()                               # -> tuple(word, [(next_i, count_i), .., (next_n, count_n)])
             | 'ChooseSuccessor'   >> beam.ParDo(ExtractMostLikelyNextWordFn())       # -> list tuple(word, next_k)
             | 'FormatResult'      >> beam.Map(format_result)                         # string "word: next_k"
             | 'WriteResult'       >> beam.io.WriteToText(OUTPUT_FILE)                # output file written
            )
    result = p.run()
    result.wait_until_finish()
    
print("elapsed time: {}s".format(time.time() - start_time))

elapsed time: 3.11439204216s
