<a href="https://colab.research.google.com/github/Prachii26/ApacheBeam/blob/main/Apache_Beam.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Assignment
###Submitted By: Prachi Gupta
###SJSU ID: 019106594

In [14]:
# === Cell 1: Environment setup for Apache Beam demo (Colab) ===================
# Install required packages
!pip -q install --upgrade apache-beam scikit-learn

# --- Imports & basic config
import os, sys, platform, random
import numpy as np
from datetime import datetime

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# Reproducibility
random.seed(42)
np.random.seed(42)

# Project directories (local to Colab)
BASE_DIR = "/content/beam_demo"
DATA_DIR = f"{BASE_DIR}/data"
OUT_DIR  = f"{BASE_DIR}/out"
for d in (BASE_DIR, DATA_DIR, OUT_DIR):
    os.makedirs(d, exist_ok=True)

# Try to enable Interactive Beam (nice for inspecting PCollections in notebooks)
try:
    from apache_beam.runners.interactive import interactive_beam as ib
    ib.watch(locals())
    INTERACTIVE_READY = True
except Exception:
    INTERACTIVE_READY = False  # It's ok if this isn't available

# Default to DirectRunner (local)
beam_options = PipelineOptions(["--runner=DirectRunner"])

# Helper: clean output directory between runs (we'll use this later)
def reset_out_dir():
    import shutil
    if os.path.exists(OUT_DIR):
        shutil.rmtree(OUT_DIR)
    os.makedirs(OUT_DIR, exist_ok=True)

# Summary printout
print("✅ Setup complete")
print(f"Python:        {platform.python_version()}")
print(f"Apache Beam:   {beam.__version__}")
print(f"Interactive:   {INTERACTIVE_READY}")
print(f"BASE_DIR:      {BASE_DIR}")
print(f"DATA_DIR:      {DATA_DIR}")
print(f"OUT_DIR:       {OUT_DIR}")


✅ Setup complete
Python:        3.12.12
Apache Beam:   2.68.0
Interactive:   True
BASE_DIR:      /content/beam_demo
DATA_DIR:      /content/beam_demo/data
OUT_DIR:       /content/beam_demo/out


In [15]:
# === Cell 2: Create a small synthetic dataset (transactions.csv) ==============
# Columns: event_time (ISO8601), user_id, amount, category
# We'll treat event_time as the event timestamp later for windowing.

import csv
from datetime import datetime, timedelta
import numpy as np
import os, random

# Make results reproducible
random.seed(42)
np.random.seed(42)

N = 200  # number of rows
categories = ["grocery", "electronics", "fashion", "travel", "utilities", "other"]

start_time = datetime.utcnow() - timedelta(minutes=10)
times = []
t = start_time
for i in range(N):
    # irregular gaps between 1-20 seconds to make windows interesting
    t += timedelta(seconds=random.randint(1, 20))
    times.append(t)

user_ids = np.random.randint(1, 31, size=N)  # 30 users
# positive skew for amounts; clamp to [1, 500]
amounts = np.clip(np.random.lognormal(mean=2.5, sigma=0.8, size=N), 1, 500)
cats = np.random.choice(categories, size=N, p=[0.25,0.15,0.2,0.15,0.15,0.1])

csv_path = os.path.join(DATA_DIR, "transactions.csv")
with open(csv_path, "w", newline="") as f:
    w = csv.writer(f)
    w.writerow(["event_time","user_id","amount","category"])
    for ts, uid, amt, cat in zip(times, user_ids, amounts, cats):
        w.writerow([ts.isoformat() + "Z", int(uid), round(float(amt), 2), cat])

# Quick preview: show first 5 lines
print(f"✅ Wrote {N} rows to: {csv_path}")
with open(csv_path, "r") as f:
    head = [next(f).strip() for _ in range(6)]  # header + 5 rows
print("\n".join(head))


✅ Wrote 200 rows to: /content/beam_demo/data/transactions.csv
event_time,user_id,amount,category
2025-10-22T20:18:39.113498Z,7,24.49,grocery
2025-10-22T20:18:40.113498Z,20,75.93,utilities
2025-10-22T20:18:49.113498Z,29,44.63,utilities
2025-10-22T20:18:57.113498Z,15,23.55,travel
2025-10-22T20:19:05.113498Z,11,15.4,utilities


  start_time = datetime.utcnow() - timedelta(minutes=10)


In [16]:
# === Cell 3: Read CSV -> Map/Filter -> Write JSONL ============================
# Demonstrates: Pipeline I/O (ReadFromText, WriteToText), Map, Filter

