In [None]:
# import required libraries
import kfp
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output, OutputPath, ClassificationMetrics,
                        Metrics, component)
import os
import re
from pathlib import Path

from datetime import date
from datetime import timedelta
from dateutil.relativedelta import relativedelta

import google
from google.oauth2 import credentials
from google.oauth2 import service_account
from google.oauth2.service_account import Credentials
from google.cloud import storage
from google.cloud.aiplatform import pipeline_jobs
from google_cloud_pipeline_components.v1.batch_predict_job import \
    ModelBatchPredictOp as batch_prediction_op
from typing import NamedTuple

In [None]:
BUCKET_NAME='divg-josh-pr-d1cc3a-default'
REGION = "northamerica-northeast1"
PIPELINE_ROOT = f"gs://{BUCKET_NAME}"

In [None]:
@component(base_image="northamerica-northeast1-docker.pkg.dev/cio-workbench-image-np-0ddefe/wb-platform/pipelines/kubeflow-pycaret:latest", output_component_file="component_one.yaml")
def input_integer(num: int) -> int:
    return num

In [None]:
@component(base_image="northamerica-northeast1-docker.pkg.dev/cio-workbench-image-np-0ddefe/wb-platform/pipelines/kubeflow-pycaret:latest", output_component_file="component_two.yaml")
def double_square(
    num: int
) -> NamedTuple(
    "Outputs",
    [
        ("value", int),  # Return parameters
        ("double", int),
        ("square", int)
    ],
):
    double = num * 2 
    square = num * num

    print(f"input value: {num}, double: {double}, square: {square}") 
    return (num, double, square)

In [None]:
@component(base_image="northamerica-northeast1-docker.pkg.dev/cio-workbench-image-np-0ddefe/wb-platform/pipelines/kubeflow-pycaret:latest", output_component_file="component_three.yaml")
def show_results(
    num: int,
    double: int,
    square: int
) -> str:
    print("Here is the output: ")

    end_str = f"The double of {num} is {double}, and the square of {num} is {square}"

    return end_str

In [None]:
@dsl.pipeline(
    name="demo-pipeline",
    description="vertex pipeline example",
    pipeline_root=PIPELINE_ROOT,
)
# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def pipeline(num: int = 5):
    
    # ----- component 1 --------
    input_integer_op = input_integer(num)
    
    # ----- component 2 --------
    double_square_op = double_square(num)
    
    # ----- component 3 --------
    show_results_op = show_results(
            input_integer_op.output,
            double_square_op.outputs["double"],
            double_square_op.outputs["square"] ,
        ) 
    
    show_results_op.after(input_integer_op)
    show_results_op.after(double_square_op)


In [None]:
import google.oauth2.credentials
import json

token = !gcloud auth print-access-token
CREDENTIALS = google.oauth2.credentials.Credentials(token[0])

compiler.Compiler().compile(
   pipeline_func=pipeline, package_path="pipeline.json"
)

job = pipeline_jobs.PipelineJob(
   display_name="demo-pipeline-job",
   template_path="pipeline.json",
   credentials = CREDENTIALS,
   pipeline_root = PIPELINE_ROOT,
   location=REGION,
   enable_caching=True # I encourage you to enable caching when testing as it will reduce resource use
)

job.run()

In [None]:
# @dsl.pipeline(
#     name="hello-world",
#     description="An intro pipeline",
#     pipeline_root=PIPELINE_ROOT,
# )
# # You can change the `text` and `emoji_str` parameters here to update the pipeline output
# def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
#     product_task = product_name(text)
#     emoji_task = emoji(emoji_str)
#     consumer_task = build_sentence(
#         product_task.output,
#         emoji_task.outputs["emoji"],
#         emoji_task.outputs["emoji_text"],
#     )

In [None]:
def double_square(
    num: int,
) -> NamedTuple(
    "Outputs",
    [
        ("value", int),  # Return parameters
        ("double", int),
        ("square", int)
    ],
):
    double = num * 2 
    square = num * num

    print(f"input value: {num}, double: {double}, square: {square}") 
    return (num, double, square)

