# E2E Predictive MLOps Demo -- Fraud Detection

This notebook covers the **exploration and local iteration** phase of the MLOps lifecycle.
We go from raw transaction data in BigQuery to a trained XGBoost fraud detection model,
logging everything to Vertex AI Experiments.

**Dataset**: FraudFinder (public fraud detection dataset in BigQuery)  
**Tables**:
- `tx` -- raw transactions (`tx_id`, `tx_ts`, `customer_id`, `terminal_id`, `tx_amount`)
- `txlabels` -- fraud labels (`tx_id`, `tx_fraud`)
- `demographics.customers` -- customer demographics
- `demographics.terminals` -- terminal demographics

**Sections**:
1. Connect to BigQuery
2. Exploratory Data Analysis (EDA)
3. Feature Engineering
4. Write Features to BigQuery
5. Train Model Locally
6. Iterate (hyperparameter tuning)
7. Log to Vertex AI Experiments

---
## 1.1 Connect to BigQuery

Authenticate, set project/dataset, and explore the raw tables.

In [None]:
import os
import warnings

import google.auth
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import xgboost as xgb
from google.cloud import bigquery
from sklearn.metrics import (
    ConfusionMatrixDisplay,
    classification_report,
    confusion_matrix,
    precision_recall_curve,
    roc_auc_score,
    roc_curve,
)
from sklearn.model_selection import train_test_split

warnings.filterwarnings("ignore")
sns.set_theme(style="whitegrid")
%matplotlib inline

In [None]:
# --- Configuration ---
# Project ID: set via environment variable or auto-detect from gcloud/ADC
PROJECT_ID = os.environ.get("PROJECT_ID") or os.environ.get("GOOGLE_CLOUD_PROJECT")
if PROJECT_ID is None:
    _, PROJECT_ID = google.auth.default()
    if PROJECT_ID is None:
        PROJECT_ID = "asp-test-dev"  # fallback default for testing

BQ_DATASET = "fraud_detection"
FEATURES_DATASET = "features"
FEATURES_TABLE = "fraud_features"
REGION = "us-central1"

print(f"Project ID: {PROJECT_ID}")
print(f"BQ Dataset: {BQ_DATASET}")
print(f"Region:     {REGION}")

In [None]:
# Initialize BigQuery client
bq_client = bigquery.Client(project=PROJECT_ID)

# List tables in the fraud_detection dataset
dataset_ref = bq_client.dataset(BQ_DATASET)
tables = list(bq_client.list_tables(dataset_ref))
print(f"Tables in {BQ_DATASET}:")
for table in tables:
    print(f"  - {table.table_id}")

In [None]:
# Preview the tx table
query_tx_preview = f"""
SELECT *
FROM `{PROJECT_ID}.{BQ_DATASET}.tx`
LIMIT 10
"""
df_tx_preview = bq_client.query(query_tx_preview).to_dataframe()
print(f"tx table preview ({len(df_tx_preview)} rows):")
df_tx_preview

In [None]:
# Preview the txlabels table
query_labels_preview = f"""
SELECT *
FROM `{PROJECT_ID}.{BQ_DATASET}.txlabels`
LIMIT 10
"""
df_labels_preview = bq_client.query(query_labels_preview).to_dataframe()
print(f"txlabels table preview ({len(df_labels_preview)} rows):")
df_labels_preview

In [None]:
# Check table sizes
for table_name in ["tx", "txlabels"]:
    query_count = f"SELECT COUNT(*) as cnt FROM `{PROJECT_ID}.{BQ_DATASET}.{table_name}`"
    result = bq_client.query(query_count).to_dataframe()
    print(f"{table_name}: {result['cnt'].iloc[0]:,} rows")

---
## 1.2 Exploratory Data Analysis (EDA)

Understand the data: basic statistics, class imbalance, and temporal patterns.

In [None]:
# Load a manageable sample of transactions joined with labels for EDA
query_eda = f"""
SELECT
    t.tx_id,
    t.tx_ts,
    t.customer_id,
    t.terminal_id,
    t.tx_amount,
    l.tx_fraud
FROM `{PROJECT_ID}.{BQ_DATASET}.tx` AS t
JOIN `{PROJECT_ID}.{BQ_DATASET}.txlabels` AS l
    ON t.tx_id = l.tx_id
"""
print("Loading data from BigQuery (this may take a minute)...")
df = bq_client.query(query_eda).to_dataframe()
print(f"Loaded {len(df):,} transactions")
df.head()

