In [1]:
# Copyright 2019 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

In [2]:
# Install Pipeline SDK - This only needs to be ran once in the enviroment. 
# you can find the latest package @ https://github.com/kubeflow/pipelines/releases
#KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.20/kfp.tar.gz'
#!pip3 install $KFP_PACKAGE --upgrade

In [19]:
EXPERIMENT_NAME = 'Hellow world!'
GCLOUD_SDK = 'google/cloud-sdk:latest'
GCS_PATH = 'gs://chavoshi-dev-mlpipeline'
LOCAL_PATH = 'mnt/kfp_pvc'
PVC_NAME = 'task-pv-claim'

# ETL from Google Cloud Storage or S3 

This section demonstrates how you can access content from cloud based blog storage such as GCS and S3 to a pod during execution of the pipeline. Reference documentation is as follows. 
* https://cloud.google.com/storage/docs/quickstart-gsutil
* https://cloud.google.com/storage/docs/reference/libraries


Note that apart from using Google Specific libraries native Tensorflow io libraries can read and write to GCS & S3. 
* https://github.com/tensorflow/examples/blob/master/community/en/docs/deploy/s3.md


Finally many common libraries such as Pandas have built support for access common cloud storage such as S3 and GCS and Bigquery. 
* https://cloud.google.com/bigquery/docs/pandas-gbq-migration

In [26]:
import kfp
import kfp.dsl as dsl
from kfp.gcp import use_gcp_secret
from kubernetes import client as k8s_client
from kfp import compiler
from kfp import notebook
from kfp import components as comp
from kfp.onprem import mount_pvc

### Accessing Google Cloud storage
There are four methods that can be used to access Google cloud storage. 

* Using Gsutil
* Using Google Cloud Storage API  #TODO
* Using Tensorflow IO operations  #TODO
* Using application specific slution such as Pandas Library integration #TODO


In [34]:
def gcs_using_gsutil_op(gcs_path:str):
    return dsl.ContainerOp(
        name = 'Using Secrets',
        image = GCLOUD_SDK,
        command=['sh', '-c'],
        arguments = [  '''
            gcloud auth activate-service-account --key-file /secret/gcp-credentials/user-gcp-sa.json &&
            gsutil cp -m ''' +  gcs_path + ' '+ LOCAL_PATH ]
    ).apply(use_gcp_secret('user-gcp-sa')
           ).add_volume(k8s_client.V1Volume(name='workdir',
                                            persistent_volume_claim=k8s_client.V1PersistentVolumeClaimVolumeSource(claim_name=PVC_NAME))
                ).add_volume_mount(k8s_client.V1VolumeMount(mount_path=LOCAL_PATH, name='workdir'))

#### Define the pipeline
Pipeline function has to be decorated with the `@dsl.pipeline` decorator

In [35]:
import kfp.dsl as dsl
@dsl.pipeline(
   name='ETL',
   description='A toy pipeline demonstrates accessing blob storage'
)
def etl_pipeline():
    #Creating a one step pipeline
    gcs_using_gsutil = gcs_using_gsutil_op(GCS_PATH) #Returns a dsl.ContainerOp class instance. 

#### Compile the pipeline

In [36]:
pipeline_func = etl_pipeline
pipeline_filename = pipeline_func.__name__ + '.pipeline.zip'
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline_func, pipeline_filename)

#### Submit the pipeline for execution

In [37]:
#Get or create an experiment and submit a pipeline run
import kfp
client = kfp.Client()
experiment = client.create_experiment(EXPERIMENT_NAME)

In [38]:
#Specify pipeline argument values
arguments = {}

#Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)

#This link leads to the run information page. 
#Note: There is a bug in JupyterLab that modifies the URL and makes the link stop working