In [7]:
pip install --upgrade pip

Note: you may need to restart the kernel to use updated packages.


In [1]:
!pip show kfp

Name: kfp
Version: 2.7.0
Summary: Kubeflow Pipelines SDK
Home-page: https://github.com/kubeflow/pipelines
Author: The Kubeflow Authors
Author-email: 
License: 
Location: /opt/conda/lib/python3.10/site-packages
Requires: click, docstring-parser, google-api-core, google-auth, google-cloud-storage, kfp-pipeline-spec, kfp-server-api, kubernetes, protobuf, PyYAML, requests-toolbelt, tabulate, urllib3
Required-by: kfp-kubernetes


In [4]:
!pip install click docstring-parser  google-api-core google-auth google-cloud-storage kfp-pipeline-spec kfp-server-api kubernetes protobuf PyYAML requests-toolbelt tabulate urllib3



In [28]:
!pip install --upgrade kfp



In [45]:
!pip install kfp-kubernetes



In [1]:
!sudo apt-get install google-cloud-cli-gke-gcloud-auth-plugin

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
google-cloud-cli-gke-gcloud-auth-plugin is already the newest version (476.0.0-0).
0 upgraded, 0 newly installed, 0 to remove and 2 not upgraded.


In [23]:
!gcloud container clusters get-credentials cluster-1 --zone us-central1-a --project mtp-yusuf

Fetching cluster endpoint and auth data.
kubeconfig entry generated for cluster-1.


In [21]:
project = !gcloud config get-value project
project_id = project[0]
project_id

'mtp-yusuf'

In [22]:
REGION = 'us-central1'
DATANAME = 'myModel'
NOTEBOOK = 'mynotebook'

# Resources
DEPLOY_COMPUTE = 'n1-standard-2'

In [23]:
from google.cloud import aiplatform
from datetime import datetime
import kfp
from kfp import compiler
#import kfp.v2.dsl as dsl

from google.cloud import bigquery
from google.protobuf import json_format
from google.protobuf.struct_pb2 import Value
import json
import numpy as np

In [24]:
aiplatform.init(project=project_id, location=REGION)
bq = bigquery.Client()

In [25]:
service_account = !gcloud config list --format='value(core.account)' 
service_account = service_account[0]
service_account

'1042216787272-compute@developer.gserviceaccount.com'

In [26]:
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
BUCKET = 'mtp-yusuf-kubeflowpipelines-default'
URI = f"gs://{BUCKET}/{DATANAME}/models/{NOTEBOOK}"
DIR = f"temp/{NOTEBOOK}"

In [27]:
!rm -rf {DIR}
!mkdir -p {DIR}

In [28]:
import kfp
import kfp.components as comp
import requests
import kfp.dsl as dsl

In [29]:
@dsl.component(
    base_image='python:3.8',
    packages_to_install=['google-cloud-storage','pandas','gcsfs']
)
def data_preparation() -> str:
    # Write data to GCS bucket
    from google.cloud import storage
    import pandas as pd
    
    bucket_name = 'mtp-yusuf-kubeflowpipelines-default'
    file_path = 'data/processed_dataset.csv'  # Updated file path

    # Load dataset
    df = pd.read_csv("gs://mtp-yusuf-kubeflowpipelines-default/data/iris.csv")
    df = df.dropna()
    
    # Save dataframe to CSV string
    csv_string = df.to_csv(index=False)

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(file_path)
    blob.upload_from_string(csv_string)

    return f'gs://{bucket_name}/{file_path}'

