In [3]:
# Reset any partial venv folders
!rm -rf /content/beam_venv


In [4]:
# Create a new venv with full path to current Python
!python3 -m venv /content/beam_venv


Error: Command '['/content/beam_venv/bin/python3', '-m', 'ensurepip', '--upgrade', '--default-pip']' returned non-zero exit status 1.


In [5]:
# 1) make a fresh venv
!rm -rf /content/beam_venv
!python3 -m venv /content/beam_venv


Error: Command '['/content/beam_venv/bin/python3', '-m', 'ensurepip', '--upgrade', '--default-pip']' returned non-zero exit status 1.


In [6]:
# install virtualenv in the base colab env
!pip install -q virtualenv


[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.0/6.0 MB[0m [31m37.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m469.0/469.0 kB[0m [31m18.4 MB/s[0m eta [36m0:00:00[0m
[?25h

In [7]:
# remove old broken env, if any
!rm -rf /content/beam_venv

# create a new one using virtualenv (this bypasses the broken ensurepip)
!python3 -m virtualenv /content/beam_venv


created virtual environment CPython3.12.12.final.0-64 in 517ms
  creator CPython3Posix(dest=/content/beam_venv, clear=False, no_vcs_ignore=False, global=False)
  seeder FromAppData(download=False, pip=bundle, via=copy, app_data_dir=/root/.local/share/virtualenv)
    added seed packages: pip==25.3
  activators BashActivator,CShellActivator,FishActivator,NushellActivator,PowerShellActivator,PythonActivator


In [8]:
!ls /content/beam_venv/bin


activate       activate.nu	 pip	   pip3.12  python3.12
activate.csh   activate.ps1	 pip3	   python
activate.fish  activate_this.py  pip-3.12  python3


In [9]:
!/content/beam_venv/bin/pip install --quiet --upgrade pip setuptools wheel


In [10]:
!/content/beam_venv/bin/pip install --quiet "apache-beam[gcp]==2.58.0"


In [11]:
%%bash
/content/beam_venv/bin/python - << 'PYCODE'
import apache_beam as beam

with beam.Pipeline() as p:
    (
        p
        | "Create" >> beam.Create(["hello from virtualenv beam"])
        | "Print" >> beam.Map(print)
    )
PYCODE


hello from virtualenv beam


In [12]:
def run_beam(code: str):
    with open("/content/run_beam.py", "w") as f:
        f.write(code)
    !/content/beam_venv/bin/python /content/run_beam.py


In [13]:
run_beam("""
import apache_beam as beam
with beam.Pipeline() as p:
    (
        p
        | "Create" >> beam.Create([1,2,3])
        | "Double" >> beam.Map(lambda x: x*2)
        | "Print" >> beam.Map(print)
    )
""")


2
4
6


In [14]:
run_beam("""
import apache_beam as beam

records = [
    ("store-1", "apple", 3, 1.2),
    ("store-2", "banana", 4, 0.8),
]

def to_amount(r):
    store, product, qty, price = r
    return {
        "store": store,
        "product": product,
        "amount": qty * price,
    }

with beam.Pipeline() as p:
    (
        p
        | "Create records" >> beam.Create(records)
        | "Map to amount" >> beam.Map(to_amount)
        | "Print" >> beam.Map(print)
    )
""")


{'store': 'store-1', 'product': 'apple', 'amount': 3.5999999999999996}
{'store': 'store-2', 'product': 'banana', 'amount': 3.2}


In [15]:
run_beam("""
import apache_beam as beam

sales = [
    ("store-1", "apple", 3, 1.2),
    ("store-2", "milk", 1, 2.5),
    ("store-1", "banana", 5, 0.8),
]

def is_store_1(x):
    return x[0] == "store-1"

with beam.Pipeline() as p:
    (
        p
        | "Create" >> beam.Create(sales)
        | "Filter store-1" >> beam.Filter(is_store_1)
        | "Print" >> beam.Map(print)
    )
""")


('store-1', 'apple', 3, 1.2)
('store-1', 'banana', 5, 0.8)


In [16]:
run_beam("""
import apache_beam as beam

class ValidateAndCompute(beam.DoFn):
    def process(self, element):
        store, product, qty, price = element
        # drop bad rows
        if qty <= 0 or price <= 0:
            return
        yield {
            "store": store,
            "product": product,
            "amount": qty * price,
        }

sales = [
    ("store-1", "apple", 3, 1.2),
    ("store-2", "banana", -1, 0.8),  # bad
    ("store-1", "milk", 1, 2.5),
]

with beam.Pipeline() as p:
    (
        p
        | "Create" >> beam.Create(sales)
        | "Validate + compute" >> beam.ParDo(ValidateAndCompute())
        | "Print" >> beam.Map(print)
    )
""")


{'store': 'store-1', 'product': 'apple', 'amount': 3.5999999999999996}
{'store': 'store-1', 'product': 'milk', 'amount': 2.5}


In [17]:
run_beam("""
import apache_beam as beam

class ValidateAndCompute(beam.DoFn):
    def process(self, element):
        store, product, qty, price = element
        if qty <= 0 or price <= 0:
            return
        yield {
            "store": store,
            "product": product,
            "amount": qty * price,
        }

class CleanAndSumPerStore(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | "Validate" >> beam.ParDo(ValidateAndCompute())
            | "Key by store" >> beam.Map(lambda row: (row["store"], row["amount"]))
            | "Sum per store" >> beam.CombinePerKey(sum)
        )

sales = [
    ("store-1", "apple", 3, 1.2),
    ("store-1", "banana", 5, 0.8),
    ("store-2", "milk", 1, 2.5),
]

with beam.Pipeline() as p:
    (
        p
        | "Create" >> beam.Create(sales)
        | "Apply composite" >> CleanAndSumPerStore()
        | "Print" >> beam.Map(print)
    )
""")


('store-1', 7.6)
('store-2', 2.5)


In [18]:
run_beam("""
import apache_beam as beam

sales = [
    ("store-1", "apple", 3, 1.2),
    ("store-2", "banana", 4, 0.8),
    ("store-3", "milk", 1, 2.5),
]

def partition_by_store(element, n_parts):
    store = element[0]
    if store == "store-1":
        return 0
    if store == "store-2":
        return 1
    return 2

with beam.Pipeline() as p:
    parts = (
        p
        | "Create" >> beam.Create(sales)
        | "Partition" >> beam.Partition(partition_by_store, 3)
    )

    parts[0] | "Print S1" >> beam.Map(lambda x: print("STORE1:", x))
    parts[1] | "Print S2" >> beam.Map(lambda x: print("STORE2:", x))
    parts[2] | "Print OTHER" >> beam.Map(lambda x: print("OTHER:", x))
""")


STORE1: ('store-1', 'apple', 3, 1.2)
STORE2: ('store-2', 'banana', 4, 0.8)
OTHER: ('store-3', 'milk', 1, 2.5)


In [19]:
run_beam("""
import apache_beam as beam
from apache_beam.transforms.window import FixedWindows
import time

now_ts = int(time.time())

events = [
    beam.window.TimestampedValue(("store-1", 10), now_ts),
    beam.window.TimestampedValue(("store-1", 5),  now_ts + 5),
    beam.window.TimestampedValue(("store-1", 20), now_ts + 65),
]

with beam.Pipeline() as p:
    (
        p
        | "Create events" >> beam.Create(events)
        | "Window 60s" >> beam.WindowInto(FixedWindows(60))
        | "Key" >> beam.Map(lambda x: (x[0], x[1]))
        | "Sum per window" >> beam.CombinePerKey(sum)
        | "Print" >> beam.Map(print)
    )
""")


('store-1', 15)
('store-1', 20)


In [20]:
run_beam("""
import apache_beam as beam

output_path = "/content/beam_output.txt"

with beam.Pipeline() as p:
    (
        p
        | "Create lines" >> beam.Create(["line-1", "line-2", "line-3"])
        | "Write to text" >> beam.io.WriteToText(output_path)
    )

print("wrote to", output_path)
""")


wrote to /content/beam_output.txt


In [21]:
!cat /content/beam_output.txt-00000-of-00001


line-1
line-2
line-3


In [23]:
!git clone https://github.com/anuradha1105/apache_beam/
%cd apache_beam


Cloning into 'apache_beam'...
remote: Enumerating objects: 6, done.[K
remote: Counting objects: 100% (6/6), done.[K
remote: Compressing objects: 100% (3/3), done.[K
Receiving objects: 100% (6/6), done.
remote: Total 6 (delta 0), reused 0 (delta 0), pack-reused 0 (from 0)[K
/content/apache_beam


2. sales_input

In [25]:
# create a data folder and a sample input file
import os

os.makedirs("/content/data", exist_ok=True)

sales_text = """store-1,apple,3,1.2
store-1,banana,5,0.8
store-2,apple,2,1.2
store-3,milk,1,2.5
store-2,orange,4,1.1
store-1,milk,1,2.5
store-3,apple,6,1.2
store-2,banana,2,0.8
store-3,yogurt,3,1.5
store-1,bread,2,2.0
store-2,bread,1,2.0
store-1,juice,4,1.8
store-3,juice,2,1.8
store-2,milk,5,2.5
store-1,apple,2,1.2
store-3,banana,4,0.8
store-2,apple,5,1.2
store-1,milk,2,2.5
store-2,orange,3,1.1
store-3,milk,2,2.5
"""

with open("/content/data/sales_input.txt", "w") as f:
    f.write(sales_text)

print("created /content/data/sales_input.txt")


created /content/data/sales_input.txt


3) MAP demo

In [26]:
run_beam("""
import apache_beam as beam

sample_sales = [
    ("store-1", "apple", 3, 1.2),
    ("store-2", "banana", 4, 0.8),
    ("store-3", "milk", 1, 2.5),
]

def to_amount(row):
    store, product, qty, price = row
    return {
        "store": store,
        "product": product,
        "qty": qty,
        "price": price,
        "amount": qty * price,
    }

with beam.Pipeline() as p:
    (
        p
        | "Create sales" >> beam.Create(sample_sales)
        | "Add amount" >> beam.Map(to_amount)
        | "Print after map" >> beam.Map(lambda x: print("AFTER MAP:", x))
    )
""")


AFTER MAP: {'store': 'store-1', 'product': 'apple', 'qty': 3, 'price': 1.2, 'amount': 3.5999999999999996}
AFTER MAP: {'store': 'store-2', 'product': 'banana', 'qty': 4, 'price': 0.8, 'amount': 3.2}
AFTER MAP: {'store': 'store-3', 'product': 'milk', 'qty': 1, 'price': 2.5, 'amount': 2.5}


4) FILTER demo

In [27]:
run_beam("""
import apache_beam as beam

sample_sales = [
    ("store-1", "apple", 3, 1.2),
    ("store-2", "banana", 4, 0.8),
    ("store-1", "milk", 1, 2.5),
    ("store-3", "milk", 2, 2.5),
]

def is_store_1(row):
    return row[0] == "store-1"

with beam.Pipeline() as p:
    (
        p
        | "Create" >> beam.Create(sample_sales)
        | "Filter store-1" >> beam.Filter(is_store_1)
        | "Print after filter" >> beam.Map(lambda x: print("AFTER FILTER:", x))
    )
""")


AFTER FILTER: ('store-1', 'apple', 3, 1.2)
AFTER FILTER: ('store-1', 'milk', 1, 2.5)


5) ParDo demo

In [28]:
run_beam("""
import apache_beam as beam

class ValidateAndCompute(beam.DoFn):
    def process(self, element):
        store, product, qty, price = element
        # drop bad rows
        if qty <= 0 or price <= 0:
            return
        yield {
            "store": store,
            "product": product,
            "amount": qty * price,
        }

sales = [
    ("store-1", "apple", 3, 1.2),
    ("store-2", "banana", -1, 0.8),  # bad
    ("store-1", "milk", 1, 2.5),
]

with beam.Pipeline() as p:
    (
        p
        | "Create" >> beam.Create(sales)
        | "Validate + compute" >> beam.ParDo(ValidateAndCompute())
        | "Print after pardo" >> beam.Map(lambda x: print("AFTER PARDO:", x))
    )
""")


AFTER PARDO: {'store': 'store-1', 'product': 'apple', 'amount': 3.5999999999999996}
AFTER PARDO: {'store': 'store-1', 'product': 'milk', 'amount': 2.5}


6) Composite transform

In [29]:
run_beam("""
import apache_beam as beam

class ValidateAndCompute(beam.DoFn):
    def process(self, element):
        store, product, qty, price = element
        if qty <= 0 or price <= 0:
            return
        yield {
            "store": store,
            "product": product,
            "amount": qty * price,
        }

class CleanAndSumPerStore(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | "Validate" >> beam.ParDo(ValidateAndCompute())
            | "Key by store" >> beam.Map(lambda row: (row["store"], row["amount"]))
            | "Sum per store" >> beam.CombinePerKey(sum)
        )

sales = [
    ("store-1", "apple", 3, 1.2),
    ("store-1", "banana", 5, 0.8),
    ("store-2", "milk", 1, 2.5),
    ("store-2", "orange", 2, 1.1),
]

with beam.Pipeline() as p:
    (
        p
        | "Create" >> beam.Create(sales)
        | "Apply composite" >> CleanAndSumPerStore()
        | "Print composite" >> beam.Map(lambda x: print("COMPOSITE:", x))
    )
""")


COMPOSITE: ('store-1', 7.6)
COMPOSITE: ('store-2', 4.7)


7) Partition

In [30]:
run_beam("""
import apache_beam as beam

sales = [
    ("store-1", "apple", 3, 1.2),
    ("store-2", "banana", 4, 0.8),
    ("store-3", "milk", 1, 2.5),
    ("store-1", "bread", 2, 2.0),
    ("store-2", "juice", 3, 1.8),
]

def partition_by_store(element, n_parts):
    store = element[0]
    if store == "store-1":
        return 0
    if store == "store-2":
        return 1
    return 2

with beam.Pipeline() as p:
    parts = (
        p
        | "Create" >> beam.Create(sales)
        | "Partition" >> beam.Partition(partition_by_store, 3)
    )

    parts[0] | "Print part 0" >> beam.Map(lambda x: print("PART 0 (store-1):", x))
    parts[1] | "Print part 1" >> beam.Map(lambda x: print("PART 1 (store-2):", x))
    parts[2] | "Print part 2" >> beam.Map(lambda x: print("PART 2 (others):", x))
""")


PART 0 (store-1): ('store-1', 'apple', 3, 1.2)
PART 1 (store-2): ('store-2', 'banana', 4, 0.8)
PART 2 (others): ('store-3', 'milk', 1, 2.5)
PART 0 (store-1): ('store-1', 'bread', 2, 2.0)
PART 1 (store-2): ('store-2', 'juice', 3, 1.8)


8) Windowing

In [31]:
run_beam("""
import apache_beam as beam
from apache_beam.transforms.window import FixedWindows
import time

now_ts = int(time.time())

events = [
    beam.window.TimestampedValue(("store-1", 10), now_ts),
    beam.window.TimestampedValue(("store-1", 5),  now_ts + 5),
    beam.window.TimestampedValue(("store-1", 20), now_ts + 65),
    beam.window.TimestampedValue(("store-2", 12), now_ts + 70),
]

class DebugWindow(beam.DoFn):
    def process(self, element, window=beam.DoFn.WindowParam):
        yield f"WINDOW {window.start.to_utc_datetime()} - {window.end.to_utc_datetime()} :: {element}"

with beam.Pipeline() as p:
    (
        p
        | "Create events" >> beam.Create(events)
        | "Apply 60s window" >> beam.WindowInto(FixedWindows(60))
        | "Key by store" >> beam.Map(lambda x: (x[0], x[1]))
        | "Sum per window" >> beam.CombinePerKey(sum)
        | "Print windowed" >> beam.ParDo(DebugWindow())
        | "Print final" >> beam.Map(print)
    )
""")


WINDOW 2025-10-30 22:26:00 - 2025-10-30 22:27:00 :: ('store-1', 15)
WINDOW 2025-10-30 22:27:00 - 2025-10-30 22:28:00 :: ('store-1', 20)
WINDOW 2025-10-30 22:27:00 - 2025-10-30 22:28:00 :: ('store-2', 12)


9) Pipeline I/O

9.1 Read sales_input.txt

In [32]:
run_beam("""
import apache_beam as beam

input_path = "/content/data/sales_input.txt"

with beam.Pipeline() as p:
    (
        p
        | "Read input" >> beam.io.ReadFromText(input_path)
        | "Print lines" >> beam.Map(lambda x: print("LINE:", x))
    )
""")


LINE: store-1,apple,3,1.2
LINE: store-1,banana,5,0.8
LINE: store-2,apple,2,1.2
LINE: store-3,milk,1,2.5
LINE: store-2,orange,4,1.1
LINE: store-1,milk,1,2.5
LINE: store-3,apple,6,1.2
LINE: store-2,banana,2,0.8
LINE: store-3,yogurt,3,1.5
LINE: store-1,bread,2,2.0
LINE: store-2,bread,1,2.0
LINE: store-1,juice,4,1.8
LINE: store-3,juice,2,1.8
LINE: store-2,milk,5,2.5
LINE: store-1,apple,2,1.2
LINE: store-3,banana,4,0.8
LINE: store-2,apple,5,1.2
LINE: store-1,milk,2,2.5
LINE: store-2,orange,3,1.1
LINE: store-3,milk,2,2.5


9.2 Write an output file

In [33]:
run_beam("""
import apache_beam as beam

output_path = "/content/data/sales_output"

with beam.Pipeline() as p:
    (
        p
        | "Create lines" >> beam.Create(["ok-1", "ok-2", "ok-3"])
        | "Write to text" >> beam.io.WriteToText(output_path)
    )
""")

# show what was written
!cat /content/data/sales_output-00000-of-00001


ok-1
ok-2
ok-3
