In [None]:
# Install the packages
! pip3 install --user --no-cache-dir --upgrade "kfp>2" "google-cloud-pipeline-components>2" \
                                        google-cloud-aiplatform

In [None]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

In [None]:
Check the versions of the packages you installed. The KFP SDK version should be >=1.6.

In [None]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! pip3 freeze | grep aiplatform
! python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

In [None]:
import kfp
import typing
from typing import Dict
from typing import NamedTuple
from kfp import dsl
from kfp.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component, 
                        OutputPath, 
                        InputPath)
import google.cloud.aiplatform as aip
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.v1.endpoint import (EndpointCreateOp,ModelDeployOp)
from google_cloud_pipeline_components.types import artifact_types

In [None]:
#### Project and Pipeline Configurations

In [None]:
#The Google Cloud project that this pipeline runs in.
PROJECT_ID = "dataengineeringcourse2023"
# The region that this pipeline runs in
REGION = "us-central1"
# Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored within the pipeline root.
PIPELINE_ROOT = "gs://temp_de2023_2124849"

In [None]:
### Pipeline Component : Train and Test Split¶

In [None]:
@dsl.component(
    packages_to_install=["pandas", "scikit-learn"],
    base_image="python:3.10.7-slim"
)
def train_test_split(dataset: Input[Dataset], dataset_train: Output[Dataset], dataset_test: Output[Dataset]):
    '''train_test_split'''
    import pandas as pd
    import logging 
    import sys
    from sklearn.model_selection import train_test_split as tts
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO) 
    
    ## here we can actually consider splitting one dataset into train and test
    alldata = pd.read_csv(dataset.path, index_col=None)
    
    alldata = alldata.drop(['gender', 'ever_married', 'work_type', 'Residence_type'], axis=1)
    train, test = tts(alldata, test_size=0.3)
    train.to_csv(dataset_train.path + ".csv" , index=False, encoding='utf-8-sig')
    test.to_csv(dataset_test.path + ".csv" , index=False, encoding='utf-8-sig')

In [None]:
#### Pipeline Component : Training LogisticRegression

In [None]:
@dsl.component(
    packages_to_install=["pandas", "scikit-learn"],
    base_image="python:3.10.7-slim"
)
def train_lr(features: Input[Dataset], model: Output[Model]):
    '''train a LogisticRegression with default parameters'''
    import pandas as pd
    from sklearn.linear_model import LogisticRegression        
    import pickle 
    
    data = pd.read_csv(features.path+".csv")
    model_lr = LogisticRegression()
    model_lr.fit(data.drop('stroke',axis=1), data['stroke'])
    model.metadata["framework"] = "LR"
    file_name = model.path + f".pkl"
    with open(file_name, 'wb') as file:  
        pickle.dump(model_lr, file)   


In [None]:
##### Pipeline evaluation: pipeline component

In [None]:
### Upload Model and Metrics to Google Bucket 

In [None]:
@dsl.component(
    packages_to_install=["google-cloud-storage"],
    base_image="python:3.10.7-slim"
)
def upload_model_to_gcs(project_id: str, model_repo: str, model: Input[Model]):
    '''upload model to gsc'''
    from google.cloud import storage   
    import logging 
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)    
  
    # upload the model to GCS
    client = storage.Client(project=project_id)
    bucket = client.bucket(model_repo)
    blob = bucket.blob('model.pkl')
    source_file_name= model.path + '.pkl'
   
    blob.upload_from_filename(source_file_name)    
    
    print(f"File {source_file_name} uploaded to {model_repo}.")

In [None]:
#### Define pipeline

In [None]:
# Define the workflow of the pipeline.
@kfp.dsl.pipeline(
    name="stroke-predictor-training-pipeline")
def pipeline(project_id: str, data_bucket: str, dataset_uri: str, model_repo: str, thresholds_dict_str:str, model_repo_uri:str):    
    
    dataset_op = kfp.dsl.importer(
        artifact_uri=dataset_uri,
        artifact_class=Dataset,
        reimport=False,
    )
     
    train_test_split_op = train_test_split(dataset=dataset_op.output)
        
    training_lr_job_run_op = train_lr(features=train_test_split_op.outputs["dataset_train"])
    
    model_evaluation_op = lr_model_evaluation(
        test_set=train_test_split_op.outputs["dataset_test"],
        model_lr=training_lr_job_run_op.outputs["model"],
        thresholds_dict_str=thresholds_dict_str, # I deploy the model anly if the model performance is above the threshold
    )
    
    with dsl.If(
        model_evaluation_op.outputs["approval"]== True,
        name="approve-model",
    ):
        upload_model_to_gc_op = upload_model_to_gcs(
            project_id=project_id,
            model_repo=model_repo,
            model=training_lr_job_run_op.outputs['model']
        )    
        
        import_unmanaged_model_task = dsl.importer(
            artifact_uri="gs://models_de2023_2124849",
            artifact_class=artifact_types.UnmanagedContainerModel,
            metadata={
                "containerSpec": {
                    "imageUri": "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",  # see https://cloud.google.com/vertex-ai/docs/predictions/pre-built-containers  
                },
            },
        ).after(upload_model_to_gc_op)      
       
    ## According to Indika's email these steps are unnecessary
        # using Google's custom components for for uloading and deploying the model.
       
#         model_upload_op = ModelUploadOp(
#             project=project_id,
#             display_name="stroke-prediction-model",
#             unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
#         ).after(import_unmanaged_model_task)       
               
#         model_deploy_op = ModelDeployOp(
#             model=model_upload_op.outputs["model"],
#             deployed_model_display_name="stroke-prediction-model",
#             dedicated_resources_machine_type="n1-standard-4",
#             dedicated_resources_min_replica_count=1,
#             dedicated_resources_max_replica_count=1,
#             traffic_split={"0": 100},
#         ).after(create_endpoint_op)      

In [None]:
from kfp import compiler
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='stroke_prdictor_training_pipeline.yaml')

In [None]:
#### Submit the pipeline run