In [None]:
! pip install google-cloud-aiplatform==1.16.1 kfp==1.8.20

In [None]:
from kfp.v2.dsl import Artifact, Dataset, Input, Metrics, Model, Output, component, pipeline
from kfp.v2 import compiler

# Component to get data from BigQuery (the database where the data is hosted)
@component(
    base_image="python:3.9-slim", 
    packages_to_install=[
        "google-cloud-bigquery==2.34.4",
        "pandas==2.0.1",
        "db-dtypes==1.1.1"
    ]
)
def get_source_data(
    source_data: Output[Dataset],
    project_id_host_project: str = "pj-dgbe-vtk-workshop-2023-host",
    dataset_name: str = "source_data",
    table_name: str = "housing"
):
    
    from google.cloud import bigquery
    bigquery_client = bigquery.Client()
    
    dataset_ref = bigquery.DatasetReference(project_id_host_project, dataset_name)
    table_ref = dataset_ref.table(table_name)
    table = bigquery_client.get_table(table_ref)
    
    src_data = bigquery_client.list_rows(table).to_dataframe()
    
    src_data.to_csv(
        source_data.path, 
        index=False,
        header=True
    )

In [None]:
# Component to process data and split it into test and train sets
@component(
    base_image="python:3.9-slim",
    packages_to_install=[
        "pandas==2.0.1",
        "scikit-learn==1.2.2"
    ]
)
def process_source_data(
    source_data: Input[Dataset],
    features_train: Output[Dataset],
    target_train: Output[Dataset],
    features_test: Output[Dataset],
    target_test: Output[Dataset]
):
    import pandas as pd
    from sklearn.model_selection import train_test_split
    
    src_data = pd.read_csv(source_data.path)
    
    ##############################################################
    ## TODO: try to use feature 'furnishingstatus' as a feature ##
    ## hint: this is a string and must be converted to a number ##
    ## before it can be used                                    ##
    ##############################################################
    features = src_data.drop('price', axis=1)#.drop('furnishingstatus', axis=1)
    
    def status_to_number(status):
        if status == "furnished":
            return 0
        if status == "unfurnished":
            return 1
        if status == "semi-furnished":
            return 2
        return -1
    
    features['furnishingstatus'] = features['furnishingstatus'].apply(status_to_number)
    
    target = src_data['price']
    
    ftrs_train, ftrs_test, trgt_train, trgt_test = train_test_split(features,target)
    
    ftrs_train.to_csv(
        features_train.path,
        index=False,
        header=True
    )
    
    trgt_train.to_csv(
        target_train.path,
        index=False,
        header=True
    )
    
    ftrs_test.to_csv(
        features_test.path,
        index=False,
        header=True
    )
        
    trgt_test.to_csv(
        target_test.path,
        index=False,
        header=True
    )
    

In [None]:
@component(
    base_image="python:3.9-slim",
    packages_to_install=[
        "pandas==2.0.1", 
        "scikit-learn==1.2.2"
    ]
)
def train_model(
    features_train: Input[Dataset],
    target_train: Input[Dataset],
    features_test: Input[Dataset],
    target_test: Input[Dataset],
    model: Output[Model], 
    metrics: Output[Metrics]
):
    
    import pandas as pd
    import pickle
    from sklearn.tree import DecisionTreeRegressor
    
    ftrs_train = pd.read_csv(features_train.path)
    trgt_train = pd.read_csv(target_train.path)
    ftrs_test = pd.read_csv(features_test.path)
    trgt_test = pd.read_csv(target_test.path)
    
    ################################################
    ## TODO: add and train any sklearn model here ##
    ################################################
    ml_model = DecisionTreeRegressor()
    ml_model.fit(ftrs_train, trgt_train)
    
    with open(model.path, "wb") as f:
        pickle.dump(ml_model, f)
    
    # Calculate and export squared_error
    squared_error = ml_model.score(ftrs_test, trgt_test)
    metrics.log_metric("squared error", (squared_error * 100.0))
    metrics.log_metric("framework", "Scikit Learn")
    

