In [1]:
# Install the packages
! pip3 install --user --no-cache-dir --upgrade "kfp>2" "google-cloud-pipeline-components>2" \
                                        google-cloud-aiplatform

! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! pip3 freeze | grep aiplatform
! python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

Collecting google-cloud-aiplatform
  Obtaining dependency information for google-cloud-aiplatform from https://files.pythonhosted.org/packages/f6/67/734b8c73b8e708a24301b8a0a072ddfe936816896d12af4884e4f7bbd3b0/google_cloud_aiplatform-1.35.0-py2.py3-none-any.whl.metadata
  Downloading google_cloud_aiplatform-1.35.0-py2.py3-none-any.whl.metadata (27 kB)
Downloading google_cloud_aiplatform-1.35.0-py2.py3-none-any.whl (3.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.1/3.1 MB[0m [31m50.9 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: google-cloud-aiplatform
  Attempting uninstall: google-cloud-aiplatform
    Found existing installation: google-cloud-aiplatform 1.34.0
    Uninstalling google-cloud-aiplatform-1.34.0:
      Successfully uninstalled google-cloud-aiplatform-1.34.0
[0mSuccessfully installed google-cloud-aiplatform-1.35.0
KFP SDK version: 2.3.0
google-cloud-aiplatform==1.35.0
google_cloud_pipeline_components version:

In [59]:
import kfp
import typing
from typing import Dict
from typing import NamedTuple
from kfp import dsl
from kfp.dsl import (
    Artifact,
    Dataset,
    Input,
    Model,
    Output,
    Metrics,
    ClassificationMetrics,
    component,
    OutputPath,
    InputPath,
)
import google.cloud.aiplatform as aip
from google_cloud_pipeline_components.types import artifact_types

#### Project and Pipeline Configurations

In [60]:
# The Google Cloud project that this pipeline runs in.
PROJECT_ID = "de23-398309"
# The region that this pipeline runs in
REGION = "us-central1"
# Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored within the pipeline root.
PIPELINE_ROOT = "gs://data_de2023_2065718"


#### Pipeline Component : Data Ingestion

In [61]:
@dsl.component(
    packages_to_install=["pandas", "google-cloud-storage"],
    base_image="python:3.10.7-slim",
)
def download_data(
    project_id: str, bucket: str, file_name: str, dataset: Output[Dataset]
):
    """download data"""
    from google.cloud import storage
    import pandas as pd
    import logging
    import sys

    logging.basicConfig(stream=sys.stdout, level=logging.INFO)

    # Downloaing the file from a google bucket
    client = storage.Client(project=project_id)
    bucket = client.bucket(bucket)
    blob = bucket.blob(file_name)
    blob.download_to_filename(dataset.path + ".csv")
    logging.info("Downloaded Data!")

#### Pipeline Component : Train and Test Split

In [62]:
@dsl.component(
    packages_to_install=["pandas", "scikit-learn"], base_image="python:3.10.7-slim"
)
def train_test_split(
    dataset: Input[Dataset],
    dataset_train_X: Output[Dataset],
    dataset_test_X: Output[Dataset],
    dataset_train_y: Output[Dataset],
    dataset_test_y: Output[Dataset],
    level: float,
):
    """train_test_split"""
    import pandas as pd
    import logging
    import sys
    from sklearn.model_selection import train_test_split as tts

    logging.basicConfig(stream=sys.stdout, level=logging.INFO)

    alldata = pd.read_csv(dataset.path + ".csv", index_col=None)
    X_train, X_test, y_train, y_test = tts(
        alldata.drop("quality", axis=1),
        alldata["quality"],
        test_size=level,
        random_state=6,
    )
    X_train.to_csv(dataset_train_X.path + ".csv", index=False, encoding="utf-8-sig")
    X_test.to_csv(dataset_test_X.path + ".csv", index=False, encoding="utf-8-sig")
    y_train.to_csv(dataset_train_y.path + ".csv", index=False, encoding="utf-8-sig")
    y_test.to_csv(dataset_test_y.path + ".csv", index=False, encoding="utf-8-sig")

#### Pipeline Component : PCA


In [63]:
@dsl.component(
    packages_to_install=["pandas", "scikit-learn"], base_image="python:3.10.7-slim"
)
def PCA(
    standard_features: Input[Dataset],
    pca_features: Output[Dataset],
    scaler_model: Output[Model],
    pca_model: Output[Model],
):
    """ """

    from sklearn.decomposition import PCA
    from sklearn.preprocessing import StandardScaler

    import pandas as pd
    import json
    import logging
    import sys
    import os
    import pickle

    logging.basicConfig(stream=sys.stdout, level=logging.INFO)

    scaler = StandardScaler()

    features = pd.read_csv(standard_features.path + ".csv")
    train_features = features.drop("quality", axis=1)
    test_features = features[["quality"]]
    scaler.fit(train_features)
    scaled_features = pd.DataFrame(
        scaler.transform(train_features),
        columns=train_features.columns,
        index=train_features.index,
    )

    pca = PCA(n_components=0.1, svd_solver="full")
    pca.fit(scaled_features)
    pca_df = pd.DataFrame(pca.transform(scaled_features))
    result = pd.concat([pca_df, test_features], axis=1)

    result.to_csv(pca_features.path + ".csv", index=False, encoding="utf-8-sig")

    # Save the scaler
    s_file = scaler_model.path + ".pkl"
    with open(s_file, "wb") as f:
        pickle.dump(scaler, f)

        # Save the scaler
    p_file = pca_model.path + ".pkl"
    with open(p_file, "wb") as f:
        pickle.dump(pca, f)

#### Pipeline Component : Training-RF 

In [64]:
@dsl.component(
    packages_to_install=["pandas", "scikit-learn"], base_image="python:3.10.7-slim"
)
def train_rf(
    train_features_X: Input[Dataset],
    test_features_X: Input[Dataset],
    train_features_y: Input[Dataset],
    test_features_y: Input[Dataset],
    out_model: Output[Model],
) -> NamedTuple("outputs", metrics=dict):
    """ """
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.metrics import (
        accuracy_score,
        precision_score,
        recall_score,
        f1_score,
        mean_absolute_error,
    )
    from sklearn.model_selection import GridSearchCV

    import pandas as pd
    import json
    import logging
    import sys
    import os
    import pickle

    logging.basicConfig(stream=sys.stdout, level=logging.INFO)

    X_train = pd.read_csv(train_features_X.path + ".csv")
    y_train = pd.read_csv(train_features_y.path + ".csv")

    logging.info(X_train.columns)

    parameters = {
        "n_estimators": [100, 250],
        "criterion": ["gini", "entropy", "log_loss"],
        "min_samples_split": [2, 5, 10],
        "min_samples_leaf": [1, 5, 10],
    }

    rf = RandomForestClassifier(random_state=6)
    rf_gs = GridSearchCV(rf, parameters)
    rf_gs.fit(X_train, y_train)
    best_params = rf_gs.best_params_

    X_test = pd.read_csv(test_features_X.path + ".csv")
    y_test = pd.read_csv(test_features_y.path + ".csv")

    # Predicting Test Set

    y_pred = rf_gs.predict(X_test)
    metrics_dict = {
        "accuracy": accuracy_score(y_test, y_pred),
        "precision": precision_score(y_test, y_pred, average="weighted"),
        "recall": recall_score(y_test, y_pred, average="weighted"),
        "f1_score": f1_score(y_test, y_pred, average="weighted"),
        "mae": mean_absolute_error(y_test, y_pred),
    }

    logging.info(metrics_dict)

    out_model.metadata["file_type"] = ".pkl"
    out_model.metadata["algo"] = "rf_gs"
    out_model.metadata["best_params"] = best_params

    # Save the model
    m_file = out_model.path + ".pkl"
    with open(m_file, "wb") as f:
        pickle.dump(rf_gs, f)

    outputs = NamedTuple("outputs", metrics=dict)
    return outputs(metrics_dict)

#### Pipeline Component : Training-GBC 

In [65]:
@dsl.component(
    packages_to_install=["pandas", "scikit-learn"], base_image="python:3.10.7-slim"
)
def train_gbc(
    train_features_X: Input[Dataset],
    test_features_X: Input[Dataset],
    train_features_y: Input[Dataset],
    test_features_y: Input[Dataset],
    out_model: Output[Model],
) -> NamedTuple("outputs", metrics=dict):
    """ """
    from sklearn.ensemble import GradientBoostingClassifier
    from sklearn.metrics import (
        accuracy_score,
        precision_score,
        recall_score,
        f1_score,
        mean_absolute_error,
    )
    from sklearn.model_selection import GridSearchCV

    import pandas as pd
    import json
    import logging
    import sys
    import os
    import pickle

    logging.basicConfig(stream=sys.stdout, level=logging.INFO)

    X_train = pd.read_csv(train_features_X.path + ".csv")
    y_train = pd.read_csv(train_features_y.path + ".csv")

    logging.info(X_train.columns)

    parameters = {
        "n_estimators": [100, 250],
        "criterion": ["friedman_mse", "squared_error"],
        "min_samples_split": [2, 5, 10],
        "min_samples_leaf": [1, 5, 10],
    }

    gbc = GradientBoostingClassifier(random_state=6)
    gbc_gs = GridSearchCV(gbc, parameters)
    gbc_gs.fit(X_train, y_train)
    best_params = gbc_gs.best_params_

    X_test = pd.read_csv(test_features_X.path + ".csv")
    y_test = pd.read_csv(test_features_y.path + ".csv")

    # Predicting Test Set

    y_pred = gbc_gs.predict(X_test)
    metrics_dict = {
        "accuracy": accuracy_score(y_test, y_pred),
        "precision": precision_score(y_test, y_pred, average="weighted"),
        "recall": recall_score(y_test, y_pred, average="weighted"),
        "f1_score": f1_score(y_test, y_pred, average="weighted"),
        "mae": mean_absolute_error(y_test, y_pred),
    }

    logging.info(metrics_dict)

    out_model.metadata["file_type"] = ".pkl"
    out_model.metadata["algo"] = "gbc_gs"
    out_model.metadata["best_params"] = best_params

    # Save the model
    m_file = out_model.path + ".pkl"
    with open(m_file, "wb") as f:
        pickle.dump(gbc_gs, f)

    outputs = NamedTuple("outputs", metrics=dict)
    return outputs(metrics_dict)

#### Pipeline Component : Prediction-RF


In [66]:
@dsl.component(
    packages_to_install=["pandas", "scikit-learn"], base_image="python:3.10.7-slim"
)
def predict_rf(
    model: Input[Model],
    features: Input[Dataset],
    scaler_model: Input[Model],
    pca_model: Input[Model],
    results: Output[Dataset],
):
    import pandas as pd
    import pickle
    import json
    import logging
    import sys
    import os

    logging.basicConfig(stream=sys.stdout, level=logging.INFO)

    df = pd.read_csv(features.path + ".csv")

    # Loading the saved model
    model_rf = pickle.load(open(model.path + ".pkl", "rb"))
    scaler = pickle.load(open(scaler_model.path + ".pkl", "rb"))
    pca = pickle.load(open(pca_model.path + ".pkl", "rb"))

    X_test = df.drop("quality", axis=1)

    # transform the test data
    scaled_features = pd.DataFrame(
        scaler.transform(X_test),
        columns=X_test.columns,
        index=X_test.index,
    )
    X_test_pca = pd.DataFrame(pca.transform(scaled_features))

    df_complete = df.copy()
    y_pred = model_rf.predict(X_test_pca)
    logging.info(y_pred)
    df_complete["pclass"] = y_pred.tolist()
    df_complete.to_csv(results.path + ".csv", index=False, encoding="utf-8-sig")

#### Pipeline Component : Prediction-GBC

In [67]:
@dsl.component(
    packages_to_install=["pandas", "scikit-learn"], base_image="python:3.10.7-slim"
)
def predict_gbc(
    model: Input[Model],
    features: Input[Dataset],
    scaler_model: Input[Model],
    pca_model: Input[Model],
    results: Output[Dataset],
):
    import pandas as pd
    import pickle
    import json
    import logging
    import sys
    import os

    logging.basicConfig(stream=sys.stdout, level=logging.INFO)

    df = pd.read_csv(features.path + ".csv")

    # Loading the saved model
    model_gbc = pickle.load(open(model.path + ".pkl", "rb"))
    scaler = pickle.load(open(scaler_model.path + ".pkl", "rb"))
    pca = pickle.load(open(pca_model.path + ".pkl", "rb"))

    X_test = df.drop("quality", axis=1)

    # transform the test data
    scaled_features = pd.DataFrame(
        scaler.transform(X_test),
        columns=X_test.columns,
        index=X_test.index,
    )

    X_test_pca = pd.DataFrame(pca.transform(scaled_features))

    df_complete = df.copy()
    y_pred = model_gbc.predict(X_test_pca)
    logging.info(y_pred)
    df_complete["pclass"] = y_pred.tolist()
    df_complete.to_csv(results.path + ".csv", index=False, encoding="utf-8-sig")

#### Pipeline Component : Algorithm Selection 

In [68]:
@dsl.component(base_image="python:3.10.7-slim")
def compare_model(rf_metrics: dict, gbc_metrics: dict) -> str:
    import logging
    import json
    import sys

    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    logging.info(rf_metrics)
    logging.info(gbc_metrics)
    if rf_metrics.get("f1_score") > gbc_metrics.get("f1_score"):
        return "rf_gs"
    else:
        return "gbc_gs"

### Upload Model and Metrics to Google Bucket 

In [69]:
@dsl.component(
    packages_to_install=["google-cloud-storage"], base_image="python:3.10.7-slim"
)
def upload_model_to_gcs(project_id: str, model_repo: str, model: Input[Model]):
    """upload model to gcs"""
    from google.cloud import storage
    import logging
    import sys

    logging.basicConfig(stream=sys.stdout, level=logging.INFO)

    # upload the model to GCS
    client = storage.Client(project=project_id)
    bucket = client.bucket(model_repo)
    blob = bucket.blob(
        str(model.metadata["algo"]) + "_model" + str(model.metadata["file_type"])
    )
    blob.upload_from_filename(model.path + str(model.metadata["file_type"]))

    print("Saved the model to GCP bucket : " + model_repo)


#### Define the Pipeline

In [73]:
# Define the workflow of the pipeline.
@kfp.dsl.pipeline(name="winequality-predictor-training-pipeline-v1")
def pipeline(
    project_id: str,
    data_bucket: str,
    dataset_filename: str,
    model_repo: str,
    prediction_set: str,
):
    di_op = download_data(
        project_id=project_id, bucket=data_bucket, file_name=dataset_filename
    )

    pca_job_run_op = PCA(standard_features=di_op.outputs["dataset"])

    tts_job_run_op = train_test_split(
        dataset=pca_job_run_op.outputs["pca_features"], level=0.2
    )

    training_rf_job_run_op = train_rf(
        train_features_X=tts_job_run_op.outputs["dataset_train_X"],
        train_features_y=tts_job_run_op.outputs["dataset_train_y"],
        test_features_X=tts_job_run_op.outputs["dataset_test_X"],
        test_features_y=tts_job_run_op.outputs["dataset_test_y"],
    )

    training_gbc_job_run_op = train_gbc(
        train_features_X=tts_job_run_op.outputs["dataset_train_X"],
        train_features_y=tts_job_run_op.outputs["dataset_train_y"],
        test_features_X=tts_job_run_op.outputs["dataset_test_X"],
        test_features_y=tts_job_run_op.outputs["dataset_test_y"],
    )

    comp_model__op = compare_model(
        rf_metrics=training_rf_job_run_op.outputs["metrics"],
        gbc_metrics=training_gbc_job_run_op.outputs["metrics"],
    ).after(training_rf_job_run_op, training_gbc_job_run_op)

    prediction_set_op = download_data(
        project_id=project_id, bucket=data_bucket, file_name=prediction_set
    )

    # defining the branching condition
    with dsl.If(comp_model__op.output == "rf_gs"):
        predict_rf_job_run_op = predict_rf(
            model=training_rf_job_run_op.outputs["out_model"],
            features=prediction_set_op.outputs["dataset"],
            scaler_model=pca_job_run_op.outputs["scaler_model"],
            pca_model=pca_job_run_op.outputs["pca_model"],
        )
        upload_model_rf_to_gc_op = upload_model_to_gcs(
            project_id=project_id,
            model_repo=model_repo,
            model=training_rf_job_run_op.outputs["out_model"],
        ).after(predict_rf_job_run_op)

    with dsl.If(comp_model__op.output == "gbc_gs"):
        predict_gbc_job_run_op = predict_gbc(
            model=training_gbc_job_run_op.outputs["out_model"],
            features=prediction_set_op.outputs["dataset"],
            scaler_model=pca_job_run_op.outputs["scaler_model"],
            pca_model=pca_job_run_op.outputs["pca_model"],
        )
        upload_model_gbc_to_gc_op = upload_model_to_gcs(
            project_id=project_id,
            model_repo=model_repo,
            model=training_gbc_job_run_op.outputs["out_model"],
        ).after(predict_gbc_job_run_op)

#### Compile the pipeline into a JSON file

In [74]:
from kfp import compiler

compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path="winequality_predictor_training_pipeline_v1.yaml",
)

#### Submit the pipeline run

In [75]:
import google.cloud.aiplatform as aip

# Before initializing, make sure to set the GOOGLE_APPLICATION_CREDENTIALS
# environment variable to the path of your service account.
aip.init(
    project=PROJECT_ID,
    location=REGION,
)

# Prepare the pipeline job
job = aip.PipelineJob(
    display_name="winequality-predictor-v1",
    enable_caching=False,
    template_path="winequality_predictor_training_pipeline_v1.yaml",
    pipeline_root=PIPELINE_ROOT,
    location=REGION,
    parameter_values={
        "project_id": PROJECT_ID,  # makesure to use your project id
        "data_bucket": "data_de2023_2065718",  # makesure to use your data bucket name
        "dataset_filename": "train_data.csv",  # makesure to upload these to your data bucket from DE2023
        "model_repo": "models_de2023_2065718",  # makesure to use your model bucket name
        "prediction_set": "predict_data.csv",  # makesure to use your data bucket name
    },
)

job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/915562509454/locations/us-central1/pipelineJobs/winequality-predictor-training-pipeline-v1-20231012150943
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/915562509454/locations/us-central1/pipelineJobs/winequality-predictor-training-pipeline-v1-20231012150943')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/winequality-predictor-training-pipeline-v1-20231012150943?project=915562509454
PipelineJob projects/915562509454/locations/us-central1/pipelineJobs/winequality-predictor-training-pipeline-v1-20231012150943 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/915562509454/locations/us-central1/pipelineJobs/winequality-predictor-training-pipeline-v1-20231012150943 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/915562509454/locations/us-central1/pipelineJobs/winequality-predict