### Installation
Install the packages required for executing this notebook.

In [1]:
# Install the packages
! pip3 install --user --no-cache-dir --upgrade "kfp>2" "google-cloud-pipeline-components>2" \
                                        google-cloud-aiplatform

Collecting kfp>2
  Downloading kfp-2.14.6-py3-none-any.whl.metadata (4.4 kB)
Collecting google-cloud-pipeline-components>2
  Downloading google_cloud_pipeline_components-2.21.0-py3-none-any.whl.metadata (5.7 kB)
Collecting google-cloud-aiplatform
  Downloading google_cloud_aiplatform-1.124.0-py2.py3-none-any.whl.metadata (45 kB)
     ---------------------------------------- 0.0/45.2 kB ? eta -:--:--
     ------------------------------------ --- 41.0/45.2 kB 2.0 MB/s eta 0:00:01
     ---------------------------------------- 45.2/45.2 kB 1.1 MB/s eta 0:00:00
Collecting click==8.1.8 (from kfp>2)
  Downloading click-8.1.8-py3-none-any.whl.metadata (2.3 kB)
Collecting click-option-group==0.5.7 (from kfp>2)
  Downloading click_option_group-0.5.7-py3-none-any.whl.metadata (5.8 kB)
Collecting docstring-parser<1,>=0.7.3 (from kfp>2)
  Downloading docstring_parser-0.17.0-py3-none-any.whl.metadata (3.5 kB)
Collecting google-api-core!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.0,<3.0.0dev,>=1.31.5 (from kfp>2)
 

ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
streamlit 1.32.0 requires packaging<24,>=16.8, but you have packaging 25.0 which is incompatible.
streamlit 1.32.0 requires protobuf<5,>=3.20, but you have protobuf 6.33.0 which is incompatible.
streamlit 1.32.0 requires tenacity<9,>=8.1.0, but you have tenacity 9.1.2 which is incompatible.


## Restart the kernel
Once you've installed the additional packages, you need to restart the notebook kernel so it can find the packages.

In [None]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

: 

## Check the versions of the packages you installed. The KFP SDK version should be >2.

In [1]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! pip3 freeze | grep aiplatform
! python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

Python was not found; run without arguments to install from the Microsoft Store, or disable this shortcut from Settings > Apps > Advanced app settings > App execution aliases.
'grep' is not recognized as an internal or external command,
operable program or batch file.
Python was not found; run without arguments to install from the Microsoft Store, or disable this shortcut from Settings > Apps > Advanced app settings > App execution aliases.


In [2]:
import kfp
import typing
from typing import Dict
from typing import NamedTuple
from kfp import dsl
from kfp.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component, 
                        OutputPath, 
                        InputPath)
import google.cloud.aiplatform as aip
from google_cloud_pipeline_components.types import artifact_types

#### Project and Pipeline Configurations

In [3]:
#The Google Cloud project that this pipeline runs in.
PROJECT_ID = "data-engineering-1-473218"
# The region that this pipeline runs in
REGION = "europe-west4"
# Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored within the pipeline root.
PIPELINE_ROOT = "gs://ass1_temp_bucket/"

### Train Model

