# Create Kubeflow Pipeline



In [1]:
import kfp
client = kfp.Client(host='https://5ab1dd08e55a522c-dot-europe-west1.pipelines.googleusercontent.com')
import kfp.components as comp
from kfp.components import create_component_from_func
import kfp.dsl as dsl

In [2]:
%%bash
#!/bin/bash -e
image_name='sfeir-data'
zone='europe-west1-c'
cluster='cluster-1 '

gcloud container clusters get-credentials $cluster --zone $zone --project $image_name

Fetching cluster endpoint and auth data.
kubeconfig entry generated for cluster-1.


## Define kubeflow components
### 1- Docker image components 

In [3]:
with open("./components/image_tfrecord.yaml", "r") as f:
    image_tfrecord_components = f.read()
print(image_tfrecord_components)

name: image_to_tfrecords
description: Transform image data to TfRecords.

inputs:
- {name: images path, type: String, description: 'GCS path for images files'}
- {name: tfrecords path, type: String, description: 'GCS path for tfrecords files'}
- {name: target size, type: Integer, description: 'Size of the training images'}

implementation:
  container:
    image: gcr.io/sfeir-data/image_to_tfrecords
    command: [
      python3, 
      # Path of the program inside the container
      /preprocess.py,
      --input-path,
      {inputValue: images path},
     --output-path, 
      {inputValue: tfrecords path},
     --target-size, 
      {inputValue: target size},
    ]



In [4]:
create_step_convert_tfrecords = comp.load_component_from_text(image_tfrecord_components)

### 2- Python based components

In [5]:
def preprocess_flag(preprocess: bool) -> bool:
    """
    Print arguments
    """
    print("Preprocess ",preprocess)
    return preprocess

print_preprocess = comp.func_to_container_op(preprocess_flag)

In [6]:

def createTraningJob(training_data: str,
                     validation_data: str,
                     project: str,
                     location: str,
                     bucket: str,
                     batch_size: str = '50',
                     validation_batch_size: str = '20',
                     training_ds_size : str = '25000',
                     validation_ds_size: str = '5000',
                     img_height: str = '255',
                     img_width: str = '255',
                     nb_classes: str = '5',
                     display_name:str = 'quickdraw_training'
                    )-> str:
    """
    Create the training job into Vertex training and launch it 
    :param training_data:  (str) GCS path to the training dataset Tfrecords,
    :param validation_data: (str) GCS path to the validation dataset Tfrecords,
    :param project: (str) Name of the Google cloud project,
    :param location: (str) Training location in vetex (europe-west1,...) ,
    :param bucket: (str) GCS bucket to store data during the training,
    :param batch_size: (str) Training Batch size '50',
    :param validation_batch_size: (str) Validation batch size defaut '20',
    :param  training_ds_size: (str) training dataset size default 25000,
    :param  validation_ds_size: (str) validation dataset size default 5000,
    :param  img_height: (str) image height size default 255,
    :param  img_width: (str) image widht size default 255,
    :param  nb_classes: (str) number of class default '5',
    :param  display_name:(str)Vertex job display name default'quickdraw_training'
    :return: GCS path for the trained model
    """

    from datetime import datetime 
    import google.cloud.aiplatform as aip
    
    display_job_name = display_name
    staging_bucket = bucket+"staging/"+display_job_name
    model_path = bucket+"gcs_model_data/"+display_job_name
    
    env_var = {'GCS_TRAINING_DATA': training_data,
               'GCS_VALIDATION_DATA':validation_data,
                'GCS_MODEL_DATA_PATH': model_path
                }
    
  
    #flo-test-devoxx/gcs_model_data/quickdraw_training_20220414_071940/model
    
    job = aip.CustomPythonPackageTrainingJob(
        display_name=display_job_name,
        python_package_gcs_uri= 'gs://devoxx_poc/vertex_job_code/quickdraw_classifier-0.0.1.tar.gz',
        python_module_name="quickdraw_classifier.training",
        container_uri='europe-docker.pkg.dev/vertex-ai/training/tf-gpu.2-8:latest',
        model_serving_container_image_uri='europe-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-8:latest',
        project=project,
        location=location,
        staging_bucket=bucket
    )
    
    CMDARGS = [
        '--batch_size='+batch_size,
        '--validation_batch_size='+validation_batch_size,
        '--training_ds_size='+training_ds_size,
        '--validation_ds_size='+validation_ds_size,
        '--img_height='+img_height,
        '--img_width='+img_width,
        '--nb_classes='+nb_classes
    ]
    
    print(CMDARGS)
    
    model = job.run(
        args=CMDARGS,
        environment_variables=env_var,
        sync=True,
        replica_count=1,
        machine_type='n1-standard-8',
        accelerator_type='NVIDIA_TESLA_K80',
        accelerator_count=1,
        base_output_dir=bucket)
    
    return model.name


