# Apache Beam

Фреймворк для потоковой обработки данных, который может работать поверх различных движков (`runners`): `Google Cloud Dataflow`, `Apache Spark`,  `Direct Runner`, ... 

`Pipeline` - конструируемый пользователь граф, в котором определяется последовательность действий над данными

`PCollection` - набор неизменяемых данных или поток неизменяемых данных, который преобразуется в рамках `Pipeline`

`PTransform` - операция преобразования данных, шаг в `Pipeline`. Применяется к набору `PCollection` и в результате получается набор `PCollection`.

На базовом уровне программа на `Beam` выглядит так:

- создать объект `Pipeline` и задать необходимые параметры 
- создать начальный набор данных `PCollection`, используя внешние хранилища или данные хранящиеся в памяти
- применять `PTransforms` к `PCollection`. `PTransforms` могут изменять, фильтровать, группировать, анализировать или иным образом обрабатывать элементы в `PCollection`. Преобразование создает новую `PCollection` без изменения исходной `PCollection`. Процессы преобразований не обязательно линейны, можно строить граф обработки произвольной сложности.
- запустить `Pipeline` используя какой-нибудь `Runner`. Для демонстрации в здесь будет использоваться `Direct Runner`

Создадим простой `Pipeline`. Весь процесс можно описать в виде простого `DSL`, который использует оператор `|`:

```bash
[Output PCollection] = [Input PCollection] | [Transform]
```

- `PCollection` в памяти из 10 чисел
- отфильтруем четные числа
- выведем их на экран

In [1]:
import apache_beam as beam
from typing import List, Tuple, Dict, Iterable, Iterator

with beam.Pipeline() as p:
    # p - Pipeline
    p | beam.Create(range(1, 11)) \
      | beam.Filter(lambda num: num % 2 == 0) \
      | beam.Map(print) 

2
4
6
8
10


Трансформации можно аннотировать с помощью оператора `<<`

In [2]:
with beam.Pipeline() as p:
    p | "Create" >> beam.Create(range(1, 11)) \
      | "Filter" >> beam.Filter(lambda num: num % 2 == 0) \
      | "Print" >> beam.Map(print) 


2
4
6
8
10


### ParDo

Базовое, определенное пользователем преобразование, можно создать с помощью наследования от класса `DoFn`. При этом на код, который реализует преобразования, накладываются определенные ограничения:

- они должны быть сериализуемы
- идемпотентны
- потокобезопасны

`ParDo` — это базовый `PTransform` для параллельной обработки данных.  `ParDo` рассматривает каждый элемент во входной `PCollection`, выполняет некоторую функцию обработки (пользовательский код, `DoFn`) над этим элементом и выдает ноль, один или несколько элементов в выходную `PCollection`.

ParDo может быть применен для множества распространенных операций обработки данных, в том числе:

- для фильтрации набора данных (аналог функции `filter` в стандартной библиотеки `Python` или других фреймворках для потоков обработки)
- для преобразование каждого элемента в наборе данных (аналог `map`)
- для выполнения вычислений и генерации новых данных (`flatMap`)

In [3]:
class BreakIntoWordsDoFn(beam.DoFn):
    def process(self, element: str) -> Iterator[str]:
        return element.split()
        #yield from element.split()

    
class LenDoFn(beam.DoFn):
    def process(self, word: str) -> Iterator[int]:
        return [len(word)]
    
class PrintFn(beam.DoFn):
    def process(self, entity) -> None:
        print(entity)
        return None
        
    
with beam.Pipeline() as p:
    words = p | "Text lines"  >> beam.Create(["home", "sweet home"]) \
              | beam.ParDo(BreakIntoWordsDoFn()) 
    
    out = words | "Print words" >> beam.ParDo(PrintFn())

    out_len = words | beam.ParDo(LenDoFn()) \
                    | "Print lengths" >> beam.ParDo(PrintFn())                

home
4
sweet
5
home
4



   Иногда, в случае простых преобразований, проще использовать простые lambda-функции и преобразования:
   - `Filter`
   - `Map`
   - `FlatMap`


