In [1]:
USER_FLAG = "--user"

In [2]:
!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0



In [3]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

In [1]:
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

KFP SDK version: 1.8.9
google_cloud_pipeline_components version: 0.2.0


In [2]:
import os
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Project ID:  niveustraining


In [3]:
BUCKET_NAME="gs://" + "black-friday-demo-bucket"

In [4]:
import kfp
from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics, Dataset, Markdown
from google.cloud import aiplatform

In [5]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline-root/"
PIPELINE_ROOT

env: PATH=/home/niveus2610-03/anaconda3/envs/demo2/bin:/home/niveus2610-03/anaconda3/condabin:/home/niveus2610-03/.local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin:/home/jupyter/.local/bin


'gs://black-friday-demo-bucket/pipeline-root/'

In [6]:
@component(
    base_image="python:3.8-slim",
    output_component_file="./components/get-data/component.yml",
    packages_to_install=["pandas", "fsspec", "gcsfs"]
)
def get_data(
    dataset: Output[Dataset],
    Columns: Output[Metrics]
):
    from pandas import read_csv

    df = read_csv('gs://black-friday-demo-bucket/dataset/train.csv')

    df.to_csv(dataset.uri, index=False)
    
    Columns.log_metric("columns", str(df.columns.tolist()))


In [7]:
@component(
    base_image="python:3.8-slim",
    output_component_file="./components/clean-data/component.yml",
    packages_to_install=["pandas", "np", "sklearn", "fsspec", "gcsfs"]
)
def clean_data(
    dataset: Input[Dataset],
    cleaned_data: Output[Artifact],
    column_transformer: Output[Artifact],
    label_encoder: Output[Artifact],
    dataset_data: Output[Markdown]
):
    import json
    import numpy as np
    import pandas as pd
    from pickle import dump
    from sklearn.impute import SimpleImputer
    from sklearn.compose import ColumnTransformer
    from sklearn.preprocessing import LabelEncoder, OneHotEncoder

    df = pd.read_csv(dataset.uri)
    df.drop(['User_ID', 'Product_ID', 'Product_Category_3'], axis=1, inplace=True)
    # Split the data into independent and dependent variables and remove unnecessary columns
    x = df.iloc[:, :-1].values
    y = df.iloc[:, -1].values
    
    json_data = json.dumps({"dataset":[{_: df[_].unique().tolist()}for _ in df.drop(['Purchase'], axis=1).columns.tolist()]}, indent=4)
    
    markdown_content = f"""
    ```json
    {json_data}
    ```
    """
    
    with open(dataset_data.path, 'w') as f:
        f.write(markdown_content)

    #Encode Gender column
    le = LabelEncoder()
    x[:, 0] = le.fit_transform(x[:, 0])

    # Fill NaN values in Product_Category_2 column 
    imputer = SimpleImputer(missing_values=np.nan, strategy='median')
    x[:, 7] = imputer.fit_transform(x[:, 7].reshape(-1, 1)).reshape(1, -1)[0]

    # Onehot encode the Age, City_Category, Stay_In_Current_City_Years columns
    ct = ColumnTransformer(transformers=[('encoder', OneHotEncoder(), [1, 3, 4])], remainder='passthrough')
    x = ct.fit_transform(x)

    # Merge the dataset to pass it to the next component
    x = pd.DataFrame(x)
    y = pd.DataFrame(y)
    data = pd.concat([x,y], axis=1)

    # Write the data to a csv file
    data.to_csv(path_or_buf=cleaned_data.path, index=False)

    # Write the column transformer object into a file
    with open(column_transformer.path, 'wb') as output_file:
        dump(ct, output_file)

    with open(label_encoder.path, 'wb') as output_file:
        dump(le, output_file)

In [8]:
@component(
    packages_to_install=["pandas", "np", "sklearn", "fsspec", "gcsfs"],
    base_image="python:3.8-slim",
    output_component_file="./components/feature-scale/component.yml"
)
def feature_scale(
    cleaned_data: Input[Artifact],
    x_train_artifact: Output[Artifact],
    y_train_artifact: Output[Artifact],
    x_test_artifact: Output[Artifact],
    y_test_artifact: Output[Artifact],
    standard_scaler: Output[Artifact]
):
    from pickle import dump
    from pandas import DataFrame, read_csv
    from sklearn.preprocessing import StandardScaler
    from sklearn.model_selection import train_test_split

    df = read_csv(cleaned_data.uri)

    x = df.iloc[:, :-1].values
    y = df.iloc[:, -1].values

    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size = 0.2)

    sc = StandardScaler()
    x_train = sc.fit_transform(x_train)
    x_test = sc.transform(x_test)

    DataFrame(x_train).to_csv(x_train_artifact.uri, index=False)
    DataFrame(y_train).to_csv(y_train_artifact.uri, index=False)
    DataFrame(x_test).to_csv(x_test_artifact.uri, index=False)
    DataFrame(y_test).to_csv(y_test_artifact.uri, index=False)

    with open(standard_scaler.path, 'wb') as output_file:
        dump(sc, output_file)

