# Introduction to Apache Beam Python SDK & Google Dataflow

![title](./image/beam_mascot.png)

## prepared and presented by Setia Budi

## What is Apache Beam?

Apache Beam is a flexible programming SDK for building data processing pipelines that can handle batch processing, stream processing, and parallel processing in one. Its unified model allows developers to define and execute abstract data workflows to be deployed on one of any number of different data processing engines, such as Apache Flink, Apache Spark, Google Cloud Dataflow, and Kafka.

BEAM -> Batch + strEAM

## Apache Beam in a Glance

![title](./image/learner_graph.png)

## Basic Components

### Pipeline
A Pipeline encapsulates the entire data processing task, from start to finish. This includes reading input data, transforming that data, and writing output data.

### PCollection
A PCollection represents a distributed data set that your Beam pipeline operates on. The data set can be bounded, meaning it comes from a fixed source like a file, or unbounded, meaning it comes from a continuously updating source via a subscription or other mechanism. PCollections are the inputs and outputs for each step in your pipeline.

### PTransform
A PTransform represents a data processing operation, or a step, in your pipeline. Every PTransform takes one or more PCollection objects as the input, performs a processing function that you provide on the elements of that PCollection, and then produces zero or more output PCollection objects.

### I/O Transforms
Beam comes with a number of “IOs” - library PTransforms that read or write data to various external storage systems.

## Illustration for Pipeline, PCollection, and PTransform

![title](./image/pcollection_ptransform.png)

## Installation

In [1]:
!python --version

Python 3.9.18


In [None]:
!which python

In [None]:
!pip install apache-beam
!pip install apache-beam[gcp]

## Sample Dataset

In [2]:
!{("head -n 20 ./example/dept_data.txt")}

149633CM,Marco,10,Accounts,1-01-2019
212539MU,Rebekah,10,Accounts,1-01-2019
231555ZZ,Itoe,10,Accounts,1-01-2019
503996WI,Edouard,10,Accounts,1-01-2019
704275DC,Kyle,10,Accounts,1-01-2019
957149WC,Kyle,10,Accounts,1-01-2019
241316NX,Kumiko,10,Accounts,1-01-2019
796656IE,Gaston,10,Accounts,1-01-2019
331593PS,Beryl,20,HR,1-01-2019
560447WH,Olga,20,HR,1-01-2019
222997TJ,Leslie,20,HR,1-01-2019
171752SY,Mindy,20,HR,1-01-2019
153636AS,Vicky,20,HR,1-01-2019
745411HT,Richard,20,HR,1-01-2019
298464HN,Kirk,20,HR,1-01-2019
783950BW,Kaori,20,HR,1-01-2019
892691AR,Beryl,20,HR,1-01-2019
245668UZ,Oscar,20,HR,1-01-2019
231206QD,Kumiko,30,Finance,1-01-2019
357919KT,Wendy,30,Finance,1-01-2019


## Case 1: Simple and Not So Useful Pipeline

In [3]:
import apache_beam as beam

p1 = beam.Pipeline()

(
    p1
    | beam.io.ReadFromText("./example/dept_data.txt")
    | beam.io.WriteToText("./output/output_data")
)

p1.run()





<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f3eeabb3250>

In [4]:
!{("head -n 20 ./output/output_data-00000-of-00001")}

149633CM,Marco,10,Accounts,1-01-2019
212539MU,Rebekah,10,Accounts,1-01-2019
231555ZZ,Itoe,10,Accounts,1-01-2019
503996WI,Edouard,10,Accounts,1-01-2019
704275DC,Kyle,10,Accounts,1-01-2019
957149WC,Kyle,10,Accounts,1-01-2019
241316NX,Kumiko,10,Accounts,1-01-2019
796656IE,Gaston,10,Accounts,1-01-2019
331593PS,Beryl,20,HR,1-01-2019
560447WH,Olga,20,HR,1-01-2019
222997TJ,Leslie,20,HR,1-01-2019
171752SY,Mindy,20,HR,1-01-2019
153636AS,Vicky,20,HR,1-01-2019
745411HT,Richard,20,HR,1-01-2019
298464HN,Kirk,20,HR,1-01-2019
783950BW,Kaori,20,HR,1-01-2019
892691AR,Beryl,20,HR,1-01-2019
245668UZ,Oscar,20,HR,1-01-2019
231206QD,Kumiko,30,Finance,1-01-2019
357919KT,Wendy,30,Finance,1-01-2019


