In [1]:
# Install the packages
! pip3 install --user --no-cache-dir --upgrade "kfp>2" "google-cloud-pipeline-components>2" \
                                        google-cloud-aiplatform

Collecting kfp>2
  Downloading kfp-2.9.0.tar.gz (595 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m595.6/595.6 kB[0m [31m30.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Collecting kfp-pipeline-spec==0.4.0 (from kfp>2)
  Downloading kfp_pipeline_spec-0.4.0-py3-none-any.whl.metadata (301 bytes)
Collecting kfp-server-api<2.4.0,>=2.1.0 (from kfp>2)
  Downloading kfp_server_api-2.3.0.tar.gz (84 kB)
  Preparing metadata (setup.py) ... [?25ldone


In [101]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

In [1]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! pip3 freeze | grep aiplatform
! python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

KFP SDK version: 2.7.0
google-cloud-aiplatform==1.70.0
google_cloud_pipeline_components version: 2.17.0


In [2]:
import kfp
import typing
from typing import Dict
from typing import NamedTuple
from kfp import dsl
from kfp.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component, 
                        OutputPath, 
                        InputPath)
import google.cloud.aiplatform as aip
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.v1.endpoint import (EndpointCreateOp,ModelDeployOp)
from google_cloud_pipeline_components.types import artifact_types

In [3]:
#The Google Cloud project that this pipeline runs in.
PROJECT_ID = "de2024-435509"
# The region that this pipeline runs in
REGION = "us-central1"
# Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored within the pipeline root.
PIPELINE_ROOT = "gs://temp_de2024_bz"   # e.g., gs://temp_de2024

In [4]:
@dsl.component(
    packages_to_install=["pandas", "scikit-learn==1.3.2"],
    base_image="python:3.10.7-slim"
)
def train_test_split(dataset: Input[Dataset], dataset_train: Output[Dataset], dataset_test: Output[Dataset]):
    '''train_test_split'''
    import pandas as pd
    import logging 
    import sys
    from sklearn.model_selection import train_test_split as tts
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO) 
    
    alldata = pd.read_csv(dataset.path, index_col=None)
    train, test = tts(alldata, test_size=0.3)
    train.to_csv(dataset_train.path + ".csv" , index=False, encoding='utf-8-sig')
    test.to_csv(dataset_test.path + ".csv" , index=False, encoding='utf-8-sig')

In [5]:
@dsl.component(
    packages_to_install=["pandas", "scikit-learn==1.3.2", "joblib"],
    base_image="python:3.10.7-slim"
)
def train_rf(features: Input[Dataset], rf_model: Output[Model]):
    '''Train a RandomForestClassifier using message text as features and label as the target'''
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.pipeline import Pipeline
    from sklearn.feature_extraction.text import TfidfVectorizer
    import joblib
    
    # Load the dataset, handling encoding for any special characters
    data = pd.read_csv(features.path + ".csv", encoding="utf-8")
    
    # Separate features and target
    X = data['message'].fillna("").astype(str).str.lower()    # Message column as the feature
    y = data['label']      # Label column as the target
    
    # Define the pipeline: text vectorization and classifier
    pipeline = Pipeline([
        ('vectorizer', TfidfVectorizer()),     # Convert text to TF-IDF features
        ('classifier', RandomForestClassifier())  # Train the RandomForestClassifier
    ])
    
    # Fit the pipeline to the data
    pipeline.fit(X, y)
    
    # Save metadata and the model
    rf_model.metadata["framework"] = "RandomForest"
    file_name = rf_model.path + ".joblib"
    joblib.dump(pipeline, file_name)


In [6]:
@dsl.component(
    packages_to_install=["pandas", "scikit-learn==1.3.2","joblib"],
    base_image="python:3.10.7-slim"
)
def train_nb(features: Input[Dataset], nb_model: Output[Model]):
    '''Train a Naive Bayes (MultinomialNB) using message text as features and label as the target'''
    import pandas as pd
    from sklearn.naive_bayes import MultinomialNB
    from sklearn.pipeline import Pipeline
    from sklearn.feature_extraction.text import TfidfVectorizer
    import joblib

    # Load the dataset, handling encoding for any special characters
    data = pd.read_csv(features.path + ".csv", encoding="utf-8")
    
    # Separate features and target
    X = data['message'].fillna("").astype(str).str.lower()    # Message column as the feature
    y = data['label']      # Label column as the target
    
    # Define the pipeline for MultinomialNB
    nb_pipeline = Pipeline([
        ('vectorizer', TfidfVectorizer()),       # Convert text to TF-IDF features
        ('classifier', MultinomialNB())          # MultinomialNB pipeline
    ])
    
    # Train the Naive Bayes pipeline
    nb_pipeline.fit(X, y)
    
    # Save Naive Bayes model
    nb_model.metadata["framework"] = "NaiveBayes"
    nb_file_name = nb_model.path + f".joblib"
    joblib.dump(nb_pipeline, nb_file_name)

In [15]:
@dsl.component(
    packages_to_install=[
        "pandas", "scikit-learn==1.3.2", "numpy", "joblib"
    ], base_image="python:3.10.7-slim"
)
def model_evaluation(
    test_set: Input[Dataset],
    model: Input[Model],
    thresholds_dict_str: str,
    metrics: Output[ClassificationMetrics],
    kpi: Output[Metrics]
) -> NamedTuple('outputs', [('approval',bool),('accuracy', float)]):
  
    import pandas as pd
    from sklearn.metrics import roc_curve, confusion_matrix, accuracy_score
    import json
    import joblib
    from numpy import nan_to_num
    
    def threshold_check(val1, val2):
        return val1 >= val2

    # Load the test dataset
    data = pd.read_csv(test_set.path + ".csv")
    
    # Load the saved pipeline (with both vectorizer and classifier)
    m_filename = model.path + ".joblib"
    model = joblib.load(m_filename)
    
    # Prepare test data
    X_test = data['message'].fillna("").astype(str).str.lower()   # Pass only 'message' column as features
    y_target = data['label']  # 'label' is the target column
    
    # Predict and calculate scores
    y_pred = model.predict(X_test)
    y_scores = model.predict_proba(X_test)[:, 1]
 
    # Calculate ROC curve values
    fpr, tpr, thresholds = roc_curve(
         y_true=y_target, y_score=y_scores, pos_label=True
    )
    thresholds = nan_to_num(thresholds)   
    metrics.log_roc_curve(fpr, tpr, thresholds)  
    
    # Log confusion matrix
    metrics.log_confusion_matrix(
        ['Negative', 'Positive'],
         confusion_matrix(y_target, y_pred).tolist(), 
    )
    
    # Calculate and log accuracy
    accuracy = accuracy_score(y_target, y_pred)
    thresholds_dict = json.loads(thresholds_dict_str)
    kpi.log_metric("accuracy", float(accuracy))
    
    # Approval check based on the threshold
    outputs = NamedTuple('outputs', [('approval',bool),('accuracy', float)])
    approval_value = threshold_check(float(accuracy), int(thresholds_dict['roc']))
    return outputs(approval_value, accuracy)


In [16]:
@dsl.component(
    packages_to_install=[],
    base_image="python:3.10.7-slim"
)
def get_best_model_name(
    rf_accuracy: float,
    nb_accuracy: float
) -> NamedTuple('outputs', [('best_model_name', str)]):
    '''Outputs the name of the model with the highest accuracy.'''
    
    # Determine the model with the highest accuracy
    best_model_name = "RandomForest" if rf_accuracy >= nb_accuracy else "NaiveBayes"

    outputs = NamedTuple('outputs', [('best_model_name', str)])
    return outputs(best_model_name)

In [17]:
@dsl.component(
    packages_to_install=["google-cloud-storage", "joblib"],
    base_image="python:3.10.7-slim"
)
def upload_model_to_gcs(project_id: str, model_repo: str, model: Input[Model]):
    '''upload model to gsc'''
    from google.cloud import storage   
    import logging 
    import sys
    import joblib
    
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)    
  
    # upload the model to GCS
    client = storage.Client(project=project_id)
    bucket = client.bucket(model_repo)
    blob = bucket.blob('model.joblib')
    source_file_name= model.path + '.joblib'
   
    blob.upload_from_filename(source_file_name)    
    
    print(f"File {source_file_name} uploaded to {model_repo}.")

In [18]:
# Define the workflow of the pipeline.
@kfp.dsl.pipeline(
    name="sms-predictor-training-pipeline")
def pipeline(project_id: str, data_bucket: str, dataset_uri: str, model_repo: str, thresholds_dict_str:str, model_repo_uri:str):    
    
    dataset_op = kfp.dsl.importer(
        artifact_uri=dataset_uri,
        artifact_class=Dataset,
        reimport=False,
    )
     
    train_test_split_op = train_test_split(dataset=dataset_op.output)
        
    training_lr_job_run_op_rf = train_rf(features=train_test_split_op.outputs["dataset_train"])
    training_lr_job_run_op_nb = train_nb(features=train_test_split_op.outputs["dataset_train"])
    
    model_evaluation_op_rf = model_evaluation(
        test_set=train_test_split_op.outputs["dataset_test"],
        model=training_lr_job_run_op_rf.outputs["rf_model"],
        thresholds_dict_str=thresholds_dict_str, # I deploy the model anly if the model performance is above the threshold
    )
    
    model_evaluation_op_nb = model_evaluation(
        test_set=train_test_split_op.outputs["dataset_test"],
        model=training_lr_job_run_op_nb.outputs["nb_model"],
        thresholds_dict_str=thresholds_dict_str, # I deploy the model anly if the model performance is above the threshold
    )
    
    best_model_name_op = get_best_model_name(
    rf_accuracy=model_evaluation_op_rf.outputs["accuracy"],
    nb_accuracy=model_evaluation_op_nb.outputs["accuracy"]
)
    
    # Conditionally upload RandomForest model if it's approved and has the highest accuracy
    with dsl.If(model_evaluation_op_rf.outputs["approval"] == True):
        with dsl.If(best_model_name_op.outputs["best_model_name"] == "RandomForest"):
            upload_model_to_gc_op_rf = upload_model_to_gcs(
                project_id=project_id,
                model_repo=model_repo,
                model=training_lr_job_run_op_rf.outputs['rf_model']
            )
    
    # Conditionally upload NaiveBayes model if it's approved and has the highest accuracy
    with dsl.If(model_evaluation_op_nb.outputs["approval"] == True):
        with dsl.If(best_model_name_op.outputs["best_model_name"] == "NaiveBayes"):
            upload_model_to_gc_op_nb = upload_model_to_gcs(
                project_id=project_id,
                model_repo=model_repo,
                model=training_lr_job_run_op_nb.outputs['nb_model']
            )

In [19]:
from kfp import compiler
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='sms_prdictor_training_pipeline.yaml')

