# Setting up Apache Beam in Google Colab:

In [2]:
!pip install apache-beam[interactive]



# Import necessary libraries:

In [13]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows, SlidingWindows
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulationMode, Repeatedly

# Defining a basic pipeline:

In [25]:
def basic_pipeline():
    options = PipelineOptions()
    with beam.Pipeline(options=options) as p:
        (p
         | beam.Create([1, 2, 3, 4, 5])
         | beam.Map(lambda x: x*2)
         | beam.io.WriteToText('output.txt')
        )


# **Composite Transforms:**

In [28]:
class MultiplyAndFilterEven(beam.PTransform):
    def expand(self, pcoll):
        return (pcoll
                | "MultiplyByTwo" >> beam.Map(lambda x: x*2)
                | "FilterEven" >> beam.Filter(lambda x: x % 2 == 0))

In [29]:
options = PipelineOptions()

with beam.Pipeline(options=options) as p:
    result = (p
              | beam.Create([1, 2, 3, 4, 5])
              | MultiplyAndFilterEven()
              # If you want to see the results in Colab, you can print them:
              | beam.Map(print)
             )



2
4
6
8
10


In [36]:
!pip install pytz



# **Windowing:**

In [37]:
from datetime import datetime
import pytz

options = PipelineOptions()

with beam.Pipeline(options=options) as p:
    result = (p
              | beam.Create([(1, "2023-10-23 12:01:00"),
                             (2, "2023-10-23 12:02:00"),
                             (3, "2023-10-23 12:03:00"),
                             (4, "2023-10-23 12:04:00"),
                             (5, "2023-10-23 12:05:00")])  # Example data with timestamps
              | "Add event time" >> beam.Map(lambda element: beam.window.TimestampedValue(element[0], beam.window.Timestamp.from_utc_datetime(datetime.strptime(element[1], "%Y-%m-%d %H:%M:%S").replace(tzinfo=pytz.UTC))))
              | 'Apply Windowing' >> beam.WindowInto(FixedWindows(2*60))  # 2-minute windows
              | "Sum elements" >> beam.CombineGlobally(sum).without_defaults()
              | beam.Map(print)
             )



1
5
9


# **Triggers:**

In [40]:
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterCount

options = PipelineOptions(flags=['--allow_unsafe_triggers'])

with beam.Pipeline(options=options) as p:
    result = (p
              | beam.Create([(1, "2023-10-23 12:01:00"),
                             (2, "2023-10-23 12:01:30"),
                             (3, "2023-10-23 12:02:00"),
                             (4, "2023-10-23 12:02:30"),
                             (5, "2023-10-23 12:03:00")])  # Example data with timestamps
              | beam.Map(lambda element: beam.window.TimestampedValue(element[0], beam.window.Timestamp.from_utc_datetime(datetime.strptime(element[1], "%Y-%m-%d %H:%M:%S").replace(tzinfo=pytz.UTC))))
              | 'Apply Windowing' >> beam.WindowInto(FixedWindows(2*60), trigger=AfterCount(2), accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING)
              | "Sum elements" >> beam.CombineGlobally(sum).without_defaults()
              | beam.Map(print)
             )



3
12


# **ParDo:**

In [41]:
class MultiplyByTwo(beam.DoFn):
    def process(self, element):
        return [element * 2]


In [42]:
options = PipelineOptions()

with beam.Pipeline(options=options) as p:
    result = (p
              | beam.Create([1, 2, 3, 4, 5])
              | 'Multiply elements' >> beam.ParDo(MultiplyByTwo())
              | beam.Map(print)  # To print the results in Colab
             )




2
4
6
8
10