In [30]:
@dsl.component(
    base_image='python:3.8',
    packages_to_install=['google-cloud-storage','pandas','numpy','scikit-learn','gcsfs']
)
def train_test_split() -> str:
    # Write data to GCS bucket
    from google.cloud import storage
    import pandas as pd
    import numpy as np
    from sklearn.model_selection import train_test_split
    
    bucket_name = 'mtp-yusuf-kubeflowpipelines-default'
    
    # Load and preprocess data
    final_data = pd.read_csv("gs://mtp-yusuf-kubeflowpipelines-default/data/processed_dataset.csv")  # Assuming final_df.csv is already uploaded to GCS
    
    target_column = 'class'
    X = final_data.loc[:, final_data.columns != target_column]
    y = final_data.loc[:, final_data.columns == target_column]
    
    # Split data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, stratify=y, random_state=47)
    
    # Save arrays to GCS bucket
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    
    # Save X_train
    X_train_blob = bucket.blob('data/X_train.npy')
    with X_train_blob.open("wb") as f:
        np.save(f, X_train)
    
    # Save X_test
    X_test_blob = bucket.blob('data/X_test.npy')
    with X_test_blob.open("wb") as f:
        np.save(f, X_test)
    
    # Save y_train
    y_train_blob = bucket.blob('data/y_train.npy')
    with y_train_blob.open("wb") as f:
        np.save(f, y_train)
    
    # Save y_test
    y_test_blob = bucket.blob('data/y_test.npy')
    with y_test_blob.open("wb") as f:
        np.save(f, y_test)
    
    return f'gs://{bucket_name}/data/'


In [31]:
@dsl.component(
    base_image='python:3.8',
    packages_to_install=['google-cloud-storage','pandas','numpy','scikit-learn','gcsfs']
)
def training() -> str:
    # Write data to GCS bucket
    from google.cloud import storage
    import pandas as pd
    import numpy as np
    from sklearn.linear_model import LogisticRegression
    
    bucket_name = 'mtp-yusuf-kubeflowpipelines-default'
    
    # Load data
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    
    # Load X_train and y_train from GCS
    X_train_blob = bucket.blob('data/X_train.npy')
    with X_train_blob.open("rb") as f:
        X_train = np.load(f, allow_pickle=True)
    
    y_train_blob = bucket.blob('data/y_train.npy')
    with y_train_blob.open("rb") as f:
        y_train = np.load(f, allow_pickle=True)
    
    # Train classifier
    classifier = LogisticRegression(max_iter=500)
    classifier.fit(X_train, y_train)
    
    # Save trained model to GCS
    model_blob = bucket.blob('data/model.pkl')
    with model_blob.open("wb") as f:
        import pickle
        pickle.dump(classifier, f)
    
    return f'gs://{bucket_name}/data/'



In [32]:
@dsl.component(
    base_image='python:3.8',
    packages_to_install=['google-cloud-storage','pandas','numpy','scikit-learn']
)
def predictions() -> str:
    # Write data to GCS bucket
    from google.cloud import storage
    import pandas as pd
    import numpy as np
    import pickle
    
    bucket_name = 'mtp-yusuf-kubeflowpipelines-default'
    
    # Load data
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    
    # Load model from GCS
    model_blob = bucket.blob('data/model.pkl')
    with model_blob.open("rb") as f:
        logistic_reg_model = pickle.load(f)
    
    # Load X_test from GCS
    X_test_blob = bucket.blob('data/X_test.npy')
    with X_test_blob.open("rb") as f:
        X_test = np.load(f, allow_pickle=True)
    print("1")
    
    # Predict on test data
    y_pred = logistic_reg_model.predict(X_test)
    print("2")
    
    # Save predictions to GCS
    y_pred_blob = bucket.blob('data/y_pred.npy')
    with y_pred_blob.open("wb") as f:
        np.save(f, y_pred)
    print("3")
    return f'gs://{bucket_name}/data/'


