# Apache Beam

This will include all you need to know to start building your own pipeline with apache beam.


### Basic concepts

Apache Beam is a multi-language framework (Python, Java, Go, ...) that helps you build and test your pipeline.
Apache Beam is very portable, meaning that pipelines built with Beam can be run on multiple back-ends including Dataflow(Google Cloud Platform), Spark and even your Local machine (DirectRunner). In the Beam parlance, these back-ends are called *Runner*.

Beam stands for *B*atch and Str*eam* which means that Apache Beam supports both Batch and Streaming processing. This is one of the greatest advantages of using Beam, the same code can be used for both batch and online processing and run on top of multiple back-ends.

That being said, let's dive into components that make up a Beam pipeline:

- **Pipeline:** A Pipeline encapsulates your entire data processing task, from start to finish. This includes reading input data, transforming that data, and writing output data.

- **PCollection:** A PCollection represents basically the input and the output for each step in your pipeline. As Beam supports both batch and streaming, the data set can be bounded, meaning it comes from a fixed source like a file, or unbounded, meaning it comes from a continuously updating source.

- **PTransform:** A PTransform represents a data processing operation, or a step, in your pipeline. Every PTransform takes one or more PCollection objects as input, performs a processing function that you provide on the elements of that PCollection, and produces zero or more output PCollection objects.

### Create a Pipeline

To create a Beam Pipeline, you'll first have to create an instance of the Beam SDK class **Pipeline** and set some **configuration options.** 

In [None]:
import apache_beam as beam
 argv = [
        '--project={0}'.format(project),
        '--job_name=...',
        '--save_main_session',
        '--staging_location=gs://{0}/...'.format(bucket),
        '--temp_location=gs://{0}/.../temp/'.format(bucket),
        '--setup_file=./setup.py',
        '--autoscaling_algorithm=THROUGHPUT_BASED',
        '--max_num_workers=8',
        '--region={}'.format(region),
        '--runner=DataflowRunner'
    ]

with beam.Pipeline(argv=argv) as pipeline:
  pass  # build your pipeline here



### Create a PCollection

The most common way to create a PCollection is to read data from an external source. Each external source has an I/O adapter that has a **Read** Transform. To read data you should apply that transform to the **Pipeline object** itself.

Some Characteristics of PCollection elements

- They should have the same type, which can be anything.
- They are 'immutable'.
- They can be bounded or unbounded.
- They are associated with a timestamp

In [None]:
# Read from Text
lines = pipeline | 'ReadMyFile' >> beam.io.ReadFromText(
    'gs://some/inputData.txt')

# Read from BigQuery
flights = pipeline | 'flights:read' >> beam.io.ReadFromBigQuery(
                    query='SELECT * FROM dsongcp.flights', use_standard_sql=True)

### PTransforms:

They require you to provide a processing logic in a form of a function (called user code) that is applied to each element of a PCollection. 
The graph bellow reminds us the immutability of PCollections - the same PCollection (Table rows)  is used as an input for two PTransforms at the same time. 

<img align="center" src="./PColl_is_immutable.png" style=" width:500px; padding: 10px; " >


In [None]:
[PCollection of database table rows] = [Database Table Reader] | [Read Transform]
[PCollection of 'A' names] = [PCollection of database table rows] | [Transform A]
[PCollection of 'B' names] = [PCollection of database table rows] | [Transform B]

#### Core Beam transforms

##### **ParDo**: 

ParDo is the Beam transformation for generic parallel processing. It takes each element in an input PCollection, performs some processing units on that element, and emits zero, one or multiple element to an output PCollection. It's usefull for tasks such as:

- **Filtering a data set**
- **Formatting or type-converting each element in a data set.**
- **Extracting parts of each element in a data set**: for example you might want to discard some field in your dataset and keep only those that are relevant or important for your use case.
- **Performing computations on each element in a data set**: for example prediction on each element of your Batch set.