In [None]:
# Basic statistics
print("=" * 60)
print("BASIC STATISTICS")
print("=" * 60)
print(f"\nShape: {df.shape}")
print(f"Date range: {df['tx_ts'].min()} to {df['tx_ts'].max()}")
print(f"Unique customers: {df['customer_id'].nunique():,}")
print(f"Unique terminals: {df['terminal_id'].nunique():,}")
print(f"\nTransaction amount statistics:")
df["tx_amount"].describe()

In [None]:
# Class imbalance check
fraud_counts = df["tx_fraud"].value_counts()
fraud_pct = df["tx_fraud"].value_counts(normalize=True) * 100

print("=" * 60)
print("CLASS DISTRIBUTION")
print("=" * 60)
print(f"\nLegitimate (0): {fraud_counts[0]:>10,}  ({fraud_pct[0]:.2f}%)")
print(f"Fraudulent (1): {fraud_counts[1]:>10,}  ({fraud_pct[1]:.2f}%)")
print(f"Imbalance ratio: 1:{fraud_counts[0] // fraud_counts[1]}")

fig, axes = plt.subplots(1, 2, figsize=(12, 4))

# Bar chart
axes[0].bar(["Legitimate", "Fraudulent"], fraud_counts.values, color=["steelblue", "coral"])
axes[0].set_title("Transaction Count by Class")
axes[0].set_ylabel("Count")
for i, v in enumerate(fraud_counts.values):
    axes[0].text(i, v + v * 0.01, f"{v:,}", ha="center", fontweight="bold")

# Pie chart
axes[1].pie(fraud_counts.values, labels=["Legitimate", "Fraudulent"],
            autopct="%1.2f%%", colors=["steelblue", "coral"], startangle=90)
axes[1].set_title("Class Distribution")

plt.tight_layout()
plt.show()

In [None]:
# Transaction amount distribution by class
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

for ax, label, title in zip(
    axes, [0, 1], ["Legitimate Transactions", "Fraudulent Transactions"]
):
    subset = df[df["tx_fraud"] == label]["tx_amount"]
    ax.hist(subset, bins=50, color="steelblue" if label == 0 else "coral", edgecolor="white")
    ax.set_title(f"{title} (n={len(subset):,})")
    ax.set_xlabel("Transaction Amount")
    ax.set_ylabel("Count")
    ax.axvline(subset.mean(), color="black", linestyle="--", label=f"Mean: {subset.mean():.2f}")
    ax.legend()

plt.tight_layout()
plt.show()

print("Amount statistics by class:")
df.groupby("tx_fraud")["tx_amount"].describe()

In [None]:
# Temporal distribution of fraud
df["tx_date"] = pd.to_datetime(df["tx_ts"]).dt.date

daily_fraud = df.groupby("tx_date").agg(
    total_tx=pd.NamedAgg(column="tx_id", aggfunc="count"),
    fraud_tx=pd.NamedAgg(column="tx_fraud", aggfunc="sum"),
).reset_index()
daily_fraud["fraud_rate"] = daily_fraud["fraud_tx"] / daily_fraud["total_tx"] * 100

fig, axes = plt.subplots(2, 1, figsize=(14, 8), sharex=True)

axes[0].plot(daily_fraud["tx_date"], daily_fraud["total_tx"], color="steelblue", alpha=0.7)
axes[0].fill_between(daily_fraud["tx_date"], daily_fraud["total_tx"], alpha=0.3, color="steelblue")
axes[0].set_title("Daily Transaction Volume")
axes[0].set_ylabel("Number of Transactions")

axes[1].plot(daily_fraud["tx_date"], daily_fraud["fraud_rate"], color="coral", alpha=0.7)
axes[1].fill_between(daily_fraud["tx_date"], daily_fraud["fraud_rate"], alpha=0.3, color="coral")
axes[1].set_title("Daily Fraud Rate (%)")
axes[1].set_ylabel("Fraud Rate (%)")
axes[1].set_xlabel("Date")

plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

In [None]:
# Fraud by hour of day
df["tx_hour"] = pd.to_datetime(df["tx_ts"]).dt.hour

