<a href="https://colab.research.google.com/github/NickKornienko/CMPE-255-Assignment-4/blob/main/Apache_beam.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install apache-beam[interactive]


Composite Transform:

The entire pipeline from reading the CSV to writing the results is a composite transform.

Pipline IO:

Demonstrated by:
ReadFromText('/content/Iris.csv'): This reads the Iris dataset from the provided CSV file.
WriteToText('/content/output.txt'): This writes the results of the pipeline to a text file.

ParDo:

Demonstrated by:
beam.ParDo(ExtractSpeciesFn()): This applies the ExtractSpeciesFn class to each element of the input PCollection.

In [22]:
# Import necessary libraries
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.transforms.combiners import Count

# Define the ParDo function to extract species and petal length
class ExtractSpeciesFn(beam.DoFn):
    def process(self, element):
        # Splitting by commas to extract values
        parts = element.split(',')
        # Check for header using a different column since 'Species' can also appear as data
        if parts[2] != 'SepalWidthCm':  # Ignore header
            yield (parts[4], float(parts[2]))

# Define the pipeline
def run_pipeline():
    options = PipelineOptions(flags=['--allow_unsafe_triggers'])
    with beam.Pipeline(options=options) as p:
        # Reading the CSV
        (p
         | "Read CSV" >> ReadFromText('/content/Iris.csv')
         | "Extract Species and Petal Length" >> beam.ParDo(ExtractSpeciesFn())
         | "Count Species" >> Count.PerKey()
         | "Write to File" >> WriteToText('/content/output.txt'))

# Run the pipeline
run_pipeline()

# Display the output
!cat /content/output.txt



cat: /content/output.txt: No such file or directory


We treat the row number as a "timestamp" to apply windowing and triggers, even though in a real-world scenario these features are more relevant for streaming data.

Windowing: We use fixed windows, each window containing a set number of rows

Triggers: We use an AfterCount trigger to emit results after a certain number of rows are processed.

In [21]:
# Import necessary libraries
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.transforms.combiners import Count
from apache_beam.transforms import window, trigger

# Define the ParDo function to extract species and petal length
class ExtractSpeciesFn(beam.DoFn):
    def process(self, element, window=beam.DoFn.WindowParam):
        # Splitting by commas to extract values
        parts = element.split(',')
        # Check for header using a different column since 'Species' can also appear as data
        if parts[2] != 'SepalWidthCm':  # Ignore header
            # Using row number as pseudo-timestamp for windowing
            yield beam.window.TimestampedValue((parts[4], float(parts[2])), int(parts[0]))

# Define the pipeline
def run_pipeline():
    options = PipelineOptions(flags=['--allow_unsafe_triggers'])
    with beam.Pipeline(options=options) as p:
        # Reading the CSV
        (p
         | "Read CSV" >> ReadFromText('/content/Iris.csv')
         | "Extract Species and Petal Length" >> beam.ParDo(ExtractSpeciesFn())
         | "Window into Fixed Windows" >> beam.WindowInto(window.FixedWindows(50),
                                                          trigger=trigger.AfterCount(25),
                                                          accumulation_mode=trigger.AccumulationMode.DISCARDING)
         | "Count Species in Window" >> Count.PerKey()
         | "Write to File" >> WriteToText('/content/windowed_output.txt'))

# Run the pipeline
run_pipeline()

# Display the output
!cat /content/windowed_output.txt



cat: /content/windowed_output.txt: No such file or directory
