<a href="https://colab.research.google.com/github/akanksha-dt/asl-ml-immersion/blob/master/Apache_Beam_Pipeline_Demo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
! pip install apache_beam[interactive] --quiet

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m14.8/14.8 MB[0m [31m25.4 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m6.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.8/139.8 kB[0m [31m13.1 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m13.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.1/3.1 MB[0m [31m39.5 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.5/43.5 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.0/1.0 MB[0m [31m37.8 MB/s[0m

In [5]:
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

from datetime import datetime

In [7]:
p = beam.Pipeline(InteractiveRunner())


In [13]:
res = p | beam.Create([1,2,3,4,5]) | beam.Map (lambda x: x*100) | beam.io.WriteToText("Result","txt")

In [14]:
p.run()



<apache_beam.runners.interactive.interactive_runner.PipelineResult at 0x7a7e39b083a0>

In [15]:
lines = {
    "Beam is great",
    "Beam is good to work"
}

In [18]:
words = p | beam.Create(lines) | beam.FlatMap(lambda x: x.split(" "))
ib.show(words)
ib.show_graph (p)

In [19]:
# First we need to implement the function that does the math. We create a new class that extends the beam.DoFn and overwrite the process() method
class MultiplyByTenDoFn(beam.DoFn):
  # This method
  def process(self, element):
    print("Processing element: %s" % element)
    # Note: an iterable object must be return
    yield element*10
  # Can be use to customize the behavoir when the bundle of element starts. This is called before process function
  def start_bundle(self):
    print("Bundle started at ", datetime.now().strftime("%d/%m/%Y %H:%M:%S"))
  # Can be use to customize the behavoir when the processing has done on the bundle. This is called after process function and it can be used for running a batch query
  def finish_bundle(self):
    print("Bundle finished at ", datetime.now().strftime("%d/%m/%Y %H:%M:%S"))
  # Can be used to customize the worker behavior at start up. Common operations might be connecting to a db
  def setup(self):
    print("Worker started at ", datetime.now().strftime("%d/%m/%Y %H:%M:%S"))
  # Can be used to customize the worker behavior at shut down. Common operations might be disconnecting to a db
  def teardown(self):
    print("Worker finished at ", datetime.now().strftime("%d/%m/%Y %H:%M:%S"))

In [21]:
p = beam.Pipeline(InteractiveRunner())
res = p | beam.Create([1,2,3,4,5]) | beam.ParDo(MultiplyByTenDoFn()) | beam.io.WriteToText("ResultParDo.txt")

In [22]:
p.run ()

Worker started at  30/01/2024 15:47:53
Bundle started at  30/01/2024 15:47:53
Processing element: 1
Processing element: 2
Processing element: 3
Processing element: 4
Processing element: 5
Bundle finished at  30/01/2024 15:47:53
Worker finished at  30/01/2024 15:47:53


<apache_beam.runners.interactive.interactive_runner.PipelineResult at 0x7a7e391c73d0>

In [24]:
p = beam.Pipeline(InteractiveRunner())
inventory = [
    ("winter", 8),
    ("summer", 10),
    ("spring", 4),
    ("summer", 6),
    ("winter", 3),
    ("winter", 1),
    ("winter", 5),
    ("winter", 9),
    ("autumn", 8)
]
gbk = p | beam.Create(inventory) | beam.GroupByKey()
ib.show(gbk)

In [25]:

users = [
    ("001", "Mark"),
    ("002", "John"),
    ("003", "Eva")
]

orders = [
    ("001", "iPhone13"),
    ("002", "iPhone14"),
    ("003", "iPad Pro"),
    ("003", "Macbook M1 Pro"),
    ("001", "Apple watch")
]

p = beam.Pipeline(InteractiveRunner())

In [27]:
users_PCollection = p | "Create users PCollection " >> beam.Create(users)
orders_PCollection = p | "Create orders PCollection " >> beam.Create(orders)

joined_table_by_user_id = ({"users": users_PCollection, "orders": orders_PCollection}) | "Merge Tables" >> beam.CoGroupByKey() | "Print Joined Table " >> beam.Map(print)

In [32]:
p.run()
ib.show(joined_table_by_user_id)
ib.show_graph(p)



('001', {'users': ['Mark'], 'orders': ['iPhone13', 'Apple watch']})
('002', {'users': ['John'], 'orders': ['iPhone14']})
('003', {'users': ['Eva'], 'orders': ['iPad Pro', 'Macbook M1 Pro']})


('001', {'users': ['Mark'], 'orders': ['iPhone13', 'Apple watch']})
('002', {'users': ['John'], 'orders': ['iPhone14']})
('003', {'users': ['Eva'], 'orders': ['iPad Pro', 'Macbook M1 Pro']})
