### Make sure to first upload the Iris.csv file to your data bucket in cloud storage to be able to run this file

And if you want to do the batch prediction, also add the iris_batch.csv file to that data bucket. Or another batch file you want to use

In [2]:
# Install the packages
! pip3 install --user --no-cache-dir --upgrade "kfp>2" "google-cloud-pipeline-components>2" \
                                        google-cloud-aiplatform

Collecting kfp>2
  Downloading kfp-2.9.0.tar.gz (595 kB)
     ---------------------------------------- 0.0/595.6 kB ? eta -:--:--
     --------------- ---------------------- 245.8/595.6 kB 7.4 MB/s eta 0:00:01
     ----------------------------- -------- 460.8/595.6 kB 4.8 MB/s eta 0:00:01
     -------------------------------------- 595.6/595.6 kB 4.7 MB/s eta 0:00:00
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting kfp-pipeline-spec==0.4.0 (from kfp>2)
  Obtaining dependency information for kfp-pipeline-spec==0.4.0 from https://files.pythonhosted.org/packages/0b/d7/422c6e40880a15745eaebf74b5ea9a9bfd8e4d3a62cbb34022917e216468/kfp_pipeline_spec-0.4.0-py3-none-any.whl.metadata
  Downloading kfp_pipeline_spec-0.4.0-py3-none-any.whl.metadata (301 bytes)
Collecting kfp-server-api<2.4.0,>=2.1.0 (from kfp>2)
  Downloading kfp_server_api-2.3.0.tar.gz (84 kB)
     ---------------------------------------- 0.0/84.0 kB ? eta -:--:--
  



In [4]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

In [1]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! pip3 freeze | grep aiplatform
! python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

Traceback (most recent call last):
  File "<string>", line 1, in <module>
ModuleNotFoundError: No module named 'kfp'
'grep' is not recognized as an internal or external command,
operable program or batch file.
Traceback (most recent call last):
  File "<string>", line 1, in <module>
ModuleNotFoundError: No module named 'google_cloud_pipeline_components'


In [2]:
import kfp
import typing
from typing import Dict
from typing import NamedTuple
from kfp import dsl
from kfp.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component, 
                        OutputPath, 
                        InputPath)
import google.cloud.aiplatform as aip
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.v1.endpoint import (EndpointCreateOp,ModelDeployOp)
from google_cloud_pipeline_components.types import artifact_types

C:\Users\sande\anaconda3\lib\site-packages\numpy\.libs\libopenblas.FB5AE2TYXYH2IJRDKGDGQ3XBKLKTF43H.gfortran-win_amd64.dll
C:\Users\sande\anaconda3\lib\site-packages\numpy\.libs\libopenblas64__v0.3.21-gcc_10_3_0.dll
  from google_cloud_pipeline_components.v1.model import ModelUploadOp


In [4]:
#The Google Cloud project that this pipeline runs in.
PROJECT_ID = "your project id"
# The region that this pipeline runs in
REGION = "us-central1"
# Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored within the pipeline root.
PIPELINE_ROOT = "your url to pipeline root"   # e.g., gs://temp_de2024

## First create a function that trains a SVM model on dataset

In [41]:
@dsl.component(
    packages_to_install=['pandas', 'scikit-learn==1.3.2'],
    base_image="python:3.10.7-slim"
)
def train_svm(features: Input[Dataset], out_model: Output[Model]) -> NamedTuple('outputs', metrics=dict):
    '''train a SVM with default parameters'''
    import pandas as pd
    from sklearn import svm
    from sklearn import metrics
    from sklearn.model_selection import train_test_split
    import json
    import logging 
    import sys
    import os
    import pickle  
       
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    df = pd.read_csv(features.path)
    df = df.drop(columns=['Id']) # have to drop the variable
    
    logging.info(df.columns)        
    
    x_train, x_test, y_train, y_test = train_test_split(df.drop('Species',axis=1), 
                                                    df['Species'], test_size=0.20, 
                                                    random_state=42)
    model_svm = svm.SVC()
    model_svm.fit(x_train,y_train)
    
    # calculate accuracy
    metrics_dict = {
        "accuracy": model_svm.score(x_test, y_test)
    }
    logging.info(metrics_dict)  
    
    out_model.metadata["file_type"] = ".pkl"
    out_model.metadata["algorithm"] = "svm"
   # Save the model
    model_file = out_model.path + ".pkl"
    with open(model_file, 'wb') as f:  
        pickle.dump(model_svm, f)   
    
    outputs = NamedTuple('outputs', metrics=dict)
    return outputs(metrics_dict)

## Then train a decision tree on the dataset