In [4]:
with beam.Pipeline() as p:
    words = p | "Text lines"  >> beam.Create(["home ", "sweet home"]) \
              | beam.FlatMap(lambda text: text.split()) 
    
    out = words | "Print words" >> beam.Map(print)

    out_len = words | beam.Map(lambda x: len(x)) \
                    | "Print lengths" >> beam.Map(print)    

home
4
sweet
5
home
4


### GroupByKey
`GroupByKey` — это преобразование для обработки коллекций вида пар ключ/значение. Это параллельная операция свертки. Входные данные для `GroupByKey` — это коллекция пар ключ/значение, представляющая словарь, который может содержать несколько пар с одинаковым ключом, но разными значениями.  `GroupByKey` можно использовать для обработки значений, связанных с каждым уникальным ключом.

In [5]:
data = [("привет", 5), ("мир", 1), ("привет", 4)]

with beam.Pipeline() as p:
    input = p | beam.Create(data) \
              | beam.GroupByKey() \
              | beam.Map(print)

('привет', [5, 4])
('мир', [1])


### CoGroupByKey

`CoGroupByKey` позволяет группировать по ключам сразу несколько `PCollection`

In [6]:
word_documents_list = [
    ('привет', [1, 2, 3]),
    ('привет', [4, 5, 6]),
    ('мир', [1, 3]),
]

word_freq_list = [
    ('привет', 8),
    ('мир', 10),
]


with beam.Pipeline() as p:
    word_documents = p | "Create wd" >> beam.Create(word_documents_list)
    word_freq = p | "Create freq" >> beam.Create(word_freq_list)

    results = {'docs': word_documents, 'freqs': word_freq} \
                | beam.CoGroupByKey() \
                | beam.Map(print)

('привет', {'docs': [[1, 2, 3], [4, 5, 6]], 'freqs': [8]})
('мир', {'docs': [[1, 3]], 'freqs': [10]})


### Combine

Позволяет свертывать значения, аналог сверток.

In [7]:

with beam.Pipeline() as p:
    p | beam.Create(range(1, 5)) \
      | beam.CombineGlobally(sum) \
      | beam.Map(print)      

10


Можно реализовать собственный класс для неассоциативных операций:

In [8]:

class MeanFn(beam.CombineFn):
    def create_accumulator(self):
        return (0.0, 0)

    def add_input(self, state: Tuple[float, int], input: float) -> Tuple[float, int]:
        (sum, count) = state
        return sum + input, count + 1

    def merge_accumulators(self, accumulators: List[Tuple[float, int]]) -> Tuple[float, int]:
        sums, counts = zip(*accumulators)
        return sum(sums), sum(counts)

    def extract_output(self, state: Tuple[float, int]) -> float:
        (sum, count) = state
        return sum / count if count else float('NaN')
    
with beam.Pipeline() as p:
    p | beam.Create(range(1, 6)) \
      | beam.CombineGlobally(MeanFn()) \
      | beam.Map(print)     
        

3.0


Можно делать свертку по ключу

In [9]:
word_freq_list = [
    ('привет', 8),
    ('мир', 10),
    ('привет', 10),
]

with beam.Pipeline() as p:
    p | beam.Create(word_freq_list) \
      | beam.CombinePerKey(MeanFn()) \
      | beam.Map(print)  

('привет', 9.0)
('мир', 10.0)


### Flatten

Позволяет объединить несколько `PCollection` в одно

In [10]:
with beam.Pipeline() as p:
    p_col1 = p | "PCol 1" >> beam.Create(range(1, 5))
    p_col2 = p | "PCol 2" >> beam.Create(range(10, 15))

    (p_col1, p_col2) \
      | beam.Flatten() \
      | beam.Map(print)  

1
2
3
4
10
11
12
13
14


### Partition
Позволяет разделить один `PCollection` на несколько

In [11]:
with beam.Pipeline() as p:
    parts = p | beam.Create(range(0, 15)) \
              | beam.Partition(lambda x, t: x % t, 3) 
       
    parts[0] | beam.Map(print)   

0
3
6
9
12


### Состояния

Можно поддерживать состояния для подсчета статистики

In [12]:
from apache_beam.transforms.userstate import ReadModifyWriteStateSpec

