# *Advanced grouping and aggregations*

Let's start installing and importing Beam

In [None]:
%pip install -q apache-beam[interactive] --no-warn-conflicts

In case you get any error running the next cell, restart the runtime (either "*Runtime/Restart runtime*" in the top bar or *Ctrl+M*)

In [None]:
import apache_beam as beam
from apache_beam import pvalue
from apache_beam import Create, FlatMap, Map, ParDo, Filter, Flatten
from apache_beam import CombineGlobally, CombinePerKey
from apache_beam.transforms.combiners import Top, Mean, Count
from apache_beam import pvalue, window, WindowInto

import logging

from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

Some of the basic combiner functions are already built-in:

- **`Count`** takes a `PCollection` and outputs the amount of elements.  
- **`Top`** outputs the *n* largest/smallest of a `PCollection` given a comparison.  
- **`Mean`** outputs the arithmetic mean of a `PCollection`.

Combiners can aggregate using the whole `PCollection` or by key using methods:

- **`.Globally`** applies the combiner to the whole `PCollection`.
- **`.PerKey`** applies the combiner for each key-value in the `Pcollection`.

In [None]:
p = beam.Pipeline(InteractiveRunner())

elements = [
    {"country": "China", "population": 1389, "continent": "Asia"},
    {"country": "India", "population": 1311, "continent": "Asia"},
    {"country": "Japan", "population": 126, "continent": "Asia"},        
    {"country": "USA", "population": 331, "continent": "America"},
    {"country": "Ireland", "population": 5, "continent": "Europe"},
    {"country": "Indonesia", "population": 273, "continent": "Asia"},
    {"country": "Brazil", "population": 212, "continent": "America"},
    {"country": "Egypt", "population": 102, "continent": "Africa"},
    {"country": "Spain", "population": 47, "continent": "Europe"},
    {"country": "Ghana", "population": 31, "continent": "Africa"},
    {"country": "Australia", "population": 25, "continent": "Oceania"},
]

create = (p | "Create" >> Create(elements)
            | "Map Keys" >> Map(lambda x: (x['continent'], x['population'])))

element_count_total = create | "Total Count" >> Count.Globally()

element_count_grouped = create | "Count Per Key" >> Count.PerKey()

top_grouped = create | "Top" >> Top.PerKey(n=2) # We get the top 2

mean_grouped = create | "Mean" >> Mean.PerKey()


ib.show_graph(p)
ib.show(element_count_total, element_count_grouped, top_grouped, mean_grouped)

We can also create our own **Combiners** and apply them both `Globally` and `PerKey`

In [None]:
p = beam.Pipeline(InteractiveRunner())

elements = ["Lorem ipsum dolor sit amet. Consectetur adipiscing elit",
            "Sed eu velit nec sem vulputate loborti",
            "In lobortis augue vitae sagittis molestie. Mauris volutpat tortor non purus elementum",
            "Ut blandit massa et risus sollicitudin auctor"]

combine = (p | "Create" >> Create(elements)
             | "Join" >> CombineGlobally(lambda x: ". ".join(x)))

ib.show(combine)

In [None]:
p = beam.Pipeline(InteractiveRunner())

elements = [
            ("Latin", "Lorem ipsum dolor sit amet. Consectetur adipiscing elit. Sed eu velit nec sem vulputate loborti"),
            ("Latin", "In lobortis augue vitae sagittis molestie. Mauris volutpat tortor non purus elementum"),
            ("English", "But as the riper should by time decease"),
            ("English", "That thereby beauty's rose might never die"),
            ("English", "From fairest creatures we desire increase"),
            ("Spanish", "tiempo que vivía un hidalgo de los de lanza en astillero, awindow_pcdarga antigua"),
            ("Spanish", "En un lugar de la Mancha, de cuyo nombre no quiero acordarme, no ha mucho"),
]

combine_key = (p | "Create" >> Create(elements)
                 | "Join By Language" >> CombinePerKey(lambda x: ". ".join(x)))

ib.show(combine_key)

**Combiners** also work on a window basis

In [None]:
p = beam.Pipeline(InteractiveRunner())