In [20]:
import google.cloud.aiplatform as aip

# Before initializing, make sure to set the GOOGLE_APPLICATION_CREDENTIALS
# environment variable to the path of your service account.
aip.init(
    project=PROJECT_ID,
    location=REGION,
)

# Prepare the pipeline job
job = aip.PipelineJob(
    display_name="sms-predictor",
    enable_caching=False,
    template_path="sms_prdictor_training_pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,
    location=REGION,
    parameter_values={
        'project_id': PROJECT_ID, # makesure to use your project id 
        'data_bucket': 'data_de2024_bz',  # makesure to use your data bucket name 
        'dataset_uri':'gs://data_de2024_bz/SMS_preporcessed.csv',
        'model_repo':'models_de2024_bz', # makesure to use your model bucket name 
        'thresholds_dict_str':'{"roc":0.8}',
        'model_repo_uri':'gs://models_de2024_bz' # makesure to use your model bucket name 
    }
)

job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/471717444362/locations/us-central1/pipelineJobs/sms-predictor-training-pipeline-20241025152119
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/471717444362/locations/us-central1/pipelineJobs/sms-predictor-training-pipeline-20241025152119')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/sms-predictor-training-pipeline-20241025152119?project=471717444362
PipelineJob projects/471717444362/locations/us-central1/pipelineJobs/sms-predictor-training-pipeline-20241025152119 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/471717444362/locations/us-central1/pipelineJobs/sms-predictor-training-pipeline-20241025152119 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/471717444362/locations/us-central1/pipelineJobs/sms-predictor-training-pipeline-20241025152119 current state:
PipelineStat