In [7]:
train_model = create_component_from_func(
    createTraningJob, output_component_file='./components/train_model.yaml', base_image='gcr.io/deeplearning-platform-release/tf2-cpu.2-8')


## Assemble componenent for first Pipeline definition
### Define your pipeline parameters

In [8]:
username="Florent"
display_name=username+'_quickdraw_training'
pipeline_name=username+'_quickdraw_pipeline'
experiment_name=username+'_quickdraw'

### Define your pipeline

In [9]:

@dsl.pipeline(
    name='Quickdraw classifier ',
    description='A trainer that does end-to-end distributed training for Quickdraw classifier.'
)
def kubeflow_pipeline(
    images_path:str,
    tfrecords_path:str,
    image_validation_path:str,
    tfrecords_validation_path:str,
    bucket:str,
    location:str,
    project:str,
    training_data:str,
    validation_data:str,
    batch_size: str = '50',
    validation_batch_size: str = '20',
    training_ds_size : str = '25000',
    validation_ds_size: str = '5000',
    image_size: int = 64,
    img_height: str = '64',
    img_width: str = '64',
    nb_classes: str = '5',
    display_name:str = 'quickdraw_training',
    preprocess:bool = False,
    deploy_model:bool = False
    
):
    check_preprocess = print_preprocess(preprocess)
    
    with dsl.Condition(check_preprocess.output==True,name="do-pretraitement"):
        convert_train = create_step_convert_tfrecords(
            images_path=images_path,
            tfrecords_path=tfrecords_path,
            target_size = image_size
            ).set_display_name('convert_training_data').after(check_preprocess)

        convert_valid = create_step_convert_tfrecords(
            images_path=image_validation_path,
            tfrecords_path=tfrecords_validation_path,
            target_size = image_size
            ).set_display_name('convert_validation_data').after(check_preprocess)

    train_model_step = train_model(training_data=training_data,
                    validation_data=validation_data,
                    project= project,
                    location =location,
                    bucket=bucket,
                    batch_size=batch_size,
                    validation_batch_size= validation_batch_size,
                    training_ds_size=training_ds_size,
                    validation_ds_size=validation_ds_size,
                    img_height=img_height,
                    img_width=img_width,
                    nb_classes=nb_classes,
                    display_name=display_name).after(convert_valid,convert_train).set_display_name('Training_Model')
    


### Compile Pipeline

In [10]:
kfp.compiler.Compiler().compile(
    pipeline_func=kubeflow_pipeline,
    package_path='quickdraw_pipeline.yaml')

### Upload Pipeline to Kubeflow

In [11]:
pipeline = client.upload_pipeline(
    pipeline_package_path="quickdraw_pipeline.yaml", pipeline_name=pipeline_name)
print(pipeline)