ParDo needs a DoFn object which contains the core transformation that your data will undergo. The **process** method is the one where those transformations are coded.

**Map** (One To One mapping) and **FlatMap** (One To Multiple mapping) are examples of ParDo processing.

In [None]:
# The input PCollection of Strings.
words = ...

# The DoFn to perform on each element in the input PCollection.

class ComputeWordLengthFn(beam.DoFn):
  def process(self, element):
    return [len(element)]



# Apply a ParDo to the PCollection "words" to compute lengths for each word.
word_lengths = words | beam.ParDo(ComputeWordLengthFn())

##### **GroupByKey**

The Beam transformation for collections of Key/Value pairs. It's best suited for aggregating data that has something (the key) in common.
When applied to a collection, GroupByKey returns a pair of variables in the following format: 

*Key*, *[ List of values ]*

Note that if you're trying to use GroupByKey on unbounded PCollection,Beam won't like it. It will kindly throw at you an IllegalStateException error at pipeline construction time. This is because GroupByKey needs to collect all the data for a particular key before grouping them together, while unbounded PCollections are by definition not limited. It worth mentioning that GroupByKey is applied at the same level as your windowing strategies.



In [None]:
# The input PCollection of (`string`, `int`) tuples.
words_and_counts = ...


grouped_words = words_and_counts | beam.GroupByKey()

##### **CoGroupByKey**

Used to join collections that have the same *Key*.

*In the Beam SDK for Python, CoGroupByKey accepts a dictionary of keyed PCollections as input. As output, CoGroupByKey creates a single output PCollection that contains one key/value tuple for each key in the input PCollections. Each key’s value is a dictionary that maps each tag to an iterable of the values under they key in the corresponding PCollection.*

In [None]:
emails_list = [
    ('amy', 'amy@example.com'),
    ('carl', 'carl@example.com'),
    ('julia', 'julia@example.com'),
    ('carl', 'carl@email.com'),
]
phones_list = [
    ('amy', '111-222-3333'),
    ('james', '222-333-4444'),
    ('amy', '333-444-5555'),
    ('carl', '444-555-6666'),
]

emails = p | 'CreateEmails' >> beam.Create(emails_list)
phones = p | 'CreatePhones' >> beam.Create(phones_list)

----------------------------------------------------------------------
results = [
    (
        'amy',
        {
            'emails': ['amy@example.com'],
            'phones': ['111-222-3333', '333-444-5555']
        }),
    (
        'carl',
        {
            'emails': ['carl@email.com', 'carl@example.com'],
            'phones': ['444-555-6666']
        }),
    ('james', {
        'emails': [], 'phones': ['222-333-4444']
    }),
    ('julia', {
        'emails': ['julia@example.com'], 'phones': []
    }),
]

##### **Requirements for writing user code for Beam transforms**


In general, your user code must fulfill at least these requirements:

- Your function object must be serializable.
- Your function object must be thread-compatible, and be aware that the Beam SDKs are not thread-safe.fe.

#### Side Inputs

Side inputs are additional input along with the PCollection. They are useful when a DoFn needs extra data but that data needs to be determined at runtime.

### Pipeline I/O

Beam provides read and write transforms for a number of common data storage types.

In [None]:
# Reading 
lines = pipeline | beam.io.ReadFromText('gs://some/inputData.txt')

# Writing
output | beam.io.WriteToText('gs://some/outputData')

#### Reading from multiple locations

Reading from multiple input files requires to these files having their name following a pattern that can be captured by a glob operator.
Note that glob operators are filesystem-specific and obey filesystem-specific consistency models.

In this example, we use the glob operator (*) to read from all the input files that have a **input** prefix and a **.csv** suffix


In [None]:
lines = pipeline | 'ReadFromText' >> beam.io.ReadFromText(
    'path/to/input-*.csv')

#### Writing to multiple output files