class StateDoFn(beam.DoFn):
  STATE_SPEC = ReadModifyWriteStateSpec('num_elements', beam.coders.VarIntCoder())

  def process(self, element: Tuple[str, int], state=beam.DoFn.StateParam(STATE_SPEC)):
    current_value = state.read() or 0
    state.write(current_value + element[1])
    return [state.read()]

with beam.Pipeline() as p:
    parts = p | beam.Create([("привет", 1), ("мир", 2), ("привет", 7)]) \
              | beam.ParDo(StateDoFn()) \
              | beam.Map(print)   

1
2
8


In [13]:
def filter_func(el: int, avg_param: int) -> bool:
    return el < avg_param

with beam.Pipeline() as p:
    inp = p | beam.Create(range(15)) 
            
    avg = inp | beam.CombineGlobally(MeanFn()) 

    inp | beam.Filter(filter_func, avg_param=beam.pvalue.AsSingleton(avg)) \
        | beam.Map(print)     

0
1
2
3
4
5
6


### Поддержка пакетных операций

Для оптимизации кода можно применять обработку данных с помощью `numpy`

In [14]:
import numpy as np


class Square(beam.DoFn):
    def process_batch(self, batch: np.ndarray) -> Iterator[np.ndarray]:
        yield batch ** 2

    def infer_output_type(self, input_element_type):
        return input_element_type

with beam.Pipeline() as p:
    p | beam.Create(range(20)).with_output_types(np.int64) \
      | beam.ParDo(Square()) \
      | beam.Map(print)
    
# with beam.Pipeline() as p:
#     p | beam.Create(range(20)) \
#       | beam.Map(lambda x: x ** 2) \
#       | beam.Map(print)   

0
1
4
9
16
25
36
49
64
81
100
121
144
169
196
225
256
289
324
361


In [15]:
from apache_beam.dataframe.io import read_csv
from apache_beam.dataframe.convert import to_pcollection, to_dataframe


with beam.Pipeline() as p:
    currencies = p | read_csv("data/currencies.csv")

    agg = currencies.groupby("cur").val.mean()
    
    to_pcollection(agg) | beam.Map(print)

60.152263636363635
70.50923636363636
56.536663636363635


Естественно можно использовать SQL

In [None]:
from apache_beam.transforms.sql import SqlTransform

with beam.Pipeline() as p:
    currencies = p | read_csv("data/currencies.csv") \
                   | SqlTransform("""SELECT cur, AVG(val) FROM PCOLLECTION GROUP BY cur""") \
                   | beam.Map(print)



Можно создавать DataFrame на лету

In [17]:

with beam.Pipeline() as p:
    parts = p | beam.Create([("привет", 1), ("мир", 2), ("привет", 7)]) \
              | beam.Map(lambda pair: beam.Row(word=pair[0], count=pair[1])) 
    df = to_dataframe(parts) 
    agg = df.groupby("word").sum()
    to_pcollection(agg) | beam.Map(print)
      

BeamSchema_ba49671c_0c24_432e_8347_1d475a7bf9b5(count=2)
BeamSchema_ba49671c_0c24_432e_8347_1d475a7bf9b5(count=8)


Потоковые данные

In [18]:
import re 
import time

from apache_beam.transforms import window

class PrintFn(beam.DoFn):
    def process(
        self,
        element: Tuple[str, int],
        timestamp=beam.DoFn.TimestampParam,
        window=beam.DoFn.WindowParam
    ):  
        print(element)
        yield element

class ChangeTimestamp(beam.DoFn):
    def process(
        self,
        element: str,
    ): 
        timestamp = int(time.time())
        yield beam.window.TimestampedValue(element, timestamp)



with beam.Pipeline() as p:
    messages =(p | beam.Create([b"hello world", b"hello hello york"]) #beam.io.ReadFromPubSub(subscription="some_channel").with_output_types(bytes) \
                 | "Decode" >> beam.Map(lambda x: x.decode('utf-8')) 
                 | "Add time" >> beam.ParDo(ChangeTimestamp())
                 | "Split"  >> beam.FlatMap(lambda line: re.findall(r"\w+", line)) 
                 | beam.Map(lambda x: (x, 1)) 
                 | beam.WindowInto(window.FixedWindows(15, 0)) 
                 | beam.GroupByKey() 
                 | beam.Map(lambda kv: (kv[0], sum(kv[1]))) 
                 | beam.ParDo(PrintFn())) 