hourly_fraud = df.groupby("tx_hour").agg(
    total_tx=pd.NamedAgg(column="tx_id", aggfunc="count"),
    fraud_tx=pd.NamedAgg(column="tx_fraud", aggfunc="sum"),
).reset_index()
hourly_fraud["fraud_rate"] = hourly_fraud["fraud_tx"] / hourly_fraud["total_tx"] * 100

fig, ax = plt.subplots(figsize=(10, 4))
ax.bar(hourly_fraud["tx_hour"], hourly_fraud["fraud_rate"], color="coral", edgecolor="white")
ax.set_title("Fraud Rate by Hour of Day")
ax.set_xlabel("Hour")
ax.set_ylabel("Fraud Rate (%)")
ax.set_xticks(range(24))
plt.tight_layout()
plt.show()

---
## 1.3 Feature Engineering (Python)

Compute rolling window features for each customer and terminal:
- `count_tx_Xd` -- number of transactions in the past X days
- `avg_tx_amount_Xd` -- average transaction amount in the past X days
- `max_tx_amount_Xd` -- maximum transaction amount in the past X days

Windows: **1 day, 7 days, 28 days, 90 days**

> **Note**: This uses pandas for feature engineering. The same logic can be implemented
> using SQL in BigQuery, BigFrames, or PySpark on Dataproc. The choice is independent
> of the rest of the MLOps stack.

In [None]:
# Ensure tx_ts is datetime and sort by timestamp
df["tx_ts"] = pd.to_datetime(df["tx_ts"])
df = df.sort_values("tx_ts").reset_index(drop=True)

print(f"Data sorted by timestamp: {df['tx_ts'].min()} to {df['tx_ts'].max()}")
print(f"Total rows: {len(df):,}")

In [None]:
def compute_rolling_features(df: pd.DataFrame, group_col: str, windows_days: list[int]) -> pd.DataFrame:
    """Compute rolling window features (count, avg, max) for a given grouping column.

    Args:
        df: DataFrame with tx_ts, tx_amount, and the group_col.
        group_col: Column to group by (e.g., 'customer_id' or 'terminal_id').
        windows_days: List of window sizes in days.

    Returns:
        DataFrame with rolling features added as new columns.
    """
    # Work on a copy sorted by group and timestamp
    result = df.copy()
    result = result.sort_values([group_col, "tx_ts"]).reset_index(drop=True)

    # Set tx_ts as index for rolling operations
    group_suffix = group_col.replace("_id", "")

    for window in windows_days:
        print(f"  Computing {window}d window for {group_col}...")
        window_str = f"{window}D"

        # Group by the entity and compute rolling stats
        grouped = result.set_index("tx_ts").groupby(group_col)["tx_amount"]

        rolling = grouped.rolling(window_str, min_periods=1)

        count_col = f"count_tx_{window}d_{group_suffix}"
        avg_col = f"avg_tx_amount_{window}d_{group_suffix}"
        max_col = f"max_tx_amount_{window}d_{group_suffix}"

        counts = rolling.count().reset_index(level=0, drop=True).rename(count_col)
        avgs = rolling.mean().reset_index(level=0, drop=True).rename(avg_col)
        maxs = rolling.max().reset_index(level=0, drop=True).rename(max_col)

        # Join back -- align on the original index
        for series in [counts, avgs, maxs]:
            result = result.set_index("tx_ts")
            result[series.name] = series
            result = result.reset_index()

    return result


WINDOWS = [1, 7, 28, 90]
print(f"Windows: {WINDOWS} days")

In [None]:
# Compute customer-level rolling features
print("Computing customer-level rolling features...")
df_features = compute_rolling_features(df, "customer_id", WINDOWS)
print(f"Done. Shape: {df_features.shape}")

In [None]:
# Compute terminal-level rolling features
print("Computing terminal-level rolling features...")
df_features = compute_rolling_features(df_features, "terminal_id", WINDOWS)
print(f"Done. Shape: {df_features.shape}")

In [None]:
# Drop temporary columns and inspect the feature set
df_features = df_features.drop(columns=["tx_date", "tx_hour"], errors="ignore")

print(f"Feature table shape: {df_features.shape}")
print(f"\nColumns:")
for col in df_features.columns:
    print(f"  - {col}: {df_features[col].dtype}")

