In [None]:
#cell1
# === Install (robust & light) ===
!pip -q install -U pip setuptools wheel
!pip -q install "apache-beam==2.56.0" "scikit-learn==1.3.2"
# Fallback (uncomment if needed):
# !pip -q install "apache-beam==2.53.0" "scikit-learn==1.2.2"


[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.8 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.3/1.8 MB[0m [31m8.7 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m1.8/1.8 MB[0m [31m26.4 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m19.6 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.2 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m41.6 MB/s[0m eta [36m0:00:00[0m
[?25h[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
ipython 7.34.0 requires jedi>=0.16, which is not installed.[0m[31m
[0m  Installing build 

In [1]:
# cell 2
# === Environment check ===
import sys, numpy as np, pandas as pd, apache_beam as beam, sklearn
from datetime import datetime
print("Python:", sys.version.split()[0])
print("NumPy:", np.__version__)
print("Pandas:", pd.__version__)
print("Apache Beam:", beam.__version__)
print("scikit-learn:", sklearn.__version__)
print("Time:", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))




Python: 3.12.12
NumPy: 1.26.4
Pandas: 2.2.2
Apache Beam: 2.56.0
scikit-learn: 1.3.2
Time: 2025-10-27 02:10:58


In [2]:
# cell 3
# === Create small CSV input ===
import os, json
from pathlib import Path

DATA_DIR = Path("/content/data"); DATA_DIR.mkdir(parents=True, exist_ok=True)
OUT_DIR = Path("/content/output"); OUT_DIR.mkdir(parents=True, exist_ok=True)

rows = [
    ("2025-01-01T00:00:10","u1",12.5,"US",0),
    ("2025-01-01T00:01:25","u2",300.0,"US",1),
    ("2025-01-01T00:01:55","u3",50.0,"CA",0),
    ("2025-01-01T00:02:05","u4",700.0,"US",1),
    ("2025-01-01T00:04:40","u5",25.0,"GB",0),
    ("2025-01-01T00:05:05","u1",900.0,"US",1),
    ("2025-01-01T00:06:10","u2",10.0,"CA",0),
    ("2025-01-01T00:06:59","u3",75.5,"US",0),
    ("2025-01-01T00:09:20","u4",15.0,"US",0),
]
csv_path = "/content/data/input_transactions.csv"
with open(csv_path, "w") as f:
    f.write("ts,user,amount,country,is_fraud\n")
    for r in rows:
        f.write(f"{r[0]},{r[1]},{r[2]},{r[3]},{r[4]}\n")

!head -n 5 /content/data/input_transactions.csv


ts,user,amount,country,is_fraud
2025-01-01T00:00:10,u1,12.5,US,0
2025-01-01T00:01:25,u2,300.0,US,1
2025-01-01T00:01:55,u3,50.0,CA,0
2025-01-01T00:02:05,u4,700.0,US,1


In [4]:
# cell 4
# === Helpers: parse, timestamp, DoFn, Composite ===
import datetime as dt
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows, TimestampedValue

def parse_csv(line: str):
    if line.startswith("ts,"):
        return None
    ts, user, amount, country, is_fraud = line.split(",")
    return {"ts": ts, "user": user, "amount": float(amount), "country": country, "is_fraud": int(is_fraud)}

def to_timestamped(e: dict):
    ts = dt.datetime.fromisoformat(e["ts"])
    return TimestampedValue(e, ts.timestamp())

class AddRiskScore(beam.DoFn):
    def process(self, e):
        risk = 0.0
        if e["amount"] >= 500: risk += 0.5
        if e["country"] == "US": risk += 0.2
        if e["is_fraud"] == 1: risk += 0.3
        e["risk"] = round(min(risk, 1.0), 3)
        yield e

class CleanAndEnrich(beam.PTransform):
    def __init__(self, amount_threshold=0.0): self.amount_threshold = amount_threshold
    def expand(self, pcoll):
        return (
            pcoll
            | "ParseCSV" >> beam.Map(parse_csv)
            | "DropHeader" >> beam.Filter(lambda x: x is not None)
            | "FilterAmt" >> beam.Filter(lambda x: x["amount"] >= self.amount_threshold)
            | "AddRisk" >> beam.ParDo(AddRiskScore())
        )


In [11]:
# === Cell 5: Clean outputs + build/run pipeline (batch) ===
import os, glob, shutil
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows

# Output *prefixes* (must NOT be directories)
OUTPUT_1 = "/content/output/high_value_risk"
OUTPUT_2 = "/content/output/fraud_partition"
OUTPUT_3 = "/content/output/window_sums"

# Clean any previous files/dirs that collide with prefixes
for prefix in [OUTPUT_1, OUTPUT_2, OUTPUT_3]:
    for path in glob.glob(prefix + "*"):
        if os.path.isdir(path):
            shutil.rmtree(path, ignore_errors=True)
        else:
            try:
                os.remove(path)
            except FileNotFoundError:
                pass

# Batch mode: no --streaming flag
opts = PipelineOptions(["--runner=DirectRunner"])

with beam.Pipeline(options=opts) as p:
    lines = p | "ReadCSV" >> ReadFromText(csv_path)

    # Composite: parse -> filter -> risk
    high_value = lines | "CleanAndEnrich>=100" >> CleanAndEnrich(amount_threshold=100.0)

    # Partition: fraud vs not fraud
    def part_fn(e, n):
        return 0 if e["is_fraud"] == 1 else 1
    fraud, not_fraud = high_value | "PartitionFraud" >> beam.Partition(part_fn, 2)

    # Write outputs (prefixes). num_shards=1 for single file each.
    fraud      | "WriteFraud"    >> WriteToText(OUTPUT_2 + "_fraud", num_shards=1)
    not_fraud  | "WriteNotFraud" >> WriteToText(OUTPUT_2 + "_notfraud", num_shards=1)
    high_value | "WriteHighVals" >> WriteToText(OUTPUT_1, num_shards=1)

    # Windowing: 5-minute fixed windows; sum amount by country
    _ = (
        high_value
        | "ToTimestamped" >> beam.Map(to_timestamped)
        | "Fixed5mWindows" >> beam.WindowInto(FixedWindows(5 * 60))
        | "KV(country,amount)" >> beam.Map(lambda e: (e["country"], e["amount"]))
        | "SumPerWindow" >> beam.CombinePerKey(sum)
        | "WriteWindowSums" >> WriteToText(OUTPUT_3, num_shards=1)
    )

print("Done. Files under /content/output")
!ls -lah /content/output | sed -n '1,200p'


Done. Files under /content/output
total 20K
drwxr-xr-x 2 root root 4.0K Oct 27 02:27 .
drwxr-xr-x 1 root root 4.0K Oct 27 02:14 ..
-rw-r--r-- 1 root root  318 Oct 27 02:27 fraud_partition_fraud-00000-of-00001
-rw-r--r-- 1 root root    0 Oct 27 02:27 fraud_partition_notfraud-00000-of-00001
-rw-r--r-- 1 root root  318 Oct 27 02:27 high_value_risk-00000-of-00001
-rw-r--r-- 1 root root   29 Oct 27 02:27 window_sums-00000-of-00001


In [13]:
# === Cell 6: Preview output files ===

print("High-value sample:")
!head -n 5 /content/output/high_value_risk-00000-of-00001

print("\nFraud partition (fraud):")
!head -n 5 /content/output/fraud_partition_fraud-00000-of-00001

print("\nFraud partition (not fraud):")
!head -n 5 /content/output/fraud_partition_notfraud-00000-of-00001

print("\nWindow sums sample:")
!head -n 5 /content/output/window_sums-00000-of-00001


High-value sample:
{'ts': '2025-01-01T00:01:25', 'user': 'u2', 'amount': 300.0, 'country': 'US', 'is_fraud': 1, 'risk': 0.5}
{'ts': '2025-01-01T00:02:05', 'user': 'u4', 'amount': 700.0, 'country': 'US', 'is_fraud': 1, 'risk': 1.0}
{'ts': '2025-01-01T00:05:05', 'user': 'u1', 'amount': 900.0, 'country': 'US', 'is_fraud': 1, 'risk': 1.0}

Fraud partition (fraud):
{'ts': '2025-01-01T00:01:25', 'user': 'u2', 'amount': 300.0, 'country': 'US', 'is_fraud': 1, 'risk': 0.5}
{'ts': '2025-01-01T00:02:05', 'user': 'u4', 'amount': 700.0, 'country': 'US', 'is_fraud': 1, 'risk': 1.0}
{'ts': '2025-01-01T00:05:05', 'user': 'u1', 'amount': 900.0, 'country': 'US', 'is_fraud': 1, 'risk': 1.0}

Fraud partition (not fraud):

Window sums sample:
('US', 1000.0)
('US', 900.0)


In [19]:
# cell 7 === RunInference (sklearn) — fixed for Beam >= 2.56 ===
import numpy as np, pickle, json
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline

# tiny model
X = np.array([[12.5,1],[300,1],[50,0],[700,1],[25,0],[900,1],[10,0],[75.5,1],[15,1]], dtype=float)
y = np.array([0,1,0,1,0,1,0,0,0])
clf = make_pipeline(StandardScaler(), LogisticRegression(max_iter=200)).fit(X,y)

MODEL_PATH = "/content/model.pkl"
with open(MODEL_PATH, "wb") as f:
    pickle.dump(clf, f)

from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy
from apache_beam.options.pipeline_options import PipelineOptions

def to_features(e):
    return np.array([e["amount"], 1 if e["country"]=="US" else 0], dtype=float)

# Use predict_proba so we get class probabilities
handler = SklearnModelHandlerNumpy(MODEL_PATH)

INF_OUT = "/content/output/inference_scores"
import os; os.system("rm -f " + INF_OUT + "*")

def to_json_from_prediction_result(r):
    # r is a PredictionResult; probabilities are in r.inference
    arr = np.array(r.inference).ravel()
    # if two probs (neg, pos), take the positive class prob; else just first value
    val = float(arr[-1]) if arr.size > 1 else float(arr[0])
    return json.dumps({"proba_1": val})

with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner"])) as p:
    _ = (
        p
        | "ReadCSV-Inf" >> ReadFromText(csv_path)
        | "Clean(>=50)+Risk" >> CleanAndEnrich(amount_threshold=50.0)
        | "ToFeatures" >> beam.Map(to_features)
        | "RunInference" >> RunInference(handler)
        | "ToJSON" >> beam.Map(to_json_from_prediction_result)
        | "WriteInf" >> WriteToText(INF_OUT, num_shards=1)
    )

!echo "Inference sample:"
!head -n 5 /content/output/inference_scores-00000-of-00001

Inference sample:
{"proba_1": 0.0}
{"proba_1": 0.0}
{"proba_1": 1.0}
{"proba_1": 1.0}
{"proba_1": 0.0}
