In [54]:
!pip3 install -U --use-feature=2020-resolver kfp
import kfp
import kfp.components as comp
from kfp import dsl
from kfp import compiler
from kfp.components import func_to_container_op

import json
import time
import datetime



In [55]:
PIPELINE_HOST = "4dbcebe1d8f5b931-dot-asia-east1.pipelines.googleusercontent.com"
WORK_BUCKET = "gs://cse-linux-kubeflowpipelines-default"
EXPERIMENT_NAME = "First Experiment"

In [56]:
# Function for determine deployment
@func_to_container_op
def check_and_deploy_op(ACC_CSV_GCS_URI) -> str:
    import sys, subprocess
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'gcsfs'])
    import pandas as pd
    acc_df = pd.read_csv(ACC_CSV_GCS_URI)
    return acc_df["deploy"].item()

In [57]:
@func_to_container_op
def finish_deploy_op(ACC_CSV_GCS_URI):
    import sys, subprocess
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'gcsfs'])
    import pandas as pd
    acc_df = pd.read_csv(ACC_CSV_GCS_URI)
    acc_df["deploy"] = "done"
    acc_df.to_csv(ACC_CSV_GCS_URI)
    print("Successfully new model was deployed")

In [75]:
@dsl.pipeline(
    name='kubeflow-pipeline-demo',
    description='Kubeflow Pipelines demo embrassing AI Platform in Google Cloud'
)
def train_pipeline(
    PROJECT_ID,
    WORK_BUCKET,
    CSV_URI,
    #PREPROC_CSV_GCS_URI,
    ACC_CSV_GCS_URI,
    MODEL_URI,
    #MIN_ACC_PROGRESS,
    #STAGE_GCS_FOLDER,
    TRAIN_ON_CLOUD,
    AIPJOB_TRAINER_GCS_PATH,
    AIPJOB_OUTPUT_GCS_PATH
):
    IMAGE_PREFIX = "cse-linux"
    #PREPROC_DIR = "preprocess"
    TRAIN_DIR = "train"
    MODEL_DIR = "model"
    
#     preprocess = dsl.ContainerOp(
#         name = "Preprocess raw data and generate new one",
#         image = "gcr.io/" + str(PROJECT_ID) + "/" + IMAGE_PREFIX + "-" + PREPROC_DIR + ":latest",
#         arguments = [
#         "--raw_csv_gcs_uri", RAW_CSV_GCS_URI,
#         "--preproc_csv_gcs_uri", PREPROC_CSV_GCS_URI
#         ]
#     )
    
    train_args = [
        "--csv_uri", str(CSV_URI),
        "--checkpoint_uri", str(MODEL_URI),
        "--acc_uri", str(ACC_CSV_GCS_URI)
        #"--min_acc_progress", str(MIN_ACC_PROGRESS)
    ]

    with dsl.Condition(TRAIN_ON_CLOUD == False) as check_condition1:
        train = dsl.ContainerOp(
            name ="Train",
            image = "gcr.io/" + str(PROJECT_ID) + "/" + IMAGE_PREFIX + "-" + TRAIN_DIR + ":latest",
            arguments = train_args,
            file_outputs={"mlpipeline-metrics" : "/mlpipeline-metrics.json"}
        )
        
#     with dsl.Condition(TRAIN_ON_CLOUD == True) as check_condition2:
#         aip_job_train_op = comp.load_component_from_url("https://raw.githubusercontent.com/kubeflow/pipelines/1.0.0/components/gcp/ml_engine/train/component.yaml")
#         help(aip_job_train_op)
#         aip_train = aip_job_train_op(
#             project_id=PROJECT_ID, 
#             python_module="train.train", 
#             package_uris=json.dumps([str(AIPJOB_TRAINER_GCS_PATH)]), 
#             region="us", 
#             args=json.dumps(train_args),
#             job_dir=AIPJOB_OUTPUT_GCS_PATH, 
#             python_version="3.6",
#             runtime_version="1.15", #cf. 2.1 
#             master_image_uri="", 
#             worker_image_uri="", 
#             training_input="", 
#             job_id_prefix="", 
#             job_id="",
#             wait_interval=5
#         )
        
    check_deploy = check_and_deploy_op(ACC_CSV_GCS_URI)
    with dsl.Condition(check_deploy.output == "pending"):
#         aip_model_deploy_op = comp.load_component_from_url("https://raw.githubusercontent.com/kubeflow/pipelines/1.0.0/components/gcp/ml_engine/deploy/component.yaml")
#         help(aip_model_deploy_op)
#         aip_model_deploy = aip_model_deploy_op(
#             model_uri=str(WORK_BUCKET) + "/" + MODEL_DIR, 
#             project_id=PROJECT_ID, 
#             model_id="", 
#             version_id="", 
#             runtime_version="2.2", #cf. 2.1 
#             python_version="3.7",
#             version="", 
#             replace_existing_version="True", 
#             set_default="True", 
#             wait_interval=5
#         )
        lastStep = finish_deploy_op(ACC_CSV_GCS_URI)
 
    #check_condition1.after(preprocess)
    #check_condition2.after(preprocess)
    check_deploy.after(train)
    #lastStep.after(aip_model_deploy)
    
    train.execution_options.caching_strategy.max_cache_staleness = "P0D"
    #aip_train.execution_options.caching_strategy.max_cache_staleness = "P0D"
    check_deploy.execution_options.caching_strategy.max_cache_staleness = "P0D"
    #aip_model_deploy.execution_options.caching_strategy.max_cache_staleness = "P0D"
    lastStep.execution_options.caching_strategy.max_cache_staleness = "P0D"

In [76]:
args = {
    "PROJECT_ID" : "cse-linux",
    "WORK_BUCKET" : WORK_BUCKET,
    "CSV_URI" : "cse-linux-kubeflowpipelines-default",
    #"PREPROC_CSV_GCS_URI" : WORK_BUCKET + "/preprocdata/processed_train.csv",
    "ACC_CSV_GCS_URI" : WORK_BUCKET + "/latestacc/acc.csv",
    "MODEL_URI" : WORK_BUCKET + "/model/model.pkl",
    #"MIN_ACC_PROGRESS" : 0.000001,
    #"STAGE_GCS_FOLDER" : WORK_BUCKET + "/stage",
    "TRAIN_ON_CLOUD" : False,
    "AIPJOB_TRAINER_GCS_PATH" : WORK_BUCKET + "/train/train.tar.gz",
    "AIPJOB_OUTPUT_GCS_PATH" : WORK_BUCKET + "/train/output/"
}

client = kfp.Client(host=PIPELINE_HOST)

pipeline_name = "train_pipelines.zip"
compiler.Compiler().compile(train_pipeline, pipeline_name)
try:
    pipeline = client.upload_pipeline(pipeline_package_path=pipeline_name, pipeline_name=pipeline_name)
    print("uploaded:" + pipeline.id)
except:
    print("already exist")

client.create_run_from_pipeline_func(
 train_pipeline,
 arguments=args,
 experiment_name=EXPERIMENT_NAME
)



already exist


RunPipelineResult(run_id=3fcca030-0424-45b0-8278-8238530edcbf)

In [None]:
print("hello")