

In [1]:
!gcloud services enable compute.googleapis.com \
                       containerregistry.googleapis.com  \
                       aiplatform.googleapis.com  \
                       cloudbuild.googleapis.com \
                       cloudfunctions.googleapis.com

Operation "operations/acat.p2-164351260402-64bd8765-1274-42a0-8aeb-b8f5a959497a" finished successfully.


In [2]:
# IMPORT THE REQUIRED LIBRARIES

from kfp.v2 import dsl
from kfp.v2.dsl import (Artifact,
                        Dataset,
                        Input,
                        Output,
                        Model,
                        Metrics,
                        Markdown,
                        HTML,
                        component, 
                        OutputPath, 
                        InputPath)

from kfp.v2 import compiler
from google.cloud import aiplatform as vertex_
from google.cloud.aiplatform import pipeline_jobs

from datetime import datetime
import pandas as pd

  from kfp.v2 import dsl


In [3]:
PROJECT_ID = "careful-acumen-414922"
REGION = 'europe-west2'

BUCKET_NAME="gs://"+PROJECT_ID+"-houseprice"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root_houseprice/"

In [4]:
BUCKET_NAME

'gs://careful-acumen-414922-houseprice'

In [5]:
# Custom base image created using docker

IMAGE_NAME = "training"
BASE_IMAGE = f"{REGION}-docker.pkg.dev/{PROJECT_ID}/houseprice/{IMAGE_NAME}"

## Read the Dataset

In [6]:
@component(
    base_image=BASE_IMAGE,
    output_component_file="get_data.yaml"
)