('hello', 3)
('world', 1)
('york', 1)


Пример с подсчетом tf-idf

In [19]:
import math


docs = [
    "человек лев орел черепаха человек", 
    "лев вол орел",
    "лев черепаха лев кошка",
    "жучка кошка мышка",
    "лев орел грифон"
]


with beam.Pipeline() as p:
    id_to_content = p | beam.Create(enumerate(docs))

    total_documents = (
        id_to_content
        | beam.Keys()
        | beam.Distinct()
        | beam.combiners.Count.Globally()
    )

    id_to_words = (
        id_to_content
        | "SplitWords" >> beam.FlatMap(lambda id_and_doc: [(id_and_doc[0], word) for word in re.findall(r"\w+", id_and_doc[1])])
    )

    word_to_doc_count = (
        id_to_words
        | "GetUniqueWordsPerDoc" >> beam.Distinct()
        | "GetWords" >> beam.Values()
        | "CountDocsPerWord" >> beam.combiners.Count.PerElement()
    )

    id_to_word_total = (
        id_to_words
        | "GetIds" >> beam.Keys()
        | "CountWordsInDoc" >> beam.combiners.Count.PerElement())


    id_and_word_to_count = (
        id_to_words
        | "CountWord-DocPairs" >> beam.combiners.Count.PerElement())

    id_to_word_and_count = (
        id_and_word_to_count
        | "ShiftKeys" >> beam.Map(lambda id_word_count: (id_word_count[0][0], (id_word_count[0][1], id_word_count[1])))
    )

    id_to_word_and_count_and_total = ({
         "word totals": id_to_word_total, 
         "word counts": id_to_word_and_count
         } | "CoGroupByUri" >> beam.CoGroupByKey())

    
    def compute_term_frequency(id_count_and_total: Tuple[int, Dict]) -> Iterator[Tuple[str, Tuple[int, float]]]:
        (id, count_and_total) = id_count_and_total
        word_and_count = count_and_total["word counts"]

        [word_total] = count_and_total["word totals"]
        for word, count in word_and_count:
            yield word, (id, float(count) / word_total)

    word_to_id_and_tf = (
        id_to_word_and_count_and_total
        | "ComputeTermFrequencies" >> beam.FlatMap(compute_term_frequency))

    word_to_df = (
        word_to_doc_count
        | "ComputeDocFrequencies" >> beam.Map(lambda word_and_count, total: (word_and_count[0], float(word_and_count[1]) / total), 
                                              beam.pvalue.AsSingleton(total_documents))
    )

    word_to_id_and_tf_and_df = ({
        "tf": word_to_id_and_tf, "df": word_to_df
    } | "CoGroupWordsByTf-df" >> beam.CoGroupByKey())

    def compute_tf_idf(word_tf_and_df: Tuple[str, Dict]) -> Iterator[Tuple[str, Tuple[int, float]]]:
      (word, tf_and_df) = word_tf_and_df
      [docf] = tf_and_df["df"]
      for id, tf in tf_and_df["tf"]:
        yield word, (id, tf * math.log(1 / docf))

    word_to_id_and_tfidf = (
        word_to_id_and_tf_and_df
        | "ComputeTf-idf" >> beam.FlatMap(compute_tf_idf)
    )

    word_to_id_and_tfidf | beam.Map(print)

('человек', (0, 0.6437751649736402))
('лев', (0, 0.04462871026284196))
('лев', (1, 0.07438118377140325))
('лев', (2, 0.11157177565710488))
('лев', (4, 0.07438118377140325))
('орел', (0, 0.10216512475319815))
('орел', (1, 0.1702752079219969))
('орел', (4, 0.1702752079219969))
('черепаха', (0, 0.18325814637483104))
('черепаха', (2, 0.22907268296853878))
('вол', (1, 0.5364793041447))
('кошка', (2, 0.22907268296853878))
('кошка', (3, 0.3054302439580517))
('жучка', (3, 0.5364793041447))
('мышка', (3, 0.5364793041447))
('грифон', (4, 0.5364793041447))