In [33]:
@dsl.component(
    base_image='python:3.8',
    packages_to_install=['google-cloud-storage','numpy', 'pandas', 'scikit-learn']
)
def prediction_probability() -> str:
    # Write data to GCS bucket
    from google.cloud import storage
    import pandas as pd
    import numpy as np
    import pickle
    
    bucket_name = 'mtp-yusuf-kubeflowpipelines-default'
    
    # Load data
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    
    # Load model from GCS
    model_blob = bucket.blob('data/model.pkl')
    with model_blob.open("rb") as f:
        logistic_reg_model = pickle.load(f)
    
    # Load X_test from GCS
    X_test_blob = bucket.blob('data/X_test.npy')
    with X_test_blob.open("rb") as f:
        X_test = np.load(f, allow_pickle=True)
    
    # Predict probabilities on test data
    y_pred_prob = logistic_reg_model.predict_proba(X_test)
    
    # Save predicted probabilities to GCS
    y_pred_prob_blob = bucket.blob('data/y_pred_prob.npy')
    with y_pred_prob_blob.open("wb") as f:
        np.save(f, y_pred_prob)
    
    return f'gs://{bucket_name}/data/'


In [34]:
@dsl.component(
    base_image='python:3.8',
    packages_to_install=['google-cloud-storage','numpy', 'pandas', 'scikit-learn']
)
def metrics() -> str:
    # Write data to GCS bucket
    from google.cloud import storage
    import pandas as pd
    import numpy as np
    from sklearn.metrics import accuracy_score, precision_score, recall_score, log_loss, classification_report
    
    bucket_name = 'mtp-yusuf-kubeflowpipelines-default'
    
    # Load data
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    
    # Load y_test, y_pred, and y_pred_prob from GCS
    y_test_blob = bucket.blob('data/y_test.npy')
    with y_test_blob.open("rb") as f:
        y_test = np.load(f, allow_pickle=True)
    
    y_pred_blob = bucket.blob('data/y_pred.npy')
    with y_pred_blob.open("rb") as f:
        y_pred = np.load(f, allow_pickle=True)
    
    y_pred_prob_blob = bucket.blob('data/y_pred_prob.npy')
    with y_pred_prob_blob.open("rb") as f:
        y_pred_prob = np.load(f, allow_pickle=True)
    
    # Calculate metrics
    acc = accuracy_score(y_test, y_pred)
    prec = precision_score(y_test, y_pred, average='micro')
    recall = recall_score(y_test, y_pred, average='micro')
    entropy = log_loss(y_test, y_pred_prob)
    
    # Generate classification report
    report = classification_report(y_test, y_pred)
    print(report)
    
    # Print and return metrics
    metrics_dict = {'accuracy': round(acc, 2), 'precision': round(prec, 2), 'recall': round(recall, 2), 'entropy': round(entropy, 2)}
    print("\n Model Metrics:", metrics_dict)
    
    return f'gs://{bucket_name}/data/'


In [35]:
import kfp.dsl as dsl

@dsl.pipeline(
    name = f'kfp-{NOTEBOOK}-{DATANAME}-{TIMESTAMP}',
    pipeline_root = URI+'/'+str(TIMESTAMP)+'/kfp/'
)
def my_pipeline():
    # Define tasks
    task1 = data_preparation()
    task1.set_caching_options(False)
    task2 = train_test_split()
    task2.set_caching_options(False)
    task3 = training()
    task3.set_caching_options(False)
    task4 = predictions()
    task4.set_caching_options(False)
    task5 = prediction_probability()
    task5.set_caching_options(False)
    task6 = metrics()
    task6.set_caching_options(False)

    # Define dependencies
    task2.after(task1)
    task3.after(task2)
    task4.after(task3, task2)
    task5.after(task4)
    task6.after(task4, task5)


In [36]:
# Compile the pipeline
pipeline_file = 't1_kfp_pipeline_sm.yaml'
kfp.compiler.Compiler().compile(my_pipeline, pipeline_file)

In [37]:
compiler.Compiler().compile(
    pipeline_func = my_pipeline,
    package_path = f"{DIR}/{NOTEBOOK}.json"
)

In [38]:
!gsutil cp {DIR}/{NOTEBOOK}.json {URI}/{TIMESTAMP}/kfp/

Copying file://temp/mynotebook/mynotebook.json [Content-Type=application/json]...
/ [1 files][ 17.1 KiB/ 17.1 KiB]                                                
Operation completed over 1 objects/17.1 KiB.                                     


