# Try Apache Beam - Python

In this notebook, we set up your development environment and work through a simple example using the [DirectRunner](https://beam.apache.org/documentation/runners/direct/). You can explore other runners with the [Beam Capatibility Matrix](https://beam.apache.org/documentation/runners/capability-matrix/).

To navigate through different sections, use the table of contents. From **View**  drop-down list, select **Table of contents**.

To run a code cell, you can click the **Run cell** button at the top left of the cell, or by select it and press **`Shift+Enter`**. Try modifying a code cell and re-running it to see what happens.

To learn more about Colab, see [Welcome to Colaboratory!](https://colab.sandbox.google.com/notebooks/welcome.ipynb).

# Setup

First, you need to set up your environment, which includes installing `apache-beam` and downloading a text file from Cloud Storage to your local file system. We are using this file to test your pipeline.

In [19]:
# Run and print a shell command.
def run(cmd):
    print('>> {}'.format(cmd))
    !{cmd}
    print('')

# Create data directory
run('mkdir -p data')

# Download the Iris dataset
run('curl -o data/iris.csv https://raw.githubusercontent.com/mwaskom/seaborn-data/master/iris.csv')


>> mkdir -p data

>> curl -o data/iris.csv https://raw.githubusercontent.com/mwaskom/seaborn-data/master/iris.csv


A subdirectory or file -p already exists.
Error occurred while processing: -p.
A subdirectory or file data already exists.
Error occurred while processing: data.





  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed

  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100  3858  100  3858    0     0  32563      0 --:--:-- --:--:-- --:--:-- 33258


# Word count with comments

Below is mostly the same code as above, but with comments explaining every line in more detail.

In [23]:
import apache_beam as beam
import csv
import os

inputs_pattern = os.path.abspath("data/iris.csv")  # Full path to CSV
outputs_prefix = os.path.abspath("outputs_iris/avg_lengths")
os.makedirs("outputs_iris", exist_ok=True)

# Ensure output directory exists
os.makedirs('outputs_iris', exist_ok=True)

# Helper functions
def parse_iris(line):
    """Parse CSV line and return (species, (sepal_length, petal_length))"""
    try:
        fields = next(csv.reader([line]))
        species = fields[4].strip()
        sepal_length = float(fields[0].strip())
        petal_length = float(fields[2].strip())
        return species, (sepal_length, petal_length)
    except:
        return None

def compute_average(values):
    n = len(values)
    if n == 0:
        return (0.0, 0.0)
    sum_sepal = sum(v[0] for v in values)
    sum_petal = sum(v[1] for v in values)
    return (sum_sepal / n, sum_petal / n)

# Running locally in the DirectRunner.
with beam.Pipeline() as pipeline:

    iris_data = (
        pipeline
        | 'Read CSV' >> beam.io.ReadFromText(inputs_pattern, skip_header_lines=1)
        | 'Parse CSV' >> beam.Map(parse_iris)
        | 'Filter Valid Rows' >> beam.Filter(lambda x: x is not None)
    )

    avg_lengths = (
        iris_data
        | 'Group by Species' >> beam.GroupByKey()
        | 'Compute Average Lengths' >> beam.Map(lambda kv: (kv[0], compute_average(list(kv[1]))))
    )

    (
        avg_lengths
        | 'Format Results' >> beam.Map(lambda kv: f"Species: {kv[0]}, AvgSepalLength: {kv[1][0]:.2f}, AvgPetalLength: {kv[1][1]:.2f}")
        | 'Write Results' >> beam.io.WriteToText(outputs_prefix)
    )

output_file = r"f:\MS in CS\MlOps\repo\MLOps\Labs\Data_Labs\Apache_Beam_Labs\outputs_iris\avg_lengths-00000-of-00001"

# PowerShell command to read first 20 lines
run(f'powershell -Command "Get-Content -LiteralPath \'{output_file}\' | Select-Object -First 20"')




>> powershell -Command "Get-Content -LiteralPath 'f:\MS in CS\MlOps\repo\MLOps\Labs\Data_Labs\Apache_Beam_Labs\outputs_iris\avg_lengths-00000-of-00001' | Select-Object -First 20"
Species: setosa, AvgSepalLength: 5.01, AvgPetalLength: 1.46
Species: versicolor, AvgSepalLength: 5.94, AvgPetalLength: 4.26
Species: virginica, AvgSepalLength: 6.59, AvgPetalLength: 5.55