def get_houseprice_data(
    filepath: str,
    dataset_train: Output[Dataset],
):
    
    import pandas as pd
    
    df_train = pd.read_csv(filepath + '/train.csv')
   
    df_train.to_csv(dataset_train.path, index=False)

  @component(
  def get_houseprice_data(


## Data Preprocessing

In [7]:
@component(
    base_image=BASE_IMAGE,
    output_component_file="preprocessing.yaml"
)

def preprocess_houseprice_data(
    train_df: Input[Dataset],
    dataset_train_preprocessed: Output[Dataset],
):
    
    import pandas as pd
    from src.data_preprocessing.preprocessing import data_preprocessing_pipeline
   
    train_df = pd.read_csv(train_df.path)
    
    # data_preprocessing_pipeline creates a copy of the df, removes id col, converts to correct dtype, \
    # subtracts YearSold from temporal features and cosine transforms cyclic features.
    train_df_preprocessed = data_preprocessing_pipeline(train_df)
    
    train_df_preprocessed.to_csv(dataset_train_preprocessed.path, index=False)

  @component(
  def preprocess_houseprice_data(


## Train Test Split

In [8]:
@component(
    base_image=BASE_IMAGE,
    output_component_file="train_test_split.yaml",
)
def train_test_split(dataset_in: Input[Dataset],
                     dataset_train: Output[Dataset],
                     dataset_test: Output[Dataset],
                     test_size: float = 0.2):

    import pandas as pd
    from sklearn.model_selection import train_test_split

    df = pd.read_csv(dataset_in.path)
    df_train, df_test = train_test_split(df, test_size=test_size, random_state=42)

    df_train.to_csv(dataset_train.path, index=False)
    df_test.to_csv(dataset_test.path, index=False)

  @component(
  def train_test_split(dataset_in: Input[Dataset],


## Train the Model

In [9]:
@component(
    base_image=BASE_IMAGE,
    output_component_file="model_training.yaml"
)

def train_houseprice(
    dataset_train: Input[Dataset],
    dataset_test: Input[Dataset],
    best_params: Output[Markdown],
    shap_summary_plot: Output[HTML],
    model: Output[Model], 
):
    
    import pandas as pd
    import pickle
    import shap
    from src.modelling.train import HousePriceModel
    from src.utils.utils import get_image_data
    
    TARGET = 'SalePrice'

    # Read train and test data
    train_data = pd.read_csv(dataset_train.path)
    test_data = pd.read_csv(dataset_test.path)
    
    # Instantiate the model class
    house_price_model = HousePriceModel(test_data.copy(),   #we perform hyperparameter tuning using train_data and evaluate using test_data and finally train the final model using the entire (train+test) dataset.
                                        target=TARGET,
                                        n_kfold_splits=3,
                                        n_trials=10, 
                                        random_state=42)
                                        
    # Create X_train and y_train
    X_train = train_data.drop(TARGET, axis=1)
    y_train = train_data[TARGET]

    # Fit the model (training pipeline consists of feature engineering, feature selection and training an xgboost model)
    house_price_model.fit(X_train, y_train)
    
    # Save the best hyperparameters as an artifact
    with open(best_params.path, "w") as f:
        f.write(str(house_price_model.best_params))
        
    shap.summary_plot(house_price_model.shap_values, house_price_model.X_test_transformed, max_display=20) # plot shap summary plot
    shap_plot_dataurl = get_image_data() # get image data to render the image in the html file
    html_content = f'<html><head></head><body><h1>Shap Summary Plot</h1>\n<img src={shap_plot_dataurl} width="97%"></body></html>' 
    # Save shap summary plot as an html artifact
    with open(shap_summary_plot.path, "w") as f: 
        f.write(html_content)
      
    model.metadata["framework"] = "xgboost" 
    # Save the model as an artifact
    with open(model.path, 'wb') as f: 
        pickle.dump({
            "pipeline": house_price_model.model_pipeline,
            "target": house_price_model.target,
            "scores_dict": house_price_model.scores}, f)

  @component(
  def train_houseprice(


## Evaluate the Model

In [10]:
@component(
    base_image=BASE_IMAGE,
    output_component_file="model_evaluation.yaml"
)
def evaluate_houseprice(
    houseprice_model: Input[Model],
    metrics_baseline: Output[Metrics],
    metrics_train: Output[Metrics],
    metrics_test: Output[Metrics]):
    
    import pickle
    
    file_name = houseprice_model.path
    with open(file_name, 'rb') as file:  
        model_data = pickle.load(file)
    
    scores = model_data["scores_dict"] 

    def log_metrics(scores, metric):
        for metric_name, val in scores.items():
            metric.log_metric(metric_name, float(val))
            
    log_metrics(scores["baseline_scores"], metrics_baseline)        
    log_metrics(scores["train_scores"], metrics_train)
    log_metrics(scores["test_scores"], metrics_test)

  @component(
  def evaluate_houseprice(


## Deploy the Model

In [11]:
@component(
    base_image=BASE_IMAGE,
    install_kfp_package=False,
    output_component_file="model_deployment.yaml",
)
def deploy_houseprice(
        serving_container_image_uri: str,
        display_name: str,
        model_endpoint: str,
        gcp_project: str,
        gcp_region: str,
        model: Input[Model],
        vertex_model: Output[Model],
        vertex_endpoint: Output[Model]
):
    from google.cloud import aiplatform as vertex_ai
    from pathlib import Path
    
    # Checks existing Vertex AI Enpoint or creates Endpoint if it is not exist.
    def create_endpoint ():
        endpoints = vertex_ai.Endpoint.list(
        filter='display_name="{}"'.format(model_endpoint),
        order_by='create_time desc',
        project=gcp_project,
        location=gcp_region,
        )
        if len(endpoints) > 0:
            endpoint = endpoints[0] # most recently created
        else:
            endpoint = vertex_ai.Endpoint.create(
                display_name=model_endpoint,
                project=gcp_project,
                location=gcp_region
        )
        return endpoint

    endpoint = create_endpoint()
    
    # Uploads trained model to Vertex AI Model Registry or creates new model version into existing uploaded one.
    def upload_model ():
        listed_model = vertex_ai.Model.list(
        filter='display_name="{}"'.format(display_name),
        project=gcp_project,
        location=gcp_region,
        )
        if len(listed_model) > 0:
            model_version = listed_model[0] # most recently created
            model_upload = vertex_ai.Model.upload(
                    display_name=display_name,
                    parent_model=model_version.resource_name,
                    artifact_uri=str(Path(model.path).parent),
                    serving_container_image_uri=serving_container_image_uri,
                    location=gcp_region,
                    serving_container_predict_route="/predict",
                    serving_container_health_route="/health"
            )
        else:
            model_upload = vertex_ai.Model.upload(
                    display_name=display_name,
                    artifact_uri=str(Path(model.path).parent),
                    serving_container_image_uri=serving_container_image_uri,
                    location=gcp_region,
                    serving_container_predict_route="/predict",
                    serving_container_health_route="/health"
            )
        return model_upload
    
    uploaded_model = upload_model()
    
    # Save data to the output params
    vertex_model.uri = uploaded_model.resource_name

    # Deploys trained model to Vertex AI Endpoint
    model_deploy = uploaded_model.deploy(
        machine_type='n1-standard-4',
        endpoint=endpoint,
        traffic_split={"0": 100},
        deployed_model_display_name=display_name,
    )

    # Save data to the output params
    vertex_endpoint.uri = model_deploy.resource_name

  @component(
  def deploy_houseprice(


## Create the Pipeline

In [12]:
# USE TIMESTAMP TO DEFINE UNIQUE PIPELINE NAMES
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
DISPLAY_NAME = 'pipeline-houseprice-job{}'.format(TIMESTAMP)

In [13]:
@dsl.pipeline(
    pipeline_root=PIPELINE_ROOT,
    # A name for the pipeline. Use to determine the pipeline Context.
    name="pipeline-houseprice"   
)

def pipeline(
    data_filepath: str = f"{BUCKET_NAME}/data",
    project: str = PROJECT_ID,
    region: str = REGION, 
    display_name: str = DISPLAY_NAME,    
    serving_container_image_uri: str = f"{REGION}-docker.pkg.dev/{PROJECT_ID}/houseprice/serving_image:latest" # custom serving container image     
):

    data_op = get_houseprice_data(filepath=data_filepath)
    data_preprocess_op = preprocess_houseprice_data(train_df = data_op.outputs["dataset_train"])
    train_test_split_op = train_test_split(dataset_in = data_preprocess_op.outputs["dataset_train_preprocessed"])
    train_model_op = train_houseprice( dataset_train = train_test_split_op.outputs["dataset_train"], 
                                       dataset_test = train_test_split_op.outputs["dataset_test"])
    model_evaluation_op = evaluate_houseprice(houseprice_model = train_model_op.outputs["model"])
           
    deploy_model_op = deploy_houseprice(
        model = train_model_op.outputs['model'],
        gcp_project = project,
        gcp_region = region, 
        serving_container_image_uri = serving_container_image_uri,
        display_name = "houseprice",
        model_endpoint = "houseprice_endpoint"
    )

## Compile and Run the Pipeline

In [14]:
# COMPILE THE PIPELINE (to create the job spec file)

compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='ml_houseprice.json')

In [15]:
# CREATE A RUN USING THE JOB SPEC FILE GENERATED 

start_pipeline = pipeline_jobs.PipelineJob(
    display_name="houseprice-pipeline",
    template_path="ml_houseprice.json",
    enable_caching=False,
    location=REGION,
)

In [16]:
#create artefac registry
#!gcloud artifacts repositories create houseprice --repository-format=docker --location=europe-west2

In [17]:
# RUN THE PIPELINE

start_pipeline.run()

Creating PipelineJob


InvalidArgument: 400 Invalid image URI europe-west2-docker.pkg.dev/careful-acumen-414922/houseprice/serving_image	.

## Make Predictions Using Vertex AI Endpoint

In [None]:
endpoint_name = <ENDPONT_URI>
endpoint = vertex_ai.Endpoint(endpoint_name)

In [None]:
test_df = pd.read_csv('./data/test.csv')

In [None]:
request = test_df.to_json(orient='records', lines=True)

In [None]:
predictions = endpoint.predict(instances=request.splitlines())