import os, json, csv, glob
import apache_beam as beam

reset_out_dir()  # from Cell 1

INPUT_CSV = os.path.join(DATA_DIR, "transactions.csv")
OUT_SUBDIR = os.path.join(OUT_DIR, "01_parsed")
os.makedirs(OUT_SUBDIR, exist_ok=True)
OUT_PREFIX = os.path.join(OUT_SUBDIR, "transactions")

def parse_line(line: str):
    # Robust CSV parsing for a single line
    row = next(csv.reader([line]))
    event_time, user_id, amount, category = row
    return {
        "event_time": event_time,
        "user_id": int(user_id),
        "amount": float(amount),
        "category": category
    }

def is_valid(rec: dict) -> bool:
    return rec["user_id"] > 0 and rec["amount"] > 0.0 and rec["category"] != ""

with beam.Pipeline(options=beam_options) as p:
    _ = (
        p
        # skip_header_lines avoids manual header filtering
        | "ReadCSV" >> beam.io.ReadFromText(INPUT_CSV, skip_header_lines=1)
        | "ParseCSV" >> beam.Map(parse_line)          # Map
        | "FilterInvalid" >> beam.Filter(is_valid)    # Filter
        | "ToJSON" >> beam.Map(json.dumps)
        | "WriteJSONL" >> beam.io.WriteToText(
            OUT_PREFIX, file_name_suffix=".jsonl", num_shards=1
        )
    )

# Preview a few output lines
out_files = sorted(glob.glob(os.path.join(OUT_SUBDIR, "transactions-*.jsonl")))
print(f"✅ Wrote parsed data to: {out_files[0] if out_files else '(none)'}")
if out_files:
    with open(out_files[0], "r") as f:
        for i, line in enumerate(f):
            if i >= 5: break
            print(line.strip())


✅ Wrote parsed data to: /content/beam_demo/out/01_parsed/transactions-00000-of-00001.jsonl
{"event_time": "2025-10-22T20:18:39.113498Z", "user_id": 7, "amount": 24.49, "category": "grocery"}
{"event_time": "2025-10-22T20:18:40.113498Z", "user_id": 20, "amount": 75.93, "category": "utilities"}
{"event_time": "2025-10-22T20:18:49.113498Z", "user_id": 29, "amount": 44.63, "category": "utilities"}
{"event_time": "2025-10-22T20:18:57.113498Z", "user_id": 15, "amount": 23.55, "category": "travel"}
{"event_time": "2025-10-22T20:19:05.113498Z", "user_id": 11, "amount": 15.4, "category": "utilities"}


In [17]:
# === Cell 4: ParDo (DoFn) + Composite transform ===============================
# Demonstrates: ParDo via a custom DoFn, and a Composite PTransform
# Input: transactions.csv
# Output: enriched JSONL in OUT_DIR/02_enriched

import os, json, csv, glob
from typing import Dict, Iterable
import apache_beam as beam

INPUT_CSV = os.path.join(DATA_DIR, "transactions.csv")
OUT_SUBDIR = os.path.join(OUT_DIR, "02_enriched")
os.makedirs(OUT_SUBDIR, exist_ok=True)
OUT_PREFIX = os.path.join(OUT_SUBDIR, "transactions_enriched")

# --- Re-define parse/validate for self-contained execution in this cell -------
def parse_line(line: str) -> Dict:
    row = next(csv.reader([line]))
    event_time, user_id, amount, category = row
    return {
        "event_time": event_time,
        "user_id": int(user_id),
        "amount": float(amount),
        "category": category,
    }

def is_valid(rec: Dict) -> bool:
    return rec["user_id"] > 0 and rec["amount"] > 0.0 and rec["category"] != ""

# --- ParDo: enrich each record with derived features --------------------------
class EnrichDoFn(beam.DoFn):
    def process(self, rec: Dict) -> Iterable[Dict]:
        rec = dict(rec)  # shallow copy to avoid mutating input
        amt = rec.get("amount", 0.0)

        # Simple tiering logic based on amount
        if amt >= 200:
            tier = "vip"
        elif amt >= 100:
            tier = "high"
        elif amt >= 50:
            tier = "med"
        else:
            tier = "low"
        rec["amount_tier"] = tier

        # Normalize and add a user hash bucket (0..9) for later partitioning
        rec["category"] = str(rec.get("category", "")).strip().lower()
        rec["user_bucket"] = int(rec["user_id"]) % 10

        # Example boolean flag
        rec["is_high_value"] = amt >= 100.0

        yield rec