In [None]:
@component(
    base_image='python:3.9-slim',
    packages_to_install=[
        'google-cloud-aiplatform==1.16.1', 
        'kfp-server-api==1.8'
    ]
)
def deploy_model(
    trained_model: Input[Model],
    project_id: str,
    region: str,
    endpoint_name: str,
    model_display_name: str
):
    """
    Deploy model to Vertex AI endpoint
    :param trained_model:       Model artifact to deploy
    :param feature_data_scaler: Scaler artifact to scale input data
    :param target_data_scaler:  Scaler artifact to scale target data
    :param region:              Region to init AI Platform
    :param project_id:          Project ID
    :param endpoint_name:       Name of endpoint
    :param model_display_name:  Display name of model
    :param serving_image:       Name of serving image to use for custom predictions
    :param service_account:     Service account that will use the serving image
    """

    import google.cloud.aiplatform as aip

    def extract_or_create_endpoint(endpoint_display_name: str):
        """
        Looks for an existing Vertex AI Endpoint with a given display name. If not found, creates a new one
        
        :param endpoint_display_name: display name of the Vertex AI Endpoint
        :return: created/retrieved endpoint object
        """
        existing_endpoints = aip.Endpoint.list()
        found_endpoint = False
        i = 0
        while i < len(existing_endpoints) and not found_endpoint:
            endpoint_retrieved = existing_endpoints[i]
            if endpoint_retrieved.display_name == endpoint_display_name:
                endpoint = endpoint_retrieved
                found_endpoint = True
            i += 1
        if not found_endpoint:
            endpoint = aip.Endpoint.create(display_name=endpoint_display_name)
        return endpoint

    def model_retrieve(model_display_name: str):
        """
        Looks for an existing Vertex AI Model with a given display name
        :param model_display_name: display name of the Vertex AI Mndpoint
        :return: Model if found, else None
        """
        models = aip.Model.list()
        for model in models:
            if model.display_name == model_display_name:
                return model
        return None

    aip.init(
        project=project_id,
        location=region,
    )

    endpoint = extract_or_create_endpoint(endpoint_name)
    artifact_uri = trained_model.path

    model = model_retrieve(model_display_name)

    model = aip.Model.upload_scikit_learn_model_file(
        model_file_path = trained_model.path
    )

    model.deploy(
        endpoint=endpoint,
        machine_type="n1-standard-4",
        min_replica_count=1,
        max_replica_count=1,
        accelerator_type=None,
        accelerator_count=None,
        traffic_split={"0": 100},
        deployed_model_display_name=model_display_name
    )
    

In [None]:
@pipeline(name="pipeline-sklearn")
def my_pipeline(
    project_id: str,
    region: str,
    endpoint_name: str,
    model_display_name: str
):
    
    source_data = get_source_data()
    
    processed_source_data = process_source_data(
        source_data = source_data.outputs["source_data"]
    )
    
    trained_model = train_model(
        features_train = processed_source_data.outputs["features_train"],
        target_train = processed_source_data.outputs["target_train"],
        features_test = processed_source_data.outputs["features_test"],
        target_test = processed_source_data.outputs["target_test"]
    )
    
    deploy_model(
        trained_model = trained_model.outputs["model"],
        project_id = project_id,
        region = region,
        endpoint_name = endpoint_name,
        model_display_name = model_display_name
    )

In [None]:
from datetime import datetime

from google.cloud import aiplatform

if __name__ == "__main__":
    compiler.Compiler().compile(my_pipeline,package_path="pipeline.json")

    model_display_name = "predict-house-prices"
    project_id = "dt-vtk-workshop-1"
    region = "europe-west1"
    pipeline_root_bucket = "gs://gcs-be-dt-vtk-workshop-1-pipelines"
    
    run = aiplatform.PipelineJob(
        project=project_id,
        location=region,
        display_name=model_display_name,
        template_path="pipeline.json",
        job_id=f"test-pipeline-{datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}",
        enable_caching=True,
        pipeline_root=pipeline_root_bucket,
        parameter_values={
            "project_id": project_id,
            "region": region,
            "endpoint_name": "endp-predict-house-prices",
            "model_display_name": model_display_name
        },
    )

    run.submit()

In [None]:
# Use the model to do a prediction

def endpoint_predict_sample(
    to_predict: list,
    project: str = project_id, 
    location: str = region, 
    endpoint: str = "1833536794386235392" # example: "4580732567082237952"
):
    aiplatform.init(project=project, location=location)

    endpoint = aiplatform.Endpoint(endpoint)

    prediction = endpoint.predict(instances=to_predict)
    print(prediction)
    return prediction



In [None]:
df_sample_requests_list = [[3450,1,1,1,True,False,False,True,False,0,False]]
prediction = endpoint_predict_sample(df_sample_requests_list)
print(prediction)