### Installation
Install the packages required for executing this notebook.

In [2]:
import os

# The Vertex AI Workbench Notebook product has specific requirements
IS_WORKBENCH_NOTEBOOK = os.getenv("DL_ANACONDA_HOME") and not os.getenv("VIRTUAL_ENV")
IS_USER_MANAGED_WORKBENCH_NOTEBOOK = os.path.exists(
    "/opt/deeplearning/metadata/env_version"
)

# Vertex AI Notebook requires dependencies to be installed with '--user'
USER_FLAG = ""
if IS_WORKBENCH_NOTEBOOK:
    USER_FLAG = "--user"

! pip3 install --upgrade google-cloud-aiplatform {USER_FLAG} -q
! pip3 install -U google-cloud-storage {USER_FLAG} -q
! pip3 install {USER_FLAG} kfp google-cloud-pipeline-components --upgrade -q

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
kfp 1.8.14 requires google-cloud-storage<2,>=1.20.0, but you have google-cloud-storage 2.5.0 which is incompatible.
google-cloud-pipeline-components 1.0.24 requires google-cloud-storage<2,>=1.20.0, but you have google-cloud-storage 2.5.0 which is incompatible.[0m[31m
[0m

## Restart the kernel
Once you've installed the additional packages, you need to restart the notebook kernel so it can find the packages.

In [3]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

Check the versions of the packages you installed. The KFP SDK version should be >=1.6.

In [1]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"

KFP SDK version: 1.8.14


In [26]:
import kfp
import typing
from typing import Dict
from typing import NamedTuple
from kfp.v2 import dsl
from kfp.v2.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component, 
                        OutputPath, 
                        InputPath)
import google.cloud.aiplatform as aip
from google_cloud_pipeline_components import aiplatform as gcc_aip
from kfp.v2.components import importer_node
from google_cloud_pipeline_components.types import artifact_types

#### Project and Pipeline Configurations

In [16]:
#The Google Cloud project that this pipeline runs in.
project_id = "de2022-362409"
# The region that this pipeline runs in
region = "us-west1"
# Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored within the pipeline root.
pipeline_root_path = "gs://de_jads_temp"

#### Create Pipeline Components

We can create a component from Python functions (inline) and from a container. We will first try inline python functions. 

Step 1: Define the python function

Step 2:  Use **kfp.components.create_component_from_func** build the component. This function takes four parameters.

**1.func**: The Python function to convert.

**2.base_image**: (Optional.) Specify the Docker container image to run this function in. 

**3.output_component_file**: (Optional.) Writes your component definition to a file. 

**4.packages_to_install**: (Optional.) A list of versioned Python packages to install before running your function.

Another thing we need to consider is passing parameters between components. We can pass simple parameters such as integer, string, tuple, dict, and list by values. To pass the large datasets or complex configurations, we can use files. We can annotate the Python function’s parameters to indicate input or output files for the component. 

Refer to  https://www.kubeflow.org/docs/components/pipelines/sdk/python-function-components/ for more information.

#### Pipeline Component : Data Ingestion

In [17]:
@component(
    packages_to_install=["pandas", "pyarrow", "scikit-learn", "google-cloud-storage"],
    base_image="python:3.10.7-slim",
    output_component_file="data_ingestion.yaml"
)
def download_data(project_id: str, bucket: str, file_name: str, dataset_train: Output[Dataset], dataset_test: Output[Dataset]):
    '''download data'''
    from google.cloud import storage
    import pandas as pd
    import logging 
    import sys
    from sklearn.model_selection import train_test_split as tts
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    # Downloaing the file from a google bucket 
    client = storage.Client(project=project_id)
    bucket = client.get_bucket(bucket)
    blob = bucket.blob(file_name)
    local_path = '/tmp/'+ file_name
    blob.download_to_filename(local_path)
    logging.info('Downloaded Data!')
    
    alldata = pd.read_csv(local_path, index_col=None, squeeze=True)
    train, test = tts(alldata, test_size=0.3)
    train.to_csv(dataset_train.path + ".csv" , index=False, encoding='utf-8-sig')
    test.to_csv(dataset_test.path + ".csv" , index=False, encoding='utf-8-sig')

#### Pipeline Component : Training LogisticRegression

In [18]:
@component(
    packages_to_install=['google-cloud-storage', 'pandas', 'joblib', 'scikit-learn'],
    base_image="python:3.10.7-slim",
    output_component_file="train_lr_model.yaml"
)
def train_lr (features: Input[Dataset], model: Output[Model]):
    '''train a LogisticRegression with default parameters'''
    import pandas as pd
    from sklearn.linear_model import LogisticRegression  
    from google.cloud import storage    
    import joblib 
    
    data = pd.read_csv(features.path+".csv")
    model_lr = LogisticRegression()
    model_lr.fit(data.drop('class',axis=1), data['class'])
    model.metadata["framework"] = "LR"
    file_name = model.path + f".pkl"
    with open(file_name, 'wb') as file:  
        joblib.dump(model_lr, file)   


#### Pipeline Component : Model Evaluation

In [19]:
@component(
    packages_to_install = [
        "pandas",
        "sklearn", 
        "joblib"
    ], base_image="python:3.10.7-slim",  output_component_file="model_evaluation.yaml"
)
def lr_model_evaluation(
    test_set:  Input[Dataset],
    model_lr: Input[Model],
    thresholds_dict_str: str,
    metrics: Output[ClassificationMetrics],
    kpi: Output[Metrics]
) -> NamedTuple("output", [("approval", str)]):

    from sklearn.ensemble import RandomForestClassifier
    import pandas as pd
    import logging     
    from sklearn.metrics import roc_curve, confusion_matrix, accuracy_score
    import json
    import typing
    import joblib
    
    def threshold_check(val1, val2):
        cond = "false"
        if val1 >= val2 :
            cond = "true"
        return cond

    data = pd.read_csv(test_set.path+".csv")
    
     #Loading the saved model with joblib
    m_filename = model_lr.path + ".pkl"
    model = joblib.load(m_filename)
    
    y_test = data.drop(columns=["class"])
    y_target = data['class']
    y_pred = model.predict(y_test)    

    y_scores =  model.predict_proba(data.drop(columns=["class"]))[:, 1]
    fpr, tpr, thresholds = roc_curve(
         y_true=data['class'].to_numpy(), y_score=y_scores, pos_label=True
    )
    metrics.log_roc_curve(fpr.tolist(), tpr.tolist(), thresholds.tolist())  
    
    metrics.log_confusion_matrix(
       ["False", "True"],
       confusion_matrix(
           data['class'], y_pred
       ).tolist(), 
    )
    
    accuracy = accuracy_score(data['class'], y_pred.round())
    thresholds_dict  = json.loads(thresholds_dict_str)
    model_lr.metadata["accuracy"] = float(accuracy)
    kpi.log_metric("accuracy", float(accuracy))
    approval = threshold_check(float(accuracy), int(thresholds_dict['roc']))
    return (approval,)

### Upload Model and Metrics to Google Bucket 

In [20]:
@component(
    packages_to_install=["google-cloud-storage"],
    base_image="python:3.10.7-slim",
    output_component_file="model_upload.yaml"
)
def upload_model(project_id: str, model_repo: str, model: Input[Model]):
    '''upload model and metrics'''
    from google.cloud import storage   
    import logging 
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)    
  
    # upload the model to GCS
    client = storage.Client(project=project_id)
    bucket = client.get_bucket(model_repo)
    blob = bucket.blob('model.pkl') 
    blob.upload_from_filename(model.path + '.pkl')   
    
    
    print("Saved the model to GCP bucket : " + model_repo)

### Deploy the model at Vertext AI 
We can use Google Pre-built Kebeflow componets such as  EndpointCreateOp, ModelUploadOp, and ModelDeployOp to deploy the models locally at Vertex AI.

***This is only for testing.  In your assigment, please use custom serving applications and CI-CD pipelines to deploy models. We should be able to deploy a given model at any given production environment. CI-CD pipelines are the best solution. ***

<img src="imgs/EnterpriseMlOps_Model_Deployment.png">

source: https://cloud.redhat.com/blog/enterprise-mlops-reference-design

https://cloud.google.com/vertex-ai/docs/pipelines/components-introduction

https://cloud.google.com/vertex-ai/docs/pipelines/gcpc-list

### Enable Artifact Registryy API

https://cloud.google.com/artifact-registry/docs/enable-service

#### Define the Pipeline

In [58]:
# Define the workflow of the pipeline.
@kfp.dsl.pipeline(
    name="diabetes-prdictor-training-pipeline-lab5ex1",
    pipeline_root=pipeline_root_path)
def pipeline(project_id: str, data_bucket: str, trainset_filename: str, model_repo: str, thresholds_dict_str:str):
    
    
    di_op = download_data(
        project_id=project_id,
        bucket=data_bucket,
        file_name=trainset_filename
    )
     
    training_lr_job_run_op = train_lr(features=di_op.outputs["dataset_train"]
    )
    
    model_evaluation_op = lr_model_evaluation(
        test_set=di_op.outputs["dataset_test"],
        model_lr=training_lr_job_run_op.outputs["model"],
        thresholds_dict_str = thresholds_dict_str, # I deploy the model anly if the model performance is above the threshold
    )
    
    with dsl.Condition(
        model_evaluation_op.outputs["approval"]=="true",
        name="approve-model",
    ):
        upload_model_op = upload_model(
            project_id=project_id,
            model_repo=model_repo,
            model=training_lr_job_run_op.outputs['model']
        )    
        
        import_unmanaged_model_task = importer_node.importer(
            artifact_uri= "gs://model_repo_de2022",
            artifact_class=artifact_types.UnmanagedContainerModel,
            metadata={
                "containerSpec": {
                    "imageUri": "europe-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-24:latest",
                },
            },
        ).after(upload_model_op)      
       
        
        model_upload_op = gcc_aip.ModelUploadOp(
            project=project_id,
            display_name="diabetes-prediction-model",
            unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
        ).after(import_unmanaged_model_task)

        # model_upload_op.after(import_unmanaged_model_task)
        
          # using Google's custom components for for deploying the model.
        create_endpoint_op = gcc_aip.EndpointCreateOp(
            project=project_id,
            display_name="diabetes-prediction-service",
        ).after(model_upload_op)      
        
        model_deploy_op = gcc_aip.ModelDeployOp(
            model=model_upload_op.outputs["model"],
            endpoint=create_endpoint_op.outputs['endpoint']
        )    
        
        # model_deploy_op.after(model_deploy_op)

#### Compile the pipeline into a JSON file

In [59]:
from kfp.v2 import compiler
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='diabetes_prdictor_training_pipeline_lab5ex1.json')

#### Submit the pipeline run

In [None]:

job = aip.PipelineJob(
    display_name="diabetes-predictor-lab5ex1",
    enable_caching=False,
    template_path="diabetes_prdictor_training_pipeline_lab5ex1.json",
    pipeline_root=pipeline_root_path,
    parameter_values={
        'project_id': project_id, # makesure to use your project id 
        'data_bucket': 'data_de2022',  # makesure to use your data bucket name 
        'trainset_filename': 'training_set.csv',     # makesure to upload these to your data bucket from DE2022/lab4/data
        'model_repo':'model_repo_de2022', # makesure to use your model bucket name 
         'thresholds_dict_str':'{"roc":0.8}'
    }
)

job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/958343845263/locations/us-central1/pipelineJobs/diabetes-prdictor-training-pipeline-lab5ex1-20221013102820
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/958343845263/locations/us-central1/pipelineJobs/diabetes-prdictor-training-pipeline-lab5ex1-20221013102820')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/diabetes-prdictor-training-pipeline-lab5ex1-20221013102820?project=958343845263
PipelineJob projects/958343845263/locations/us-central1/pipelineJobs/diabetes-prdictor-training-pipeline-lab5ex1-20221013102820 current state:
PipelineState.PIPELINE_STATE_RUNNING