In [None]:
double_square(5)

## MLOps With Kubeflow Pipelines (Part 1)

### 1) Data Extraction

In [None]:
# Importing libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn import metrics
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
import xgboost as xgb
from sklearn.metrics import roc_auc_score

# import the entire dataset into `data` for quick EDA
data = load_breast_cancer() 
df = pd.DataFrame(data = data.data, columns = data.feature_names) 
df['target'] = pd.Series(data.target) 

#import features into X and target into y for training
X, y = load_breast_cancer(return_X_y = True, as_frame = True) 

# Create the training and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=123)

df.head()

In [None]:
df.groupby('target')[['mean radius', 'mean perimeter', 'mean area']].mean()

In [None]:
df['target'].value_counts()

In [None]:
# Check target names
print(data.target_names)

# Check value counts
print(df['target'].value_counts()) 

# Analyzing the target variable
plt.title('Count of cancer type')
sns.countplot(x=df['target'])
plt.xlabel('Cancer Severity')
plt.ylabel('Count')
plt.show()

In [None]:
# Plotting correlation between diagnosis and radius
plt.figure(figsize=(15,5))
plt.subplot(1,3,1)
sns.boxplot(x="target", y="mean radius", data=df)
plt.subplot(1,3,2)
sns.boxplot(x="target", y="mean perimeter", data=df)
plt.subplot(1,3,3)
sns.boxplot(x="target", y="mean concave points", data=df)
plt.show()

In [None]:
df.columns

In [None]:
# Import Matplotlib and Seaborn
import matplotlib.pyplot as plt
import seaborn as sns

# Create scatter plot of horsepower vs. mpg
sns.relplot(x='mean radius', y='mean concave points', data=df, style='target', hue='target', kind='scatter')

# Show plot
plt.show()

In [None]:
# Size the plot
plt.figure(figsize=(15,4))

# Plotting bivariate relations between each pair of features 
sns.pairplot(df, hue="target", vars = ["mean radius", "mean concave points", "mean texture", "mean smoothness"])

# Show plot
plt.show()

In [None]:
# Viewing the data statistics
df.describe()

### 2) Model Training

In [None]:
# Import Libraries
import kfp
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output, OutputPath, ClassificationMetrics,
                        Metrics, component)
from google.cloud.aiplatform import pipeline_jobs
from typing import NamedTuple
import google
from google.oauth2 import credentials
from google.oauth2 import service_account
from google.oauth2.service_account import Credentials
from google.cloud import storage
from google.cloud.aiplatform import pipeline_jobs
from google_cloud_pipeline_components.v1.batch_predict_job import \
    ModelBatchPredictOp as batch_prediction_op

# Import Dataset and Save in GCS
@component(
    base_image="northamerica-northeast1-docker.pkg.dev/cio-workbench-image-np-0ddefe/wb-platform/pipelines/kubeflow-pycaret:latest",
    output_component_file="import_data.yaml",
)
def import_data(
            project_id: str,
            dataset_id: str,
            file_bucket: str, 
    ) -> str: 
    # Import Libraries
    import pandas as pd
    import numpy as np
    from sklearn.datasets import load_breast_cancer

    # import the entire dataset into 'data'
    data = load_breast_cancer() 
    
    # save the data in df, including the targets
    df = pd.DataFrame(data = data.data, columns = data.feature_names) 
    df['target'] = pd.Series(data.target) 
    
    # save df in cloud storage 
    save_path = f'gs://{file_bucket}/{dataset_id}/{dataset_id}_data.csv'
    df.to_csv(save_path, index=True) 
    
    print(f'{dataset_id}_data.csv saved in {save_path}')
    
    return save_path

