## Imports

In [None]:
!pip install -U kfp --upgrade -q

In [None]:
import kfp  # the Pipelines SDK.  
from kfp import compiler
import kfp.dsl as dsl
import kfp.gcp as gcp
import kfp.components as comp
from kfp.dsl.types import Integer, GCSPath, String
import kfp.notebook
import time

import logging
logging.getLogger().setLevel(logging.ERROR)

### Create the bucket 

Create it from User interface or from here by running cells below (if not exists)

In [None]:
BUCKET_NAME = 'ribtdap-ds-aiplatform-kubeflow'

In [None]:
!gsutil mb gs://$BUCKET_NAME/

### Pipeline client 

In [None]:
import kfp
client = kfp.Client(host='')

### Constants

In [None]:
# Parameters
PROJECT_ID = ''
BUCKET_URI = 'gs://' + BUCKET_NAME 
MODEL_NAME = 'model_aiplatform_kubeflow' 
MODEL_VERSION = MODEL_NAME + '_v1'
PIPELINE_NAME = 'POC Pipeline'
PIPELINE_DESCRIPTION = 'First POC'
DATA_GCS_PATH = BUCKET_URI + '/data.csv'
RUNTIME_VERSION = '2.1'
PYTHON_VERSION = '3.7'

In [None]:
# Already built images 
IMG_BQ = 'https://raw.githubusercontent.com/kubeflow/pipelines/01a23ae8672d3b18e88adf3036071496aca3552d/components/gcp/bigquery/query/component.yaml'
IMG_DEPLOY = 'https://raw.githubusercontent.com/kubeflow/pipelines/1.0.0/components/gcp/ml_engine/deploy/component.yaml'

# Images built and push in Registry by the env.sh script
IMG_PREPROCESS = 'gcr.io/ysance-datascience/poc-preprocess@sha256:7c4bd09d41b4c7957ba8c66d620938efc17fca1019e29b3c0793b380f0dae9cf'
IMG_TRAIN = 'gcr.io/ysance-datascience/poc-train@sha256:4b93c1bb10b846099b84a9dec5698f959613d0f901c0c9f644e639dc029325d9'
IMG_TEST = 'gcr.io/ysance-datascience/poc-test@sha256:eb59934ba45df9423f8b0713f013a4a53bec6f9fa42ce30f2b942a72d6952b8b'

### Download data

Define a query and a download function that uses the BigQuery component

In [None]:
QUERY = """
        SELECT passenger_count, trip_distance , fare_amount as label
        FROM `nyc-tlc.yellow.trips`
        WHERE trip_distance > 0 AND fare_amount > 0
        ORDER BY rand()
        LIMIT 1000
        """

In [None]:
bigquery_query_op = comp.load_component_from_url(IMG_BQ)

def download(project_id, data_gcs_path):

    return bigquery_query_op(
        query=QUERY,
        project_id=PROJECT_ID,
        output_gcs_path=DATA_GCS_PATH
    )

### Preprocess data

This step will use the image pushed in the google registry.

In [None]:
def preprocess_op(file_gcs_path, bucket_name=BUCKET_NAME):

    return dsl.ContainerOp(
        name='Preprocess data',
        image=IMG_PREPROCESS,
        arguments=[
            '--file_gcs_path', file_gcs_path,
            '--bucket_name', bucket_name
        ],
        file_outputs={
            'path_train': '/app/df_train.csv',
            'path_test': '/app/df_test.csv'
        }
    )

### Train model

This step will use the image pushed in the google registry 

In [None]:
def train_op(path_train, bucket_name=BUCKET_NAME):
    
    return dsl.ContainerOp(
        name='Train model',
        image=IMG_TRAIN,
        arguments=[
            '--path_train', path_train,
            '--bucket_name', bucket_name
        ],
        file_outputs={
            'path_model': '/app/model.pkl'

        }
    )

### Test data

This step will use the image pushed in the google registry

In [None]:
def test_op(path_test, path_model, bucket_name=BUCKET_NAME):

    return dsl.ContainerOp(
        name='Test model',
        image=IMG_TEST,
        arguments=[
            '--path_test', path_test,
            '--path_model', path_model,
            '--bucket_name', bucket_name
        ],
        file_outputs={
            'path_metrics': '/app/metrics.txt',
            'path_pred': '/app/df_pred.csv'
        }
    )

### Deploy model

