!pip3 install kfp --upgrade --user

# Presidential Election pipeline

In [1]:
EXPERIMENT_NAME = 'used-cars'

## Imports

In [2]:
import kfp
from kfp import dsl

## Load components

In [3]:
def preprocess_op():

    return dsl.ContainerOp(
        name='Preprocess Data',
        image='gcr.io/used-cars-298619/used_cars_pipeline_preprocessing:latest',
        arguments=[],
        file_outputs={
            'X_train': '/app/X_train.npy',
            'X_test': '/app/X_test.npy',
            'y_train': '/app/y_train.npy',
            'y_test': '/app/y_test.npy',
        }
    )

def train_op(X_train, y_train):

    return dsl.ContainerOp(
        name='Train Model',
        image='gcr.io/used-cars-298619/used_cars_pipeline_training:latest',
        arguments=[
            '--X_train', X_train,
            '--y_train', y_train
        ],
        file_outputs={
            'model': '/app/model.pkl'
        }
    )

def test_op(X_test, y_test, model):

    return dsl.ContainerOp(
        name='Test Model',
        image='cr.io/used-cars-298619/used_cars_pipeline_test:latest',
        arguments=[
            '--X_test', X_test,
            '--y_test', y_test,
            '--model', model
        ],
        file_outputs={
            'mean_squared_error': '/app/model_result.'
        }
    )

def deploy_model_op(model):

    return dsl.ContainerOp(
        name='Deploy Model',
        image='gcr.io/used-cars-298619/used_cars_pipeline_deploy:latest',
        arguments=[
            '--model', model
        ]
    )

## Build the Pipeline 

In [4]:
@dsl.pipeline(
   name='Used Cars Pipeline',
   description='Used Cars pipeline that performs preprocessing, training, evaluation and model deployment.'
)
def presidential_pipeline():
    _preprocess_op = preprocess_op()

    _train_op = train_op(
        dsl.InputArgumentPath(_preprocess_op.outputs['X_train']),
        dsl.InputArgumentPath(_preprocess_op.outputs['y_train'])
    ).after(_preprocess_op)

    _test_op = test_op(
        dsl.InputArgumentPath(_preprocess_op.outputs['X_test']),
        dsl.InputArgumentPath(_preprocess_op.outputs['y_test']),
        dsl.InputArgumentPath(_train_op.outputs['model'])
    ).after(_train_op)

    deploy_model_op(
        dsl.InputArgumentPath(_train_op.outputs['model'])
    ).after(_test_op)

## Compile the Pipeline

In [5]:
pipeline_func = presidential_pipeline
pipeline_filename = pipeline_func.__name__ + '.pipeline.zip'

import kfp.compiler as compiler
compiler.Compiler().compile(pipeline_func, pipeline_filename)

## Create a Kubeflow Experiment

In [6]:
client = kfp.Client(host='pipelines-api.kubeflow.svc.cluster.local:8888')

try:
    experiment = client.get_experiment(experiment_name=EXPERIMENT_NAME)
except:
    experiment = client.create_experiment(EXPERIMENT_NAME)
    
print(experiment)

{'created_at': datetime.datetime(2020, 10, 11, 20, 16, 2, tzinfo=tzlocal()),
 'description': None,
 'id': '97a5e6d3-30c0-42f3-89cd-1bb5622d5ed7',
 'name': 'presidential-election',
 'resource_references': None,
 'storage_state': None}


## Run the Pipeline

In [7]:
arguments = {}

run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, 
                                 run_name, 
                                 pipeline_filename, 
                                 arguments)

print(experiment.id)
print(run_name)
print(pipeline_filename)
print(arguments)

97a5e6d3-30c0-42f3-89cd-1bb5622d5ed7
presidential_pipeline run
presidential_pipeline.pipeline.zip
{}
