Install required packages

In [13]:
%pip install -U "kfp>=2" "google-cloud-pipeline-components>=2" google-cloud-aiplatform


Note: you may need to restart the kernel to use updated packages.


Restart the kernel

In [14]:
import importlib, pkgutil
import IPython

IPython.get_ipython().kernel.do_shutdown(restart=True)

{'status': 'ok', 'restart': True}

In [27]:
# Imports from the later cell (assuming you run this after the import cell)
import kfp
import google_cloud_pipeline_components

print(f"KFP SDK version: {kfp.__version__}")
# Note: google-cloud-aiplatform does not expose __version__ directly on its top-level import
print(f"google-cloud-pipeline-components version: {google_cloud_pipeline_components.__version__}")

KFP SDK version: 2.14.6
google-cloud-pipeline-components version: 2.21.0


Imports

In [28]:
import kfp
import typing
from typing import Dict
from typing import NamedTuple
from kfp import dsl
from kfp.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component, 
                        OutputPath, 
                        InputPath)
import google.cloud.aiplatform as aip
from google_cloud_pipeline_components.types import artifact_types

Project and pipeline configurations

In [40]:
PROJECT_ID   = "assignment1-476007"        
REGION       = "us-central1"
PIPELINE_ROOT = "gs://assignment1group3/runs"
DATA_BUCKET = "assignment1group3"
DATA_FILE = 'data/penguins_clean.csv'
MODEL_DIR = 'gs://assignment1group3/models'

Pipeline component: Data Ingestion

In [81]:
@dsl.component(
    packages_to_install= ["pandas", "google-cloud-storage"],
    base_image="python:3.10.7-slim",
)
def download_data(project_id: str,
                  bucket: str,
                  file_name: str,
                  dataset: Output[Dataset]):
    """Download data"""
    from google.cloud import storage
    import pandas as pd
    import logging
    import sys

    logging.basicConfig(stream=sys.stdout, level=logging.INFO)

    client = storage.Client(project=project_id)
    bucket = client.bucket(bucket)
    blob = bucket.blob(file_name)
    local_csv = dataset.path + '.csv'
    blob.download_to_filename(local_csv)
    logging.info(f"Downloaded to {local_csv}")

Pipeline component: Preprocess data

In [89]:
@dsl.component(
        packages_to_install=['pandas', 'scikit-learn'],
        base_image="python:3.10.7-slim",
        )
def preprocess(dataset: Input[Dataset],
               train_out: Output[Dataset],
               test_out: Output[Dataset]):
    
    import pandas as pd
    from sklearn.model_selection import train_test_split
    import logging
    import sys
    import os

    logging.basicConfig(stream=sys.stdout, level=logging.INFO)

    cols = ["bill_length_mm","bill_depth_mm","flipper_length_mm","body_mass_g","species"]
    df = pd.read_csv(dataset.path + ".csv")[cols].dropna()
    train, test = train_test_split(df, test_size=0.2, random_state=42, stratify=df["species"])
    
    os.makedirs(train_out.path, exist_ok=True)
    train_csv_path = os.path.join(train_out.path, "train.csv")
    train.to_csv(train_csv_path, index=False)
    
    os.makedirs(test_out.path, exist_ok=True)
    test_csv_path = os.path.join(test_out.path, "test.csv")
    test.to_csv(test_csv_path, index=False)
    

Pipeline component: train data

In [96]:
@dsl.component(
    base_image="python:3.10-slim",
    packages_to_install=["pandas","scikit-learn","joblib"],
)
def train(
    train_ds: Input[Dataset],   # expects train_ds.path/train.csv
    model_art: Output[Model],   # directory to store model files
):
    """Train sklearn pipeline, save model.pkl + model_meta.json into model_art.path"""
    import os, sys, logging, json, joblib
    import pandas as pd
    from sklearn.preprocessing import StandardScaler
    from sklearn.linear_model import LogisticRegression
    from sklearn.pipeline import Pipeline
    from sklearn.preprocessing import LabelEncoder

    logging.basicConfig(stream=sys.stdout, level=logging.INFO)

    train_csv = os.path.join(train_ds.path, "train.csv")
    logging.info(f"[train] reading {train_csv}")

    df = pd.read_csv(train_csv)
    X = df[["bill_length_mm","bill_depth_mm","flipper_length_mm","body_mass_g"]]
    y = df["species"]
    
    le = LabelEncoder()
    y_encoded = le.fit_transform(y)

    pipe = Pipeline(
        steps=[
            ("scaler", StandardScaler()),
            ("clf", LogisticRegression(max_iter=500, multi_class="ovr", random_state=42)),
        ]
    ).fit(X, y_encoded)

    os.makedirs(model_art.path, exist_ok=True)
    model_pkl = os.path.join(model_art.path, "model.pkl")
    meta_json = os.path.join(model_art.path, "model_meta.json")

    joblib.dump(pipe, model_pkl)
    
    with open(meta_json, "w") as f:
        json.dump({"classes": sorted(y.unique())}, f)

    logging.info(f"[train] saved model to {model_pkl}")


Component: predict