## Case 1.1: Simple and Not So Useful Pipeline, now with label

In [5]:
import apache_beam as beam

p1 = beam.Pipeline()

(
    p1
    | "ReadFromText" >> beam.io.ReadFromText("./example/dept_data.txt")
    | "WriteOutput" >> beam.io.WriteToText("./output/output_data")
)

p1.run()



<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f3eebc23cd0>

In [6]:
!{("head -n 20 ./output/output_data-00000-of-00001")}

149633CM,Marco,10,Accounts,1-01-2019
212539MU,Rebekah,10,Accounts,1-01-2019
231555ZZ,Itoe,10,Accounts,1-01-2019
503996WI,Edouard,10,Accounts,1-01-2019
704275DC,Kyle,10,Accounts,1-01-2019
957149WC,Kyle,10,Accounts,1-01-2019
241316NX,Kumiko,10,Accounts,1-01-2019
796656IE,Gaston,10,Accounts,1-01-2019
331593PS,Beryl,20,HR,1-01-2019
560447WH,Olga,20,HR,1-01-2019
222997TJ,Leslie,20,HR,1-01-2019
171752SY,Mindy,20,HR,1-01-2019
153636AS,Vicky,20,HR,1-01-2019
745411HT,Richard,20,HR,1-01-2019
298464HN,Kirk,20,HR,1-01-2019
783950BW,Kaori,20,HR,1-01-2019
892691AR,Beryl,20,HR,1-01-2019
245668UZ,Oscar,20,HR,1-01-2019
231206QD,Kumiko,30,Finance,1-01-2019
357919KT,Wendy,30,Finance,1-01-2019


## Case 2: Simple Pipeline with Simple Filter Transform

In [7]:
import apache_beam as beam


def split_row(element):
    return element.split(",")


def is_accounts(element):
    return element[3] == "Accounts"


p1 = beam.Pipeline()

(
    p1
    | "ReadFromText" >> beam.io.ReadFromText("./example/dept_data.txt")
    | "SplitRecord" >> beam.Map(split_row)
    | "FilterAccounts" >> beam.Filter(is_accounts)
    | "WriteOutput" >> beam.io.WriteToText("./output/output_data")
)

p1.run()



<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f3ee9b1e6d0>

In [8]:
!{("head -n 20 ./output/output_data-00000-of-00001")}

