In [24]:
import kfp
import kfp.components as comp
from kfp.components import InputPath, OutputPath
import kfp.dsl as dsl
from kfp.aws import use_aws_secret
from typing import NamedTuple
from itertools import product


In [25]:
# In v1.1.0, in-cluster communication from notebook to Kubeflow Pipeline is not supported in this phase.
# In order to use kfp as previous, user needs to pass a cookie to KFP for communication as a workaround.
# https://www.kubeflow.org/docs/aws/pipeline/#authenticate-kubeflow-pipeline-using-sdk-inside-cluster

#authservice_session='authservice_session=<cookie>'
client = kfp.Client()
#Mudar namespace
namespace='fabiano-alencar'
client.list_experiments(namespace=namespace)
DATA_PATH = '/mnt'

## Component: Load Raw Data

In [44]:
def load_raw_data(data_path,output_data_path: OutputPath()):
    
    
    #from minio import Minio
    #from minio.error import S3Error

    #client = Minio(
    #"minio-service.kubeflow:9000",
    #access_key="minio",
    #secret_key="minio123",
    #secure=False,
    #)

    # Create export bucket if it does not yet exist
    #buckets = client.list_buckets()
    #print(buckets)
    import subprocess
    
    subprocess.call(['mkdir', '/mnt/data'])
    
    import boto3
    from botocore.client import Config
    
    s3 = boto3.resource('s3',
                    endpoint_url='http://minio-service.kubeflow:9000',
                    config=Config(signature_version='s3v4'))
    
    print(data_path)
    #s3.Bucket('ialab').download_file('dados/labelled/frente-verso/cnh-rg-oab-cpf/frente/relatorio_tarefa_457590379_p7_2.png',output_data_path)
    
    bucket = s3.Bucket('ialab')
    folder_prefix = 'dados/labelled/frente-verso/cnh-rg-oab-cpf/frente'

    for s3_file in bucket.objects.filter(Prefix=folder_prefix):
        file_object = s3_file.key
        file_name = str(file_object.split('/')[-1])
        print('Downloading file {} ...'.format(file_object))
        # outputpath
        bucket.download_file(file_object, output_data_path)
        #volume 
        bucket.download_file(file_object, data_path+"/data/"+file_name)

In [45]:
test_minio_op = comp.func_to_container_op(load_raw_data,
                                             base_image='python:3.7-slim',
                                             packages_to_install=['boto3'])

In [46]:
def process_data(data_path):
    
    import subprocess

    # downlaod the dataset from the mlflow repo
    def install():
        subprocess.call(['apt-get', 'update'])
        subprocess.call(['apt-get', 'install', 'ffmpeg', '-y'])
        subprocess.call(['apt-get', 'install', 'libxext6', '-y'])
        subprocess.call(['apt-get', 'install', 'libsm6', '-y'])
        subprocess.call(['apt-get', 'install', 'libfontconfig1', '-y'])
        subprocess.call(['apt-get', 'install', 'libxrender1', '-y'])
        subprocess.call(['apt-get', 'install', 'libgl1-mesa-glx', '-y'])
     
    install()
    
    import cv2
    import glob
    
    ext = ['png', 'jpg', 'gif']    # Add image formats here

    files = []
    [files.extend(glob.glob(data_path+"/data/" + '*.' + e)) for e in ext]
    print(files)
    images = [cv2.imread(file) for file in files]
    #print(images)

In [47]:
process_opencv_op = comp.func_to_container_op(process_data,
                                             base_image='python:3.7-slim',
                                             packages_to_install=['opencv-python','opencv-contrib-python','glob2'])

## Creating a Pipeline

In [52]:
from kubernetes.client.models import V1EnvVar


@dsl.pipeline(
    name='Training pipeline',
    description='Training pipeline for time series forecasting on household power consumption dataset.'

)
def training_pipeline(data_path):
    
    http_proxy = V1EnvVar(name='http_proxy', value='http://10.190.88.59:8888')
    https_proxy = V1EnvVar(name='https_proxy', value='http://10.190.88.59:8888')
    no_proxy = V1EnvVar(name='no_proxy', value='mlflow.mlflow,minio-service.kubeflow')   
    
    # Define volume to share data between components.
    #vop = dsl.VolumeOp(
    #name="create_volume",
    #resource_name="data-volume", 
    #size="1Gi", 
    #modes=dsl.VOLUME_MODE_RWM)
    
    kubeflow_pvc = dsl.PipelineVolume(pvc="workspace-server1", name="workspace-server1")
    
    
    # Returns a dsl.ContainerOp class instance.
    load_raw_data_task = test_minio_op(data_path).set_display_name('Load Raw Data') \
                .add_env_variable(http_proxy) \
                .add_env_variable(https_proxy) \
                .add_env_variable(no_proxy) \
                .apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY')) \
                .add_pvolumes({data_path: kubeflow_pvc}) \
    
    
    process_opencv_task = process_opencv_op(data_path).set_display_name('Process Images') \
                .add_env_variable(http_proxy) \
                .add_env_variable(https_proxy) \
                .add_env_variable(no_proxy) \
                .add_pvolumes({data_path: load_raw_data_task.pvolume})
    
    

## Creating a Pipeline Run

In [53]:

arguments = {"data_path":DATA_PATH}
experiment_name = 'minio_test'

# Submit a pipeline run
client.create_run_from_pipeline_func(
    training_pipeline, arguments=arguments, namespace=namespace,experiment_name=experiment_name)

RunPipelineResult(run_id=d84456f3-d948-4a5e-bc67-5b528f851073)

## Uploading the Pipeline to be reuseable by others

In [48]:
kfp.compiler.Compiler().compile(training_pipeline, 'workflow.yaml')


#client.upload_pipeline(pipeline_package_path='workflow.yaml',
#                             pipeline_name='Electric Power Consumption Forecasting Training Pipeline.')