In [109]:
@dsl.component(
    base_image="python:3.10-slim",
    packages_to_install=["pandas","joblib", "scikit-learn"],
)
def batch_predict(
    model_art: Input[Model],      # expects model_art.path/model.pkl
    features_ds: Input[Dataset],  # expects features_ds.path/test.csv (or features.csv)
    predictions_out: Output[Dataset],  # writes predictions_out.path/preds.csv
):
    """Run batch predictions on features and store preds.csv"""
    import os, sys, logging, joblib
    import pandas as pd

    logging.basicConfig(stream=sys.stdout, level=logging.INFO)

    feat_csv  = os.path.join(features_ds.path, "test.csv")
    model_pkl = os.path.join(model_art.path, "model.pkl")

    logging.info(f"[predict] reading features={feat_csv}")
    logging.info(f"[predict] loading model={model_pkl}")

    X = pd.read_csv(feat_csv)[["bill_length_mm","bill_depth_mm","flipper_length_mm","body_mass_g"]]
    model = joblib.load(model_pkl)
    preds = model.predict(X)

    os.makedirs(predictions_out.path, exist_ok=True)
    out_csv = os.path.join(predictions_out.path, "preds.csv")
    pd.DataFrame({"prediction": preds}).to_csv(out_csv, index=False)
    logging.info(f"[predict] wrote {out_csv}")

Component: Evaluation

In [114]:
@dsl.component(
    base_image="python:3.10-slim",
    packages_to_install=["pandas","joblib","fsspec","gcsfs", 'scikit-learn'],
)
def evaluate_and_promote(
    test_ds: Input[Dataset],   # expects test_ds.path/test.csv
    model_art: Input[Model],   # expects model_art.path/model.pkl
    metrics: Output[Metrics],
    model_dir: str,            # e.g. gs://assignment1group3/models
):
    """Compute accuracy and copy model files to a stable GCS location (promotion)."""
    import os, sys, logging, json, joblib
    import pandas as pd
    from sklearn.metrics import accuracy_score
    import fsspec
    from sklearn.preprocessing import LabelEncoder

    logging.basicConfig(stream=sys.stdout, level=logging.INFO)

    test_csv  = os.path.join(test_ds.path, "test.csv")
    model_pkl = os.path.join(model_art.path, "model.pkl")
    meta_json = os.path.join(model_art.path, "model_meta.json")

    logging.info(f"[eval] reading test={test_csv}")
    logging.info(f"[eval] loading model={model_pkl}")

    df = pd.read_csv(test_csv)
    X = df[["bill_length_mm","bill_depth_mm","flipper_length_mm","body_mass_g"]]
    y = df["species"]

    model = joblib.load(model_pkl)
    
    with open(meta_json, 'r') as f:
        meta = json.load(f)
    classes = meta['classes']
    
    le = LabelEncoder()
    le.fit(classes)
    y_encoded = le.transform(y)
    
    predictions = model.predict(X)
    acc = float(accuracy_score(y, predictions))
    metrics.log_metric("accuracy", acc)
    logging.info(f"[eval] accuracy={acc:.4f}")

    # Promote model files
    proj_model_dir = model_dir.rstrip("/")
    fs = fsspec.filesystem("gcs")
    fs.put(model_pkl, f"{proj_model_dir}/model.pkl")
    fs.put(meta_json, f"{proj_model_dir}/model_meta.json")
    with fs.open(f"{proj_model_dir}/metrics.json", "w") as f:
        json.dump({"accuracy": acc}, f)

    logging.info(f"[eval] promoted model to {proj_model_dir}")



Component: pipeline definition

In [115]:
@kfp.dsl.pipeline(name="penguins-pipeline")

def penguins_pipeline(
    project_id: str,
    data_bucket: str,
    data_file: str,
    model_dir: str,
    run_id: str = "manual-run",
):
    # 1) Ingest
    ingest = download_data(
        project_id=project_id,
        bucket=data_bucket,
        file_name=data_file,
    )

    # 2) Preprocess
    prep = preprocess(
        dataset=ingest.outputs["dataset"]
    )

    # 3) Train
    trn = train(
        train_ds=prep.outputs["train_out"]
    )

    # 4) Predict on test set (optional but nice to have)
    pred = batch_predict(
        model_art=trn.outputs["model_art"],
        features_ds=prep.outputs["test_out"],
    )

    # 5) Evaluate + Promote
    evaluate_and_promote(
        test_ds=prep.outputs["test_out"],
        model_art=trn.outputs["model_art"],
        model_dir=model_dir,
    )


Compile into YAML file

In [116]:
from kfp import compiler
compiler.Compiler().compile(pipeline_func = penguins_pipeline, 
                            package_path = 'penguins_pipeline.yaml')

In [117]:
import google.cloud.aiplatform as aip

PROJECT_ID   = "assignment1-476007"
REGION       = "us-central1"
PIPELINE_ROOT = "gs://assignment1group3/runs"
DATA_BUCKET  = "assignment1group3"
DATA_FILE    = "data/penguins_clean.csv"
MODEL_DIR    = "gs://assignment1group3/models"
STAGING_BUCKET = "gs://assignment1group3"

# Before initializing, make sure to set the GOOGLE_APPLICATION_CREDENTIALS
# environment variable to the path of your service account.
aip.init(
    project=PROJECT_ID,
    location=REGION,
    staging_bucket=STAGING_BUCKET,
)

# Prepare the pipeline job
job = aip.PipelineJob(
    display_name="penguins-pipeline-1",
    template_path="penguins_pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={
        'project_id': PROJECT_ID,
        'data_bucket': DATA_BUCKET,
        'data_file': DATA_FILE,
        'model_dir': MODEL_DIR,
        'run_id' : 'run1' 
    }
)

job.run(enable_preflight_validations=True)