### Install MLRun

In [None]:
# !pip install mlrun

### Setup MLRun Project

In [None]:
import os
from os import path, getenv
import nuclio
from mlrun import new_project, code_to_function, run_local, NewTask, mlconf, import_function, mount_v3io, new_function

project_name = 'horovod-pipeline-template'
project_path = path.abspath('project')
project = new_project(name=project_name, context=project_path)
artifact_path = path.abspath('pipeline')
mlconf.dbpath = mlconf.dbpath or 'http://mlrun-api:8080'

print(f'Project name: {project_name}\nProject path: {project_path}')
print(f'Artifacts path: {artifact_path}\nMLRun DB path: {mlconf.dbpath}')

### Import Functions

In [None]:
# Syntax for normal MLRun job
project.set_function(func='utils.py',
                     name='utils',
                     kind='job',
                     image='mlrun/mlrun')

# Slightly different syntax for MPIJob
HOROVOD_FILE = os.path.join(project_path, 'training.py')
project.set_function(new_function(name='trainer',
                                  kind='mpijob',
                                  command=HOROVOD_FILE))

# Import function from MLRun function marketplace
project.set_function('hub://tf2_serving', 'serving')

### Pipeline

In [None]:
%%writefile {path.join(project_path, 'workflow.py')}

from kfp import dsl
from mlrun import mount_v3io
import yaml

use_gpu = False

funcs = {}

# Configure function resources
def init_functions(functions: dict, project=None, secrets=None):
    # Mount V3IO data layer to pipeline components
    for f in functions.values():
        f.apply(mount_v3io())
       
    # Configuration for training function
    image = lambda gpu: 'mlrun/ml-models-gpu' if gpu else 'mlrun/ml-models' 
    functions['trainer'].spec.image = image(use_gpu)
    functions['trainer'].with_requests(cpu=1, mem="3G")
    functions['trainer'].with_limits(cpu=2, mem="5G")
    functions['trainer'].spec.replicas = 1
    if use_gpu:
        functions['trainer'].gpus(1)
    
    # Configuration for serving function
    functions['serving'].set_env('MODEL_CLASS', 'TFModel')
    functions['serving'].set_env('IMAGE_HEIGHT', "128")
    functions['serving'].set_env('IMAGE_WIDTH', "128")
    functions['serving'].set_env('ENABLE_EXPLAINER', "False")
    functions["serving"].spec.base_spec['spec']['loggerSinks'] = [{'level': 'info'}]
    functions['serving'].spec.min_replicas = 1
    functions['serving'].spec.max_replicas = 1

# Create a Kubeflow Pipelines pipeline
@dsl.pipeline(
    name='Image classification demo',
    description='Train an Image Classification TF Algorithm using MLRun'
)
def kfpipeline(target_path='images',
               archive_url='http://iguazio-sample-data.s3.amazonaws.com/catsndogs.zip',
               checkpoints_dir='models/checkpoints',
               model_name='cat_vs_dog_tfv2',
               epochs=1,
               batch_size=256):

    # step 1: download images
    open_archive = funcs['utils'].as_step(name='download',
                                          handler='open_archive',
                                          params={'target_path': target_path},
                                          inputs={'archive_url': archive_url},
                                          outputs=['content'])

    # step 2: label images
    source_dir = str(open_archive.outputs['content']) + '/cats_n_dogs'
    label = funcs['utils'].as_step(name='label',
                                   handler='categories_map_builder',
                                   params={'source_dir': source_dir},
                                   outputs=['categories_map',
                                            'file_categories'])

    # step 3: train the model
    train = funcs['trainer'].as_step(name='train',
                                     params={'epochs': epochs,
                                             'checkpoints_dir': checkpoints_dir,
                                             'data_path'      : source_dir,
                                             'model_dir'     : 'tfmodels',
                                             'batch_size'     : batch_size},
                                     inputs={
                                         'categories_map': label.outputs['categories_map'],
                                         'file_categories': label.outputs['file_categories']},
                                     outputs=['model'])

    # deploy the model using nuclio functions
    deploy = funcs['serving'].deploy_step(models={model_name: train.outputs['model']})


### Save Pipeline

In [None]:
project.set_workflow('main', 'workflow.py')
project.save()

In [None]:
run_id = project.run(
    'main',
    arguments={}, 
    artifact_path=path.abspath(path.join('pipeline','{{workflow.uid}}')), 
    dirty=True)

In [None]:
from mlrun import wait_for_pipeline_completion
wait_for_pipeline_completion(run_id, timeout=3600);

## Test the serving function

After the function has been deployed we can test it as a regular REST Endpoint using `requests`.

In [None]:
import requests
from PIL import Image
from io import BytesIO
import matplotlib.pyplot as plt

### Define test params

In [None]:
# Testing event
cat_image_url = 'https://s3.amazonaws.com/iguazio-sample-data/images/catanddog/cat.102.jpg'
response = requests.get(cat_image_url)
cat_image = response.content
img = Image.open(BytesIO(cat_image))

print('Test image:')
plt.imshow(img)

### Test The Serving Function (with Image URL)

In [None]:
addr = 'http://nuclio-{}-{}:8080'.format(project.name, project.func('serving').metadata.name)

headers = {'Content-type': 'image/jpeg'}
url = addr + f"/cat_vs_dog_tfv2/predict"

response = requests.post(url=url, 
                         data=json.dumps({'data_url': cat_image_url}), 
                         headers=headers)
print(response.content.decode('utf-8'))

In [None]:
%%timeit 
requests.post(url=url, 
              data=json.dumps({'data_url': cat_image_url}), 
              headers=headers)

### Test The Serving Function (with Jpeg Image)

In [None]:
headers = {'Content-type': 'image/jpeg'}
response = requests.post(url=url, 
                         data=cat_image, 
                         headers=headers)
print(response.content.decode('utf-8'))

In [None]:
%%timeit
requests.post(url=url, 
              data=cat_image, 
              headers=headers)