# --- Composite transform: parse -> validate -> enrich -------------------------
class ParseValidateEnrich(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | "ParseCSV" >> beam.Map(parse_line)
            | "FilterInvalid" >> beam.Filter(is_valid)
            | "Enrich" >> beam.ParDo(EnrichDoFn())
        )

# --- Run pipeline -------------------------------------------------------------
with beam.Pipeline(options=beam_options) as p:
    lines = p | "ReadCSV" >> beam.io.ReadFromText(INPUT_CSV, skip_header_lines=1)
    enriched = lines | "PVE" >> ParseValidateEnrich()
    _ = (
        enriched
        | "ToJSON" >> beam.Map(json.dumps)
        | "WriteJSONL" >> beam.io.WriteToText(
            OUT_PREFIX, file_name_suffix=".jsonl", num_shards=1
        )
    )

# Preview output
out_files = sorted(glob.glob(os.path.join(OUT_SUBDIR, "transactions_enriched-*.jsonl")))
print(f"✅ Enriched output: {out_files[0] if out_files else '(none)'}")
if out_files:
    with open(out_files[0], "r") as f:
        for i, line in enumerate(f):
            if i >= 5: break
            print(line.strip())


✅ Enriched output: /content/beam_demo/out/02_enriched/transactions_enriched-00000-of-00001.jsonl
{"event_time": "2025-10-22T20:18:39.113498Z", "user_id": 7, "amount": 24.49, "category": "grocery", "amount_tier": "low", "user_bucket": 7, "is_high_value": false}
{"event_time": "2025-10-22T20:18:40.113498Z", "user_id": 20, "amount": 75.93, "category": "utilities", "amount_tier": "med", "user_bucket": 0, "is_high_value": false}
{"event_time": "2025-10-22T20:18:49.113498Z", "user_id": 29, "amount": 44.63, "category": "utilities", "amount_tier": "low", "user_bucket": 9, "is_high_value": false}
{"event_time": "2025-10-22T20:18:57.113498Z", "user_id": 15, "amount": 23.55, "category": "travel", "amount_tier": "low", "user_bucket": 5, "is_high_value": false}
{"event_time": "2025-10-22T20:19:05.113498Z", "user_id": 11, "amount": 15.4, "category": "utilities", "amount_tier": "low", "user_bucket": 1, "is_high_value": false}


In [18]:
# === Cell 5: Partition (high_value vs regular) ================================
# Demonstrates: beam.Partition
# Input: OUT_DIR/02_enriched/transactions_enriched-*.jsonl (from Cell 4)
# Output: OUT_DIR/03_partition/{high_value|regular}-*.jsonl

import os, json, glob
import apache_beam as beam

# Locate enriched input from previous cell
ENRICHED_DIR = os.path.join(OUT_DIR, "02_enriched")
enriched_files = sorted(glob.glob(os.path.join(ENRICHED_DIR, "transactions_enriched-*.jsonl")))
assert enriched_files, "No enriched files found. Please run Cell 4 first."
INPUT_ENRICHED = enriched_files[0]

# Output paths
OUT_SUBDIR = os.path.join(OUT_DIR, "03_partition")
os.makedirs(OUT_SUBDIR, exist_ok=True)
OUT_PREFIX_HIGH = os.path.join(OUT_SUBDIR, "high_value")
OUT_PREFIX_REG  = os.path.join(OUT_SUBDIR, "regular")

def to_obj(line: str):
    return json.loads(line)

def partition_by_value(rec, n_partitions):
    # 0 -> high_value, 1 -> regular
    return 0 if rec.get("is_high_value", False) else 1

with beam.Pipeline(options=beam_options) as p:
    objs = (
        p
        | "ReadEnriched" >> beam.io.ReadFromText(INPUT_ENRICHED)
        | "ParseJSON"    >> beam.Map(to_obj)
    )

    parts = objs | "PartitionHighRegular" >> beam.Partition(partition_by_value, 2)

    high = parts[0]
    reg  = parts[1]

    _ = (
        high
        | "HighToJSON" >> beam.Map(json.dumps)
        | "WriteHigh"  >> beam.io.WriteToText(OUT_PREFIX_HIGH, file_name_suffix=".jsonl", num_shards=1)
    )
    _ = (
        reg
        | "RegToJSON" >> beam.Map(json.dumps)
        | "WriteReg"  >> beam.io.WriteToText(OUT_PREFIX_REG, file_name_suffix=".jsonl", num_shards=1)
    )

# Preview & simple counts
high_file = sorted(glob.glob(os.path.join(OUT_SUBDIR, "high_value-*.jsonl")))[0]
reg_file  = sorted(glob.glob(os.path.join(OUT_SUBDIR, "regular-*.jsonl")))[0]

