In [87]:
# 04_kubeflow_pipeline.ipynb
# MLOps Pipeline with Kubeflow on Vertex AI

"""
This notebook creates a production ML pipeline that:
1. Ingests data from GCS
2. Engineers features
3. Trains XGBoost model
4. Evaluates against threshold
5. Registers model to Vertex AI Model Registry
6. Deploys if performance meets criteria

Pipeline Architecture:
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   Ingest    │───▶│  Feature    │───▶│   Train     │
│    Data     │    │ Engineering │    │   Model     │
└─────────────┘    └─────────────┘    └─────────────┘
                                            │
                                            ▼
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   Deploy    │◀───│  Register   │◀───│  Evaluate   │
│  (if pass)  │    │   Model     │    │   Model     │
└─────────────┘    └─────────────┘    └─────────────┘
"""

# Install Kubeflow Pipelines SDK
!pip install kfp google-cloud-aiplatform --quiet

print("✓ Kubeflow Pipelines SDK installed")

✓ Kubeflow Pipelines SDK installed


In [88]:
# Cell 2: Imports and Pipeline Components
from kfp import dsl
from kfp.dsl import component, Output, Input, Dataset, Model, Metrics
from google.cloud import aiplatform

# Initialize Vertex AI
PROJECT_ID = "transaction-forecast-mlops"
REGION = "us-central1"
BUCKET = "gs://transaction-forecast-data"

aiplatform.init(project=PROJECT_ID, location=REGION)

print(f"Project: {PROJECT_ID}")
print(f"Region: {REGION}")
print(f"Bucket: {BUCKET}")
print("\n✓ Vertex AI initialized")

Project: transaction-forecast-mlops
Region: us-central1
Bucket: gs://transaction-forecast-data

✓ Vertex AI initialized


In [89]:
# Cell 3: Data Ingestion Component
@component(
    base_image="python:3.10",
    packages_to_install=["pandas", "google-cloud-storage"]
)
def ingest_data(
    bucket_name: str,
    output_data: Output[Dataset]
):
    """Load raw data from GCS and prepare for feature engineering."""
    import pandas as pd
    from google.cloud import storage
    
    # Download data
    client = storage.Client()
    bucket = client.bucket(bucket_name)
    bucket.blob('processed_data/daily_volumes_clean.csv').download_to_filename('/tmp/data.csv')
    
    # Load and validate
    df = pd.read_csv('/tmp/data.csv')
    print(f"✓ Loaded {len(df)} rows")
    print(f"  Date range: {df['date'].min()} to {df['date'].max()}")
    
    # Save to output path
    df.to_csv(output_data.path, index=False)
    print(f"✓ Data saved to {output_data.path}")

In [90]:
# Cell 4: Feature Engineering Component
@component(
    base_image="python:3.10",
    packages_to_install=["pandas", "numpy", "holidays"]
)
def engineer_features(
    input_data: Input[Dataset],
    output_data: Output[Dataset]
):
    """Create all features needed for XGBoost model."""
    import pandas as pd
    import numpy as np
    import holidays
    
    df = pd.read_csv(input_data.path)
    df['date'] = pd.to_datetime(df['date'])
    
    # Temporal features
    df['day_of_week'] = df['date'].dt.dayofweek
    df['month'] = df['date'].dt.month
    df['quarter'] = df['date'].dt.quarter
    df['day_of_month'] = df['date'].dt.day
    df['week_of_year'] = df['date'].dt.isocalendar().week.astype(int)
    df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int)
    df['is_month_start'] = df['date'].dt.is_month_start.astype(int)
    df['is_month_end'] = df['date'].dt.is_month_end.astype(int)
    
    # Lag features
    for lag in [1, 7, 14, 30]:
        df[f'lag_{lag}'] = df['transaction_volume'].shift(lag)
    
    # Rolling features
    for window in [7, 14, 30]:
        df[f'rolling_mean_{window}'] = df['transaction_volume'].rolling(window).mean()
    df['rolling_std_7'] = df['transaction_volume'].rolling(7).std()
    df['rolling_std_30'] = df['transaction_volume'].rolling(30).std()
    df['rolling_min_7'] = df['transaction_volume'].rolling(7).min()
    df['rolling_max_7'] = df['transaction_volume'].rolling(7).max()
    
    # Holiday features
    br_holidays = holidays.Brazil(years=[2016, 2017, 2018])
    df['is_holiday'] = df['date'].dt.date.apply(lambda x: 1 if x in br_holidays else 0)
    
    # Trend features
    df['days_since_start'] = (df['date'] - df['date'].min()).dt.days
    df['transaction_growth'] = df['transaction_volume'].pct_change()
    df['momentum_7'] = df['transaction_volume'].diff(7)
    
    # Fill NaN
    df = df.fillna(method='ffill').fillna(0)
    
    print(f"✓ Created {len(df.columns)} features")
    df.to_csv(output_data.path, index=False)

