# 1. From MapReduce to Beam

* MapReduce is an architecture for the distributed processing of large quantities of data (originally for ranking online searches) proposed by Google in 2004 [1].
* It is no longer encouraged by Google since 2014, but the design can be found in many more modern architectures for distributed computing [2].
* It has 4 components **Mapper**, **Combiner**, **Partitioner**, **Reducer** [1].
<img src="public/images/MapReduce_Architecture.png" width="800" height="400">

There are 3 functions that are the basic components of **Apache Beam**:

1. [Map](https://beam.apache.org/documentation/transforms/python/elementwise/map/)
2. [FlatMap](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap/)
3. [ParDo](https://beam.apache.org/documentation/transforms/python/elementwise/pardo/)

People would often confuse them. And I hope, after this notebook, you can utilize them effectively.
***P.S. You can click on the keywords***

## 1.1 Map

This is the same as python's **Map** function for parallel processing. And it gauranttees **1 input 1 output**.

In [34]:
from multiprocessing import Pool
from test_funcs import square

numbers = [1, 2, 3, 4, 5]
with Pool(2) as pool:
    result = pool.map(square, numbers)
print(result)

[1, 4, 9, 16, 25]


In [35]:
import apache_beam as beam

def square(x):
    return x * x

with beam.Pipeline() as p:

  (p | beam.Create([1, 2, 3, 4, 5])
     | "Square" >> beam.Map(square)
     | beam.LogElements())

1
4
9
16
25


## 1.2 FlatMap

This is the same as Map, but you flatten the result. It allows **0 to infinite output per input**.

In [36]:
from multiprocessing import Pool
from test_funcs import split_string

strings = ["Hello World", "One potato, two potatoes, three potatoes, four. Five potatoes", ""]
with Pool(2) as pool:
    result = pool.map(split_string, strings)
    # Flatten The List
    result = [
        x
        for xs in result
        for x in xs
    ]
print(result)

['Hello', 'World', 'One', 'potato,', 'two', 'potatoes,', 'three', 'potatoes,', 'four.', 'Five', 'potatoes']


In [37]:
import apache_beam as beam
from test_funcs import split_string

def split_string(x):
    return x.split()

with beam.Pipeline() as p:

  (p | beam.Create(["Hello World", "One potato, two potatoes, three potatoes, four. Five potatoes", ""])
     | "split_string" >> beam.FlatMap(split_string)
     | beam.LogElements())

Hello
World
One
potato,
two
potatoes,
three
potatoes,
four.
Five
potatoes


### Comparison: Map

In [13]:
import apache_beam as beam

def split_string(x):
    return x.split()

with beam.Pipeline() as p:

  (p | beam.Create(["Hello World", "One potato, two potatoes, three potatoes, four. Five potatoes", ""])
     | "split_string" >> beam.Map(split_string)
     | beam.LogElements())

['Hello', 'World']
['One', 'potato,', 'two', 'potatoes,', 'three', 'potatoes,', 'four.', 'Five', 'potatoes']
[]


## 1.3 ParDo

***Mean*** Calculation is a good example to demonstrate the shortcoming of *FlatMap*

Since Mean(Mean(1, 2), Mean(2, 3, 4)) $\neq$ Mean(1, 2, 2, 3, 4), you need to be able to pass both the ***numerator*** and ***denominator***

Suppose we have to count the average number of *occurrences per document of words*:

### Version 1: Normal MapReduce Implementation
```
Method Map(String Document)
    words = Document.split()
    Emit(String word, Integer freq)  #('potato', 2), ('potato', 1)|('potato', 1)

Method Combine(String word, Integers [freq1, freq2, ...])
    for freq in [freq1, ...]:
        sum <- sum + freq
        cnt <- cnt + 1
    Emit(String word, [Integer sum, Integer cnt])  #('potato', (3, 2))|('potato', (1, 1))

# all the same *word* goes to the same Reduce function
Method Partition(String word, [Integer sum, Integer cnt])
    Emit(String word)

Method Reduce(Key String word, [(sum1, cnt1), (sum2, cnt2), ...]) #('potato', (3, 2), 'potato', (1, 1))
    for sum, cnt in [(sum1, cnt1)...]:
        sum_total <- sum_total + sum
        cnt_total <- cnt_total + cnt
    Emit(String word, sum_total / cnt_total)  
```

### Version 2: Put Combine in Map
```
Method Map(String Document)
    words = Document.split()
    for freq in [freq1, ...]:
        sum <- sum + freq
        cnt <- cnt + 1
    Emit(String word, Integer [Integer sum, Integer cnt])

# all the same *word* goes to the same Reduce function
Method Partition(String word, [Integer sum, Integer cnt])
    Emit(String word)

Method Reduce(Key String word, [(sum1, cnt1), (sum2, cnt2), ...])
    for sum, cnt in [(sum1, cnt1)...]:
        sum_total <- sum_total + sum
        cnt_total <- cnt_total + cnt
    Emit(String word, sum_total / cnt_total)
```

### Version 3: Map & Reduce -> Do, and build Partition into Them
```
Method ParDo(Key None, String Document)
    words = Document.split()
    for freq in [freq1, ...]:
        sum <- sum + freq
        cnt <- cnt + 1
    Emit(String word, Integer [Integer sum, Integer cnt])

Method ParDo(Key String word, [(sum1, cnt1), (sum2, cnt2), ...])
    for sum, cnt in [(sum1, cnt1)...]:
        sum_total <- sum_total + sum
        cnt_total <- cnt_total + cnt
    Emit(String word, sum_total / cnt_total)
```

### MapReduce Style in Apache Beam

In [15]:
import apache_beam as beam
from apache_beam.transforms.userstate import *

class defineKey(beam.DoFn):
    def process(self, element):
        key = element["key"]
        yield (key, element)

def get_occurrence_dict(lst):
    occurrence_dict = {}
    
    for item in lst:
        if item in occurrence_dict:
            occurrence_dict[item] += 1
        else:
            occurrence_dict[item] = 1
    
    return occurrence_dict

class Mapper(beam.DoFn):
    def process(self, element):
        key, data = element
        for row in data:
            value = row['value']
            occurrence_dict = get_occurrence_dict(value)

            for word, occur in occurrence_dict.items():
                yield (key, {"word": word, "occur": occur})

class Combiner(beam.DoFn):
    def process(self, element):
        
        frequency_dict = dict()
        document_dict = dict()
        
        key, data = element
        for row in data:
            frequency_dict[row['word']] = frequency_dict.get(row['word'], 0) + row['occur']
            document_dict[row['word']] = document_dict.get(row['word'], 0) + 1

        for key, value in frequency_dict.items():
            yield (key, {'freq': value, 'doc': document_dict[key]})

class Reducer(beam.DoFn):
    def process(self, element):
        key, data = element
        freq = 0
        doc = 0
        for d in data:
            freq += d['freq']
            doc += d['doc']
        yield (key, freq/doc)

### Mapper 

In [16]:
with beam.Pipeline() as p:
  (p | beam.Create([
          {"value": ['potato'], "key": 0},
          {"value": ['potato', 'tomato'], "key": 0}, 
          {"value": ['potato', 'tomato'], "key": 1},
          {"value": ['potato', 'tomato', 'tomato'], "key": 1},
          {"value": ['potato', 'potato', 'potato'], "key": 1}
      ])
     | "Define_Key" >> beam.ParDo(defineKey())
     | "Partitioner1" >> beam.GroupByKey()
     | "Mean_Mapper" >> beam.ParDo(Mapper())
     | beam.LogElements())

(0, {'word': 'potato', 'occur': 1})
(0, {'word': 'potato', 'occur': 1})
(0, {'word': 'tomato', 'occur': 1})
(1, {'word': 'potato', 'occur': 1})
(1, {'word': 'tomato', 'occur': 1})
(1, {'word': 'potato', 'occur': 1})
(1, {'word': 'tomato', 'occur': 2})
(1, {'word': 'potato', 'occur': 3})


### Combiner
Combine function exist to perform a local reduce operation first, if you are counting the word `the`, summing the `occur` and `doc` for **every article** on the internet is still a big task.\
`Combiner` exists to do a smaller scale total first to make the `Reducer`'s job easier [1]

In [17]:
with beam.Pipeline() as p:
  (p | beam.Create([
          {"value": ['potato'], "key": 0},
          {"value": ['potato', 'tomato'], "key": 0}, 
          {"value": ['potato', 'tomato'], "key": 1},
          {"value": ['potato', 'tomato', 'tomato'], "key": 1},
          {"value": ['potato', 'potato', 'potato'], "key": 1}
      ])
     | "Define_Key" >> beam.ParDo(defineKey())
     | "Partitioner1" >> beam.GroupByKey()
     | "Mean_Mapper" >> beam.ParDo(Mapper())
     | "Partitioner2" >> beam.GroupByKey()
     | "Mean_Combiner" >> beam.ParDo(Combiner())
     | beam.LogElements())

('potato', {'freq': 2, 'doc': 2})
('tomato', {'freq': 1, 'doc': 1})
('potato', {'freq': 5, 'doc': 3})
('tomato', {'freq': 3, 'doc': 2})


### Reducer

In [19]:
with beam.Pipeline() as p:
  (p | beam.Create([
          {"value": ['potato'], "key": 0},
          {"value": ['potato', 'tomato'], "key": 0}, 
          {"value": ['potato', 'tomato'], "key": 1},
          {"value": ['potato', 'tomato', 'tomato'], "key": 1},
          {"value": ['potato', 'potato', 'potato'], "key": 1}
      ])
     | "Define_Key" >> beam.ParDo(defineKey())
     | "Partitioner1" >> beam.GroupByKey()
     | "Mean_Mapper" >> beam.ParDo(Mapper())
     | "Partitioner2" >> beam.GroupByKey()
     | "Mean_Combiner" >> beam.ParDo(Combiner())
     | "Partitioner3" >> beam.GroupByKey()
     | "Mean_Reducer" >> beam.ParDo(Reducer())
     | beam.LogElements())

('potato', 1.4)
('tomato', 1.3333333333333333)


### Or just use default functions

Mean is a common aggregate function that is already implemented, and the source code is doing the exact same thing:
https://beam.apache.org/documentation/transforms/python/aggregation/mean/
<img src="public/images/Mean_of_Apache_Beam.png" width="800" height="400">

## 1.4 Conclusion:

1. Map == multiprocessing.pool.map()
2. FlatMap == flatten(multiprocessing.pool.map())
3. ParDo == **whole MapReduce**:
    * Allows Memory state (vs FlatMap)
    * Allows Partition by Key (vs FlatMap)
    * Hence, enable much more complex Algorithms, i.e. `Page Rank`, `Bellman–Ford`

# 2. Our Streaming Pipeline

## 2.1 App Usage

***TODO:*** Convert `time` and `last_time_used` to timestamp using `pd.to_datetime(?, "ms")` and rename `total_fg_time` to `foreground_time_(ms)`

In [22]:
import apache_beam as beam

class tranformations(beam.DoFn):
    def process(self, element):
        import pandas as pd 
        ### YOUR CODE HERE
        pass

with beam.Pipeline() as p:

  (p | beam.Create([
      {
        'time': 1524550039846,
        'package_name': 'com.google.android.youtube',
        'last_time_used': 1524438939878,
        'total_fg_time': 857910,
        'user_id': 'user_0'},
     {
        'time': 1524550039846,
        'package_name': 'com.samsung.android.app.galaxyfinder',
        'last_time_used': 1524438952278,
        'total_fg_time': 64158,
        'user_id': 'user_0'},
     {
        'time': 1524550039846,
        'package_name': 'com.samsung.android.incallui',
        'last_time_used': 1524480748673,
        'total_fg_time': 321040,
        'user_id': 'user_0'}])
     | "Transformations" >> beam.ParDo(tranformations())
     | beam.LogElements())



### Output:
{'time': Timestamp('2018-04-24 06:07:19.846000'), 'package_name': 'com.google.android.youtube', 'last_time_used': Timestamp('2018-04-22 23:15:39.878000'), 'foreground_time_(ms)': 857910, 'user_id': 'user_0'} \
{'time': Timestamp('2018-04-24 06:07:19.846000'), 'package_name': 'com.samsung.android.app.galaxyfinder', 'last_time_used': Timestamp('2018-04-22 23:15:52.278000'), 'foreground_time_(ms)': 64158, 'user_id': 'user_0'} \
{'time': Timestamp('2018-04-24 06:07:19.846000'), 'package_name': 'com.samsung.android.incallui', 'last_time_used': Timestamp('2018-04-23 10:52:28.673000'), 'foreground_time_(ms)': 321040, 'user_id': 'user_0'}

## 2.2 Locations

***TODO:*** Drop "acc", "alt", "bearing", "postime", and convert `time` to timestamp using `datetime.utcfromtimestamp`
(**NOTE: `utcfromtimestamp` uses *second*, but our data is in *milliseconds*)

In [20]:
import apache_beam as beam
from datetime import datetime

def transform(element):
    ### YOUR CODE HERE
    pass

with beam.Pipeline() as p:

  (p | beam.Create([{
        'time': 1524551239484,
        'lat': 43.7068094,
        'long': 10.4036635,
        'speed': 0.0,
        'acc': 400.0,
        'alt': 0.0,
        'bearing': 0.0,
        'postime': 1524551040134,
        'label': 'free_time',
        'place_type': 'route',
        'user_id': 'user_0'},
    {
        'time': 1524551539483,
        'lat': 43.7068094,
        'long': 10.4036635,
        'speed': 0.0,
        'acc': 400.0,
        'alt': 0.0,
        'bearing': 0.0,
        'postime': 1524551040134,
        'label': 'free_time',
        'place_type': 'route',
        'user_id': 'user_0'},
    {
        'time': 1524551839681,
        'lat': 43.716982,
        'long': 10.4026103,
        'speed': 0.0,
        'acc': 899.9989999999997,
        'alt': 0.0,
        'bearing': 0.0,
        'postime': 1524551654162,
        'label': 'free_time',
        'place_type': 'health',
        'user_id': 'user_0'}])
     | "Transformations" >> beam.Map(transform)
     | beam.LogElements())

None
None
None


### Output:
{'time': datetime.datetime(2018, 4, 24, 6, 27, 19, 484000), 'lat': 43.7068094, 'long': 10.4036635, 'speed': 0.0, 'label': 'free_time', 'place_type': 'route', 'user_id': 'user_0'} \
{'time': datetime.datetime(2018, 4, 24, 6, 32, 19, 483000), 'lat': 43.7068094, 'long': 10.4036635, 'speed': 0.0, 'label': 'free_time', 'place_type': 'route', 'user_id': 'user_0'} \
{'time': datetime.datetime(2018, 4, 24, 6, 37, 19, 681000), 'lat': 43.716982, 'long': 10.4026103, 'speed': 0.0, 'label': 'free_time', 'place_type': 'health', 'user_id': 'user_0'}

## 2.3 Calls

***TODO:*** Group the calls by `incoming`, `phone_number`, `registered` & `user_id` and count the number of rows

In [82]:
import apache_beam as beam
import json

def define_key(element):
    ### YOUR CODE HERE
    pass

def expand(element):
    ### YOUR CODE HERE
    pass

with beam.Pipeline() as p:
  (p | beam.Create([{
      'time': 1524549275699,
      'incoming': True,
      'phone_number': 'user_13',
      'registered': True,
      'user_id': 'user_0'},
     {
      'time': 1524549070890,
      'incoming': True,
      'phone_number': 'phone_958',
      'registered': True,
      'user_id': 'user_0'},
     {
      'time': 1524549030099,
      'incoming': True,
      'phone_number': 'phone_958',
      'registered': True,
      'user_id': 'user_0'},])
     | "Define_Key" >> ### YOUR CODE HERE
     | 'Count elements per key' >> beam.combiners.Count.PerKey()
     | 'Turn the key back to dictionary' >> ### YOUR CODE HERE
     | beam.LogElements())


{'incoming': True, 'phone_number': 'user_13', 'registered': True, 'user_id': 'user_0', 'count': 1}
{'incoming': True, 'phone_number': 'phone_958', 'registered': True, 'user_id': 'user_0', 'count': 2}


### Output:
{'incoming': True, 'phone_number': 'user_13', 'registered': True, 'user_id': 'user_0', 'count': 1} \
{'incoming': True, 'phone_number': 'phone_958', 'registered': True, 'user_id': 'user_0', 'count': 2}