In [42]:
@dsl.component(
    packages_to_install=['pandas', 'scikit-learn==1.3.2'],
    base_image="python:3.10.7-slim"
)
def train_dt(features: Input[Dataset], out_model: Output[Model]) -> NamedTuple('outputs', metrics=dict):
    '''train a Decision Tree with default parameters'''
    import pandas as pd
    from sklearn import tree
    from sklearn import metrics
    from sklearn.model_selection import train_test_split
    import json
    import logging 
    import sys
    import os
    import pickle  
       
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    
    df = pd.read_csv(features.path)
    df = df.drop(columns=['Id'])
    
    logging.info(df.columns)        
    
    x_train, x_test, y_train, y_test = train_test_split(df.drop('Species',axis=1), 
                                                    df['Species'], test_size=0.20, 
                                                    random_state=42)
    model_dt = tree.DecisionTreeClassifier()
    model_dt.fit(x_train,y_train)

    metrics_dict = {
        "accuracy": model_dt.score(x_test, y_test)
    }
    logging.info(metrics_dict)  
    
    out_model.metadata["file_type"] = ".pkl"
    out_model.metadata["algorithm"] = "dt"
   # Save the model
    model_file = out_model.path + ".pkl"
    with open(model_file, 'wb') as f:  
        pickle.dump(model_dt, f)   
    
    outputs = NamedTuple('outputs', metrics=dict)
    return outputs(metrics_dict)

## Then we compare the two model's performances to decide which one to use

In [43]:
@dsl.component(
    base_image="python:3.10.7-slim"
)
def compare_model(svm_metrics: dict, dt_metrics: dict) -> str:
    import logging
    import json
    import sys
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    logging.info(svm_metrics)
    logging.info(dt_metrics)
    if svm_metrics.get("accuracy") > dt_metrics.get("accuracy"):
        return "SVM"
    else :
        return "DT"

## Then we upload the model to google cloud storage 

In [44]:
@dsl.component(
    packages_to_install=["google-cloud-storage"],
    base_image="python:3.10.7-slim"
)
def upload_model_to_gcs(project_id: str, model_repo: str, model: Input[Model], model_name: str):
    '''upload model to gsc'''
    from google.cloud import storage   
    import logging 
    import sys
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)    
  
    # upload the model to GCS
    client = storage.Client(project=project_id)
    bucket = client.bucket(model_repo)
    blob = bucket.blob('model.pkl')
    source_file_name= model.path + '.pkl'
   
    blob.upload_from_filename(source_file_name)    
    
    print(f"File {source_file_name} uploaded to {model_repo}.")

## Then we create the complete pipeline

In [45]:
# Define the workflow of the pipeline.
@kfp.dsl.pipeline(
    name="iris-predictor-training-pipeline-v2")
def pipeline(project_id: str, data_bucket: str, dataset_uri: str, model_repo: str, model_repo_uri: str):
    
    # load the dataset
    dataset_op = kfp.dsl.importer(
        artifact_uri=dataset_uri,
        artifact_class=Dataset,
        reimport=False,
    )
    
    # svm model
    training_svm_job_run_op = train_svm(
        features=dataset_op.output
    )
    
    # dt model
    training_dt_job_run_op = train_dt(
        features=dataset_op.output
    )
        
    # compare models
    comp_model_op = compare_model(svm_metrics=training_svm_job_run_op.outputs["metrics"],
                                       dt_metrics=training_dt_job_run_op.outputs["metrics"]).after(training_svm_job_run_op, training_dt_job_run_op)  
    
    # This part of the code did not work, so right now, we just use the DT model, whether it was better or not
#     with dsl.If(comp_model_op.output=='SVM'):
#         final_model_op = training_svm_job_run_op
#     with dsl.If(comp_model_op.output=='DT'):
#         final_model_op = training_dt_job_run_op
    
    # upload model to storage
    upload_model_to_gc_op = upload_model_to_gcs(
        project_id=project_id,
        model_repo=model_repo,
        model=training_dt_job_run_op.outputs['out_model'],
        model_name=comp_model_op.output
    ) 
    
    # dont know why we do this but is for deploying the model i believe
    import_unmanaged_model_task = dsl.importer(
            artifact_uri=model_repo_uri,
            artifact_class=artifact_types.UnmanagedContainerModel,
            metadata={
                "containerSpec": {
                    "imageUri": "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-3:latest",  # see https://cloud.google.com/vertex-ai/docs/predictions/pre-built-containers  
                },
            },
        ).after(upload_model_to_gc_op)  
    
    # upload model to model registry
    model_upload_op = ModelUploadOp(
            project=project_id,
            display_name=f"iris-prediction-model-{comp_model_op.output}",
            unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
        ).after(import_unmanaged_model_task) 
    
    # create endpoint for predictions
    create_endpoint_op = EndpointCreateOp(
            project=project_id,
            display_name="iris-prediction-service",
        ).after(model_upload_op) 
    
    # deploy full model
    model_deploy_op = ModelDeployOp(
            model=model_upload_op.outputs["model"],
            endpoint=create_endpoint_op.outputs['endpoint'],
            deployed_model_display_name=f"iris-prediction-model-{comp_model_op.output}",
            dedicated_resources_machine_type="n1-standard-2",
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1,
            traffic_split={"0": 100},
        ).after(create_endpoint_op)