In [91]:
# Cell 5: Train Model Component
@component(
    base_image="python:3.10",
    packages_to_install=["pandas", "numpy", "xgboost", "scikit-learn", "joblib", "google-cloud-storage"]
)
def train_model(
    input_data: Input[Dataset],
    model_output: Output[Model],
    metrics_output: Output[Metrics],
    random_state: int = 42
):
    """Train XGBoost model with best parameters."""
    import pandas as pd
    import numpy as np
    import joblib
    import os
    from xgboost import XGBRegressor
    from sklearn.metrics import mean_absolute_error, mean_squared_error, mean_absolute_percentage_error
    
    df = pd.read_csv(input_data.path)
    df['date'] = pd.to_datetime(df['date'])
    
    exclude_cols = ['date', 'transaction_volume', 'day_name']
    feature_cols = [c for c in df.columns if c not in exclude_cols and c in df.select_dtypes(include=[np.number]).columns]
    
    train = df.iloc[:-14]
    test = df.iloc[-14:]
    
    X_train = train[feature_cols]
    y_train = train['transaction_volume']
    X_test = test[feature_cols]
    y_test = test['transaction_volume']
    
    model = XGBRegressor(
        n_estimators=100,
        max_depth=6,
        learning_rate=0.1,
        objective='reg:squarederror',
        random_state=random_state
    )
    
    model.fit(X_train, y_train)
    
    y_pred = model.predict(X_test)
    mae = mean_absolute_error(y_test, y_pred)
    rmse = np.sqrt(mean_squared_error(y_test, y_pred))
    mape = mean_absolute_percentage_error(y_test, y_pred) * 100
    
    print(f"✓ Model trained (random_state={random_state})")
    print(f"  MAE:  {mae:.2f}")
    print(f"  RMSE: {rmse:.2f}")
    print(f"  MAPE: {mape:.2f}%")
    
    metrics_output.log_metric("mae", mae)
    metrics_output.log_metric("rmse", rmse)
    metrics_output.log_metric("mape", mape)
    
    os.makedirs(model_output.path, exist_ok=True)
    joblib.dump(model, os.path.join(model_output.path, "model.joblib"))
    joblib.dump(feature_cols, os.path.join(model_output.path, "features.joblib"))
    print(f"✓ Model saved to {model_output.path}")

print("✓ Train component defined")

✓ Train component defined


In [92]:
# Cell 6: Evaluate Model Component
@component(
    base_image="python:3.10",
    packages_to_install=["google-cloud-aiplatform"]
)
def evaluate_model(
    metrics: Input[Metrics],
    project_id: str,
    region: str,
    model_name: str,
    initial_threshold: float = 10.0,
    max_threshold: float = 15.0,
    min_improvement: float = 0.5
) -> bool:
    """Compare challenger against champion model."""
    from google.cloud import aiplatform
    
    aiplatform.init(project=project_id, location=region)
    
    challenger_mape = metrics.metadata.get("mape", 100)
    
    print(f"EVALUATION")
    print(f"   Challenger MAPE: {challenger_mape:.2f}%")
    
    if challenger_mape > max_threshold:
        print(f"   Exceeds max threshold ({max_threshold}%) - REJECTED")
        return False
    
    models = aiplatform.Model.list(filter=f'display_name="{model_name}"')
    
    if not models:
        passed = challenger_mape <= initial_threshold
        print(f"   No champion exists")
        print(f"   Initial threshold: {initial_threshold}%")
        print(f"   {'PASSED' if passed else 'FAILED'}")
        return passed
    
    champion = models[0]
    champion_mape_str = champion.labels.get("mape", str(initial_threshold).replace(".", "_"))
    champion_mape = float(champion_mape_str.replace("_", "."))
    
    print(f"   Champion MAPE: {champion_mape:.2f}%")
    print(f"   Required improvement: {min_improvement}%")
    
    improvement = champion_mape - challenger_mape
    passed = improvement >= min_improvement
    
    print(f"   Actual improvement: {improvement:.2f}%")
    print(f"   {'CHALLENGER WINS' if passed else 'CHAMPION DEFENDED'}")
    
    return passed

print("Done")

Done


