<a href="https://colab.research.google.com/github/gowripreetham/SJSU_Apache_Beam/blob/main/Apache_Beam_Data_Engineering_Exercise.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# --- Step 1: Setup Environment ---
!pip install --quiet apache-beam[gcp]

import apache_beam as beam
print("✅ Apache Beam installed and ready to use!")


[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m7.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.5/43.5 kB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m173.5/173.5 kB[0m [31m10.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m88.8/88.8 kB[0m [31m6.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m32.6 MB/s[0m eta [3

In [None]:
# --- Step 2: Pipeline I/O Example ---

sample_data = [
    "2984641,Emily,35,cardio,2021-09-01",
    "9454384,Riikka,86,ortho,2021-07-21",
    "9266396,Fanny,86,ortho,2021-06-03",
    "5247541,Urooj,35,cardio,2021-08-21",
    "6482736,Ali,50,neuro,2021-09-12",
]

with open("dept_data.txt", "w") as f:
    for line in sample_data:
        f.write(line + "\n")

with beam.Pipeline() as p:
    (
        p
        | "Read Input File" >> beam.io.ReadFromText("dept_data.txt")
        | "Write Output File" >> beam.io.WriteToText("output_data")
    )

print("✅ File created and processed into output_data.txt")




✅ File created and processed into output_data.txt


In [None]:
# --- Step 3: Map and Filter Example ---

def parse_line(line):
    patient_id, name, dept_id, dept_name, date = line.split(",")
    return {"patient_id": patient_id, "name": name, "dept": dept_name, "visit_date": date}

with beam.Pipeline() as p:
    (
        p
        | "Read" >> beam.io.ReadFromText("dept_data.txt")
        | "Parse CSV" >> beam.Map(parse_line)
        | "Filter cardio dept" >> beam.Filter(lambda x: x["dept"] == "cardio")
        | "Print results" >> beam.Map(print)
    )


{'patient_id': '2984641', 'name': 'Emily', 'dept': 'cardio', 'visit_date': '2021-09-01'}
{'patient_id': '5247541', 'name': 'Urooj', 'dept': 'cardio', 'visit_date': '2021-08-21'}


In [None]:
# --- Step 4: ParDo Example ---
class ExtractAndTagPatients(beam.DoFn):
    def process(self, element):
        patient_id, name, dept_id, dept_name, date = element.split(",")
        yield (name, dept_name)

with beam.Pipeline() as p:
    (
        p
        | "Read" >> beam.io.ReadFromText("dept_data.txt")
        | "Extract Name and Dept" >> beam.ParDo(ExtractAndTagPatients())
        | "Print Results" >> beam.Map(print)
    )


('Emily', 'cardio')
('Riikka', 'ortho')
('Fanny', 'ortho')
('Urooj', 'cardio')
('Ali', 'neuro')


In [None]:
# --- Step 5: Partition Example ---

def partition_fn(record, n_partitions):
    if "cardio" in record:
        return 0
    elif "ortho" in record:
        return 1
    else:
        return 2

with beam.Pipeline() as p:
    results = (
        p
        | "Read data" >> beam.io.ReadFromText("dept_data.txt")
        | "Partition data" >> beam.Partition(partition_fn, 3)
    )

cardio, ortho, others = results
cardio | "Write cardio" >> beam.io.WriteToText("cardio_output")
ortho  | "Write ortho"  >> beam.io.WriteToText("ortho_output")
others | "Write others" >> beam.io.WriteToText("other_output")


<PCollection[[5]: Write others/Write/WriteImpl/FinalizeWrite.None] at 0x7bbc68a64e60>

In [None]:
# --- Step 6: Composite Transform Example ---
class CountVisitsTransform(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | "Pair each patient with 1" >> beam.Map(lambda x: (x.split(",")[1], 1))
            | "Count visits per patient" >> beam.CombinePerKey(sum)
            | "Format output" >> beam.Map(lambda kv: f"{kv[0]} visited {kv[1]} time(s)")
        )

with beam.Pipeline() as p:
    (
        p
        | "Read File" >> beam.io.ReadFromText("dept_data.txt")
        | "Apply Custom Transform" >> CountVisitsTransform()
        | "Write Results" >> beam.io.WriteToText("composite_output")
    )


In [None]:
# --- Step 7: Windowing Example ---
import time
from apache_beam.transforms import window

data_with_timestamps = [
    beam.window.TimestampedValue("cardio", int(time.time())),
    beam.window.TimestampedValue("ortho", int(time.time()) + 5),
    beam.window.TimestampedValue("cardio", int(time.time()) + 10),
    beam.window.TimestampedValue("neuro", int(time.time()) + 15),
]

with beam.Pipeline() as p:
    (
        p
        | "Create Data" >> beam.Create(data_with_timestamps)
        | "Apply Fixed Window" >> beam.WindowInto(window.FixedWindows(10))
        | "Count per Window" >> beam.combiners.Count.PerElement()
        | "Print results" >> beam.Map(print)
    )


('cardio', 1)
('cardio', 1)
('ortho', 1)
('neuro', 1)


In [None]:
# --- Step 8: Beam ML RunInference Example (Colab Fixed Version) ---

import apache_beam as beam
import numpy as np
import urllib.request
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.sklearn_inference import ModelFileType, SklearnModelHandlerNumpy

# ✅ Download the model locally (instead of gs://)
model_url = "https://storage.googleapis.com/apache-beam-samples/run_inference/five_times_table_sklearn.pkl"
urllib.request.urlretrieve(model_url, "five_times_table_sklearn.pkl")

# ✅ Create the model handler from the local file
model_handler = SklearnModelHandlerNumpy(
    model_uri="five_times_table_sklearn.pkl",
    model_file_type=ModelFileType.PICKLE
)

# ✅ Input data
unkeyed_data = np.array([10, 20, 30], dtype=np.float32).reshape(-1, 1)

# ✅ Run the inference pipeline
with beam.Pipeline() as p:
    (
        p
        | "Create Inputs" >> beam.Create(unkeyed_data)
        | "Run Inference" >> RunInference(model_handler=model_handler)
        | "Show Predictions" >> beam.Map(print)
    )


PredictionResult(example=array([10.], dtype=float32), inference=array([50.], dtype=float32), model_id='five_times_table_sklearn.pkl')
PredictionResult(example=array([20.], dtype=float32), inference=array([100.], dtype=float32), model_id='five_times_table_sklearn.pkl')
PredictionResult(example=array([30.], dtype=float32), inference=array([150.], dtype=float32), model_id='five_times_table_sklearn.pkl')


https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


# ✅ Apache Beam Data Engineering Exercise

**Features Demonstrated:**
- Pipeline I/O (ReadFromText, WriteToText)
- Map, Filter
- ParDo
- Partition
- CompositeTransform
- Windowing
- Bonus: Beam ML RunInference

**Theme:** Hospital Visit Analyzer

**Execution Environment:** Google Colab (Python SDK, DirectRunner)

**Video Walkthrough Outline:**
1. Introduction & setup
2. I/O demonstration
3. Map/Filter explanation
4. ParDo custom DoFn
5. Partitioned outputs
6. Composite reusable transform
7. Windowing example
8. Bonus ML inference
