# Apache Beam in Colab — End‑to‑End Demo
*Composite Transform · Pipeline IO · ParDo · Windowing · Map · Filter · Partition · Beam ML (RunInference)*

> Runs on **Google Colab** with the **DirectRunner**.

## 1) Install & Imports

In [None]:
!pip -q install apache-beam==2.56.0 scikit-learn==1.5.2

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows
from apache_beam.testing.test_stream import TestStream

import re, os, numpy as np, pandas as pd
print("Beam version:", beam.__version__)

## 2) Hello, Beam — Minimal pipeline

In [None]:
def run_hello():
    with beam.Pipeline(options=PipelineOptions()) as p:
        (p | beam.Create(["hello","beam","from","colab"])
           | beam.Map(lambda s: s.upper())
           | beam.Map(print))

run_hello()

## 3) Pipeline IO — ReadFromText & WriteToText

In [None]:
input_path = "/content/beam_io_input.txt"
with open(input_path, "w", encoding="utf-8") as f:
    f.write("\n".join([
        "Apache Beam makes data processing portable and unified",
        "Map and Filter are element-wise transforms",
        "ParDo runs user code (DoFn) on each element",
        "Windowing groups elements by event time windows"
    ]))

output_prefix = "/content/beam_io_output"

def run_io():
    with beam.Pipeline(options=PipelineOptions()) as p:
        lines = p | beam.io.ReadFromText(input_path)
        words = (lines
                 | "Lower" >> beam.Map(lambda s: s.lower())
                 | "Tokens" >> beam.FlatMap(lambda s: re.findall(r"[a-z]+", s)))
        counts = (words | beam.Map(lambda w: (w,1)) | beam.CombinePerKey(sum))
        counts | beam.io.WriteToText(output_prefix)

run_io()

## 4) Map · Filter · ParDo (DoFn)

In [None]:
class CleanAndLength(beam.DoFn):
    def process(self, element: str):
        for t in re.findall(r"[a-z]+", element.lower()):
            if len(t) >= 4:
                yield (t, len(t))

def run_elementwise():
    data = ["Beam combines batch and streaming.",
            "ParDo lets you run your own functions.",
            "Filter discards, Map transforms."]
    with beam.Pipeline(options=PipelineOptions()) as p:
        (p | beam.Create(data)
           | "ParDoCleanLen" >> beam.ParDo(CleanAndLength())
           | "FilterLen>=5" >> beam.Filter(lambda kv: kv[1] >= 5)
           | "Fmt" >> beam.Map(lambda kv: f"{kv[0]}:{kv[1]}")
           | beam.Map(print))

run_elementwise()

## 5) Composite Transform (PTransform)

In [None]:
class CleanTokenizeCount(beam.PTransform):
    def expand(self, pcoll):
        return (pcoll
                | beam.Map(lambda s: s.lower())
                | beam.FlatMap(lambda s: re.findall(r"[a-z]+", s))
                | beam.Map(lambda w: (w,1))
                | beam.CombinePerKey(sum))

def run_composite():
    data = ["Composite transforms encapsulate reusable logic.",
            "Encapsulation makes pipelines cleaner."]
    with beam.Pipeline(options=PipelineOptions()) as p:
        (p | beam.Create(data) | CleanTokenizeCount() | beam.Map(print))

run_composite()

## 6) Partition — split a PCollection into multiple PCollections

In [None]:
def part_fn(x, n):
    return 0 if x < 0 else (1 if x == 0 else 2)

def run_partition():
    nums = list(range(-5,6))
    with beam.Pipeline(options=PipelineOptions()) as p:
        neg, zero, pos = (p | beam.Create(nums)) | beam.Partition(part_fn, 3)
        neg  | "PrintNeg"  >> beam.Map(lambda x: ("neg", x))  | beam.Map(print)
        zero | "PrintZero" >> beam.Map(lambda x: ("zero", x)) | beam.Map(print)
        pos  | "PrintPos"  >> beam.Map(lambda x: ("pos", x))  | beam.Map(print)

run_partition()

## 7) Windowing — Fixed windows with TestStream (event time)

In [None]:
def run_windowing():
    start = beam.timestamp.Timestamp(0)
    ts = (TestStream()
          .add_elements([beam.window.TimestampedValue(1, start + 0)])
          .advance_watermark_to(start + 5)
          .add_elements([beam.window.TimestampedValue(2, start + 6),
                         beam.window.TimestampedValue(3, start + 7)])
          .advance_watermark_to_infinity())

    with beam.Pipeline(options=PipelineOptions()) as p:
        (p | ts
           | "Win5s" >> beam.WindowInto(FixedWindows(5))
           | "Sum" >> beam.CombineGlobally(sum).without_defaults()
           | beam.Map(print))

run_windowing()

## 8) Beam‑ML — RunInference with scikit‑learn (Iris)

In [None]:
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline as SkPipeline
import joblib

iris = load_iris(as_frame=True)
X_train, X_test, y_train, y_test = train_test_split(
    iris.data, iris.target, test_size=0.25, random_state=42, stratify=iris.target
)
sk_model = SkPipeline([('scaler', StandardScaler()), ('lr', LogisticRegression(max_iter=500))])
sk_model.fit(X_train, y_train)
print("Local sklearn accuracy:", sk_model.score(X_test, y_test))

model_path = "/content/iris_lr.joblib"
joblib.dump(sk_model, model_path)
print("Saved:", model_path)

In [None]:
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy

def run_inference_with_beam():
    handler = SklearnModelHandlerNumpy(model_uri="/content/iris_lr.joblib")
    data = iris.data.to_numpy().tolist()[:10]
    with beam.Pipeline(options=PipelineOptions()) as p:
        (p | beam.Create(data)
           | beam.Map(lambda row: np.array(row, dtype=float))
           | "Infer" >> RunInference(handler)
           | beam.Map(lambda pred: int(np.argmax(pred.inference)))
           | beam.Map(print))

run_inference_with_beam()

## 9) What to record for your video
- Show **Hello, Beam** running
- Show **IO** read & write shards
- **Map/Filter/ParDo** outputs
- **Composite Transform** in action
- **Partition** results for (neg, zero, pos)
- **Windowing** with 5s windows
- **RunInference** predictions from the sklearn model