Writing to multiple output files is the default option of a write transforms in Beam. When you pass an output file name to a write transform, it is used as a prefix for the multiple output files that'll going to be created. You can also provide a suffix to each file.

In [None]:
filtered_words | 'WriteToText' >> beam.io.WriteToText(
    '/path/to/numbers', file_name_suffix='.csv')

### **Windowing**

Windowing subdivides a PCollection according to the timestamps of its individual elements. Each PCollection's element can belong to one or more windows according to the PCollection's windowing function, and each individual window contains a finite number of elements. Grouping transforms work then on a per-window basis. This is especially useful for unbounded data.

**Note:** Beam's default windowing behavior is to gather all of your data and put it into one single window and discard all the late-arrival. Before using a grouping transform such as *GroupByKey* on an unbounded PCollection, you must do at least one of the following:

- Set a non-global windowing function.
- Set a non-default trigger.
  

#### **Windowing with bounded PCollections:**

To use windowing with fixed dataset you should first assign your own timestamps to each element of that dataset. Windowing affects the way your pipeline processes data and applies Ptransforms. In the figure below, the ParDo transform gets applied multiple times per key, and once for each window.

<div style="text-align: center;">
    <img src="./aggregation_windowing.png" style=" width:600px; padding: 10px; ">
</div>>




#### **Windowing Functions**

Beam provides multiple windowing functions:


- **Fixed time windows (FTW):**
  
FTW is the simplest windowing functions on Beam. It capture all the elements within a certain non-overlapping-time interval. Let's say, you have a 30 seconde window duration, all the elements with timestamp values from 0:00:00 up to 0:00:30 (but not included) fall into the first window, elements with timestamp from 0:00:30 up to 0:01:00 (but not included) belong to the second window, and so on.

<div style="text-align: center;">
    <img src="./FTW.png" style=" width:600px; padding: 10px; ">
</div>


In [None]:
from apache_beam import window
fixed_windowed_items = (
    items | 'window' >> beam.WindowInto(window.FixedWindows(30)))

- **Sliding time window (STW):**
  
The only difference between FTW and STW is that STW can *overlap*. For example, you can have a 60 second window **duration** and a starting window every 30 second. The frequency at which a new window starts is called **period**. In the example below, you have 60 second window duration and 30 second period.

<div style="text-align: center;">
    <img src="./STW.png" style=" width:600px; padding: 10px; ">
</div>


In [None]:
from apache_beam import window
sliding_windowed_items = (
    items | 'window' >> beam.WindowInto(window.SlidingWindows(60, 30)))

- **Session windows:**
  
The session windowind strategy is best suited for data that is irregularly distributed with respect to time. It works on a per-key basis and requires a minimal time gap above which a new window is started. Note, in the figure below, that each key has its own number of window.

<div style="text-align: center;">
    <img src="./SW.png" style=" width:600px; padding: 10px; ">
</div>

In [None]:
from apache_beam import window
session_windowed_items = (
    items | 'window' >> beam.WindowInto(window.Sessions(10 * 60)))

- **The single global window:**

The Beam's default windowing strategy is the single global window with late data discarded. Note that this is the strategy that is used even when working with unbounded PCollections. So you should pay attention to this when applying Ptransforms such as *GroupByKey* and *Combine* which require all the data.

In [None]:
from apache_beam import window
global_windowed_items = (
    items | 'window' >> beam.WindowInto(window.GlobalWindows()))

#### **Watermarks and late data**

Watermark defines what is considered as late data. Watermark is the system's notion of when all data in a certain window can be expected to have arrived in your pipeline. Any data that arrives after the watermark has passed the end of a window is considered as late data.

*But why do we need watermarks?*

Beam uses data timestamps to divide PCollection elements into windows. However data isn't always guaranteed to arrive in a pipeline in a time order or to always at a predictable interval. For example, let's say you have a fixed time window of 5min long, with a watermark that assume 30s of lag time between the timestamp and the time the data arrives in your pipeline (processing time). If a data record with a timestamp that belongs to the first window (from 0:00 up to 4:59) arrives at 5:38, then that record is considered as **late data.**