scores = [
    {"player": "Marina", "score": 1000, "timestamp": 0},
    {"player": "Cristina", "score": 2000, "timestamp": 10},
    {"player": "Cristina", "score": 2000, "timestamp": 50},
    {"player": "Marina", "score": 3000, "timestamp": 110},
    {"player": "Juan", "score": 2000, "timestamp": 90},
    {"player": "Cristina", "score": 2000, "timestamp": 80},
    {"player": "Juan", "score": 1000, "timestamp": 100},      
]

create = (p | "Create" >> Create(scores)
            | "Add timestamps" >> Map(lambda x: window.TimestampedValue(x, x["timestamp"]))
            | "To KV" >> Map(lambda x: (x["player"], x["score"]))
          )

windowed = create | "FixedWindow" >> WindowInto(window.FixedWindows(60))
total_key = windowed | "Total Per Key" >> CombinePerKey(sum)

ib.show(total_key, include_window_info=True)

When using **windows** and **global combiners** we need to add `without_defaults`. This is because the default behaviour is to return a `PCollection` of one element for empty windows.



In [None]:
total = (windowed | Map(lambda x: x[1])
                  | "Total" >> CombineGlobally(sum).without_defaults())

ib.show(total, include_window_info=True)





---
Let's try now to create our own `Combiner`. We are going to try to make our copy of `Mean` (i.e., a `Combiner` that calculates the average).





In [None]:
p = beam.Pipeline(InteractiveRunner())

def average_fn(elements):
  # print(elements)
  list_elements = list(elements)
  return sum(list_elements)/len(list_elements)


average = (p | "Create" >> Create(range(1000))
             | CombineGlobally(average_fn))

ib.show(average)

We can see that output is wrong, the average of the first 100 non-negative integers is not 93.95. But why do we get that value?



In [None]:
sum(range(100)) / 100

We are going to need to use the combiner interface:

<details><summary>Solution</summary>
<p>

```
p = beam.Pipeline(InteractiveRunner())

class AverageFn(beam.CombineFn):
  def create_accumulator(self):
    sum = 0
    count = 0
    return sum, count

  def add_input(self, accumulator, input):
    return accumulator[0] + input, accumulator[1] + 1

  def merge_accumulators(self, accumulators):
    sums = [x[0] for x in accumulators]
    counts = [x[1] for x in accumulators]
    return (sum(sums), sum(counts))
  
  def extract_output(self, final_accumulator):
    if final_accumulator[1] != 0:
      return final_accumulator[0] / final_accumulator[1]
    else:
      pass


average = (p | "Create" >> Create(range(100))
             | CombineGlobally(AverageFn()))

ib.show(average)
```
</p>

### Streaming Example

We'll see this in Dataflow

In [None]:
p = beam.Pipeline(DataflowRunner(), options)

topic = "projects/pubsub-public-data/topics/taxirides-realtime"

def first_and_last(element):
    key = element[0]
    dictionaries = element[1]
    output_row = {}
    output_row["ride_id"] = key
    if len(dictionaries) == 2:
        for row in dictionaries:
            if row["ride_status"] == "dropoff":
                output_row["dropoff"] = row["timestamp"]
            if row["ride_status"] == "pickup":
                output_row["pickup"] = row["timestamp"]

        logging.info(f"Final row {output_row}")
        return output_row
    else:
        logging.warning(f"Length was {len(dictionaries)}")
        pass
      
            
pubsub = (p | "Read Topic" >> ReadFromPubSub(topic=topic)
            | "Json Loads" >> Map(json.loads)
            | "Filter" >> Filter(lambda x: x["ride_status"] != "enroute")
            | "Parse" >> Map(lambda x: (x["ride_id"], {"ride_status": x["ride_status"], "timestamp": x["timestamp"]})) # KV of ride id, dict
            | "Session window" >> WindowInto(window.Sessions(3600),
                                        trigger=trigger.Repeatedly(trigger.AfterCount(2)),
                                        accumulation_mode=trigger.AccumulationMode.DISCARDING
                                            )
            | "Combine" >> CombinePerKey(ToListCombineFn())
            | Map(first_and_last)
        )


p.run()