# SumAndPivot Combiner with Unit Test + VoLTE Streaming Snippet

This custom Combiner is neededd to aggregate and pivot counters from key value pairs that are obtined from the source. <b>Why it is important?</b> Apache Beam provides a bunch of scallable built-in combiners to handle effective processing of large PCollections in distributed manner, but pivot transform is not supported in Apache Beam. Current implementation in existing pipelines in our project is based on UDF and it is not scallable enough as it does not match distributed calculation patterns of Apache Beam. Implemented below custom combiner class is an extention of <b>beam.CombineFn</b> class providing scallable and distributed manner of combining large PCollections in flexible and distributed manner.

Questions? Let me know: bartosz.tertil@external.telekom.de

### Imports

In [1]:
import datetime
from zoneinfo import ZoneInfo

import logging
import json
import typing

import apache_beam as beam
from apache_beam import Create, Map, ParDo
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from apache_beam.runners.interactive import interactive_beam as ib
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.combiners import CountCombineFn
from apache_beam.options.pipeline_options import StandardOptions


### Test data

In [2]:
sample_data = {
    "sample": [
        {
        "element_id": "AAA",
        "name": "C1",
        "value": 1,
        "timestamp": 1624026749,
        },
        {
        "element_id": "AAA",
        "name": "C1",
        "value": 7,
        "timestamp": 1624026749,
        },
        {
        "element_id": "AAA",
        "name": "C1",
        "value": 12,
        "timestamp": 1624026749,
        },
        {
        "element_id": "AAA",
        "name": "C1",
        "value": 50,
        "timestamp": 1624026749,
        },
        {
        "element_id": "AAA",
        "name": "C2",
        "value": 27,
        "timestamp": 1624026749,
        },
        {
        "element_id": "AAA",
        "name": "C3",
        "value": 105,
        "timestamp": 1624026749,
        },
        {
        "element_id": "BBB",
        "name": "C1",
        "value": 22,
        "timestamp": 1624026749,
        },
        {
        "element_id": "BBB",
        "name": "C2",
        "value": 24,
        "timestamp": 1624026749,
        },
        {
        "element_id": "BBB",
        "name": "C4",
        "value": 95,
        "timestamp": 1624026749,
        },
        
        {
        "element_id": "AAA",
        "name": "C1",
        "value": 21,
        "timestamp": 1624050708,
        },
        {
        "element_id": "AAA",
        "name": "C2",
        "value": 6,
        "timestamp": 1624050708,
        },
        {
        "element_id": "AAA",
        "name": "C3",
        "value": 98,
        "timestamp": 1624050708,
        },
        {
        "element_id": "BBB",
        "name": "C1",
        "value": 112,
        "timestamp": 1624050708,
        },
        {
        "element_id": "BBB",
        "name": "C2",
        "value": 127,
        "timestamp": 1624050708,
        },
        {
        "element_id": "BBB",
        "name": "C4",
        "value": 115,
        "timestamp": 1624050708,
        }]
}

In [3]:
sample_data.get("sample")

[{'element_id': 'AAA', 'name': 'C1', 'value': 1, 'timestamp': 1624026749},
 {'element_id': 'AAA', 'name': 'C1', 'value': 7, 'timestamp': 1624026749},
 {'element_id': 'AAA', 'name': 'C1', 'value': 12, 'timestamp': 1624026749},
 {'element_id': 'AAA', 'name': 'C1', 'value': 50, 'timestamp': 1624026749},
 {'element_id': 'AAA', 'name': 'C2', 'value': 27, 'timestamp': 1624026749},
 {'element_id': 'AAA', 'name': 'C3', 'value': 105, 'timestamp': 1624026749},
 {'element_id': 'BBB', 'name': 'C1', 'value': 22, 'timestamp': 1624026749},
 {'element_id': 'BBB', 'name': 'C2', 'value': 24, 'timestamp': 1624026749},
 {'element_id': 'BBB', 'name': 'C4', 'value': 95, 'timestamp': 1624026749},
 {'element_id': 'AAA', 'name': 'C1', 'value': 21, 'timestamp': 1624050708},
 {'element_id': 'AAA', 'name': 'C2', 'value': 6, 'timestamp': 1624050708},
 {'element_id': 'AAA', 'name': 'C3', 'value': 98, 'timestamp': 1624050708},
 {'element_id': 'BBB', 'name': 'C1', 'value': 112, 'timestamp': 1624050708},
 {'element_id

In [4]:
class TestDataSchema(typing.NamedTuple):
    element_id: str
    name: str
    value: int
    timestamp: str

In [5]:
beam.coders.registry.register_coder(TestDataSchema, beam.coders.RowCoder)

## Custom Combiner: CombineAndPivotFn

Needed documentation:
* beam documentation page: https://beam.apache.org/documentation/programming-guide/#combine
* parent class source code: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/core.py#L1052

Methods that needs to be overridden:
* <b>CombineFn.create_accumulator()</b>: This creates an empty accumulator. For example, an empty accumulator for a sum would be 0, while an empty accumulator for a product (multiplication) would be 1.
* <b>CombineFn.add_input()</b>: Called once per element. Takes an accumulator and an input element, combines them and returns the updated accumulator.
* <b>CombineFn.merge_accumulators()</b>: Multiple accumulators could be processed in parallel, so this function helps merging them into a single accumulator.
* <b>CombineFn.extract_output()</b>: It allows to do additional calculations before extracting a result.

#### Example of MeanCombineFn combiner from Apache Beam (source code): 

## SumAndPivotFn Class - Implementation

#### Class implementation:

In [6]:
from apache_beam.typehints import with_output_types
from typing import Dict

@with_output_types(Dict[str, int])
class SumAndPivotFn(beam.CombineFn):
    """CombineFn for summing with pivoting"""
    
    def create_accumulator(self):
        return {}

    # Acc to new local acc + update per element. Executes for EVERY single element/input (not groups of elements!)
    def add_input(self, acc, element):
        new_dict = acc
        new_dict[element.name] = new_dict.get(element.name, 0) + element.value
        return new_dict 

    # combine/merge list of local accumulators into a global acc
    def merge_accumulators(self, accumulators):
        new_acc = {}
        for a in accumulators:
            for k, v in a.items():
                new_acc[k] = new_acc.get(k, 0) + v
        return new_acc

    # emit the final result here
    def extract_output(self, acc):
        return acc

#### Fixing rows with timestamp zone set to Europe/Berlin

In [7]:
class FormatOutputsAfterCombine(beam.DoFn):
    def process(self, element):
        import datetime
        from zoneinfo import ZoneInfo
        result = {
            "element_id": element[0].element_id,
            "timestamp": element[0].timestamp,
            "timestamp_utc": datetime.datetime.fromtimestamp(element[0].timestamp),
            "timestamp_local": datetime.datetime.fromtimestamp(element[0].timestamp, ZoneInfo("Europe/Berlin")),
            **element[1]
        }
        yield result

### Pipeline + Results

In [8]:
options = PipelineOptions(runner='InteractiveRunner')

In [9]:
with beam.Pipeline(options=options) as p:
    data = (p
        | 'ReadFromJSON' >> beam.Create(sample_data.get("sample"))
        | 'AssignSchema' >> beam.Map(lambda x: TestDataSchema(**x)) 
        | 'TimestampedValues' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x.timestamp))
        | 'GroupBy' >> beam.GroupBy("element_id", "timestamp")
        | 'SumAndPivotFn' >> beam.CombineValues(SumAndPivotFn())
        | 'FormatOutputsAfterCombine' >> beam.ParDo(FormatOutputsAfterCombine())
    )



