In [70]:
import kfp
from kfp import dsl
from kubernetes.client.models import V1EnvVar

In [71]:
def setup_volume_op():
    return dsl.VolumeOp(
        name="Creation of Volume",
        resource_name="dataset_pvc",
        modes=dsl.VOLUME_MODE_RWO,
        size="1Gi"
    )

In [72]:
def download_dataset_op(url, volume, data_path):
    op = dsl.ContainerOp(
        name='Download Dataset',
        image='google/cloud-sdk:272.0.0',
        command=['sh', '-c'],
        arguments=['gsutil cat $0 | tee $1', url, data_path],
        pvolumes={"/mnt": volume}
    )
    op.container.set_memory_limit('2G')
    op.container.set_cpu_limit('1')
    op.container.set_memory_request('1G')
    op.container.set_cpu_request('0.5')
    return op

In [73]:
def train_op(volume, trained_path, data_path):
    op = dsl.ContainerOp(
        name='Train ML',
        image='davidzeng/ml_kube:metrics-v5',
        command=['sh', '-c'],
        arguments=['python3 train.py'],
        pvolumes={'/mnt': volume},
        file_outputs={'MLPipeline Metrics': '/mlpipeline-metrics.json'}
    )
    op.container.add_env_variable(V1EnvVar('TRAINED_MODEL_PATH', trained_path))
    op.container.add_env_variable(V1EnvVar('DATA_PATH', data_path))
    op.container.set_memory_limit('4G')
    op.container.set_cpu_limit('10')
    op.container.set_memory_request('2G')
    op.container.set_cpu_request('5')
    # NO GPU ON GOOGLE CLOUD UNFORTUNATELY
    # op.container.set_gpu_limit('2')
    return op

In [74]:
def upload_model_op(volume, trained_path, storage_path):
    op = dsl.ContainerOp(
        name='Upload Trained Model',
        image='google/cloud-sdk:272.0.0',
        command=['sh', '-c'],
        arguments=['gsutil cp $0 $1', trained_path, storage_path],
        pvolumes={"/mnt": volume}
    )
    op.container.set_memory_limit('2G')
    op.container.set_cpu_limit('1')
    op.container.set_memory_request('1G')
    op.container.set_cpu_request('0.5')
    return op

In [75]:
def check_op(volume, trained_path):
    op = dsl.ContainerOp(
        name='Check Trained Model',
        image='library/bash:4.4.23',
        command=['sh', '-c'],
        arguments=['wc $0', trained_path],
        pvolumes={"/mnt": volume}
    )
    op.container.set_memory_limit('2G')
    op.container.set_cpu_limit('1')
    op.container.set_memory_request('1G')
    op.container.set_cpu_request('0.5')
    return op

In [76]:
@dsl.pipeline(
    name='Machine learning pipeline',
    description='A pipeline.'
)
def sequential_pipeline(url='gs://ml_kube_bucket/kc_house_data.csv', \
                        trained_path='/mnt/trained_model', \
                        data_path='/mnt/dataset.csv', \
                        storage_path='gs://ml_kube_bucket/'):
    """A pipeline for machine learning."""
    volume_task = setup_volume_op()
    dataset_task = download_dataset_op(url, volume_task.volume, data_path)
    train_task = train_op(dataset_task.pvolume, trained_path, data_path)
    upload_task = upload_model_op(train_task.pvolume, trained_path, storage_path)
    check_task = check_op(train_task.pvolume, trained_path)

In [77]:
if __name__ == '__main__':
    kfp.compiler.Compiler().compile(sequential_pipeline, 'ml_kube.yaml')