print(f"\nNull counts:")
print(df_features.isnull().sum())

df_features.head(10)

In [None]:
# Feature correlation heatmap
feature_cols = [c for c in df_features.columns if c.startswith(("count_", "avg_", "max_"))] + ["tx_amount"]

fig, ax = plt.subplots(figsize=(14, 10))
corr = df_features[feature_cols + ["tx_fraud"]].corr()
sns.heatmap(corr, annot=True, fmt=".2f", cmap="RdBu_r", center=0,
            square=True, linewidths=0.5, ax=ax)
ax.set_title("Feature Correlation Matrix")
plt.tight_layout()
plt.show()

---
## 1.4 Write Features Back to BigQuery

Write the engineered feature table to `features.fraud_features` so BigQuery remains
the single source of truth for both raw data and features.

In [None]:
# Create the features dataset if it does not exist
features_dataset_ref = bigquery.DatasetReference(PROJECT_ID, FEATURES_DATASET)
try:
    bq_client.get_dataset(features_dataset_ref)
    print(f"Dataset '{FEATURES_DATASET}' already exists.")
except Exception:
    dataset = bigquery.Dataset(features_dataset_ref)
    dataset.location = "US"
    bq_client.create_dataset(dataset)
    print(f"Created dataset '{FEATURES_DATASET}'.")

In [None]:
# Write the feature table to BigQuery
destination_table = f"{PROJECT_ID}.{FEATURES_DATASET}.{FEATURES_TABLE}"

job_config = bigquery.LoadJobConfig(
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,  # overwrite
)

print(f"Writing {len(df_features):,} rows to {destination_table}...")
job = bq_client.load_table_from_dataframe(df_features, destination_table, job_config=job_config)
job.result()  # wait for completion

# Verify
table = bq_client.get_table(destination_table)
print(f"Written successfully: {table.num_rows:,} rows, {table.num_bytes:,} bytes")

---
## 1.5 Train Model Locally

- Time-based train/test split (no future data leakage)
- Train XGBoost classifier
- Evaluate: precision, recall, AUC-ROC, confusion matrix

In [None]:
# Define feature columns and label
FEATURE_COLS = [c for c in df_features.columns if c.startswith(("count_", "avg_", "max_"))] + ["tx_amount"]
LABEL_COL = "tx_fraud"

print(f"Number of features: {len(FEATURE_COLS)}")
print(f"Features: {FEATURE_COLS}")

In [None]:
# Time-based train/test split
# Use the split date from config (or compute one based on the data)
SPLIT_DATE = pd.Timestamp("2023-06-01")

# If the data does not span this date, use a 80/20 time-based split
if SPLIT_DATE < df_features["tx_ts"].min() or SPLIT_DATE > df_features["tx_ts"].max():
    SPLIT_DATE = df_features["tx_ts"].quantile(0.8)
    print(f"Adjusted split date to 80th percentile: {SPLIT_DATE}")

train_mask = df_features["tx_ts"] < SPLIT_DATE
test_mask = df_features["tx_ts"] >= SPLIT_DATE

X_train = df_features.loc[train_mask, FEATURE_COLS].fillna(0)
y_train = df_features.loc[train_mask, LABEL_COL]
X_test = df_features.loc[test_mask, FEATURE_COLS].fillna(0)
y_test = df_features.loc[test_mask, LABEL_COL]

print(f"Split date: {SPLIT_DATE}")
print(f"Train set: {len(X_train):,} rows ({y_train.mean()*100:.2f}% fraud)")
print(f"Test set:  {len(X_test):,} rows ({y_test.mean()*100:.2f}% fraud)")

In [None]:
# Train XGBoost classifier
# Compute scale_pos_weight to handle class imbalance
neg_count = (y_train == 0).sum()
pos_count = (y_train == 1).sum()
scale_pos_weight = neg_count / pos_count if pos_count > 0 else 1.0

xgb_params = {
    "max_depth": 6,
    "n_estimators": 200,
    "learning_rate": 0.1,
    "scale_pos_weight": scale_pos_weight,
    "eval_metric": "auc",
    "objective": "binary:logistic",
    "random_state": 42,
    "n_jobs": -1,
}

print("Training XGBoost with parameters:")
for k, v in xgb_params.items():
    print(f"  {k}: {v}")

