<a href="https://colab.research.google.com/github/ParsaKeerthana/ApacheBeam_DataEngineering_Assignment/blob/main/Apache_Beam.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Run and print a shell command.
def run(cmd):
  print('>> {}'.format(cmd))
  !{cmd}
  print('')

# Install apache-beam.
run('pip install --quiet apache-beam')

# Copy the input file into the local file system.
#run('mkdir -p data')
#run('gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/')

>> pip install --quiet apache-beam



## ParDo

In [None]:
import apache_beam as beam

class SplitWaterRow(beam.DoFn):
    def process(self, element):
        return [element.split(',')]

class FilterPotableWater(beam.DoFn):
    def process(self, element):
        if element[-1] == '1':
            return [element]

class PairWaterRows(beam.DoFn):
    def process(self, element):
        return [("Potable", 1)]

class Counting(beam.DoFn):
    def process(self, element):
        (key, values) = element
        return [(key, sum(values))]

p2 = beam.Pipeline()

potable_water_count = (
   p2
    | 'ReadFromText' >> beam.io.ReadFromText('water_potability.csv')
    | 'SkipHeader' >> beam.Filter(lambda line: not line.startswith("ph,"))
    | 'SplitWaterRow' >> beam.ParDo(SplitWaterRow())
    | 'FilterPotableWater' >> beam.ParDo(FilterPotableWater())
    | 'PairWaterRows' >> beam.ParDo(PairWaterRows())
    | 'GroupByKey' >> beam.GroupByKey()
    | 'SumUsingParDo' >> beam.ParDo(Counting())
    | 'WriteToText' >> beam.io.WriteToText('parddo_output.txt')
)

p2.run()


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7deeb8454220>

In [None]:
# Reading parddo_output.txt
with open('/content/parddo_output.txt-00000-of-00001', 'r') as f:
    parddo_results = f.readlines()

parddo_results

["('Potable', 1278)\n"]

## Composite Transform

In [None]:
class CustomTransform(beam.PTransform):
    def expand(self, input_coll):
        return (
            input_coll
            | 'Group and sum' >> beam.CombinePerKey(sum)
            | 'count filter' >> beam.Filter(filter_on_count)
            | 'Format output' >> beam.Map(format_output)
        )

def SplitRow(element):
    return element.split(',')

def filter_on_count(element):
    name, count = element
    if count > 1:
        return element

def format_output(element):
    name, count = element
    return ', '.join((name, str(count), 'Potable water count'))

p = beam.Pipeline()

input_collection = (
    p
    | "Read from text file" >> beam.io.ReadFromText('water_potability.csv')
    | "Split rows" >> beam.Map(SplitRow)
)

potable_count = (
    input_collection
    | 'Filter potable water' >> beam.Filter(lambda record: record[-1] == '1')
    | 'Pair with 1' >> beam.Map(lambda record: ("Potable", 1))
    | 'Apply CustomTransform' >> CustomTransform()
    | 'Write results' >> beam.io.WriteToText('potable_output.txt')
)

p.run()


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7deeb818a470>

In [None]:
# Reading parddo_output.txt
with open('/content/potable_output.txt-00000-of-00001', 'r') as f:
    potable_output = f.readlines()

potable_output


['Potable, 1278, Potable water count\n']

## Setting Up Pipeline, Adding Windowing and Triggers


In [None]:
import apache_beam as beam
from datetime import datetime
import pytz

# Function to add a simulated timestamp to each row, assuming each row represents a sample taken every 10 seconds
def add_timestamp(element):
    # Extract the current processing time, convert it to seconds since the Unix epoch
    epoch_timestamp = datetime.utcnow().replace(tzinfo=pytz.utc).timestamp()

    # Convert the epoch timestamp to a Beam Timestamp
    timestamp = beam.window.Timestamp(seconds=epoch_timestamp)

    return beam.window.TimestampedValue(element, timestamp)


# Setting up the pipeline
p3 = beam.Pipeline()

# 1. Pipeline I/O: Reading from the water_potability.csv file
data = p3 | 'ReadFromText' >> beam.io.ReadFromText('/content/water_potability.csv')

# Skipping the header
filtered_data = data | 'FilterOutHeader' >> beam.Filter(lambda line: not line.startswith("ph,"))

# Adding a simulated timestamp to each row
timestamped_data = filtered_data | 'AddTimestamp' >> beam.Map(add_timestamp)

# 2. Windowing: Applying fixed windows of 5 minutes (300 seconds)
windowed_data = timestamped_data | 'WindowInto' >> beam.WindowInto(beam.window.FixedWindows(300))

# 3. Triggers: We're setting up a trigger to fire once a window has accumulated 30 samples
triggered_data = (
    windowed_data
    | 'WindowWithTrigger' >> beam.WindowInto(
        beam.window.FixedWindows(300),
        trigger=beam.transforms.trigger.AfterCount(30),
        accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING)
)

# Writing the windowed and triggered data to an output file
triggered_data | 'WriteToText' >> beam.io.WriteToText('windowed_output.txt')

# Running the pipeline
p3.run()

# Reading the output to display the result
with open('/content/windowed_output.txt-00000-of-00001', 'r') as f:
    windowed_results = f.readlines()

windowed_results[:10]  # Displaying the first 10 lines of the output for brevity


[',204.8904554713363,20791.318980747026,7.300211873184757,368.51644134980336,564.3086541722439,10.3797830780847,86.9909704615088,2.9631353806316407,0\n',
 '3.71608007538699,129.42292051494425,18630.057857970347,6.635245883862,,592.8853591348523,15.180013116357259,56.32907628451764,4.500656274942408,0\n',
 '8.099124189298397,224.23625939355776,19909.541732292393,9.275883602694089,,418.6062130644815,16.868636929550973,66.42009251176368,3.0559337496641685,0\n',
 '8.316765884214679,214.37339408562252,22018.417440775294,8.05933237743854,356.88613564305666,363.2665161642437,18.436524495493302,100.34167436508008,4.628770536837084,0\n',
 '9.092223456290965,181.10150923612525,17978.98633892625,6.546599974207941,310.13573752420444,398.41081338184466,11.558279443446395,31.997992727424737,4.075075425430034,0\n',
 '5.584086638456089,188.3133237696164,28748.68773904612,7.54486878877965,326.6783629116736,280.4679159334877,8.399734640152758,54.917861841994466,2.5597082275565217,0\n',
 '10.223862164528