In [1]:
# Install the packages
! pip3 install --user --no-cache-dir --upgrade "kfp>2" "google-cloud-pipeline-components>2" \
                                        google-cloud-aiplatform



In [2]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

In [89]:
import kfp
from kfp import dsl
from kfp.dsl import (Artifact,
                        Dataset,
                        Input,
                        Output,
                        ClassificationMetrics,)

### Pipeline config

In [37]:
#The Google Cloud project that this pipeline runs in.
PROJECT_ID = ""
# The region that this pipeline runs in
REGION = ""
# Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored within the pipeline root.
PIPELINE_ROOT = ""
# image registry location
IMAGE_REG = ""

In [98]:
@dsl.container_component
def toxic_data_ingestion(project: str, bucket: str, data_file_name: str,  features: Output[Dataset]):

    return dsl.ContainerSpec(
        image=f'{REGION}-docker.pkg.dev/{PROJECT_ID}/{IMAGE_REG}/toxic-data-ingestor:0.0.1',
        command=[
            'python3', '/pipelines/component/src/component.py'
        ],
        args=['--project_id',project,'--bucket',bucket,'--file_name',data_file_name,'--feature_path', features.path])

In [99]:
@dsl.container_component
def toxic_data_cleaner(features: Input[Dataset], cleaned_data: Output[Dataset]):

    return dsl.ContainerSpec(
        image=f'{REGION}-docker.pkg.dev/{PROJECT_ID}/{IMAGE_REG}/toxic-data-cleaner:0.0.1',
        command=[
            'python3', '/pipelines/component/src/component.py'
        ],
        args=['--raw_data_path',features.path,
              '--cleaned_data_path', cleaned_data.path
             ]
    )

In [100]:
@dsl.container_component
def toxic_train_test_split(cleaned_data: Input[Dataset], train: Output[Dataset], test: Output[Dataset]):

    return dsl.ContainerSpec(
        image=f'{REGION}-docker.pkg.dev/{PROJECT_ID}/{IMAGE_REG}/toxic-train-test-split:0.0.1',
        command=[
            'python3', '/pipelines/component/src/component.py'
        ],
        args=['--clean_data_path',cleaned_data.path,
              '--train_path', train.path,
              '--test_path', test.path
             ]
    )

In [101]:
@dsl.container_component
def multilabel_classifier_trainer(train: Input[Dataset], models: Output[Artifact]):

    return dsl.ContainerSpec(
        image=f'{REGION}-docker.pkg.dev/{PROJECT_ID}/{IMAGE_REG}/toxic-multilabel-trainer:0.0.1',
        command=[
            'python3', '/pipelines/component/src/component.py'
        ],
        args=['--train_path', train.path,
              '--models_path',models.path
             ]
    )

In [102]:
@dsl.container_component
def toxic_predictor(project: str, test: Input[Dataset], models: Input[Artifact], predicted_data_path: Output[Artifact]):

    return dsl.ContainerSpec(
        image=f'{REGION}-docker.pkg.dev/{PROJECT_ID}/{IMAGE_REG}/toxic-predictor:0.0.1',
        command=[
            'python3', '/pipelines/component/src/component.py'
        ],
        args=['--project_id', project,
              '--predict_data', test.path,
              '--model_repo',models.path,
              '--predicted_data_path', predicted_data_path.path,
              '--validation_data'
             ]
    )