model = xgb.XGBClassifier(**xgb_params)
model.fit(
    X_train, y_train,
    eval_set=[(X_test, y_test)],
    verbose=20,
)
print("\nTraining complete.")

In [None]:
# Predictions and probabilities
y_pred = model.predict(X_test)
y_prob = model.predict_proba(X_test)[:, 1]

# AUC-ROC
auc_roc = roc_auc_score(y_test, y_prob)

print("=" * 60)
print("MODEL EVALUATION")
print("=" * 60)
print(f"\nAUC-ROC: {auc_roc:.4f}")
print(f"\nClassification Report:")
print(classification_report(y_test, y_pred, target_names=["Legitimate", "Fraud"]))

In [None]:
# Confusion matrix
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Confusion matrix
cm = confusion_matrix(y_test, y_pred)
ConfusionMatrixDisplay(cm, display_labels=["Legitimate", "Fraud"]).plot(ax=axes[0], cmap="Blues")
axes[0].set_title("Confusion Matrix")

# ROC curve
fpr, tpr, _ = roc_curve(y_test, y_prob)
axes[1].plot(fpr, tpr, color="coral", lw=2, label=f"AUC = {auc_roc:.4f}")
axes[1].plot([0, 1], [0, 1], color="gray", linestyle="--")
axes[1].set_xlabel("False Positive Rate")
axes[1].set_ylabel("True Positive Rate")
axes[1].set_title("ROC Curve")
axes[1].legend(loc="lower right")

plt.tight_layout()
plt.show()

In [None]:
# Precision-Recall curve (more informative for imbalanced classes)
precision, recall, thresholds = precision_recall_curve(y_test, y_prob)

fig, ax = plt.subplots(figsize=(8, 5))
ax.plot(recall, precision, color="coral", lw=2)
ax.set_xlabel("Recall")
ax.set_ylabel("Precision")
ax.set_title("Precision-Recall Curve")
ax.set_xlim([0, 1])
ax.set_ylim([0, 1])
plt.tight_layout()
plt.show()

In [None]:
# Feature importance
importance = model.feature_importances_
importance_df = pd.DataFrame({
    "feature": FEATURE_COLS,
    "importance": importance,
}).sort_values("importance", ascending=True)

fig, ax = plt.subplots(figsize=(10, 8))
ax.barh(importance_df["feature"], importance_df["importance"], color="steelblue")
ax.set_xlabel("Feature Importance (Gain)")
ax.set_title("XGBoost Feature Importance")
plt.tight_layout()
plt.show()

---
## 1.6 Iterate

Tweak features, hyperparameters, and re-run cells to improve the model.
This section demonstrates the fast feedback loop -- change parameters and see results immediately.

In [None]:
# --- Iteration example: try different hyperparameters ---
# Modify these and re-run to see the impact.

iteration_params = [
    {"max_depth": 4, "n_estimators": 100, "learning_rate": 0.05},
    {"max_depth": 6, "n_estimators": 200, "learning_rate": 0.1},
    {"max_depth": 8, "n_estimators": 300, "learning_rate": 0.1},
    {"max_depth": 6, "n_estimators": 500, "learning_rate": 0.05},
]

results = []
for i, params in enumerate(iteration_params):
    full_params = {
        **params,
        "scale_pos_weight": scale_pos_weight,
        "eval_metric": "auc",
        "objective": "binary:logistic",
        "random_state": 42,
        "n_jobs": -1,
    }
    m = xgb.XGBClassifier(**full_params)
    m.fit(X_train, y_train, eval_set=[(X_test, y_test)], verbose=False)

    y_prob_iter = m.predict_proba(X_test)[:, 1]
    auc = roc_auc_score(y_test, y_prob_iter)

    results.append({"run": i + 1, **params, "auc_roc": auc})
    print(f"Run {i+1}: max_depth={params['max_depth']}, n_estimators={params['n_estimators']}, "
          f"lr={params['learning_rate']} -> AUC: {auc:.4f}")

results_df = pd.DataFrame(results)
print("\nAll results:")
results_df

In [None]:
# Select the best model from the iteration
best_idx = results_df["auc_roc"].idxmax()
best_params = results_df.iloc[best_idx]
print(f"Best run: #{int(best_params['run'])} with AUC-ROC: {best_params['auc_roc']:.4f}")
print(f"Parameters: max_depth={int(best_params['max_depth'])}, "
      f"n_estimators={int(best_params['n_estimators'])}, "
      f"learning_rate={best_params['learning_rate']}")

