## Very important to know:
- Apache beam is a programming model for processing large datasets , transforming them and output it to the desired runners (Spark, Apache Flink or Google Dataflow), which are the ones we want to execute our job on.


















# Structure of Beam code:
- Create a pipeline.
- Input the data.
- Transform the data.
- Display data

# Apache Beam pipeline

In [2]:
import apache_beam as beam
from apache_beam import Create, Map
p1 = beam.Pipeline()
find_cubes = ( p1 
         | "Create Element" >> Create([1,5,8]) # Step 1: Create a PCollection with numbers from 0 to 9
         | "Find Cubes" >> Map(lambda x: x ** 3) # Step 2: Map each number to its cube
           | "Print Result" >> Map(print) # Step 3: Print each cube
         ) 
# We can also add another data types like
# beam.Create({'foo':'bar'})
# beam.Create(range(10))
p1.run()



1
125
512


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x75d978ab9c60>

# Flatten

Flatten is a transformation that combines multiple PCollections of the same type into a single PCollection.

In [3]:
import apache_beam as beam
from apache_beam import Create, Map
even = {2,4,6,8}
odd = {1,3,5,7,9}
p1 = beam.Pipeline()
even_p1 = p1 | "Even Numb:" >> beam.Create(even)
odd_p1 = p1 | "Odd Numb:" >> beam.Create(odd)
flat_out = (even_p1,odd_p1) | beam.Flatten() | beam.Map(print)
p1.run() 


1
3
5
7
9
8
2
4
6


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x75d978896320>

# Map/FlatMap
Each Map/Flatmap Returns a collection of N elements.

Map Output: [["foo","bar"],["hello","world"]] -> Map(str.split) a nested flat list
FlatMap: ["foo","bar","hello","world"] -> removes the nested structure, no fluff.


In [13]:
import apache_beam as beam
from apache_beam import Create, Map, FlatMap

p1 = beam.Pipeline()


flatten_elements = (p1 | 
            "Create Element" >> Create(["foo bar", "hello world"]) 
            | "Find Cube" >> FlatMap(str.split)
            | "Print" >> Map(print)
            )

p1.run() 

foo
bar
hello
world


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x75d976e29ab0>

# Filter

In [14]:
import apache_beam as beam
from apache_beam import Create, Map, Filter

p1 = beam.Pipeline()

filter_even = (p1 | 
            "Create Element" >> Create(range(20)) 
            | "Find Even numbers" >> Filter(lambda x: x % 2 == 0)
            | "Print" >> Map(print)
            )

p1.run() 

0
2
4
6
8
10
12
14
16
18


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x75d976e2b2b0>

# Keys, values, string

In [15]:
import apache_beam as beam
from apache_beam import Create, Map, Filter

p1 = beam.Pipeline()

dict_elements = (p1 | 
            "Create Element" >> Create({"foo": "bar","hello": "world"}) 
            | "All Keys" >> beam.Keys()
            | "String tuple" >> beam.ToString.Element()
            | "Print" >> Map(print)
            )

p1.run() 

foo
hello


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x75d976e2ab00>

# Kvswap

You give a key-value pair and it returns a value-key pair swapped.

In [16]:
import apache_beam as beam
from apache_beam import Create, Map, Filter, KvSwap, FlatMap

p1 = beam.Pipeline()

swap_elements = (p1 | 
            "Create Element" >> Create({"foo": "bar","hello": "world"}) 
            | "Swap elements" >> KvSwap()
           #(Optional) | "Flatten" >> FlatMap(lambda x: x)  # Flatten the key-value pairs into a single list
            | "Print" >> Map(print)
            )

p1.run() 

('bar', 'foo')
('world', 'hello')


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x75d976ec5630>

# Partition

Partition is a transformation that splits a PCollection into multiple PCollections based on a partitioning function.

# Aggregation

In [32]:
import apache_beam as beam
from apache_beam import Create, Map, combiners

p1 = beam.Pipeline()

count_elements = (p1 | 
            "Create Element" >> Create({"foo": "bar","hello": "world"}) 
            | "Count" >> combiners.Count.PerKey()  # Count occurrences of each key
            | "Print" >> Map(print)
            )

p1.run() 

('foo', 1)
('hello', 1)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x75d976caa530>

In [None]:
import apache_beam as beam
from apache_beam import Create, Map, combiners, CombineValues

p1 = beam.Pipeline()
combine_values = (p1 | 
            "Create Element" >> Create({"one+one": [1,1],"one+three": [1,3]}) 
            | "Count" >> CombineValues(lambda x: sum(x))
            | "Print" >> Map(print)
            )

p1.run() 

('one+one', 2)
('one+three', 4)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x75d976a1b730>