In [4]:
@dsl.component(
    packages_to_install=["pandas","tensorflow","scikit-learn", "fsspec","gcsfs"],
    base_image="python:3.10.7-slim"
)
def train_model(data_bucket:str, output_model: Output[Model]):
    """
    Function takes data file from the data bucket and trains a simple MLP model on it.
    """
    import random
    import pandas as pd
    from tensorflow import keras
    from sklearn.model_selection import train_test_split
    random.seed(67)

    # 1. Load dataset from data bucket
    #url = "https://archive.ics.uci.edu/ml/machine-learning-databases/00267/data_banknote_authentication.txt"
    cols = ["variance", "skewness", "curtosis", "entropy", "class"]
    #df = pd.read_csv(url, header=None, names=cols)
    
    df = pd.read_csv(f"gs://{data_bucket}/data_banknote_authentication.txt", header=None, names=cols)

    X = df[["variance", "skewness", "curtosis", "entropy"]].values
    y = df["class"].values

    # 2. Split data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # 3. Normalize inside the model (so we don’t need a separate scaler)
    normalizer = keras.layers.Normalization()
    normalizer.adapt(X_train)

    # 4. Build simple MLP model
    model = keras.Sequential([
        normalizer,
        keras.layers.Dense(8, activation="relu"),
        keras.layers.Dense(4, activation="relu"),
        keras.layers.Dense(1, activation="sigmoid")
    ])

    model.compile(optimizer="adam", loss="binary_crossentropy", metrics=["accuracy"])

    # 5. Train model
    model.fit(X_train, y_train, epochs=15, batch_size=8, validation_split=0.2, verbose=1)

    # 6. Evaluate model
    loss, acc = model.evaluate(X_test, y_test, verbose=0)
    print(f" Test accuracy: {acc:.3f}")

    # 7. Save model in the same folder
    model.save(output_model.path + ".keras")
    
    # 8. Add metadata
    metadata = {
        "accuracy": acc,
        "algo": "MLP",
        "file_type": ".keras"
    }
        
    # 9. Attach metadata to Vertex artifact (for pipelines)
    output_model.metadata.update(metadata)

### Compare models

In [5]:
@dsl.component(
    packages_to_install=["google-cloud-storage"],
    base_image="python:3.10.7-slim"
)
def compare_model(new_model: Input[Model], model_bucket_metadata: str) -> str:
    """
    Function compares local model to existing one in model bucket
    :returns: "NEW" if accuracy of the new model is better, "EXISTING" if the old one is better.
    """
    import json, tempfile, os
    from google.cloud import storage
    
    # Get new model accuracy
    new_accuracy = float(new_model.metadata.get("accuracy", 0))

    # Get old model accuracy
    bucket_name, blob_path = model_bucket_metadata.replace("gs://", "").split("/", 1)
    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_path)
    
    # Temporarily download the older model from the model bucket
    tmp = tempfile.NamedTemporaryFile(delete=False)
    blob.download_to_filename(tmp.name)
    with open(tmp.name, "r") as f:
        old_metadata = json.load(f)
        
    old_accuracy = float(old_metadata["accuracy"])
    
    os.remove(tmp.name)
    # Check whether the new model outperforms the old one
    decision = "NEW" if new_accuracy > old_accuracy else "EXISTING"
    return decision


### Upload Model and Metrics to Google Bucket 

In [6]:
@dsl.component(
    packages_to_install=["google-cloud-storage"],
    base_image="python:3.10.7-slim"
)
def upload_model_to_gcs(project_id: str, model_repo: str, model: Input[Model])->str:
    """
    Function uploads model and metadata .json
    """
    from google.cloud import storage
    from urllib.parse import urlparse
    import json
    import os
    import tempfile
    

    # parse gs://...
    p = urlparse(model_repo) if model_repo.startswith("gs://") else None
    bucket_name = (p.netloc if p else model_repo.split("/",1)[0])
    prefix = (p.path.lstrip("/") if p else (model_repo.split("/",1)[1] if "/" in model_repo else ""))

    client = storage.Client(project=project_id)
    bucket = client.bucket(bucket_name)

    # model path from artifact
    ext = str(model.metadata.get("file_type", ".keras"))
    algo = str(model.metadata.get("algo", "model"))
    src_model = model.path + ext
    dst_model = "/".join(filter(None, [prefix, f"{algo}_model{ext}"]))
    bucket.blob(dst_model).upload_from_filename(src_model)

    # create & upload metadata json from artifact metadata
    meta = dict(model.metadata)
    with tempfile.NamedTemporaryFile("w", delete=False) as f:
        json.dump(meta, f, indent=4)
        tmp_meta = f.name
    dst_meta = "/".join(filter(None, [prefix, "model_metadata.json"]))
    bucket.blob(dst_meta).upload_from_filename(tmp_meta)

    return f"gs://{bucket_name}/{dst_model}"



