# Apache Beam: Mini Data Engineering Exercise (Colab) – Patched v2
Fixes:
- Pinned installs for Beam deps
- Correct import for `TimestampedValue`
- **Beam ML fix**: force `model_file_type=JOBLIB` and convert features to NumPy arrays

Assignmnet Created By :- **Dev Mulchandani**

In [1]:
!pip -q install --upgrade pip
!pip -q install "dill==0.3.8" "grpcio>=1.71.2" "grpcio-status>=1.71.2"
!pip -q install apache-beam scikit-learn joblib
print('Install complete. If prompted, Restart runtime and re-run.')

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.8 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m1.8/1.8 MB[0m [31m68.8 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m35.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Preparing metadata (setup.py) ... [?25l[?25hdone
  Preparing metadata (setup.py) ... [?25l[?25hdone
  Preparing metadata (setup.py) ... [?25l[?25hdone
[33m  DEPRECATION: Building 'crcmod' using the legacy setup.py bdist_wheel mechanism, which will be removed in a future version. pip 25.3 will enforce this behaviour change. A possible replacement is to use the standardized build interface by setting the `--use-pep517` option, (possibly combined with `--no-build-isolation`), or adding a `pyproject.toml` file to the source tree of 'crcmod'. Discussion can be found at https:/

In [2]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows, TimestampedValue
import os, datetime, numpy as np
print('Beam version:', beam.__version__)

Beam version: 2.68.0


In [3]:
INPUT_PATH = 'input_data.txt'
OUTPUT_DIR = 'beam_outputs'
os.makedirs(OUTPUT_DIR, exist_ok=True)
with open(INPUT_PATH, 'w') as f:
    f.write('\n'.join(['apple,10','banana,7','pear,3','banana,5','apple,2']))
print('Created', INPUT_PATH)

Created input_data.txt


In [4]:
def parse_line(line: str):
    fruit, qty = line.split(',')
    return fruit, int(qty)
def is_big(record):
    _, qty = record
    return qty >= 5
with beam.Pipeline(options=PipelineOptions()) as p:
    _ = (p
         | 'ReadText' >> beam.io.ReadFromText(INPUT_PATH)
         | 'ParseCSV' >> beam.Map(parse_line)
         | 'FilterBig' >> beam.Filter(is_big)
         | 'Format' >> beam.Map(lambda kv: f"{kv[0]}:{kv[1]}")
         | 'WriteText' >> beam.io.WriteToText(os.path.join(OUTPUT_DIR, 'big_items')))





In [5]:
class CleanSplitDoFn(beam.DoFn):
    def process(self, element):
        sep = ',' if ',' in element else ':'
        fruit, qty = element.split(sep)
        yield fruit.strip(), int(qty)
class UppercaseFruitDoFn(beam.DoFn):
    def process(self, element):
        fruit, qty = element
        yield fruit.upper(), qty
class TagDoFn(beam.DoFn):
    def process(self, element):
        fruit, qty = element
        tag = 'MANY' if qty >= 5 else 'FEW'
        yield {'fruit': fruit, 'qty': qty, 'tag': tag}
class CleanUpperTag(beam.PTransform):
    def expand(self, pcoll):
        return (pcoll
                | 'Clean' >> beam.ParDo(CleanSplitDoFn())
                | 'Upper' >> beam.ParDo(UppercaseFruitDoFn())
                | 'Tag' >> beam.ParDo(TagDoFn()))
with beam.Pipeline(options=PipelineOptions()) as p:
    _ = (p | 'CreateRaw' >> beam.Create(['apple,1','banana,9','pear,2','banana,12'])
           | 'Composite' >> CleanUpperTag()
           | 'WriteJSONLike' >> beam.io.WriteToText(os.path.join(OUTPUT_DIR, 'composite_result')))



In [6]:
base = datetime.datetime(2025,1,1,12,0,0)
events = [
    ('apple', 1, base + datetime.timedelta(seconds=5)),
    ('apple', 4, base + datetime.timedelta(seconds=30)),
    ('banana', 2, base + datetime.timedelta(seconds=70)),
    ('banana', 6, base + datetime.timedelta(seconds=95)),
]
def to_kv_tv(e):
    fruit, qty, t = e
    return TimestampedValue((fruit, qty), t.timestamp())
with beam.Pipeline(options=PipelineOptions()) as p:
    _ = (p
         | 'CreateEvents' >> beam.Create([to_kv_tv(e) for e in events])
         | 'Window1m' >> beam.WindowInto(FixedWindows(60))
         | 'ToKV' >> beam.Map(lambda x: (x[0], x[1]))
         | 'SumPerKey' >> beam.CombinePerKey(sum)
         | 'WriteWindowed' >> beam.io.WriteToText(os.path.join(OUTPUT_DIR, 'windowed_totals')))



In [7]:
def split_fn(record, n_parts):
    _, qty = record
    return 0 if qty >= 5 else 1
with beam.Pipeline(options=PipelineOptions()) as p:
    parts = (p
             | 'ReadAgain' >> beam.io.ReadFromText(INPUT_PATH)
             | 'ParseAgain' >> beam.Map(lambda line: (line.split(',')[0], int(line.split(',')[1])))
             | 'Partition' >> beam.Partition(split_fn, 2))
    heavy, light = parts[0], parts[1]
    _ = heavy | 'WriteHeavy' >> beam.io.WriteToText(os.path.join(OUTPUT_DIR, 'heavy'))
    _ = light | 'WriteLight' >> beam.io.WriteToText(os.path.join(OUTPUT_DIR, 'light'))



In [8]:
from sklearn.datasets import load_iris
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import joblib
X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=0)
clf = LogisticRegression(max_iter=200).fit(X_train, y_train)
MODEL_PATH = 'iris_logreg.joblib'
joblib.dump(clf, MODEL_PATH)
print('Local sklearn accuracy:', accuracy_score(y_test, clf.predict(X_test)))

Local sklearn accuracy: 1.0


In [9]:
from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy, ModelFileType
from apache_beam.ml.inference.base import RunInference
model_handler = SklearnModelHandlerNumpy(model_uri='iris_logreg.joblib', model_file_type=ModelFileType.JOBLIB)
with beam.Pipeline(options=PipelineOptions()) as p:
    preds = (p
             | 'CreateFeatures' >> beam.Create([np.array(row) for row in X_test[:10]])
             | 'RunInference' >> RunInference(model_handler))
    _ = preds | 'WritePreds' >> beam.io.WriteToText(os.path.join(OUTPUT_DIR, 'ml_predictions'))
print('Beam ML inference example done.')



Beam ML inference example done.