In [9]:
@component(
    packages_to_install=["pandas", "sklearn", "fsspec", "gcsfs"],
    base_image="python:3.8-slim",
    output_component_file="./components/linear-regression/train/component.yml"
)
def linear_regression_train(
    x_train_artifact: Input[Artifact],
    y_train_artifact: Input[Artifact],
    model: Output[Model]
):
    import pickle
    from pandas import read_csv
    from sklearn.linear_model import LinearRegression

    x_train = read_csv(x_train_artifact.uri)
    y_train = read_csv(y_train_artifact.uri)

    linear_regressor = LinearRegression()
    linear_regressor.fit(x_train, y_train)

    # with open(linear_regression_model.path, 'wb') as f:
    #     pickle.dump(linear_regressor, f)    
    with open(model.path + '.pkl', 'wb') as f:
        pickle.dump(linear_regressor, f)
    

In [10]:
@component(
    packages_to_install=["pandas", "sklearn", "fsspec", "gcsfs"],
    base_image="python:3.8-slim",
    output_component_file="./components/linear-regression/test/component.yml"
)
def linear_regression_test(
    linear_regression_model: Input[Model],
    x_test_artifact: Input[Artifact],
    y_test_artifact: Input[Artifact],
    MSE: Output[Metrics],
    MAE: Output[Metrics]
):
    import pickle
    from pandas import read_csv
    from sklearn.metrics import mean_squared_error, mean_absolute_error
    
    with open(linear_regression_model.path + '.pkl', 'rb') as f:
        linear_regressor = pickle.load(f)
        
    x_test = read_csv(x_test_artifact.uri)
    y_test = read_csv(y_test_artifact.uri)

    y_pred = linear_regressor.predict(x_test)

    mae = mean_absolute_error(y_pred, y_test)
    mse = mean_squared_error(y_pred, y_test)
    output_accuracy = f"\nLinear Regression\nMean Absolute Error: {mae}\nMean Squared Error: {mse}\n"
    
    MSE.log_metric("Mean Squared Error", float(mse))    
    MAE.log_metric("Mean Absolute Error", float(mae))
    
    linear_regression_model.metadata["Mean Squared Error"] = float(mse)    
    linear_regression_model.metadata["Mean Absolute Error"] = float(mae)

In [11]:
@component(
    packages_to_install=['google-cloud-aiplatform', 'sklearn'],
    base_image="python:3.8-slim",
    output_component_file="./components/linear-regression/deploy/component.yml"
)
def deploy(
    Model: Input[Model],
    model: Output[Model]
):
    import time
    from google.cloud import aiplatform
    
    TIME_STAMP = str(int(time.time()))
    DISPLAY_NAME = f"blackfriday-price-prediction-{TIME_STAMP}"
    MODEL_NAME = f"black-friday-model-{TIME_STAMP}"
    ENDPOINT_NAME = f"black-friday-endpoint-{TIME_STAMP}"
    PROJECT = 'niveustraining'
    REGION = 'us-central1'
    
    aiplatform.init(project=PROJECT, location=REGION)
    
    endpoint = aiplatform.Endpoint.create(
        display_name=ENDPOINT_NAME,
        project=PROJECT,
        location=REGION
    )
    
    model_upload = aiplatform.Model.upload(
        display_name=DISPLAY_NAME,
        artifact_uri=Model.uri.replace("model", ""),
        serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-24:latest",
        serving_container_health_route=f"/v1/models/{MODEL_NAME}",
        serving_container_predict_route=f"/v1/models/{MODEL_NAME}:predict",
        serving_container_environment_variables={
            "MODEL_NAME": MODEL_NAME,
        }
    )
    
    model_deploy = model_upload.deploy(
        machine_type="n1-standard-4", 
        endpoint=endpoint,
        traffic_split={"0": 100},
        deployed_model_display_name=DISPLAY_NAME,
    )
    
    model.uri = model_deploy.resource_name

In [12]:
@pipeline(
    name="black-friday",
    pipeline_root=PIPELINE_ROOT
)
def pipeline():
    get_data_task=get_data()
    clean_data_task=clean_data(get_data_task.outputs["dataset"])
    feature_scale_task=feature_scale(clean_data_task.outputs["cleaned_data"])
    linear_regression_train_task=linear_regression_train(feature_scale_task.outputs["x_train_artifact"], feature_scale_task.outputs["y_train_artifact"])
    linear_regression_test_task=linear_regression_test(linear_regression_train_task.outputs["model"], feature_scale_task.outputs["x_train_artifact"], feature_scale_task.outputs["y_train_artifact"])
    deployment_task=deploy(linear_regression_train_task.outputs["model"])

In [13]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="pipeline.json"
)



In [14]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

In [None]:
job = aiplatform.PipelineJob(
    display_name="black-friday-pipeline",
    template_path="pipeline.json",
    job_id="black-friday-{0}".format(TIMESTAMP),
    enable_caching=True
)

In [371]:
job.submit()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/866354246469/locations/us-central1/pipelineJobs/black-friday-20220114110557
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/866354246469/locations/us-central1/pipelineJobs/black-friday-20220114110557')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/black-friday-20220114110557?project=866354246469