In [39]:
pipeline = aiplatform.PipelineJob(
    display_name = f'{NOTEBOOK}_{DATANAME}_{TIMESTAMP}',
    template_path = f"{URI}/{TIMESTAMP}/kfp/{NOTEBOOK}.json",
    labels = {'notebook': NOTEBOOK},
    enable_caching=False
)

In [None]:
response = pipeline.run(
    service_account = service_account
)

Creating PipelineJob
PipelineJob created. Resource name: projects/1042216787272/locations/us-central1/pipelineJobs/kfp-mynotebook-mymodel-20240618165810-20240618165821
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/1042216787272/locations/us-central1/pipelineJobs/kfp-mynotebook-mymodel-20240618165810-20240618165821')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/kfp-mynotebook-mymodel-20240618165810-20240618165821?project=1042216787272
PipelineJob projects/1042216787272/locations/us-central1/pipelineJobs/kfp-mynotebook-mymodel-20240618165810-20240618165821 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/1042216787272/locations/us-central1/pipelineJobs/kfp-mynotebook-mymodel-20240618165810-20240618165821 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/1042216787272/locations/us-central1/pipelineJobs/kfp-mynotebook-mymodel-20240618165810-

In [52]:
pip install tabulate


Note: you may need to restart the kernel to use updated packages.


In [53]:
import pandas as pd
from tabulate import tabulate
from termcolor import colored

# Load the dataset into a DataFrame
# Replace 'path/to/your/dataset.csv' with the actual file path
df = pd.read_csv('gs://mtp-yusuf-kubeflowpipelines-default/data/iris.csv')

# Display a random sample of 5 rows
random_sample = df.sample(n=5)

# Convert the sample to a list of lists for tabulate
table_data = random_sample.values.tolist()

# Define headers
headers = random_sample.columns.tolist()

# Add color to the rows
colored_table_data = [
    [colored(cell, 'cyan') for cell in row] for row in table_data
]

# Create and print the table using tabulate
table = tabulate(colored_table_data, headers=headers, tablefmt='grid')
print(table)



+----------------+---------------+----------------+---------------+-----------------+
|   sepal-length |   sepal-width |   petal-length |   petal-width | class           |
|            [36m5.8[0m |           [36m2.6[0m |            [36m4[0m   |           [36m1.2[0m | [36mIris-versicolor[0m |
+----------------+---------------+----------------+---------------+-----------------+
|            [36m5[0m   |           [36m3.3[0m |            [36m1.4[0m |           [36m0.2[0m | [36mIris-setosa[0m     |
+----------------+---------------+----------------+---------------+-----------------+
|            [36m5[0m   |           [36m3[0m   |            [36m1.6[0m |           [36m0.2[0m | [36mIris-setosa[0m     |
+----------------+---------------+----------------+---------------+-----------------+
|            [36m7.3[0m |           [36m2.9[0m |            [36m6.3[0m |           [36m1.8[0m | [36mIris-virginica[0m  |
+----------------+---------------+------------

In [54]:
import pandas as pd

# Load the dataset into a DataFrame
# Replace 'path/to/your/dataset.csv' with the actual file path
df = pd.read_csv('gs://mtp-yusuf-kubeflowpipelines-default/data/iris.csv')

# Display a random sample of 5 rows
random_sample = df.sample(n=5)

# Apply color formatting and display the sample in a styled table
def highlight_row(row):
    # Define a style for the entire row with a lighter shade of blue
    return ['background-color: #e6f7ff'] * len(row)

styled_sample = random_sample.style.apply(highlight_row, axis=1)

# Display the styled sample
styled_sample


Unnamed: 0,sepal-length,sepal-width,petal-length,petal-width,class
19,5.1,3.8,1.5,0.3,Iris-setosa
50,7.0,3.2,4.7,1.4,Iris-versicolor
61,5.9,3.0,4.2,1.5,Iris-versicolor
139,6.9,3.1,5.4,2.1,Iris-virginica
100,6.3,3.3,6.0,2.5,Iris-virginica