def count_lines(path):
    c = 0
    with open(path, "r") as f:
        for _ in f:
            c += 1
    return c

print("✅ Partition complete")
print(f"High-value file: {high_file} (records: {count_lines(high_file)})")
print(f"Regular  file:   {reg_file}  (records: {count_lines(reg_file)})")

# Show a couple of examples from each
print("\n-- High-value sample --")
with open(high_file, "r") as f:
    for i, line in enumerate(f):
        if i >= 3: break
        print(line.strip())

print("\n-- Regular sample --")
with open(reg_file, "r") as f:
    for i, line in enumerate(f):
        if i >= 3: break
        print(line.strip())


✅ Partition complete
High-value file: /content/beam_demo/out/03_partition/high_value-00000-of-00001.jsonl (records: 3)
Regular  file:   /content/beam_demo/out/03_partition/regular-00000-of-00001.jsonl  (records: 197)

-- High-value sample --
{"event_time": "2025-10-22T20:21:50.113498Z", "user_id": 21, "amount": 128.37, "category": "other", "amount_tier": "high", "user_bucket": 1, "is_high_value": true}
{"event_time": "2025-10-22T20:34:26.113498Z", "user_id": 26, "amount": 141.02, "category": "travel", "amount_tier": "high", "user_bucket": 6, "is_high_value": true}
{"event_time": "2025-10-22T20:37:03.113498Z", "user_id": 25, "amount": 112.97, "category": "fashion", "amount_tier": "high", "user_bucket": 5, "is_high_value": true}