This step will use an already built image by Google. 

In [None]:
mlengine_deploy_op = comp.load_component_from_url(IMG_DEPLOY)

def deploy(
    project_id,
    model_uri,
    model_id,
    model_version,
    runtime_version,
    python_version
):
    
    return mlengine_deploy_op(
        model_uri=model_uri,
        project_id=project_id, 
        model_id=model_id, 
        version_id=model_version, 
        runtime_version=runtime_version,
        python_version=python_version,
        replace_existing_version=True, 
        set_default=True)

### Define pipeline

In [None]:
@dsl.pipeline(
    name=PIPELINE_NAME,
    description=PIPELINE_DESCRIPTION
)

def pipeline(
    project_id,
    bucket_name,
    data_gcs_path,
    model_uri,
    model_id,
    model_version,
    runtime_version,
    python_version
):      
    download_task = download(project_id,
                             data_gcs_path)
    
    preprocess_task = preprocess_op(file_gcs_path=data_gcs_path,
                                    bucket_name=bucket_name).after(download_task)
    
    train_task = train_op(path_train=dsl.InputArgumentPath(preprocess_task.outputs['path_train']),
                         bucket_name=bucket_name).after(preprocess_task)
   
    test_task = test_op(path_test=dsl.InputArgumentPath(preprocess_task.outputs['path_test']),
                        path_model=dsl.InputArgumentPath(train_task.outputs['path_model']),
                        bucket_name=bucket_name).after(train_task)
    
    
    deploy_task = deploy(project_id=project_id,
                         model_uri=model_uri,
                         model_id=model_id,
                         model_version=model_version,
                         runtime_version=runtime_version,
                         python_version=python_version).after(train_task)
    
    return True


### Submit the pipeline for execution

This will : 
- create tar.gz file that contains the whole pipeline that you can share with anyone to reproduce the work
- then it will run the pipeline according to the differents parameters in input

In [None]:
exp = client.create_experiment(name='POC Experiment')
compiler.Compiler().compile(pipeline, 'poc_pipeline.tar.gz')

In [None]:
run = client.run_pipeline(exp.id, 
                          job_name='POC Loreal', 
                          pipeline_package_path='poc_pipeline.tar.gz',
                          params={
                              'project_id': PROJECT_ID,
                              'bucket_name': BUCKET_NAME,
                              'data_gcs_path': DATA_GCS_PATH,
                              'model_uri': BUCKET_URI + '/trained_model/' ,
                              'model_id': MODEL_NAME,
                              'model_version': MODEL_VERSION,
                              'runtime_version': RUNTIME_VERSION,
                              'python_version': PYTHON_VERSION
                         })

### Use the deployed model to predict (online prediction)

#### Predict from shell

Check README file.

#### Predict from Notebook

In [None]:
import googleapiclient.discovery


def predict_json(project, model, instances, version=None):
    """Send json data to a deployed model for prediction.

    Args:
        project (str): project where the Cloud ML Engine Model is deployed.
        model (str): model name.
        instances ([Mapping[str: Any]]): Keys should be the names of Tensors
            your deployed model expects as inputs. Values should be datatypes
            convertible to Tensors, or (potentially nested) lists of datatypes
            convertible to tensors.
        version: str, version of the model to target.
    Returns:
        Mapping[str: any]: dictionary of prediction results defined by the
            model.
    """
    # Create the ML Engine service object.
    # To authenticate set the environment variable
    # GOOGLE_APPLICATION_CREDENTIALS=<path_to_service_account_file>
    service = googleapiclient.discovery.build('ml', 'v1')
    name = 'projects/{}/models/{}'.format(project, model)

    if version is not None:
        name += '/versions/{}'.format(version)

    response = service.projects().predict(
        name=name,
        body={'instances': instances}
    ).execute()

    if 'error' in response:
        raise RuntimeError(response['error'])

    return response['predictions']

In [None]:
instances= [[1, 14], [3, 14], [1, 10]]
result = predict_json(PROJECT_ID, MODEL_NAME, instances, version=MODEL_VERSION)
result

### Clean models

You can do it from this notebook, from the terminal by running commands below OR from the user interface `AI Platform -> models`

In [None]:
#!gcloud ai-platform versions delete $MODEL_VERSION --model $MODEL_NAME --quiet
#!gcloud ai-platform models delete $MODEL_NAME --quiet