In [103]:
@dsl.component(
    packages_to_install=['scikit-learn', 'pandas', 'numpy', 'google-cloud-storage'],
    base_image='python:3.10',
)
def metrics_calculation(metrics: Input[Artifact],
                        models: Input[Artifact],
                        project_id: str,
                        data_bucket: str,
                        model_repo: str,
                        toxic: Output[ClassificationMetrics],
                        severe_toxic: Output[ClassificationMetrics],
                        obscene: Output[ClassificationMetrics],
                        threat: Output[ClassificationMetrics],
                        insult: Output[ClassificationMetrics],
                        identity_hate: Output[ClassificationMetrics],
                        ):
    import pandas as pd
    import tarfile
    import shutil
    import json
    from sklearn.metrics import confusion_matrix, roc_curve, accuracy_score, precision_score, recall_score, f1_score
    from google.cloud import storage, exceptions
    import os
    from numpy import nan_to_num
    from pathlib import Path
    import logging
    import requests

    def reload_prediction_models(project_id, bucket_name, filename, dest_filename):
        client = storage.Client(project=project_id)
        bucket = client.bucket(bucket_name)
        blob = bucket.blob(filename)

        try:  # Downloading the url file of the prediction api
            Path(dest_filename).parent.mkdir(parents=True, exist_ok=True)
            blob.download_to_filename(filename=dest_filename)
            logging.info('...Download url file...')
        except exceptions.NotFound:
            logging.error(f"File {filename} does not exist in bucket {bucket_name}.")
        with open(dest_filename, 'r') as f:
            url = f.read() + '/reload_model'

        try:  # Sending model reload request to endpoint
            response = requests.post(url)
            if response.status_code == 200:
                logging.info('...Models reloaded...')
            else:
                logging.error('...Something went wrong with reloading the models...')
        except requests.exceptions.RequestException as e:
            logging.error(f"HTTP request failed: {e}")

    shutil.copyfile(metrics.path, 'metrics.tar')
    with tarfile.open('metrics.tar', 'r') as tar:
        tar.extractall(path='.')

    y = pd.read_csv('y.csv')
    y_pred = pd.read_csv('y_pred.csv')
    y_pred_proba = pd.read_csv('y_pred_proba.csv')

    metrics = {
        'accuracy': accuracy_score(y, y_pred),
        'precision': precision_score(y, y_pred, average='micro'),
        'recall': recall_score(y, y_pred, average='micro'),
        'f1': f1_score(y, y_pred, average='micro')
    }

    # load metrics.json from data bucket from current model
    client = storage.Client(project=project_id)
    bucket = client.get_bucket(data_bucket)
    temp_path = 'temp'
    if not os.path.exists(temp_path):
        os.makedirs(temp_path)
    local_file = os.path.join(temp_path, 'metrics.json')
    blob = bucket.blob('metrics.json')
    blob.download_to_filename(local_file)
    with open(local_file, 'r') as f:
        metrics_json = json.load(f)

    # compare current metrics with metrics from data bucket
    # if f1 score is higher, upload new metrics.json to data bucket
    if metrics['f1'] > metrics_json['f1']:
        # save metrics locally
        with open('metrics.json', 'w') as outfile:
            json.dump(metrics, outfile)
        # upload metrics to data bucket
        blob = bucket.blob('metrics.json')
        blob.upload_from_filename('metrics.json')
        # unpack model tar file
        shutil.copyfile(models.path, 'models.tar')
        extraction_path = 'models'
        if not os.path.exists(extraction_path):
            os.makedirs(extraction_path, exist_ok=True)
        with tarfile.open('models.tar', 'r') as tar:
            tar.extractall(path=extraction_path)

        # upload model to data bucket
        for file in os.listdir(extraction_path):
            bucket = client.get_bucket(model_repo)
            blob = bucket.blob(file)
            blob.upload_from_filename(os.path.join(extraction_path, file))

        reload_prediction_models(project_id, 'temp_de2023_group1', 'predict_api_url.text',
                             'tmp/predict_api_url.text')



    # a subset is used for visualization because the full dataset is too large
    # the limit for metric logging is 131kb
    # our full prediction output is 1 mb
    #Sample indices from the y dataframe
    sampled_indices = y.sample(frac=0.1, random_state=1).index

    # Use these indices to subset all three dataframes
    y = y.loc[sampled_indices]
    y_pred = y_pred.loc[sampled_indices]
    y_pred_proba = y_pred_proba.loc[sampled_indices]

    # Mapping column names to the Output[ClassificationMetrics] objects
    col_to_output = {
        'toxic': toxic,
        'severe_toxic': severe_toxic,
        'obscene': obscene,
        'threat': threat,
        'insult': insult,
        'identity_hate': identity_hate
    }

    for col in y.columns:
        fpr, tpr, thresholds = roc_curve(y_true=y[col], y_score=y_pred_proba[col],pos_label=True)
        thresholds = nan_to_num(thresholds)
        col_to_output[col].log_roc_curve(fpr, tpr, thresholds)  # Use the mapped object

        # Log confusion matrix
        col_to_output[col].log_confusion_matrix(['Negative', 'Positive'], confusion_matrix(y[col], y_pred[col]).tolist())



### Define the pipeline

In [104]:
# Define the workflow of the pipeline.
@kfp.dsl.pipeline(
    name="toxic-predictor")
def pipeline(project_id: str, data_bucket: str, trainset_filename: str, model_repo: str):

    # Ingestor
    di_op = toxic_data_ingestion(
        project=project_id,
        bucket=data_bucket,
        data_file_name=trainset_filename
    )

    # Cleaner
    dc_op = toxic_data_cleaner(
        features=di_op.outputs['features']
    )

    # Train-test-split
    tts_op = toxic_train_test_split(
        cleaned_data=dc_op.outputs['cleaned_data']
    )

    # Trainer
    mc_op = multilabel_classifier_trainer(
        train=tts_op.outputs['train'],
    )

    # Prediction and validation
    tp_op = toxic_predictor(
        project=project_id,
        test=tts_op.outputs['test'],
        models=mc_op.outputs['models']
    )

    # Metrics
    metrics_op = metrics_calculation(
        metrics=tp_op.outputs['predicted_data_path'],
        project_id=project_id,
        data_bucket=data_bucket,
        model_repo=model_repo,
        models=mc_op.outputs['models'],
    )

### Compile the Pipeline

In [105]:
from kfp import compiler
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='toxic_predictor_train_pipeline.yaml')

### Run the Pipeline

In [88]:
import google.cloud.aiplatform as aip
import os

# Before initializing, set the GOOGLE_APPLICATION_CREDENTIALS environment variable to the path of your service account.
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "assignment1-402316-dc4baf177723.json"

aip.init(
    project=PROJECT_ID,
    location=REGION,
)

job = aip.PipelineJob(
    display_name="toxic-predictor-train-pipeline",
    template_path="toxic_predictor_train_pipeline.yaml",
    enable_caching=False,
    pipeline_root=PIPELINE_ROOT,
    parameter_values={
        'project_id': PROJECT_ID,
        'data_bucket': 'data_de2023_group1',
        'trainset_filename': 'train.csv',
        'model_repo':'models_de2023_group1'
    }
)

job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/791882449847/locations/us-central1/pipelineJobs/toxic-predictor-20231025233918
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/791882449847/locations/us-central1/pipelineJobs/toxic-predictor-20231025233918')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/toxic-predictor-20231025233918?project=791882449847
PipelineJob projects/791882449847/locations/us-central1/pipelineJobs/toxic-predictor-20231025233918 current state:
3
PipelineJob projects/791882449847/locations/us-central1/pipelineJobs/toxic-predictor-20231025233918 current state:
3
PipelineJob projects/791882449847/locations/us-central1/pipelineJobs/toxic-predictor-20231025233918 current state:
3
PipelineJob projects/791882449847/locations/us-central1/pipelineJobs/toxic-predictor-20231025233918 current state:
3
PipelineJob projects/791882449847/locations/us-central1/pip