['149633CM', 'Marco', '10', 'Accounts', '1-01-2019']
['212539MU', 'Rebekah', '10', 'Accounts', '1-01-2019']
['231555ZZ', 'Itoe', '10', 'Accounts', '1-01-2019']
['503996WI', 'Edouard', '10', 'Accounts', '1-01-2019']
['704275DC', 'Kyle', '10', 'Accounts', '1-01-2019']
['957149WC', 'Kyle', '10', 'Accounts', '1-01-2019']
['241316NX', 'Kumiko', '10', 'Accounts', '1-01-2019']
['796656IE', 'Gaston', '10', 'Accounts', '1-01-2019']
['149633CM', 'Marco', '10', 'Accounts', '2-01-2019']
['212539MU', 'Rebekah', '10', 'Accounts', '2-01-2019']
['231555ZZ', 'Itoe', '10', 'Accounts', '2-01-2019']
['503996WI', 'Edouard', '10', 'Accounts', '2-01-2019']
['704275DC', 'Kyle', '10', 'Accounts', '2-01-2019']
['957149WC', 'Kyle', '10', 'Accounts', '2-01-2019']
['241316NX', 'Kumiko', '10', 'Accounts', '2-01-2019']
['796656IE', 'Gaston', '10', 'Accounts', '2-01-2019']
['718737IX', 'Ayumi', '10', 'Accounts', '2-01-2019']
['149633CM', 'Marco', '10', 'Accounts', '3-01-2019']
['212539MU', 'Rebekah', '10', 'Accounts'

## Case 3: Simple Filter Transform using Lambda Expression

In [9]:
import apache_beam as beam

p1 = beam.Pipeline()

(
    p1
    | "ReadFromText" >> beam.io.ReadFromText("./example/dept_data.txt")
    | "SplitRecord" >> beam.Map(lambda record: record.split(","))
    | "FilterAccounts" >> beam.Filter(lambda record: record[3] == "Accounts")
    | "WriteOutput" >> beam.io.WriteToText("./output/output_data")
)

p1.run()



<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f3f20b56040>

In [10]:
!{("head -n 20 ./output/output_data-00000-of-00001")}

['149633CM', 'Marco', '10', 'Accounts', '1-01-2019']
['212539MU', 'Rebekah', '10', 'Accounts', '1-01-2019']
['231555ZZ', 'Itoe', '10', 'Accounts', '1-01-2019']
['503996WI', 'Edouard', '10', 'Accounts', '1-01-2019']
['704275DC', 'Kyle', '10', 'Accounts', '1-01-2019']
['957149WC', 'Kyle', '10', 'Accounts', '1-01-2019']
['241316NX', 'Kumiko', '10', 'Accounts', '1-01-2019']
['796656IE', 'Gaston', '10', 'Accounts', '1-01-2019']
['149633CM', 'Marco', '10', 'Accounts', '2-01-2019']
['212539MU', 'Rebekah', '10', 'Accounts', '2-01-2019']
['231555ZZ', 'Itoe', '10', 'Accounts', '2-01-2019']
['503996WI', 'Edouard', '10', 'Accounts', '2-01-2019']
['704275DC', 'Kyle', '10', 'Accounts', '2-01-2019']
['957149WC', 'Kyle', '10', 'Accounts', '2-01-2019']
['241316NX', 'Kumiko', '10', 'Accounts', '2-01-2019']
['796656IE', 'Gaston', '10', 'Accounts', '2-01-2019']
['718737IX', 'Ayumi', '10', 'Accounts', '2-01-2019']
['149633CM', 'Marco', '10', 'Accounts', '3-01-2019']
['212539MU', 'Rebekah', '10', 'Accounts'

## Case 4: Using Google Dataflow as the runner

In [None]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

pipeline_options = PipelineOptions(
    runner='DataflowRunner',
    project='my-project-id',
    job_name='unique-job-name',
    temp_location='gs://my-bucket/temp',
)

p1 = beam.Pipeline(pipeline_options)

(
    p1
    | "ReadFromText" >> beam.io.ReadFromText("./example/dept_data.txt")
    | "SplitRecord" >> beam.Map(lambda record: record.split(","))
    | "FilterAccounts" >> beam.Filter(lambda record: record[3] == "Accounts")
    | "WriteOutput" >> beam.io.WriteToText("./output/output_data")
)

p1.run()

## Case 5: Filter Transform with multiple arguments

In [11]:
import apache_beam as beam


def split_row(element):
    return element.split(",")   


def has_role(element, role):
  return element[3] == role


p1 = beam.Pipeline()

(
    p1
    | "ReadFromText" >> beam.io.ReadFromText("./example/dept_data.txt")
    | "SplitRecord" >> beam.Map(split_row)
    | "FilterAccounts" >> beam.Filter(has_role, "Accounts")
    # | "FilterHR" >> beam.Filter(has_role, "HR")
    | "WriteOutput" >> beam.io.WriteToText("./output/output_data")
)

p1.run()



<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f3f20b2ee80>

In [12]:
!{("head -n 20 ./output/output_data-00000-of-00001")}

['149633CM', 'Marco', '10', 'Accounts', '1-01-2019']
['212539MU', 'Rebekah', '10', 'Accounts', '1-01-2019']
['231555ZZ', 'Itoe', '10', 'Accounts', '1-01-2019']
['503996WI', 'Edouard', '10', 'Accounts', '1-01-2019']
['704275DC', 'Kyle', '10', 'Accounts', '1-01-2019']
['957149WC', 'Kyle', '10', 'Accounts', '1-01-2019']
['241316NX', 'Kumiko', '10', 'Accounts', '1-01-2019']
['796656IE', 'Gaston', '10', 'Accounts', '1-01-2019']
['149633CM', 'Marco', '10', 'Accounts', '2-01-2019']
['212539MU', 'Rebekah', '10', 'Accounts', '2-01-2019']
['231555ZZ', 'Itoe', '10', 'Accounts', '2-01-2019']
['503996WI', 'Edouard', '10', 'Accounts', '2-01-2019']
['704275DC', 'Kyle', '10', 'Accounts', '2-01-2019']
['957149WC', 'Kyle', '10', 'Accounts', '2-01-2019']
['241316NX', 'Kumiko', '10', 'Accounts', '2-01-2019']
['796656IE', 'Gaston', '10', 'Accounts', '2-01-2019']
['718737IX', 'Ayumi', '10', 'Accounts', '2-01-2019']
['149633CM', 'Marco', '10', 'Accounts', '3-01-2019']
['212539MU', 'Rebekah', '10', 'Accounts'

## Case 6: Simple Pipeline with Simple Aggregation Transform
###  `Count.Globally`

In [13]:
import apache_beam as beam

p1 = beam.Pipeline()

(
    p1
    | "ReadFromText" >> beam.io.ReadFromText("./example/dept_data.txt")
    | "SplitRecord" >> beam.Map(lambda record: record.split(","))
    | "FilterAccounts" >> beam.Filter(lambda record: record[3] == "Accounts")
    | "Count all elements" >> beam.combiners.Count.Globally()
    | "Print result" >> beam.Map(print)
)

p1.run()

278


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f3eebef07f0>

## Case 7: Simple Pipeline with Simple Aggregation Transform
###  `Count.PerElement`

In [14]:
import apache_beam as beam

p1 = beam.Pipeline()

(
    p1
    | "ReadFromText" >> beam.io.ReadFromText("./example/dept_data.txt")
    | "SplitRecord" >> beam.Map(lambda record: record.split(","))
    | "Extract role column" >> beam.Map(lambda record: (record[3]))
    | "Count all elements" >> beam.combiners.Count.PerElement()
    | "Print result" >> beam.Map(print)
)

p1.run()

('Accounts', 278)
('HR', 310)
('Finance', 310)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f3eebc23ee0>

## Case 8: Adding key for each element using `WithKeys`

In [15]:
import apache_beam as beam

p1 = beam.Pipeline()

(
    p1
    | "ReadFromText" >> beam.io.ReadFromText("./example/dept_data.txt")
    | "SplitRecord" >> beam.Map(lambda record: record.split(","))
    | "AddKey" >>  beam.WithKeys(lambda record: record[3])
    | "WriteOutput" >> beam.io.WriteToText("./output/output_data")
)

p1.run()



<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f3eeab5ef40>

In [16]:
!{("head -n 20 ./output/output_data-00000-of-00001")}

('Accounts', ['149633CM', 'Marco', '10', 'Accounts', '1-01-2019'])
('Accounts', ['212539MU', 'Rebekah', '10', 'Accounts', '1-01-2019'])
('Accounts', ['231555ZZ', 'Itoe', '10', 'Accounts', '1-01-2019'])
('Accounts', ['503996WI', 'Edouard', '10', 'Accounts', '1-01-2019'])
('Accounts', ['704275DC', 'Kyle', '10', 'Accounts', '1-01-2019'])
('Accounts', ['957149WC', 'Kyle', '10', 'Accounts', '1-01-2019'])
('Accounts', ['241316NX', 'Kumiko', '10', 'Accounts', '1-01-2019'])
('Accounts', ['796656IE', 'Gaston', '10', 'Accounts', '1-01-2019'])
('HR', ['331593PS', 'Beryl', '20', 'HR', '1-01-2019'])
('HR', ['560447WH', 'Olga', '20', 'HR', '1-01-2019'])
('HR', ['222997TJ', 'Leslie', '20', 'HR', '1-01-2019'])
('HR', ['171752SY', 'Mindy', '20', 'HR', '1-01-2019'])
('HR', ['153636AS', 'Vicky', '20', 'HR', '1-01-2019'])
('HR', ['745411HT', 'Richard', '20', 'HR', '1-01-2019'])
('HR', ['298464HN', 'Kirk', '20', 'HR', '1-01-2019'])
('HR', ['783950BW', 'Kaori', '20', 'HR', '1-01-2019'])
('HR', ['892691AR', 

## Case 9: Simple Pipeline with Simple Aggregation Transform
###  `Count.PerKey`

In [17]:
import apache_beam as beam

p1 = beam.Pipeline()

(
    p1
    | "ReadFromText" >> beam.io.ReadFromText("./example/dept_data.txt")
    | "SplitRecord" >> beam.Map(lambda record: record.split(","))
    | "AddKey" >>  beam.WithKeys(lambda record: record[3])
    | "Count elements per key" >> beam.combiners.Count.PerKey()
    | "Print result" >> beam.Map(print)
)

p1.run()

('Accounts', 278)
('HR', 310)
('Finance', 310)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f3eebc23ca0>

## Case 10: Simple Pipeline with Simple Aggregation Transform
###  `CombineGlobally(sum)`, `CombineGlobally(min)`, `CombineGlobally(max)`

In [18]:
import apache_beam as beam

p1 = beam.Pipeline()

(
    p1
    | "ReadFromText" >> beam.io.ReadFromText("./example/dept_data.txt")
    | "SplitRecord" >> beam.Map(lambda record: record.split(","))
    | "Cast to int" >> beam.Map(lambda record: (int(record[2])))
    | "Sum of all elements" >> beam.CombineGlobally(sum)
    | "Print result" >> beam.Map(print)
)

p1.run()

18280


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f3f20a241f0>

## Case 11: Simple Pipeline with Simple Aggregation Transform
###  `Top.Smallest`, `Top.Largest`

In [19]:
import apache_beam as beam

p1 = beam.Pipeline()

(
    p1
    | "ReadFromText" >> beam.io.ReadFromText("./example/dept_data.txt")
    | "SplitRecord" >> beam.Map(lambda record: record.split(","))
    | "Cast to int" >> beam.Map(lambda record: (int(record[2])))
    | "Top Smallest 5" >> beam.combiners.Top.Smallest(5)
    # | "Top Largest 5" >> beam.combiners.Top.Largest(5)
    | "Print result" >> beam.Map(print)
)

p1.run()

[10, 10, 10, 10, 10]


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f3f20a24340>

## Case 12: Simple Pipeline with Simple Aggregation Transform
###  `CombinePerKey(sum)`, `CombinePerKey(min)`, `CombinePerKey(max)`

In [20]:
import apache_beam as beam

p1 = beam.Pipeline()

(
    p1
    | "ReadFromText" >> beam.io.ReadFromText("./example/dept_data.txt")
    | "SplitRecord" >> beam.Map(lambda record: record.split(","))
    # | "Cast to int" >> beam.Map(lambda record: (record[3], int(record[2])))
    | "Map to Key Value" >> beam.Map(lambda record: (record[1], int(record[2])))
    | "Calculate elements per key" >> beam.CombinePerKey(sum)
    | "Print result" >> beam.Map(print)
)

p1.run()

('Marco', 310)
('Rebekah', 310)
('Itoe', 310)
('Edouard', 310)
('Kyle', 620)
('Kumiko', 1240)
('Gaston', 310)
('Beryl', 1240)
('Olga', 620)
('Leslie', 620)
('Mindy', 620)
('Vicky', 620)
('Richard', 620)
('Kirk', 620)
('Kaori', 1550)
('Oscar', 620)
('Wendy', 930)
('Cristobal', 930)
('Erika', 930)
('Sebastien', 930)
('Valerie', 930)
('Dolly', 930)
('Emily', 930)
('Hitomi', 930)
('Ayumi', 300)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f3f213165e0>

## Case 13: Simple Pipeline with Simple Aggregation Transform
###  `Mean.Globally`

In [21]:
import apache_beam as beam

p1 = beam.Pipeline()

(
    p1
    | "ReadFromText" >> beam.io.ReadFromText("./example/dept_data.txt")
    | "SplitRecord" >> beam.Map(lambda record: record.split(","))
    | "Cast to int" >> beam.Map(lambda record: (int(record[2])))
    | "Mean of all elements" >> beam.combiners.Mean.Globally()
    | "Print result" >> beam.Map(print)
)

p1.run()

20.356347438752785


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f3f20ad22b0>

## Case 14: Simple Pipeline with Simple Aggregation Transform
###  `Mean.PerKey`

In [22]:
import apache_beam as beam

p1 = beam.Pipeline()

(
    p1
    | "ReadFromText" >> beam.io.ReadFromText("./example/dept_data.txt")
    | "SplitRecord" >> beam.Map(lambda record: record.split(","))
    | "Cast to int" >> beam.Map(lambda record: (record[3], int(record[2])))
    # | "Map to Key Value" >> beam.Map(lambda record: (record[1], int(record[2])))
    | "Mean elements per key" >> beam.combiners.Mean.PerKey()
    | "Print result" >> beam.Map(print)
)

p1.run()

('Accounts', 10.0)
('HR', 20.0)
('Finance', 30.0)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f3f2136a430>

## Testing on codespace