You can allow late data by invoking the **.withAllowedLateness** operation when you set your PCollection’s windowing strategy.

In [None]:
   pc = [Initial PCollection]
   pc | beam.WindowInto(
              FixedWindows(60),
              trigger=trigger_fn,
              accumulation_mode=accumulation_mode,
              timestamp_combiner=timestamp_combiner,
              allowed_lateness=Duration(seconds=2*24*60*60)) # 2 days

#### **Assigning timestamp**

In [None]:
def to_datetime(event_time):
    # The isinstance function in Python is used to check if an object is an instance or subclass of a class or a tuple of classes
    if isinstance(event_time, str):
        # In BigQuery, this is a datetime.datetime.  In JSON, it's a string
        # sometimes it has a T separating the date, sometimes it doesn't
        # Handle all the possibilities
        event_time = dt.datetime.strptime(event_time.replace('T', ' '), DATETIME_FORMAT)
    return event_time


def assign_timestamp(event):
    # A timestamp is encoded information generally used in UNIX, which indicates the date and 
    # time at which a particular event has occurred. Unix time (also known as Epoch time, POSIX time, 
    # seconds since the Epoch, or UNIX Epoch time) describes a point in time. It is the number of seconds 
    # that have elapsed since the Unix epoch which is 00:00:00 UTC on 1 January 1970

    try:
        event_time = to_datetime(event['WHEELS_OFF'])
        yield beam.window.TimestampedValue(event, event_time.timestamp())
    except:
        pass


events = events | 'assign_time' >> beam.FlatMap(assign_timestamp)

### **Triggers**

Beam uses triggers to determine when to aggregate data and attribute them to a window. When you work with Beam's default windowing configuration and default triggers, Beam outputs the aggregated result when it estimates all data has arrived, and discards all subsequent data for that window.

**Note:** Triggers are different from Allowed_lateness. The later allowed data to be accepted by your pipeline while the formal allows any stage of the pipeline to process and emit result.

Beam provides numbers of pre-built triggers that you can set to change this default behavior:

- **Event time trigger**

The *AfterWatermark* trigger operates on event time. *AfterWatermark* fires only when the watermark passes the end of a window based on the timestamp assigned to each element.

You can also configure trigger to fire before or after the end of a window. In the example below, Beam generates outputs 60s after the first data record has arrived and every time late data arrives.

In [None]:
AfterWatermark(
    early=AfterProcessingTime(delay=1 * 60), late=AfterCount(1))

- **Processing time triggers**

The AfterProcessingTime trigger is useful for triggering early results from a window, particularly a window with a large time frame such as a single global window. It fires after a certain amount of processing time has passed since data was received.

- **Data-driven triggers**

Beam provides one data-driven trigger, *AfterCount*. It fires after collecting a certain amount of elements. This can be particularly usefull when working with one single global window.

- **Setting a trigger**

Setting a trigger can be done when setting the *WindowInto* transform for a PCollection. It requires also setting the *AccumulationMode* parameter which can take two alternatives: **ACCUMULATING** or **DISCARDING**

When a trigger fires, it emits the current contents of the window as a pane. Therefore, when the *AccumulationMode* parameter is set to *ACCUMULATING*, data is collected across the window's panes as the trigger fires. However, when set to *DISCARDING*, the processed panes (data) of each firing triggers are mutually exclusive.

In [None]:
  pcollection | WindowInto(
    FixedWindows(1 * 60),
    trigger=AfterProcessingTime(1 * 60),
    accumulation_mode=AccumulationMode.DISCARDING)

- **Composite triggers**

Depending on your use case, you might want to use composite triggers that are made up of the ones we already describe.
For more info : https://beam.apache.org/documentation/programming-guide/#composite-triggers