<a href="https://colab.research.google.com/github/PallaviVangari/DataMiningAssignment4/blob/main/ApacheBeam.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install apache-beam[interactive]



In [2]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


In [3]:
def simple_pipeline():
    with beam.Pipeline(options=PipelineOptions()) as p:
        (p | beam.Create([1, 2, 3, 4])
           | beam.Map(lambda x: x**2)
           | beam.io.WriteToText('output.txt'))


In [4]:
class MultiplyByTen(beam.DoFn):
    def process(self, element):
        return [element * 10]

def pardo_example():
    with beam.Pipeline(options=PipelineOptions()) as p:
        (p | beam.Create([1, 2, 3, 4])
           | beam.ParDo(MultiplyByTen())
           | beam.io.WriteToText('output_pardo.txt'))


In [5]:
class FilterAndDouble(beam.PTransform):
    def expand(self, pcoll):
        return (pcoll
                | beam.Filter(lambda x: x % 2 == 0)
                | beam.Map(lambda x: x * 2))

def composite_transform_example():
    with beam.Pipeline(options=PipelineOptions()) as p:
        (p | beam.Create([1, 2, 3, 4, 5, 6])
           | FilterAndDouble()
           | beam.io.WriteToText('output_composite.txt'))


In [15]:
from datetime import datetime
import time

def to_unix_timestamp(date_string, date_format='%Y-%m-%d %H:%M:%S'):
    """Converts a date string to a unix timestamp."""
    dt = datetime.strptime(date_string, date_format)
    return time.mktime(dt.timetuple())

def windowing_example():
    with beam.Pipeline(options=PipelineOptions()) as p:
        (p | beam.Create([(1, "2023-01-01 12:04:59"),
                          (2, "2023-01-01 12:05:01"),
                          (3, "2023-01-01 12:05:39"),
                          (4, "2023-01-01 12:06:19")])
           | beam.Map(lambda element: beam.window.TimestampedValue(element[0], to_unix_timestamp(element[1])))
           | beam.WindowInto(FixedWindows(30))
           | beam.io.WriteToText('output_windowed.txt'))


In [8]:
from apache_beam.transforms.trigger import AfterCount

def trigger_example():
    with beam.Pipeline(options=PipelineOptions()) as p:
        (p | beam.Create([1, 2, 3, 4])
           | beam.WindowInto(FixedWindows(30), trigger=AfterCount(2), accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING)
           | beam.io.WriteToText('output_triggered.txt'))


In [16]:
simple_pipeline()
pardo_example()
composite_transform_example()
windowing_example()
trigger_example()


