In [19]:
!pip install apache-beam[interactive]



In [18]:
import apache_beam as beam
print("Apache Beam Version:", beam.__version__)

Apache Beam Version: 2.69.0


In [17]:
!pip install pandas scikit-learn apache-beam[gcp]



In [1]:
import pandas as pd
import sklearn
print("Pandas Version:", pd.__version__)
print("Scikit-learn Version:", sklearn.__version__)

Pandas Version: 2.2.2
Scikit-learn Version: 1.6.1


In [2]:
!pip install apache-beam[ml] scikit-learn pandas

import apache_beam as beam
import pandas as pd
import sklearn

print("Apache Beam Version:", beam.__version__)
print("Pandas Version:", pd.__version__)
print("Scikit-learn Version:", sklearn.__version__)

Apache Beam Version: 2.69.0
Pandas Version: 2.2.2
Scikit-learn Version: 1.6.1


In [3]:
from apache_beam.runners.interactive import interactive_runner
from apache_beam.options.pipeline_options import PipelineOptions

print(" Apache Beam interactive mode is ready.")


 Apache Beam interactive mode is ready.


## Map & Filter Transform
This pipeline multiplies each number by 2 and filters values greater than 10.

In [4]:
pipeline = beam.Pipeline(options=PipelineOptions())

input_data = [1, 2, 3, 4, 5, 6, 7, 8, 9]

output = (
    pipeline
    | "Read Input Data" >> beam.Create(input_data)     # Pipeline IO
    | "Multiply by 2 (Map)" >> beam.Map(lambda x: x * 2)
    | "Filter > 10" >> beam.Filter(lambda x: x > 10)
)

pipeline_result = pipeline.run()
pipeline_result.wait_until_finish()

print("✅ Basic pipeline executed")




✅ Basic pipeline executed


In [5]:
class SquareFn(beam.DoFn):
    def process(self, element):
        yield element * element

with beam.Pipeline() as pipeline:
    (pipeline
     | "Create Numbers" >> beam.Create([1, 2, 3, 4, 5])
     | "Apply ParDo" >> beam.ParDo(SquareFn())
     | "Print Results" >> beam.Map(print)
    )


1
4
9
16
25


In [6]:
class MultiplyAndFilter(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | "Multiply by 3" >> beam.Map(lambda x: x * 3)
            | "Filter > 15" >> beam.Filter(lambda x: x > 15)
        )

with beam.Pipeline() as pipeline:
    (pipeline
     | "Start Data" >> beam.Create([1, 5, 7, 10])
     | "Composite Transform" >> MultiplyAndFilter()
     | "Print Output" >> beam.Map(print)
    )


21
30


In [7]:
import apache_beam as beam
from apache_beam.transforms.window import FixedWindows
from datetime import datetime
import time

# ✅ Function to add timestamp to elements
def add_timestamp(x):
    return beam.window.TimestampedValue(x, time.time())

with beam.Pipeline() as pipeline:
    (
        pipeline
        | "Create Stream Data" >> beam.Create([10, 20, 30, 40, 50])
        | "Add Timestamp" >> beam.Map(add_timestamp)
        | "Apply 10-second Window" >> beam.WindowInto(FixedWindows(10))
        | "Print Per Window" >> beam.Map(lambda x: print("Windowed:", x))
    )


Windowed: 10
Windowed: 20
Windowed: 30
Windowed: 40
Windowed: 50


In [8]:
def partition_numbers(element, num_partitions):
    return 0 if element % 2 == 0 else 1   # 0 → even, 1 → odd

with beam.Pipeline() as pipeline:
    partitions = (
        pipeline
        | "Create Numbers" >> beam.Create([10, 15, 20, 25, 30, 35])
        | "Partition Data" >> beam.Partition(partition_numbers, 2)
    )


    even = partitions[0] | "Even Numbers" >> beam.Map(lambda x: print("Even:", x))
    odd = partitions[1] | "Odd Numbers" >> beam.Map(lambda x: print("Odd:", x))


Even: 10
Odd: 15
Even: 20
Odd: 25
Even: 30
Odd: 35


In [9]:
import apache_beam as beam
import pandas as pd

data = {
    'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
    'score': [85, 42, 77, 90, 56]
}
df = pd.DataFrame(data)
df.to_csv('input_data.csv', index=False)


with beam.Pipeline() as pipeline:
    (
        pipeline
        | "Read CSV Lines" >> beam.io.ReadFromText('input_data.csv', skip_header_lines=1)
        | "Parse CSV" >> beam.Map(lambda line: line.split(','))
        | "Filter Scores > 60" >> beam.Filter(lambda x: int(x[1]) > 60)
        | "Format Output" >> beam.Map(lambda x: f"{x[0]} has score {x[1]}")
        | "Write Output" >> beam.io.WriteToText('output_results', file_name_suffix=".txt")
    )

print("✅ Beam I/O Completed — check 'output_results.txt'")


✅ Beam I/O Completed — check 'output_results.txt'


In [13]:
import apache_beam as beam
import numpy as np
from sklearn.linear_model import LinearRegression

# ✅ Train model
X = np.array([[1], [2], [3], [4], [5]])
y = np.array([2, 4, 6, 8, 10])
model = LinearRegression().fit(X, y)

# ✅ Custom ParDo for prediction
class PredictDoFn(beam.DoFn):
    def process(self, element):
        prediction = model.predict([[element]])[0]
        yield f"Input: {element} → Predicted: {prediction}"

# ✅ Run Beam pipeline
with beam.Pipeline() as pipeline:
    (
        pipeline
        | "Create Input Numbers" >> beam.Create([6, 7, 8, 9])
        | "Predict Using Linear Model" >> beam.ParDo(PredictDoFn())
        | "Print Results" >> beam.Map(print)
    )


Input: 6 → Predicted: 12.0
Input: 7 → Predicted: 14.0
Input: 8 → Predicted: 16.0
Input: 9 → Predicted: 18.0


+-------------------------+
|      Input Source       |
|  (CSV / List / Stream)  |
+-----------+-------------+
            |
            v
+-------------------------+
|       Transform          |
|  - Map / Filter / ParDo  |
|  - Composite / Windowing |
+-----------+-------------+
            |
            v
+-------------------------+
|   Partition / ML Model   |
|  (RunInference)          |
+-----------+-------------+
            |
            v
+-------------------------+
|       Output Sink        |
|  (Text / CSV / Console)  |
+-------------------------+


### Conclusion
This notebook demonstrates all required Apache Beam concepts:
- Map & Filter
- ParDo (custom DoFn)
- Composite Transform
- Partition
- Windowing
- Pipeline I/O (read/write)
- Extra credit: ML prediction inside Beam pipeline

All pipelines executed successfully in Google Colab.