In [10]:
ib.show(data)

## Unit Testing

In [11]:
import unittest
import sys

from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import BeamAssertException
from apache_beam.testing.util import assert_that, equal_to

In [25]:
def run_test(out = sys.stdout, verbosity = 2):
    loader = unittest.TestLoader()
    suite = loader.loadTestsFromModule(sys.modules[__name__])
    unittest.TextTestRunner(out, verbosity = verbosity).run(suite)

### Testing Class

In [24]:
class SumAndPivotFnTest(unittest.TestCase):
    
    def test_check_pivot_and_sum(self):
        """
        Check if rows are corectly aggregated when there are more rows than expected
        """

        with TestPipeline() as p:

            expected_values = [
                {'C1': 70, 'C2': 27, 'C3': 105},
                {'C1': 22, 'C2': 24, 'C4': 95},
                {'C1': 21, 'C2': 6, 'C3': 98},
                {'C1': 112, 'C2': 127, 'C4': 115}]

            # pipeline previous steps 
            inputs = (p
                | 'ReadFromJSON' >> beam.Create(sample_data.get("sample"))
                | 'AssignSchema' >> beam.Map(lambda x: TestDataSchema(**x)) 
                | 'TimestampedValues' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x.timestamp))
                | 'GroupBy' >> beam.GroupBy("element_id", "timestamp")
            )

            # tested class
            output = inputs | beam.CombineValues(SumAndPivotFn()) | beam.Map(lambda x: x[1])
            assert_that(output, equal_to(expected_values))

In [26]:
run_test()

test_check_pivot_and_sum (__main__.SumAndPivotFnTest)
Check if rows are corectly aggregated when there are more rows than expected ... ok

----------------------------------------------------------------------
Ran 1 test in 1.109s

OK


## Snippet with window definition for streaming processing in VoLTE (do not run)
Related to Jira task: https://jira.telekom.de/browse/PICDAA-946

In [None]:
with beam.Pipeline(options=options) as p:
    data = (p
     | 'ReadFromPubSub' >> ...
     | 'AssignSchema' >> ...
     | 'AddEarlierSteps' >> ... # fix and normalize column names,
     | 'OptionallyRecalculate5MinutesSlots' >> ... # optionally recalculate timestamps into 5 minutes slots
         ...
     # and here IMORTANT steps for proper window settings:
     | 'TimestampedValues' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x.timestamp)) # overwrite DTS!
     | 'Fixed5MinutesWindows' >> beam.WindowInto(beam.window.FixedWindows(5*60),
             trigger=AfterWatermark(late=AfterCount(1)), # recalculate each time data arrives after watermark
             allowed_lateness=600, # wait 10 minutes for late data (needs tuning/validation)
             accumulation_mode=AccumulationMode.DISCARDING # only last window pane (latest recalculation per trigger)
             # accumulation_mode=AccumulationMode.ACCUMULATING # test on real data which will be better (it is related with aggregation in later steps)
            )

     # Then aggregation and pivot
     | 'GroupBy' >> beam.GroupBy("element_id", "timestamp")   # window will be alse added to group by by beam
     | 'SumAndPivotFn' >> beam.CombineValues(SumAndPivotFn())
     | 'FormatOutputsAfterCombine' >> beam.ParDo(FormatOutputsAfterCombine())

     # Other stuff: sink into BQ managed tables, save late data into GCS bucket, etc.
         ...
)