# Load Dataset and Train Model
@component(
    base_image="northamerica-northeast1-docker.pkg.dev/cio-workbench-image-np-0ddefe/wb-platform/pipelines/kubeflow-pycaret:latest",
    output_component_file="train_model.yaml",
)
def model_training(
            project_id: str,
            dataset_id: str,
            file_bucket: str, 
            save_path: str,
            metrics: Output[Metrics],
            metricsc: Output[ClassificationMetrics]
    ) -> NamedTuple(
        "Outputs",
        [
            ("accuracy", int),  # Return parameters
            ("roc_auc", int),
            ("f1_score", int)
        ],
    ):
    # Import Libraries
    import pandas as pd
    import numpy as np
    import matplotlib.pyplot as plt
    import seaborn as sns
    from sklearn import metrics
    from sklearn.datasets import load_breast_cancer
    from sklearn.model_selection import train_test_split
    import xgboost as xgb
    from sklearn.metrics import roc_auc_score, accuracy_score, precision_score, recall_score, f1_score
    
    # Read csv that was saved in 'import_data' component
    df = pd.read_csv(save_path)  

    # X and y
    y = np.squeeze(df['target'].values)
    X = df.drop(columns='target')

    # Create the training and test sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=123)

    # Instantiate the XGB Classifier: xgb_model
    xgb_model = xgb.XGBClassifier(
        learning_rate=0.01,
        n_estimators=100,
        max_depth=8,
        min_child_weight=1,
        max_delta_step=1, 
        colsample_bytree=0.9,
        subsample=0.9,
        objective='binary:logistic',
        nthread=4,
        scale_pos_weight=1, 
        eval_metric='auc', 
        base_score=0.5
    )

    # Fit the classifier to the training set
    xgb_model.fit(X_train, y_train)
    
    # Predict based on X_test
    y_pred = xgb_model.predict(X_test)
    y_pred_proba = xgb_model.predict_proba(X_test)[:, 1]
    
    # Model accuracy 
    accuracy = accuracy_score(y_test, y_pred)
    print("Accuracy:", accuracy)
    
    # ROC AUC Score
    roc_auc = roc_auc_score(y_test, y_pred_proba)
    print("ROC AUC Score:", roc_auc)
    
    # F1 Score 
    f1_score = f1_score(y_test, y_pred)
    print("F1 Score:", f1_score)
    
    return (accuracy, roc_auc, f1_score)
    

In [None]:
#tag cell with parameters
PROJECT_ID =  'divg-josh-pr-d1cc3a'
BUCKET_NAME='divg-josh-pr-d1cc3a-default'
DATASET_ID = 'breast_cancer'
RESOURCE_BUCKET = 'divg-josh-pr-d1cc3a-default'
FILE_BUCKET = 'divg-josh-pr-d1cc3a-default'
MODEL_ID = '5070'
REGION = 'northamerica-northeast1'

In [None]:
# library imports
from kfp.v2 import compiler
from google.cloud.aiplatform import pipeline_jobs
@dsl.pipeline(
    name='breast-cancer-pipeline', 
    description='breast-cancer-pipeline'
    )
def pipeline(
        project_id: str = PROJECT_ID,
        region: str = REGION,
        resource_bucket: str = RESOURCE_BUCKET, 
        file_bucket: str = FILE_BUCKET
    ):
    
    import google.oauth2.credentials
    token = !gcloud auth print-access-token
    token_str = token[0]
    
    # ----- create training set --------
    import_data_op = import_data(project_id=PROJECT_ID,
                          dataset_id=DATASET_ID,
                          file_bucket=FILE_BUCKET)
    
    model_training_op = model_training(project_id=PROJECT_ID,
                          dataset_id=DATASET_ID,
                          file_bucket=FILE_BUCKET, 
                          save_path=import_data_op.output)
    
    
    model_training_op.after(import_data_op)
    

In [None]:
import google.oauth2.credentials
import json

token = !gcloud auth print-access-token
CREDENTIALS = google.oauth2.credentials.Credentials(token[0])

compiler.Compiler().compile(
   pipeline_func=pipeline, package_path="pipeline.json"
)

job = pipeline_jobs.PipelineJob(
   display_name='breast-cancer-pipeline',
   template_path="pipeline.json",
   credentials = CREDENTIALS,
   pipeline_root = f"gs://{FILE_BUCKET}",
   location=REGION,
   enable_caching=True # I encourage you to enable caching when testing as it will reduce resource use
)

job.run()