# Windowing -- _Tour of Beam_

Sometimes, we want to [aggregate](https://beam.apache.org/documentation/transforms/python/overview/#aggregation) data, like `GroupByKey` or `Combine`, only at certain intervals, like hourly or daily, instead of processing the entire `PCollection` of data only once.

We might want to emit a [moving average](https://en.wikipedia.org/wiki/Moving_average) as we're processing data.

Maybe we want to analyze the user experience for a certain task in a web app, it would be nice to get the app events by sessions of activity.

Or we could be running a streaming pipeline, and there is no end to the data, so how can we aggregate data?

_Windows_ in Beam allow us to process only certain data intervals at a time.
In this notebook, we go through different ways of windowing our pipeline.


In [1]:
import apache_beam as beam

def human_readable_window(window) -> str:
    '''Formats a window object into a human readable string.'''
    if isinstance(window, beam.window.GlobalWindow):
        return str(window)
    return f"{window.start.to_utc_datetime()} - {window.end.to_utc_datetime()}"

class PrintElementInfo(beam.DoFn):
    """Prints an element with its Window information."""
    def process(self, element, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
        print(f"[{human_readable_window(window)}] {timestamp.to_utc_datetime()} -- {element}")
        yield element

@beam.ptransform_fn
def PrintWindowInfo(pcollection):
    """Prints the Window information with how many elements landed in that window."""
    class PrintCountsInfo(beam.DoFn):
        def process(self, num_elements, window=beam.DoFn.WindowParam):
            print(f">> Window [{human_readable_window(window)}] has {num_elements} elements.")
            yield num_elements

    return (
        pcollection 
        | "Count elements per window" >> beam.combiners.Count.Globally().without_defaults()
        | "Print counts info" >> beam.ParDo(PrintCountsInfo())
    )



### Creating some data to use in the examples.

In [5]:
import time
from apache_beam.options.pipeline_options import PipelineOptions

def to_unix_time(time_str: str, time_format='%Y-%m-%d %H:%M:%S') -> int:
    """Converts a time string to Unix time."""
    time_tuple = time.strptime(time_str, time_format)
    return int(time.mktime(time_tuple))

@beam.ptransform_fn
@beam.typehints.with_input_types(beam.pvalue.PBegin)
@beam.typehints.with_output_types(beam.window.TimestampedValue)
def AstronomicalEvents(pipeline):
    return(
        pipeline
        | "Create data" >> beam.Create([
          ('2021-03-20 03:37:00', 'March Equinox 2021'),
          ('2021-04-26 22:31:00', 'Super full moon'),
          ('2021-05-11 13:59:00', 'Micro new moon'),
          ('2021-05-26 06:13:00', 'Super full moon, total lunar eclipse'),
          ('2021-06-20 22:32:00', 'June Solstice 2021'),
          ('2021-08-22 07:01:00', 'Blue moon'),
          ('2021-09-22 14:21:00', 'September Equinox 2021'),
          ('2021-11-04 15:14:00', 'Super new moon'),
          ('2021-11-19 02:57:00', 'Micro full moon, partial lunar eclipse'),
          ('2021-12-04 01:43:00', 'Super new moon'),
          ('2021-12-18 10:35:00', 'Micro full moon'),
          ('2021-12-21 09:59:00', 'December Solstice 2021'),
      ])
      | "With timestamps" >> beam.MapTuple(
          lambda timestamp, element:
            beam.window.TimestampedValue(element, to_unix_time(timestamp))
      )
    )


# Lets see how the data looks like.
beam_options = PipelineOptions(flags=[], type_check_additional='all')
with beam.Pipeline(options=beam_options) as pipeline:
    (
        pipeline
        | "Astronomical events" >> AstronomicalEvents()
        | "Print element" >> beam.Map(print)
    )

March Equinox 2021
Super full moon
Micro new moon
Super full moon, total lunar eclipse
June Solstice 2021
Blue moon
September Equinox 2021
Super new moon
Micro full moon, partial lunar eclipse
Super new moon
Micro full moon
December Solstice 2021


### Fixed time windows

In [6]:
import apache_beam as beam
from datetime import timedelta

# Fixed-sized windows of approximately 3 months.
window_size = timedelta(days=3*30).total_seconds() # in seconds
print(f"window_size: {window_size} seconds")

with beam.Pipeline() as pipeline:
    elements = (
        pipeline
        | "Astronomical events" >> AstronomicalEvents()
        | "Fixed windows" >> beam.WindowInto(beam.window.FixedWindows(window_size))
        | "Print element info" >> beam.ParDo(PrintElementInfo())
        | "Print window info" >> PrintWindowInfo()
    )

window_size: 7776000.0 seconds
[2021-01-03 00:00:00 - 2021-04-03 00:00:00] 2021-03-20 02:37:00 -- March Equinox 2021
[2021-04-03 00:00:00 - 2021-07-02 00:00:00] 2021-04-26 21:31:00 -- Super full moon
[2021-04-03 00:00:00 - 2021-07-02 00:00:00] 2021-05-11 12:59:00 -- Micro new moon
[2021-04-03 00:00:00 - 2021-07-02 00:00:00] 2021-05-26 05:13:00 -- Super full moon, total lunar eclipse
[2021-04-03 00:00:00 - 2021-07-02 00:00:00] 2021-06-20 21:32:00 -- June Solstice 2021
[2021-07-02 00:00:00 - 2021-09-30 00:00:00] 2021-08-22 06:01:00 -- Blue moon
[2021-07-02 00:00:00 - 2021-09-30 00:00:00] 2021-09-22 13:21:00 -- September Equinox 2021
[2021-09-30 00:00:00 - 2021-12-29 00:00:00] 2021-11-04 14:14:00 -- Super new moon
[2021-09-30 00:00:00 - 2021-12-29 00:00:00] 2021-11-19 01:57:00 -- Micro full moon, partial lunar eclipse
[2021-09-30 00:00:00 - 2021-12-29 00:00:00] 2021-12-04 00:43:00 -- Super new moon
[2021-09-30 00:00:00 - 2021-12-29 00:00:00] 2021-12-18 09:35:00 -- Micro full moon
[2021-09

### Sliding time windows

In [7]:
import apache_beam as beam
from datetime import timedelta

# Sliding windows of approximately 3 months every month.
window_size = timedelta(days=3*30).total_seconds() # in seconds
window_period = timedelta(days=30).total_seconds() # in seconds
print(f"window_size: {window_size} seconds")
print(f"window_period: {window_period} seconds")

with beam.Pipeline() as pipeline:
    (
        pipeline
        | "Astronomical events" >> AstronomicalEvents()
        | "Sliding windows" >> beam.WindowInto(
            beam.window.SlidingWindows(window_size, window_period)
        )
        | "Print element info" >> beam.ParDo(PrintElementInfo())
        | "Print window info" >> PrintWindowInfo()
    )


window_size: 7776000.0 seconds
window_period: 2592000.0 seconds
[2021-03-04 00:00:00 - 2021-06-02 00:00:00] 2021-03-20 02:37:00 -- March Equinox 2021
[2021-02-02 00:00:00 - 2021-05-03 00:00:00] 2021-03-20 02:37:00 -- March Equinox 2021
[2021-01-03 00:00:00 - 2021-04-03 00:00:00] 2021-03-20 02:37:00 -- March Equinox 2021
[2021-04-03 00:00:00 - 2021-07-02 00:00:00] 2021-04-26 21:31:00 -- Super full moon
[2021-03-04 00:00:00 - 2021-06-02 00:00:00] 2021-04-26 21:31:00 -- Super full moon
[2021-02-02 00:00:00 - 2021-05-03 00:00:00] 2021-04-26 21:31:00 -- Super full moon
[2021-05-03 00:00:00 - 2021-08-01 00:00:00] 2021-05-11 12:59:00 -- Micro new moon
[2021-04-03 00:00:00 - 2021-07-02 00:00:00] 2021-05-11 12:59:00 -- Micro new moon
[2021-03-04 00:00:00 - 2021-06-02 00:00:00] 2021-05-11 12:59:00 -- Micro new moon
[2021-05-03 00:00:00 - 2021-08-01 00:00:00] 2021-05-26 05:13:00 -- Super full moon, total lunar eclipse
[2021-04-03 00:00:00 - 2021-07-02 00:00:00] 2021-05-26 05:13:00 -- Super full m

### Session Windows

Sessions allow us to create those kinds of windows. We now have to specify a gap size in seconds, which is the maximum number of seconds of inactivity to close a session window.

For example, if we specify a gap size of 30 days. The first event would open a new session window since there are no already opened windows. If the next event happens within the next 30 days or less, like 20 days after the previous event, the session window extends and covers that as well. If there are no new events for the next 30 days, the session window closes and is emitted.

In [None]:
import apache_beam as beam
from datetime import timedelta

# Sessions divided by approximately 1 month gaps.
gap_size = timedelta(days=30).total_seconds() # in seconds
print(f"gap_size: {gap_size} seconds")

with beam.Pipeline() as pipeline:
    (
        pipeline
        | "Astronomical events" >> AstronomicalEvents()
        | "Session windows" >> beam.WindowInto(beam.window.Sessions(gap_size))
        | "Print element info" >> beam.ParDo(PrintElementInfo())
        | "Print window info" >> PrintWindowInfo()
    )

gap_size: 2592000.0 seconds