# Retrain with the best parameters
best_xgb_params = {
    "max_depth": int(best_params["max_depth"]),
    "n_estimators": int(best_params["n_estimators"]),
    "learning_rate": best_params["learning_rate"],
    "scale_pos_weight": scale_pos_weight,
    "eval_metric": "auc",
    "objective": "binary:logistic",
    "random_state": 42,
    "n_jobs": -1,
}

best_model = xgb.XGBClassifier(**best_xgb_params)
best_model.fit(X_train, y_train, eval_set=[(X_test, y_test)], verbose=False)

y_prob_best = best_model.predict_proba(X_test)[:, 1]
y_pred_best = best_model.predict(X_test)
best_auc = roc_auc_score(y_test, y_prob_best)

print(f"\nFinal AUC-ROC: {best_auc:.4f}")
print(f"\nClassification Report:")
print(classification_report(y_test, y_pred_best, target_names=["Legitimate", "Fraud"]))

---
## 1.7 Log to Vertex AI Experiments

Track metrics, parameters, and artifacts using the Vertex AI Experiments SDK.
This creates a traceable record of every experiment run.

In [None]:
from google.cloud import aiplatform

# Initialize Vertex AI
aiplatform.init(
    project=PROJECT_ID,
    location=REGION,
)

EXPERIMENT_NAME = "fraud-detection-exploration"
print(f"Initializing experiment: {EXPERIMENT_NAME}")

In [None]:
# Create or get the experiment
aiplatform.init(
    project=PROJECT_ID,
    location=REGION,
    experiment=EXPERIMENT_NAME,
)

# Log the best run
RUN_NAME = f"xgb-d{int(best_params['max_depth'])}-n{int(best_params['n_estimators'])}-lr{best_params['learning_rate']}"

with aiplatform.start_run(RUN_NAME) as run:
    # Log parameters
    run.log_params({
        "max_depth": int(best_params["max_depth"]),
        "n_estimators": int(best_params["n_estimators"]),
        "learning_rate": best_params["learning_rate"],
        "scale_pos_weight": round(scale_pos_weight, 2),
        "objective": "binary:logistic",
        "num_features": len(FEATURE_COLS),
        "train_size": len(X_train),
        "test_size": len(X_test),
        "split_date": str(SPLIT_DATE),
        "windows": str(WINDOWS),
    })

    # Log metrics
    from sklearn.metrics import precision_score, recall_score, f1_score

    run.log_metrics({
        "auc_roc": round(best_auc, 4),
        "precision": round(precision_score(y_test, y_pred_best), 4),
        "recall": round(recall_score(y_test, y_pred_best), 4),
        "f1": round(f1_score(y_test, y_pred_best), 4),
        "train_fraud_rate": round(y_train.mean(), 4),
        "test_fraud_rate": round(y_test.mean(), 4),
    })

    print(f"Logged run '{RUN_NAME}' to experiment '{EXPERIMENT_NAME}'")
    print(f"  AUC-ROC: {best_auc:.4f}")
    print(f"  Precision: {precision_score(y_test, y_pred_best):.4f}")
    print(f"  Recall: {recall_score(y_test, y_pred_best):.4f}")
    print(f"  F1: {f1_score(y_test, y_pred_best):.4f}")

In [None]:
# Retrieve and display all experiment runs
experiment_df = aiplatform.get_experiment_df(EXPERIMENT_NAME)
print(f"All runs for experiment '{EXPERIMENT_NAME}':")
experiment_df

---
## Next Steps

Now that we have a working model and tracked our experiments, the next step is to
**productionize** this workflow:

1. **Refactor into Python modules** -- move the feature engineering and training logic
   from this notebook into `fraud_detector/feature_engineering.py` and `fraud_detector/training.py`.

2. **Build KFP pipelines** -- wire the modules into Vertex AI Pipelines for automated
   training and batch scoring.

3. **Set up CI/CD** -- GitHub Actions for testing, staging deployment, and production rollout.

4. **Enable monitoring** -- Vertex AI Model Monitoring for data and prediction drift.

See the project README for the full walkthrough.