<a href="https://colab.research.google.com/github/ArshanBhanage/Apache-Beam/blob/main/apache_beam_assignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# Apache Beam — Colab Notebook
**Covers:** composite transform, pipeline I/O, `ParDo`, windowing, `Map`, `Filter`, `Partition`

This notebook demonstrates core Apache Beam features in Python on the DirectRunner. It includes:

- **Batch pipeline** reading from text, cleaning & tokenizing, then writing outputs
- **Composite transform** (`PTransform`) that chains multiple transforms
- **Elementwise transforms**: `Map`, `Filter`, `FlatMap`
- **`ParDo`** using a custom `DoFn`
- **`Partition`** to split a `PCollection` into multiple categories
- **Pipeline I/O** via `ReadFromText` and `WriteToText`
- **Streaming-style windowing** example using `GenerateSequence` with fixed windows + early triggering


In [None]:

!pip -q install "apache-beam[gcp]"


[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m6.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.5/43.5 kB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m173.5/173.5 kB[0m [31m11.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m88.8/88.8 kB[0m [31m5.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m36.4 MB/s[0m eta [3

In [None]:

import os, re, glob, json, time, shutil, random, string
from datetime import datetime
from pathlib import Path
import sys
sys.argv = sys.argv[:1]

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterWatermark, AccumulationMode, AfterProcessingTime, Repeatedly, AfterCount

DATA_DIR = Path('/content/data')
OUT_DIR  = Path('/content/output')
DATA_DIR.mkdir(parents=True, exist_ok=True)
OUT_DIR.mkdir(parents=True, exist_ok=True)

text_path = DATA_DIR / 'sentences.txt'
text_path.write_text(
    "Apache Beam makes data processing portable and scalable.\n"
    "This is a tiny local dataset for our Beam assignment.\n"
    "Windowing lets us reason about unbounded event streams.\n"
    "ParDo DoFn can output zero, one, or many elements.\n"
    "Map, Filter, and Partition are handy elementwise transforms.\n"
)

print(f"Created sample input file at: {text_path}")


Created sample input file at: /content/data/sentences.txt



## Composite Transform: `CleanAndTokenize`
This `PTransform` chains `Map`/`FlatMap`/`Filter` steps to lowercase, strip punctuation, and split lines into words.


In [None]:

class CleanAndTokenize(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | "Lowercase" >> beam.Map(lambda s: s.lower())
            | "OnlyLetters" >> beam.Map(lambda s: re.sub(r"[^a-z\s]", " ", s))
            | "SplitWords" >> beam.FlatMap(lambda s: s.split())
            | "FilterEmpty" >> beam.Filter(lambda w: bool(w.strip()))
        )



## `ParDo` with a custom `DoFn`
We tag each word with a simple category (short/medium/long) and emit `(category, word)` pairs.


In [None]:

class TagWordLength(beam.DoFn):
    def process(self, word: str):
        L = len(word)
        if L < 4:
            tag = "short"
        elif L < 8:
            tag = "medium"
        else:
            tag = "long"
        # Emit a key-value pair so we can group or count later.
        yield (tag, word)



## Batch Pipeline (Text I/O, Map, Filter, ParDo, Partition, WriteToText)
This pipeline:
1. **Reads** lines from our local text file (pipeline I/O).
2. Applies the **composite transform** to clean & tokenize.
3. Uses **Map** to pair each word with the number 1, and **Filter** to drop ultra-short tokens.
4. Runs a **ParDo** (`TagWordLength`) to categorize each word.
5. Uses **Partition** to split words into *short*, *medium*, and *long* groups.
6. **Writes** results to `/content/output/` as text files.


In [None]:
def by_length_partition(word, n_partitions):
    L = len(word)
    if L < 4:
        return 0  # short
    elif L < 8:
        return 1  # medium
    else:
        return 2  # long

batch_output_prefix = str(OUT_DIR / "batch_words")

# Clean old outputs
for f in glob.glob(batch_output_prefix + "*"):
    os.remove(f)

options = PipelineOptions(
    save_main_session=True,
)
with beam.Pipeline(options=options) as p:
    lines = p | "ReadText" >> beam.io.ReadFromText(str(text_path))

    words = (
        lines
        | "CleanAndTokenize" >> CleanAndTokenize()
        | "DropTiny" >> beam.Filter(lambda w: len(w) >= 2)   # Filter
    )

    # Map: (word, 1)
    word_ones = words | "PairWithOne" >> beam.Map(lambda w: (w, 1))

    # ParDo: tag by length -> (tag, word)
    tagged = words | "TagWithParDo" >> beam.ParDo(TagWordLength())

    # Partition into 3 buckets
    short, medium, long = words | "PartitionByLength" >> beam.Partition(by_length_partition, 3)

    # Write out each part
    _ = short  | "WriteShort"  >> beam.io.WriteToText(batch_output_prefix + "_short")
    _ = medium | "WriteMedium" >> beam.io.WriteToText(batch_output_prefix + "_medium")
    _ = long   | "WriteLong"   >> beam.io.WriteToText(batch_output_prefix + "_long")

    # Also produce a simple word count and write to text
    counts = (
        word_ones
        | "CombineCounts" >> beam.CombinePerKey(sum)
        | "FormatCounts"  >> beam.Map(lambda kv: f"{kv[0]}	{kv[1]}")
        | "WriteCounts"   >> beam.io.WriteToText(batch_output_prefix + "_counts")
    )

print("Batch pipeline finished. Check /content/output for results.")






Batch pipeline finished. Check /content/output for results.


In [None]:

print("=== Batch Outputs ===")
for path in sorted(glob.glob(str(OUT_DIR / "batch_words*"))):
    print("\n---", path, "---")
    try:
        with open(path) as f:
            for i, line in enumerate(f):
                print(line.rstrip())
                if i > 8:
                    print("...")
                    break
    except Exception as e:
        # Some outputs may be sharded without .txt extension depending on runner;
        # try listing file names only.
        print("Could not open file directly; listed for reference.")


=== Batch Outputs ===

--- /content/output/batch_words_counts-00000-of-00001 ---
apache	1
beam	2
makes	1
data	1
processing	1
portable	1
and	2
scalable	1
this	1
is	1
...

--- /content/output/batch_words_long-00000-of-00001 ---
processing
portable
scalable
assignment
windowing
unbounded
elements
partition
elementwise
transforms
...

--- /content/output/batch_words_medium-00000-of-00001 ---
apache
beam
makes
data
this
tiny
local
dataset
beam
lets
...

--- /content/output/batch_words_short-00000-of-00001 ---
and
is
for
our
us
can
one
or
map
and
...



## Streaming-Style Windowing (Fixed windows + Early triggers)
Here we simulate a small unbounded stream using `GenerateSequence` that emits 5 elements/sec up to 50 elements total.  
We apply **fixed windows** of 10 seconds, set **early triggers** so partial results appear early, and then **count** elements per window.

We also format the output with each element's window start time for clarity.


In [None]:
class FormatWithWindow(beam.DoFn):
    def process(self, kv, window=beam.DoFn.WindowParam):
        key, value = kv
        start = window.start.to_utc_datetime().isoformat()
        end   = window.end.to_utc_datetime().isoformat()
        yield json.dumps({"window_start": start, "window_end": end, key: value})

stream_output_prefix = str(OUT_DIR / "stream_counts")

# Clean old outputs
for f in glob.glob(stream_output_prefix + "*"):
    os.remove(f)

stream_opts = PipelineOptions(
    streaming=True,
    save_main_session=True,
)
with beam.Pipeline(options=stream_opts) as p:
    # Create 50 sequential integers to simulate a stream
    events = (
        p
        | "CreateSequence" >> beam.Create(range(50))
        | "StampEventTime" >> beam.Map(lambda i: beam.window.TimestampedValue(i, time.time() + i))

    )

    windowed_counts = (
        events
        | "ToOnes" >> beam.Map(lambda x: ("count", 1))
        | "Fixed10s" >> beam.WindowInto(
            FixedWindows(10),
            trigger=AfterWatermark(early=AfterCount(5)),
            accumulation_mode=AccumulationMode.DISCARDING
        )
        | "SumPerWindow" >> beam.CombinePerKey(sum)
        | "FormatWin" >> beam.ParDo(FormatWithWindow())
        | "WriteStreamOut" >> beam.io.WriteToText(stream_output_prefix)
    )

print("Streaming windowing pipeline finished. See /content/output for stream results.")

Streaming windowing pipeline finished. See /content/output for stream results.


In [None]:

print("=== Streaming Outputs ===")
for path in sorted(glob.glob(str(OUT_DIR / "stream_counts*"))):
    print("\n---", path, "---")
    try:
        with open(path) as f:
            for i, line in enumerate(f):
                print(line.rstrip())
                if i > 12:
                    print("...")
                    break
    except Exception as e:
        print("Could not open file directly; listed for reference.")


=== Streaming Outputs ===

--- /content/output/stream_counts-00000-of-00001 ---
{"window_start": "2025-10-19T23:28:50", "window_end": "2025-10-19T23:29:00", "count": 6}
{"window_start": "2025-10-19T23:29:00", "window_end": "2025-10-19T23:29:10", "count": 10}
{"window_start": "2025-10-19T23:29:10", "window_end": "2025-10-19T23:29:20", "count": 10}
{"window_start": "2025-10-19T23:29:20", "window_end": "2025-10-19T23:29:30", "count": 10}
{"window_start": "2025-10-19T23:29:30", "window_end": "2025-10-19T23:29:40", "count": 10}
{"window_start": "2025-10-19T23:28:50", "window_end": "2025-10-19T23:29:00", "count": 0}
{"window_start": "2025-10-19T23:29:00", "window_end": "2025-10-19T23:29:10", "count": 0}
{"window_start": "2025-10-19T23:29:10", "window_end": "2025-10-19T23:29:20", "count": 0}
{"window_start": "2025-10-19T23:29:20", "window_end": "2025-10-19T23:29:30", "count": 0}
{"window_start": "2025-10-19T23:29:30", "window_end": "2025-10-19T23:29:40", "count": 0}
{"window_start": "2025-10-


### (Optional) Bonus: Side Input example
We pass a set of stopwords as a side input to filter tokens.


In [None]:

side_output_prefix = str(OUT_DIR / "side_input_demo")
for f in glob.glob(side_output_prefix + "*"):
    os.remove(f)

stopwords = {"a", "an", "the", "and", "or", "is", "of", "to", "us"}

with beam.Pipeline(options=PipelineOptions(save_main_session=True)) as p:
    lines = p | beam.io.ReadFromText(str(text_path))
    words = lines | CleanAndTokenize()

    filtered = (
        words
        | beam.Filter(lambda w, sw: w not in sw, sw=beam.pvalue.AsSingleton(p | beam.Create([stopwords])))
        | beam.io.WriteToText(side_output_prefix)
    )

print("Side input demo complete. Check /content/output for 'side_input_demo*' files.")


Side input demo complete. Check /content/output for 'side_input_demo*' files.