{'created_at': datetime.datetime(2022, 4, 15, 14, 18, 45, tzinfo=tzlocal()),
 'default_version': {'code_source_url': None,
                     'created_at': datetime.datetime(2022, 4, 15, 14, 18, 45, tzinfo=tzlocal()),
                     'description': None,
                     'id': '3696db47-b28a-47bf-b12c-5020b8ef2a2b',
                     'name': 'Florent_quickdraw_pipeline',
                     'package_url': None,
                     'parameters': [{'name': 'images_path', 'value': None},
                                    {'name': 'tfrecords_path', 'value': None},
                                    {'name': 'image_validation_path',
                                     'value': None},
                                    {'name': 'tfrecords_validation_path',
                                     'value': None},
                                    {'name': 'bucket', 'value': None},
                                    {'name': 'location', 'value': None},
                     

### Define arguments for the training 

In [12]:

images_path="gs://devoxx_poc/raw_images/training_data/*/*.png"
tfrecords_path="gs://devoxx_poc/tfrecord_data/training_data/"
image_validation_path="gs://devoxx_poc/raw_images/validation_data/*/*.png"
tfrecords_validation_path="gs://devoxx_poc/tfrecord_data/validation_data/"

bucket="gs://flo-test-devoxx/"
location= "europe-west1"
project=  "sfeir-data"
training_data='gs://devoxx_poc/tfrecord_data/training_data/'
validation_data='gs://devoxx_poc/tfrecord_data/validation_data/'
image_size=128




args = {'images_path':images_path,
        'tfrecords_path':tfrecords_path,
        'image_validation_path':image_validation_path,
        'tfrecords_validation_path':tfrecords_validation_path,
        'bucket':bucket,
        'location':location,
        'project':project,
        'training_data':training_data,
        'validation_data':validation_data,
        'display_name':display_name,
        'preprocess': True,
        'deploy_model':True,
        'image_size':image_size}

### Create an experiment.

In [13]:
try:
    experiment=client.get_experiment(experiment_name=experiment_name) 
except :
    print("Experience not already exist")
    experiment = client.create_experiment(name=experiment_name) 

Experience not already exist


In [14]:
print(experiment)

{'created_at': datetime.datetime(2022, 4, 15, 14, 19, 3, tzinfo=tzlocal()),
 'description': None,
 'id': '68d41e0f-ad09-4340-b872-a451e38dff5f',
 'name': 'Florent_quickdraw',
 'resource_references': None,
 'storage_state': 'STORAGESTATE_AVAILABLE'}


### Launch pipeline into the Experiment 

In [15]:
job = client.run_pipeline(job_name=display_name ,experiment_id = experiment.id, params=args,pipeline_id = pipeline.id)

{'created_at': datetime.datetime(2022, 4, 15, 14, 19, 18, tzinfo=tzlocal()),
 'description': None,
 'error': None,
 'finished_at': datetime.datetime(1970, 1, 1, 0, 0, tzinfo=tzlocal()),
 'id': 'ca474bbe-95d5-47e5-8ac6-9b2fe3f33f37',
 'metrics': None,
 'name': 'Florent_quickdraw_training',
 'pipeline_spec': {'parameters': [{'name': 'images_path',
                                   'value': 'gs://devoxx_poc/raw_images/training_data/*/*.png'},
                                  {'name': 'tfrecords_path',
                                   'value': 'gs://devoxx_poc/tfrecord_data/training_data/'},
                                  {'name': 'image_validation_path',
                                   'value': 'gs://devoxx_poc/raw_images/validation_data/*/*.png'},
                                  {'name': 'tfrecords_validation_path',
                                   'value': 'gs://devoxx_poc/tfrecord_data/validation_data/'},
                                  {'name': 'bucket',
              

## Assemble componenent for second Pipeline definition
### Create Deployement task 

In [16]:

def deploy_model(
    project: str,
    region: str,
    model_name:str
    
):
    from google.cloud import aiplatform
    aiplatform.init(project=project, location=region)
    
    endpoint_name = model_name+"_endpoint"
    
    def create_endpoint():
        endpoints = aiplatform.Endpoint.list(
        filter='displayName="{}"'.format(endpoint_name),
        order_by='create_time desc',
        project=project, 
        location=region,
        )
        if len(endpoints) > 0:
            endpoint = endpoints[0]  # most recently created
        else:
            endpoint = aiplatform.Endpoint.create(
            display_name=endpoint_name, project=project, location=region
        )
    endpoint = create_endpoint()   
    
    
    #Import a model programmatically
    """
    model_upload = aiplatform.Model.upload(
        display_name = model_name, 
        artifact_uri = model.uri.replace("model", ""),
        serving_container_image_uri =  serving_container_image_uri,
        serving_container_health_route=f"/v1/models/{model_name}",
        serving_container_predict_route=f"/v1/models/{model_name}:predict",
        serving_container_environment_variables={
        "model_name": model_name,
    },       
    )"""
    
    model = aiplatform.Model(model_name=model_name)
    
    model_deploy = model.deploy(
        machine_type="n1-standard-4", 
        endpoint=endpoint,
        traffic_split={"0": 100},
        deployed_model_display_name=model_name,
    )

    # Save data to the output params
    return model_deploy.resource_name

In [17]:
deploy = create_component_from_func(
    deploy_model, output_component_file='./components/model_deployment.yaml', base_image='gcr.io/deeplearning-platform-release/tf2-cpu.2-8')


### Create pipeline 

In [22]:
@dsl.pipeline(
    name='Quickdraw classifier_full',
    description='A trainer that does end-to-end distributed training for Quickdraw classifier.'
)
def kubeflow_pipeline(
    images_path:str,
    tfrecords_path:str,
    image_validation_path:str,
    tfrecords_validation_path:str,
    bucket:str,
    location:str,
    project:str,
    training_data:str,
    validation_data:str,
    batch_size: str = '50',
    validation_batch_size: str = '20',
    training_ds_size : str = '25000',
    validation_ds_size: str = '5000',
    image_size: int = 64,
    img_height: str = '64',
    img_width: str = '64',
    nb_classes: str = '5',
    display_name:str = 'quickdraw_training',
    preprocess:bool = False,
    deploy_model:bool = False
    
):
    check_preprocess = print_preprocess(preprocess)
    
    with dsl.Condition(check_preprocess.output==True,name="do-pretraitement"):
        convert_train = create_step_convert_tfrecords(
            images_path=images_path,
            tfrecords_path=tfrecords_path,
            target_size = image_size
            ).set_display_name('convert_training_data').after(check_preprocess)

        convert_valid = create_step_convert_tfrecords(
            images_path=image_validation_path,
            tfrecords_path=tfrecords_validation_path,
            target_size = image_size
            ).set_display_name('convert_validation_data').after(check_preprocess)

    train_model_step = train_model(training_data=training_data,
                    validation_data=validation_data,
                    project= project,
                    location =location,
                    bucket=bucket,
                    batch_size=batch_size,
                    validation_batch_size= validation_batch_size,
                    training_ds_size=training_ds_size,
                    validation_ds_size=validation_ds_size,
                    img_height=img_height,
                    img_width=img_width,
                    nb_classes=nb_classes,
                    display_name=display_name).after(convert_valid,convert_train).set_display_name('Training_Model')
    
    with dsl.Condition(deploy_model==True,name="deploy-model"):
        deploy_model_op = deploy(
        project=project,
        region=location,
        model_name=train_model_step.output
        ).after(train_model_step)
    


In [23]:
kfp.compiler.Compiler().compile(
    pipeline_func=kubeflow_pipeline,
    package_path='quickdraw_pipeline_full.yaml')

### Update the pipeline version

In [24]:
pipeline_full =client.upload_pipeline_version(pipeline_package_path='quickdraw_pipeline_full.yaml',
        pipeline_version_name=pipeline_name+"_full",
        pipeline_id=pipeline.id,)
print(pipeline_full)

{'code_source_url': None,
 'created_at': datetime.datetime(2022, 4, 15, 14, 21, 18, tzinfo=tzlocal()),
 'description': None,
 'id': '0017746f-c623-4cd2-8285-45a5a15e54b8',
 'name': 'Florent_quickdraw_pipeline_full',
 'package_url': None,
 'parameters': [{'name': 'images_path', 'value': None},
                {'name': 'tfrecords_path', 'value': None},
                {'name': 'image_validation_path', 'value': None},
                {'name': 'tfrecords_validation_path', 'value': None},
                {'name': 'bucket', 'value': None},
                {'name': 'location', 'value': None},
                {'name': 'project', 'value': None},
                {'name': 'training_data', 'value': None},
                {'name': 'validation_data', 'value': None},
                {'name': 'batch_size', 'value': '50'},
                {'name': 'validation_batch_size', 'value': '20'},
                {'name': 'training_ds_size', 'value': '25000'},
                {'name': 'validation_ds_size', 'valu

### Launch Pipeline

In [25]:
client.run_pipeline(job_name=display_name ,experiment_id = experiment.id, params=args,pipeline_id = pipeline_full.id)

ApiException: (404)
Reason: Not Found
HTTP response headers: HTTPHeaderDict({'Content-Type': 'application/json', 'Date': 'Fri, 15 Apr 2022 14:22:45 GMT', 'Vary': 'Origin', 'X-Content-Type-Options': 'nosniff', 'X-Frame-Options': 'SAMEORIGIN', 'X-Powered-By': 'Express', 'X-Xss-Protection': '0', 'Transfer-Encoding': 'chunked', 'Set-Cookie': 'S=cloud_datalab_tunnel=euuAgNZbV0VHJ_pIhzQHZ0cZYiPnAfaHZ8EEsrGRWS0; Path=/; Max-Age=3600'})
HTTP response body: {"error":"Validate create run request failed.: Get pipelineId failed.: ResourceNotFoundError: Pipeline 0017746f-c623-4cd2-8285-45a5a15e54b8 not found.","code":5,"message":"Validate create run request failed.: Get pipelineId failed.: ResourceNotFoundError: Pipeline 0017746f-c623-4cd2-8285-45a5a15e54b8 not found.","details":[{"@type":"type.googleapis.com/api.Error","error_message":"Pipeline 0017746f-c623-4cd2-8285-45a5a15e54b8 not found.","error_details":"Validate create run request failed.: Get pipelineId failed.: ResourceNotFoundError: Pipeline 0017746f-c623-4cd2-8285-45a5a15e54b8 not found."}]}