#### Define the Pipeline

In [7]:
# Define the workflow of the pipeline.
@kfp.dsl.pipeline(
    name="banknote-authentication-training-pipeline")
def pipeline(project_id: str, data_bucket: str, model_repo: str, model_bucket_metadata: str):
    """
    Building the pipeline
    """
    # New stuff
    training_mlp_job_run_op = train_model(
        data_bucket = data_bucket
    )
    
    compare_model_job_run_op = compare_model(
        new_model = training_mlp_job_run_op.outputs["output_model"],
        model_bucket_metadata = model_bucket_metadata
    )
    
    
    
    with dsl.If(compare_model_job_run_op.output == "NEW"):
        upload_model_to_gcs(
            project_id = project_id,
            model_repo = model_repo,
            model = training_mlp_job_run_op.outputs["output_model"]
        )
    """
    # Old stuff
    
    di_op = download_data(
        project_id=project_id,
        bucket=data_bucket,
        file_name=trainset_filename
    )

 
    training_mlp_job_run_op = train_mlp(
        features=di_op.outputs["dataset"]
    )
    
     
    training_lr_job_run_op = train_lr(
        features=di_op.outputs["dataset"]
    )
    
    pre_di_op = download_data(
        project_id=project_id,
        bucket=data_bucket,
        file_name=testset_filename
    ).after(training_mlp_job_run_op, training_lr_job_run_op)
        
        
    comp_model__op = compare_model(mlp_metrics=training_mlp_job_run_op.outputs["metrics"],
                                       lr_metrics=training_lr_job_run_op.outputs["metrics"]).after(training_mlp_job_run_op, training_lr_job_run_op)  
    
    # defining the branching condition
    with dsl.If(comp_model__op.output=="MLP"):
        predict_mlp_job_run_op = predict_mlp(
            model=training_mlp_job_run_op.outputs["out_model"],      
            features=pre_di_op.outputs["dataset"]
        )
        upload_model_mlp_to_gc_op = upload_model_to_gcs(
            project_id=project_id,
            model_repo=model_repo,
            model=training_mlp_job_run_op.outputs['out_model']
        ).after(predict_mlp_job_run_op)
        
    with dsl.If(comp_model__op.output=="LR"):
        predict_lr_job_run_op = predict_lr(
            model=training_lr_job_run_op.outputs["out_model"],     
            features=pre_di_op.outputs["dataset"]
        )
        upload_model_lr_to_gc_op = upload_model_to_gcs(
            project_id=project_id,
            model_repo=model_repo,
            model=training_lr_job_run_op.outputs['out_model']
        ).after(predict_lr_job_run_op) 
    """

#### Compile the pipeline into a JSON file

In [8]:
from kfp import compiler
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='cloudbuild-mlops.yaml')

#### Submit the pipeline run

In [51]:
import google.cloud.aiplatform as aip

# Before initializing, make sure to set the GOOGLE_APPLICATION_CREDENTIALS
# environment variable to the path of your service account.
aip.init(
    project=PROJECT_ID,
    staging_bucket=PIPELINE_ROOT,
)

# Prepare the pipeline job
job = aip.PipelineJob(
    display_name="banknote-authentication-training-pipeline",
    enable_caching=False,
    template_path="cloudbuild-mlops.yaml",
    pipeline_root=PIPELINE_ROOT,
    location=REGION,
    parameter_values={
        'project_id': PROJECT_ID, # makesure to use your project id 
        'data_bucket': 'ass1_data_bucket',  # makesure to use your data bucket name 
        'model_repo':'gs://ass1_model_bucket', # makesure to use your model bucket name
        'model_bucket_metadata': 'gs://ass1_model_bucket/model_metadata.json'
    }
)

job.run()