## Object Detection Pipeline Scheduled Workflow on UCS using Darknet & YOLO

This notebook focuses on implementing object detection scheduled workflow as a Kubeflow pipeline on Cisco UCS by using Darknet which is a open-source neural network framework, YOLO (You Only Look Once) which is a real-time object detection system.

## Clone Cisco Kubeflow starter pack repository

In [22]:
BRANCH_NAME="dev" #Provide git branch "master" or "dev"
! git clone -b $BRANCH_NAME https://github.com/CiscoAI/cisco-kubeflow-starter-pack.git

Cloning into 'cisco-kubeflow-starter-pack'...
remote: Enumerating objects: 612, done.[K
remote: Counting objects: 100% (612/612), done.[K
remote: Compressing objects: 100% (342/342), done.[K
remote: Total 6640 (delta 291), reused 439 (delta 139), pack-reused 6028[K
Receiving objects: 100% (6640/6640), 45.19 MiB | 48.11 MiB/s, done.
Resolving deltas: 100% (2672/2672), done.


## Install required packages

In [23]:
!pip install kfp==1.0.1 pillow==7.2.0 --user

You should consider upgrading via the '/usr/bin/python3 -m pip install --upgrade pip' command.[0m


## Restart kernel

In [None]:
from IPython.display import display_html
display_html("<script>Jupyter.notebook.kernel.restart()</script>",raw=True)

## Import libraries

In [1]:
import os
import json
import time
import yaml
import calendar
import requests
import logging
import numpy as np
from PIL import Image, ImageDraw
import tarfile, zipfile

#Kubeflow
import kfp
import kfp_server_api 
from kfp_server_api import models, api, ApiPipelineSpec, ApiTrigger, ApiPeriodicSchedule
from kfp.aws import use_aws_secret
import kfp.compiler as compiler

#Kubernetes
from kubernetes import client

#Tensorflow
import tensorflow as tf
from tensorflow.compat.v1 import ConfigProto
from tensorflow.compat.v1 import InteractiveSession

## Load pipeline components

Declare the paths of respective YAML configuration files of each of the pipeline components, in order to load each component into a variable for pipeline execution. 

In [2]:
path='cisco-kubeflow-starter-pack/apps/computer-vision/object-detection/onprem/pipeline/components/v2/'
component_root_dwn= path+'download/'
component_root_katib= path+'katib/'
component_root_train= path+'train/'
component_root_validate= path+'validate/'
component_root_convert_ncnn=path+'conversion_ncnn/'
component_root_cleanup=path+'cleanup/'

download_op = kfp.components.load_component_from_file(os.path.join(component_root_dwn, 'component.yaml'))
hptuning_op = kfp.components.load_component_from_file(os.path.join(component_root_katib, 'component.yaml'))
train_op = kfp.components.load_component_from_file(os.path.join(component_root_train, 'component.yaml'))
validate_op = kfp.components.load_component_from_file(os.path.join(component_root_validate, 'component.yaml'))
convert_ncnn_op=kfp.components.load_component_from_file(os.path.join(component_root_convert_ncnn, 'component.yaml'))
cleanup_op=kfp.components.load_component_from_file(os.path.join(component_root_cleanup, 'component.yaml'))

## Define volume claim & volume mount for storage during pipeline execution

Persistent volume claim & volume mount is created for the purpose of storing entities such as Dataset, model files, etc, and to share the stored resources between the various components of the pipeline during it's execution. 

In [3]:
nfs_pvc = client.V1PersistentVolumeClaimVolumeSource(claim_name='nfs')
nfs_volume = client.V1Volume(name='nfs', persistent_volume_claim=nfs_pvc)
nfs_volume_mount = client.V1VolumeMount(mount_path='/mnt/', name='nfs')

## Define pipeline function

In [4]:
gpus=1 # Number of GPUs to run training

def object_detection_pipeline(
    nfs_path='/mnt/object_detection',
    s3_path="s3://object-det-test/001",    # AWS S3 bucket URL. Ex: s3://<bucket-name>/ 
    namespace='kubeflow',               # Namespace on which trained model is to be deployed for prediction
    timestamp="123456",                 # Current timestamp
    cfg_data="voc.data",                # Config file containing file name specifications of train, test and validate datasets
    cfg_file="yolov3-voc.cfg",          # Config file containing hyperparameters declarations Ex: yolov3.cfg / yolov4.cfg
    weights="yolov3-voc_50000.weights", # Weights which are already pre-trained upto 50000 iterations is used. Therefore,  
                                        # training happens from 50000 iterations upto a limit of max_batches (say 50200) specified 
                                        # in cfg_file. 
    trials=1,                           # Total number of trials under Katib experiment
    gpus_per_trial=1,                   # Maximum GPUS to be used for each trial
    classes_file="voc.names"            # File containing the names of object classes (such as person, bus, car,etc)
):
#     # Download component
    dwn_task = download_op(s3_path=s3_path,timestamp=timestamp,
                           cfg_data=cfg_data
                          ).apply(use_aws_secret(secret_name='aws-secret', aws_access_key_id_name='AWS_ACCESS_KEY_ID', aws_secret_access_key_name='AWS_SECRET_ACCESS_KEY'))
    dwn_task.add_volume(nfs_volume)
    dwn_task.add_volume_mount(nfs_volume_mount) 
    
#     # HP tuning (Katib) component
    hptuning_task = hptuning_op(cfg_data=cfg_data,             # Config file containing file name specifications of train, test and validate datasets
                                cfg_file=cfg_file,             # Config file containing hyperparameters declarations Ex: yolov3.cfg / yolov4.cfg
                                weights=weights,               # Pre-trained weights for VOC dataset
                                trials=trials,                 # total number of trials under Katib experiment
                                timestamp=timestamp,           # Current timestamp to create unique experiment 
                                                               # Ex: object-detection-1599547688-random-588d7877f5-zvlx5
                                gpus_per_trial=gpus_per_trial, # Maximum GPUS to be used for each trial 
                                )
    hptuning_task.add_volume(nfs_volume)
    hptuning_task.add_volume_mount(nfs_volume_mount)
    hptuning_task.after(dwn_task)
    
    # Train component
    train_task = train_op(cfg_data=cfg_data,          # Config file containing file name specifications of train, test and validate datasets
                          cfg_file=cfg_file,          # Config file containing hyperparameters declarations Ex: yolov3.cfg / yolov4.cfg
                          weights=weights,            # Pre-trained weights for VOC dataset
                          gpus=gpus,             
                          timestamp=timestamp
                         )
    train_task.add_volume(nfs_volume)
    train_task.add_volume_mount(nfs_volume_mount).set_gpu_limit(gpus)  #Maximum GPUs to be used for training
    train_task.after(hptuning_task)
    
    # Validation component
    validate_task = validate_op(nfs_path=nfs_path,
                                s3_path=s3_path,
                                cfg_data=cfg_data,          # Config file containing file name specifications of train, test and validate datasets
                                cfg_file=cfg_file,          # Config file containing hyperparameters declarations Ex: yolov3.cfg / yolov4.cfg
                                weights=weights,            # Pre-trained weights for VOC dataset
                                timestamp=timestamp
                                ).apply(use_aws_secret(secret_name='aws-secret', aws_access_key_id_name='AWS_ACCESS_KEY_ID', aws_secret_access_key_name='AWS_SECRET_ACCESS_KEY'))
    validate_task.add_volume(nfs_volume)
    validate_task.add_volume_mount(nfs_volume_mount)
    validate_task.after(train_task)
    
    # Darknet to ncnn conversion component
    conversion_ncnn_task=convert_ncnn_op(push_to_s3="true",  # Flag to decide whether to upload the trained weights and converted
                                                    # model to S3 bucket for future inferencing on anyother environment or
                                                    # proceeding to serve on UCS (Input: true/false)
                               s3_path=s3_path,
                               timestamp=timestamp,
                               ).apply(use_aws_secret(secret_name='aws-secret', aws_access_key_id_name='AWS_ACCESS_KEY_ID', aws_secret_access_key_name='AWS_SECRET_ACCESS_KEY'))
    conversion_ncnn_task.add_volume(nfs_volume)
    conversion_ncnn_task.add_volume_mount(nfs_volume_mount)
    conversion_ncnn_task.after(validate_task)
    
     # Clean up component
    cleanup_task = cleanup_op(nfs_path=nfs_path,
                                timestamp=timestamp
                                ).apply(use_aws_secret(secret_name='aws-secret', aws_access_key_id_name='AWS_ACCESS_KEY_ID', aws_secret_access_key_name='AWS_SECRET_ACCESS_KEY'))
    cleanup_task.add_volume(nfs_volume)
    cleanup_task.add_volume_mount(nfs_volume_mount)
    cleanup_task.after(conversion_ncnn_task)  

## Compile pipeline function

Compile the pipeline function to create a tar ball for the pipeline.

In [5]:
# Compile pipeline
try:
    compiler.Compiler().compile(object_detection_pipeline, 'object-detection-schedule.tar.gz')
except RuntimeError as err:
    logging.debug(err)
    logging.info("Argo workflow failed validation check but it can still be used to run experiments.")



## Definition to extract YAML from object-detection-schedule.tar.gz

In [6]:
def _extract_pipeline_yaml(package_file):
    def _choose_pipeline_yaml_file(file_list) -> str:
      yaml_files = [file for file in file_list if file.endswith('.yaml')]
      if len(yaml_files) == 0:
        raise ValueError('Invalid package. Missing pipeline yaml file in the package.')

      if 'pipeline.yaml' in yaml_files:
        return 'pipeline.yaml'
      else:
        if len(yaml_files) == 1:
          return yaml_files[0]
        raise ValueError('Invalid package. There is no pipeline.yaml file and there are multiple yaml files.')

    if package_file.endswith('.tar.gz') or package_file.endswith('.tgz'):
      with tarfile.open(package_file, "r:gz") as tar:
        file_names = [member.name for member in tar if member.isfile()]
        pipeline_yaml_file = _choose_pipeline_yaml_file(file_names)
        with tar.extractfile(tar.getmember(pipeline_yaml_file)) as f:
          return yaml.safe_load(f)
    elif package_file.endswith('.zip'):
      with zipfile.ZipFile(package_file, 'r') as zip:
        pipeline_yaml_file = _choose_pipeline_yaml_file(zip.namelist())
        with zip.open(pipeline_yaml_file) as f:
          return yaml.safe_load(f)
    elif package_file.endswith('.yaml') or package_file.endswith('.yml'):
      with open(package_file, 'r') as f:
        return yaml.safe_load(f)
    else:
      raise ValueError('The package_file '+ package_file + ' should ends with one of the following formats: [.tar.gz, .tgz, .zip, .yaml, .yml]')

## Extract YAML from tar.gz file

In [7]:
pipeline_package_path="object-detection-schedule.tar.gz"
pipeline_obj = _extract_pipeline_yaml(pipeline_package_path)
pipeline_json_string = json.dumps(pipeline_obj)

In [8]:
kfp_client = kfp.Client()

## Define periodic schedule of run

In [13]:
job = kfp_server_api.models.api_job.ApiJob(
            name = "object-detection-periodic-schedule",
            max_concurrency ="1",
            enabled = True,
            pipeline_spec = kfp_server_api.models.ApiPipelineSpec(
                workflow_manifest = pipeline_json_string,
            ),
            trigger = kfp_server_api.models.ApiTrigger(
                periodic_schedule = kfp_server_api.models.ApiPeriodicSchedule(
                    interval_second = "3600"
            ),
        ))

## Create a scheduled job

In [17]:
op_job = kfp_client.jobs.create_job(body=job)

In [12]:
op_job.id

'82602d5d-6e3d-4521-805e-43474221f4b2'

## Cleanup scheduled job

In [19]:
kfp_client.jobs.delete_job(id=op_job.id)
print('Scheduled job with job ID %s has been deleted' %op_job.id)

Scheduled job with job ID 76adfcd7-fa50-4206-9aae-1a21887b4546 has been deleted