In [93]:
# Cell 7: Register Model Component
@component(
    base_image="python:3.10",
    packages_to_install=["google-cloud-aiplatform", "joblib"]
)
def register_model(
    model_input: Input[Model],
    metrics: Input[Metrics],
    project_id: str,
    region: str,
    model_name: str
) -> str:
    """Register model to Vertex AI Model Registry with metrics as labels."""
    from google.cloud import aiplatform
    
    aiplatform.init(project=project_id, location=region)
    
    mape = metrics.metadata.get("mape", 0)
    mae = metrics.metadata.get("mae", 0)
    
    model = aiplatform.Model.upload(
        display_name=model_name,
        artifact_uri=model_input.uri,
        serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-2:latest",
        labels={
            "mape": str(round(mape, 2)).replace(".", "_"),
            "mae": str(round(mae, 2)).replace(".", "_")
        }
    )
    
    print(f"Model registered")
    print(f"  Name: {model_name}")
    print(f"  MAPE: {mape:.2f}%")
    print(f"  Resource: {model.resource_name}")
    
    return model.resource_name

print("Done")

Done


In [94]:
# Cell 8: Define the Pipeline
@dsl.pipeline(
    name="transaction-forecast-pipeline",
    description="End-to-end MLOps pipeline for transaction volume forecasting"
)
def transaction_forecast_pipeline(
    bucket_name: str = "transaction-forecast-data",
    project_id: str = "transaction-forecast-mlops",
    region: str = "us-central1",
    model_name: str = "transaction-forecast-xgboost",
    initial_threshold: float = 10.0,
    max_threshold: float = 15.0,
    min_improvement: float = 0.5,
    random_state: int = 42
):
    ingest_task = ingest_data(bucket_name=bucket_name)
    feature_task = engineer_features(input_data=ingest_task.outputs["output_data"])
    train_task = train_model(input_data=feature_task.outputs["output_data"], random_state=random_state)
    
    eval_task = evaluate_model(
        metrics=train_task.outputs["metrics_output"],
        project_id=project_id,
        region=region,
        model_name=model_name,
        initial_threshold=initial_threshold,
        max_threshold=max_threshold,
        min_improvement=min_improvement
    )
    
    with dsl.Condition(eval_task.output == True, name="check-performance"):
        register_task = register_model(
            model_input=train_task.outputs["model_output"],
            metrics=train_task.outputs["metrics_output"],
            project_id=project_id,
            region=region,
            model_name=model_name
        )

print("✓ Pipeline defined")

✓ Pipeline defined


  with dsl.Condition(eval_task.output == True, name="check-performance"):


In [95]:
# Cell 9: Compile Pipeline
from kfp import compiler

compiler.Compiler().compile(
    pipeline_func=transaction_forecast_pipeline,
    package_path="transaction_forecast_pipeline.json"
)

print("✓ Pipeline compiled to transaction_forecast_pipeline.json")

✓ Pipeline compiled to transaction_forecast_pipeline.json


In [96]:
# Cell 10: Submit Pipeline
from google.cloud import aiplatform

aiplatform.init(
    project=PROJECT_ID,
    location=REGION,
    staging_bucket=BUCKET
)

job = aiplatform.PipelineJob(
    display_name="transaction-forecast-run-7",
    template_path="transaction_forecast_pipeline.json",
    pipeline_root=f"{BUCKET}/pipeline_root",
    enable_caching=False,
    parameter_values={
        "bucket_name": "transaction-forecast-data",
        "project_id": PROJECT_ID,
        "region": REGION,
        "model_name": "transaction-forecast-xgboost",
        "initial_threshold": 10.0,
        "max_threshold": 15.0,
        "min_improvement": 0.5,
        "random_state": 42
    }
)

print("Submitting pipeline to Vertex AI...")
print("This will take 10-15 minutes to run.\n")
job.submit()
print(f"✓ Pipeline submitted!")
print(f"  View at: https://console.cloud.google.com/vertex-ai/pipelines/runs?project={PROJECT_ID}")

Submitting pipeline to Vertex AI...
This will take 10-15 minutes to run.

Creating PipelineJob
PipelineJob created. Resource name: projects/1023117266322/locations/us-central1/pipelineJobs/transaction-forecast-pipeline-20251209033042
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/1023117266322/locations/us-central1/pipelineJobs/transaction-forecast-pipeline-20251209033042')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/transaction-forecast-pipeline-20251209033042?project=1023117266322
✓ Pipeline submitted!
  View at: https://console.cloud.google.com/vertex-ai/pipelines/runs?project=transaction-forecast-mlops