## Then we compile the YAML file, which is used to run the pipeline

In [46]:
from kfp import compiler
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='iris_predictor_training_pipeline.yaml')

## Then we run the pipeline job

In [47]:
import google.cloud.aiplatform as aip

# Before initializing, make sure to set the GOOGLE_APPLICATION_CREDENTIALS
# environment variable to the path of your service account.
aip.init(
    project=PROJECT_ID,
    location=REGION,
)

# Prepare the pipeline job
job = aip.PipelineJob(
    display_name="iris-predictor",
    enable_caching=False,
    template_path="iris_predictor_training_pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,
    location=REGION,
    parameter_values={
        'project_id': PROJECT_ID, # makesure to use your project id 
        'data_bucket': 'data_de2024_2083033',  # makesure to use your data bucket name 
        'dataset_uri':'gs://data_de2024_2083033/Iris.csv',
        'model_repo':'models_de2024_2083033', # makesure to use your model bucket name 
        'model_repo_uri':'gs://models_de2024_2083033' # makesure to use your model bucket name 
    }
)

job.run()

DefaultCredentialsError: Your default credentials were not found. To set up Application Default Credentials, see https://cloud.google.com/docs/authentication/external/set-up-adc for more information.

## Small test to see if model is there

In [None]:
DISPLAY_NAME = "iris-prediction-model"
! gcloud ai models list --region={REGION} --filter={DISPLAY_NAME}

## Create an endpoint prediction for a random instance

In [None]:
ENDPOINT_NAME="iris-prediction-service"
instance = [[6.7,3.3,5.7,2.5]]  # Prediciton request inputs 
ENDPOINT_ID = !(gcloud ai endpoints list --region=$REGION \
              --format='value(ENDPOINT_ID)'\
              --filter=display_name=$ENDPOINT_NAME \
              --sort-by=creationTimeStamp | tail -1)
ENDPOINT_ID = ENDPOINT_ID[1]

def endpoint_predict(
    project: str, location: str, instances: list, endpoint: str
):
    aip.init(project=project, location=location)

    endpoint = aip.Endpoint(endpoint)

    prediction = endpoint.predict(instances=instances)
    return prediction

endpoint_predict(PROJECT_ID, REGION, instance, ENDPOINT_ID)

## Create a batch prediction for a random file with input data

In [None]:
# Define variables 
job_display_name = "iris-prediction-batch-prediction-job"
MODEL_NAME="iris-prediction-model-DT"
ENDPOINT_NAME="iris-prediction-service"
BUCKET_URI="gs://data_de2024_2083033"
input_file_name="iris_batch.csv"

# Get model id
MODEL_ID=!(gcloud ai models list --region=$REGION \
           --filter=display_name=$MODEL_NAME)
print(MODEL_ID)
MODEL_ID=MODEL_ID[2].split(" ")[0]

model_resource_name = f'projects/{PROJECT_ID}/locations/{REGION}/models/{MODEL_ID}'
print(model_resource_name)
gcs_source= [f"{BUCKET_URI}/{input_file_name}"]
gcs_destination_prefix=f"{BUCKET_URI}/output"

def batch_prediction_job(
    project: str,
    location: str,
    model_resource_name: str,
    job_display_name: str,
    gcs_source: str,
    gcs_destination_prefix: str,
    machine_type: str,
    starting_replica_count: int = 1, # The number of nodes for this batch prediction job. 
    max_replica_count: int = 1,    
):   
    aip.init(project=project, location=location)

    model = aip.Model(model_resource_name)

    batch_prediction_job = model.batch_predict(
        job_display_name=job_display_name,
        instances_format='csv', #csv
        gcs_source=[f"{BUCKET_URI}/{input_file_name}"],
        gcs_destination_prefix=f"{BUCKET_URI}/output",
        machine_type=machine_type, # must be present      
    )
    batch_prediction_job.wait()
    print(batch_prediction_job.display_name)
    print(batch_prediction_job.state)
    return batch_prediction_job

batch_prediction_job(PROJECT_ID, REGION, model_resource_name, job_display_name, gcs_source, gcs_destination_prefix, machine_type="n1-standard-2")