-- Regular sample --
{"event_time": "2025-10-22T20:18:39.113498Z", "user_id": 7, "amount": 24.49, "category": "grocery", "amount_tier": "low", "user_bucket": 7, "is_high_value": false}
{"event_time": "2025-10-22T20:18:40.113498Z", "user_id": 20, "amount": 75.93,

In [19]:
# === Cell 6: Windowing with event time ================================
# Uses TimestampedValue instead of beam.WithTimestamps

import os, json, glob
from datetime import datetime
import apache_beam as beam
from apache_beam.transforms import window as beam_window

# Locate enriched input
ENRICHED_DIR = os.path.join(OUT_DIR, "02_enriched")
enriched_files = sorted(glob.glob(os.path.join(ENRICHED_DIR, "transactions_enriched-*.jsonl")))
assert enriched_files, "No enriched files found. Please run Cell 4 first."
INPUT_ENRICHED = enriched_files[0]

# Output path
OUT_SUBDIR = os.path.join(OUT_DIR, "04_windowed")
os.makedirs(OUT_SUBDIR, exist_ok=True)
OUT_PREFIX = os.path.join(OUT_SUBDIR, "windowed")

def to_obj(line: str):
    return json.loads(line)

def parse_event_ts(rec):
    # Convert ISO8601 '...Z' to timezone-aware and return epoch seconds (float)
    s = rec["event_time"]
    if s.endswith("Z"):
        s = s[:-1] + "+00:00"
    dt = datetime.fromisoformat(s)
    return dt.timestamp()

def add_timestamp(rec):
    # Attach event-time timestamp to each record
    return beam.window.TimestampedValue(rec, parse_event_ts(rec))

class FormatWindowedDoFn(beam.DoFn):
    def process(self, kv, window=beam.DoFn.WindowParam):
        category, total = kv
        ws = window.start.to_utc_datetime().isoformat()
        we = window.end.to_utc_datetime().isoformat()
        yield json.dumps({
            "window_start": ws,
            "window_end": we,
            "category": category,
            "total_amount": round(float(total), 2),
        })

with beam.Pipeline(options=beam_options) as p:
    _ = (
        p
        | "ReadEnriched" >> beam.io.ReadFromText(INPUT_ENRICHED)
        | "ParseJSON"    >> beam.Map(to_obj)
        | "AddTimestamps" >> beam.Map(add_timestamp)                           # event time
        | "FixedWindows60s" >> beam.WindowInto(beam_window.FixedWindows(60))   # windowing
        | "ToKV" >> beam.Map(lambda rec: (rec["category"], rec["amount"]))
        | "SumPerCategoryPerWindow" >> beam.CombinePerKey(sum)
        | "FormatWindowed" >> beam.ParDo(FormatWindowedDoFn())
        | "WriteWindowed" >> beam.io.WriteToText(OUT_PREFIX, file_name_suffix=".jsonl", num_shards=1)
    )

# Preview a few windowed results
out_files = sorted(glob.glob(os.path.join(OUT_SUBDIR, "windowed-*.jsonl")))
print(f"✅ Windowing complete. Output: {out_files[0] if out_files else '(none)'}")
if out_files:
    with open(out_files[0], "r") as f:
        for i, line in enumerate(f):
            if i >= 10: break
            print(line.strip())


✅ Windowing complete. Output: /content/beam_demo/out/04_windowed/windowed-00000-of-00001.jsonl
{"window_start": "2025-10-22T20:18:00", "window_end": "2025-10-22T20:19:00", "category": "grocery", "total_amount": 24.49}
{"window_start": "2025-10-22T20:19:00", "window_end": "2025-10-22T20:20:00", "category": "grocery", "total_amount": 11.2}
{"window_start": "2025-10-22T20:20:00", "window_end": "2025-10-22T20:21:00", "category": "grocery", "total_amount": 26.31}
{"window_start": "2025-10-22T20:22:00", "window_end": "2025-10-22T20:23:00", "category": "grocery", "total_amount": 5.07}
{"window_start": "2025-10-22T20:23:00", "window_end": "2025-10-22T20:24:00", "category": "grocery", "total_amount": 6.19}
{"window_start": "2025-10-22T20:24:00", "window_end": "2025-10-22T20:25:00", "category": "grocery", "total_amount": 27.36}
{"window_start": "2025-10-22T20:25:00", "window_end": "2025-10-22T20:26:00", "category": "grocery", "total_amount": 11.37}
{"window_start": "2025-10-22T20:27:00", "window

In [20]:
# === Cell 7: Notebook index + validation =====================================
# Summarizes what we've built, verifies outputs exist, and shows quick samples.

import os, glob, json, textwrap

def count_lines(path):
    c = 0
    with open(path, "r") as f:
        for _ in f: c += 1
    return c

def show_head(path, n=3):
    rows = []
    with open(path, "r") as f:
        for i, line in enumerate(f):
            if i >= n: break
            rows.append(line.strip())
    return rows

# Locate artifacts from previous cells
artifacts = {
    "parsed": sorted(glob.glob(os.path.join(OUT_DIR, "01_parsed", "transactions-*.jsonl"))),
    "enriched": sorted(glob.glob(os.path.join(OUT_DIR, "02_enriched", "transactions_enriched-*.jsonl"))),
    "high_value": sorted(glob.glob(os.path.join(OUT_DIR, "03_partition", "high_value-*.jsonl"))),
    "regular": sorted(glob.glob(os.path.join(OUT_DIR, "03_partition", "regular-*.jsonl"))),
    "windowed": sorted(glob.glob(os.path.join(OUT_DIR, "04_windowed", "windowed-*.jsonl"))),
}

print("✅ Artifact check")
for key, files in artifacts.items():
    if files:
        p = files[0]
        print(f" - {key:10s}: {p}  (records: {count_lines(p)})")
    else:
        print(f" - {key:10s}: MISSING (run the earlier cell for this stage)")

# Show a few sample lines from each available artifact
print("\n🔎 Samples (first 3 lines each):")
for key in ["parsed", "enriched", "high_value", "regular", "windowed"]:
    files = artifacts[key]
    if not files:
        continue
    p = files[0]
    print(f"\n--- {key.upper()} ---")
    for line in show_head(p, n=3):
        print(line)

✅ Artifact check
 - parsed    : /content/beam_demo/out/01_parsed/transactions-00000-of-00001.jsonl  (records: 200)
 - enriched  : /content/beam_demo/out/02_enriched/transactions_enriched-00000-of-00001.jsonl  (records: 200)
 - high_value: /content/beam_demo/out/03_partition/high_value-00000-of-00001.jsonl  (records: 3)
 - regular   : /content/beam_demo/out/03_partition/regular-00000-of-00001.jsonl  (records: 197)
 - windowed  : /content/beam_demo/out/04_windowed/windowed-00000-of-00001.jsonl  (records: 130)

🔎 Samples (first 3 lines each):

--- PARSED ---
{"event_time": "2025-10-22T20:18:39.113498Z", "user_id": 7, "amount": 24.49, "category": "grocery"}
{"event_time": "2025-10-22T20:18:40.113498Z", "user_id": 20, "amount": 75.93, "category": "utilities"}
{"event_time": "2025-10-22T20:18:49.113498Z", "user_id": 29, "amount": 44.63, "category": "utilities"}

--- ENRICHED ---
{"event_time": "2025-10-22T20:18:39.113498Z", "user_id": 7, "amount": 24.49, "category": "grocery", "amount_tier":