In [12]:
# kfp
import kfp
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, ClassificationMetrics, Metrics, component)
from kfp.v2.google.client import AIPlatformClient

# gcp
from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from google.cloud import storage
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials

# i/o
from typing import NamedTuple
from io import StringIO
import os

# pandas & sklearn
import pandas as pd
import sklearn
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score

In [2]:
storage_client = storage.Client()

In [3]:
USER_FLAG = "--user"

In [4]:
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

KFP SDK version: 1.8.10
google_cloud_pipeline_components version: 0.1.1


In [5]:
import os
PROJECT_ID = ""
# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Project ID:  kedro-kubeflow-334417


In [6]:
BUCKET_NAME="gs://diab-gsbucket"
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"
PIPELINE_NAME = "diabetes_pipeline"
PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

env: PATH=/home/jupyter/.local/bin:/usr/local/cuda/bin:/opt/conda/bin:/opt/conda/condabin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/home/jupyter/.local/bin


'gs://diab-gsbucket/pipeline_root/'

In [44]:
@component(base_image='gcr.io/kedro-kubeflow-334417/custom-kubeflow:v1')
def data_component(bucket: str, value: float, marker: int) -> int:
    import kfp
    import pandas as pd
    import sklearn
    from sklearn.model_selection import train_test_split
    from kfp.v2.google.client import AIPlatformClient
    from google.cloud import storage
    
    data = pd.read_csv('gs://iris-kfp/iris.csv') 
    '''
    df = data
    for column in df.columns:
        df[column] = (df[column] - df[column].mean()) / df[column].std() 
    
    data = df
    
    train_data = data.drop('Outcome',axis=1)
    test_data = data['Outcome']    
    
    X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(train_data, test_data, test_size = value, random_state=42)
    X_train = X_train.to_csv()
    X_test = X_test.to_csv()
    y_train = y_train.to_csv()
    y_test = y_test.to_csv()
    '''
    '''
    service account
    
    create service account
    generate key
    here, kedro-kubeflow-334417-5b30b8ecc18f.json 
    '''
    '''
    storage_client = storage.Client()
    bucket = storage_client.get_bucket('diab-gsbucket')
    d = 'data/test_train'
    d = bucket.blob(d)
    
    # 1. X_train
    d.upload_from_string(f'{X_train}.csv', 'text/csv')
    #xtrain = "gs://diab-gsbucket/data/X_train.csv"
    
    # 2. X_test
    d.upload_from_string(f'{X_test}.csv', 'text/csv')
    #xtest = "gs://diab-gsbucket/data/X_test.csv"
    
    # 3. y_train
    d.upload_from_string(f'{y_train}.csv', 'text/csv')
    #ytrain = "gs://diab-gsbucket/data/y_train.csv"
    
    # 4. y_test
    d.upload_from_string(f'{y_test}.csv', 'text/csv')
    #ytest = "gs://diab-gsbucket/data/y_test.csv"
    '''
    marker = 1
    return marker
    

In [59]:
@component(base_image='gcr.io/kedro-kubeflow-334417/custom-kubeflow:v1')
def model_component(bucket:str, xtrain:str, ytrain:str, xtest:str, ytest:str, marker: int) -> float:
    import pandas as pd    
    from sklearn.svm import SVC
    from sklearn.metrics import accuracy_score
    
    X_train = pd.read_csv(f'{bucket}/{xtrain}.csv', sep=",")
    y_train = pd.read_csv(f'{bucket}/{ytrain}.csv', sep=",")
    X_test = pd.read_csv(f'{bucket}/{xtest}.csv', sep=",")
    y_test = pd.read_csv(f'{bucket}/{ytest}.csv', sep=",")    
        
    # train model
    model = SVC(gamma='auto')
    model.fit(X_train, y_train)
    predictions = model.predict(X_test)

    accuracy = accuracy_score(y_test, predictions)
    
    return accuracy

In [60]:
@component(base_image='gcr.io/kedro-kubeflow-334417/custom-kubeflow:v1',
           output_component_file="true_decision_component.yaml")
def true_component(accuracy:float) -> None:
    print(f'Yes!! {accuracy} is the Accuracy and its greater than the threshold')

In [61]:
@component(base_image='gcr.io/kedro-kubeflow-334417/custom-kubeflow:v1',
           output_component_file="false_decision_component.yaml")
def false_component(accuracy:float) -> None:
    print(f'No. {accuracy} is the Accuracy and its smaller than the threshold')

In [62]:
@kfp.dsl.pipeline(name = "diabetes-pipeline",
                  pipeline_root = PIPELINE_ROOT)
def diab_pipeline(
    display_name: str=f"{PIPELINE_NAME}-1",
    project: str = PROJECT_ID,
    gcp_region: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    marker: int = 0,
    test_train_split_ratio: float = 0.3,
    accuracy_threshold: float = 0.5,
    bucket: str = "gs://iris-kfp"
) -> None:
        
    # initiating data component
    data_op = data_component(bucket, test_train_split_ratio, marker)
    
    
    # initiating model component
    with dsl.Condition(data_op.output ==1):
        model_op = model_component(bucket, "X_train", "y_train", "X_test", "y_test", data_op.output)
    
        with dsl.Condition(model_op.output >= accuracy_threshold, name="accuracy>=50"):
            true_component(model_op.output)
        with dsl.Condition(model_op.output < accuracy_threshold, name="accuracy<50"):
            false_component(model_op.output)
    

In [56]:
compiler.Compiler().compile(
    pipeline_func=diab_pipeline, package_path="diab_pipeline.json"
)

In [57]:
api_client = AIPlatformClient(
    project_id=PROJECT_ID,
    region=REGION,
)

In [58]:
response = api_client.create_run_from_job_spec(
    "diab_pipeline.json", pipeline_root=PIPELINE_ROOT,
)