# Load packages

In [2]:
! pip install git+https://github.com/kubeflow/pipelines.git#subdirectory=sdk/python

Collecting git+https://github.com/kubeflow/pipelines.git#subdirectory=sdk/python
  Cloning https://github.com/kubeflow/pipelines.git to /tmp/pip-req-build-5ylhofi9
  Running command git clone -q https://github.com/kubeflow/pipelines.git /tmp/pip-req-build-5ylhofi9
Collecting absl-py<=0.11,>=0.9
  Using cached absl_py-0.11.0-py3-none-any.whl (127 kB)
Collecting google-cloud-storage<2,>=1.13.0
  Using cached google_cloud_storage-1.38.0-py2.py3-none-any.whl (103 kB)
Collecting kubernetes<13,>=8.0.0
  Using cached kubernetes-12.0.1-py2.py3-none-any.whl (1.7 MB)
Collecting requests-toolbelt<1,>=0.8.0
  Using cached requests_toolbelt-0.9.1-py2.py3-none-any.whl (54 kB)
Collecting kfp-server-api<2.0.0,>=1.1.2
  Using cached kfp_server_api-1.5.0-py3-none-any.whl
Collecting tabulate<1,>=0.8.6
  Using cached tabulate-0.8.9-py3-none-any.whl (25 kB)
Collecting Deprecated<2,>=1.2.7
  Using cached Deprecated-1.2.12-py2.py3-none-any.whl (9.5 kB)
Collecting strip-hints<1,>=0.1.8
  Using cached strip_hi

In [3]:
import kfp
import json
import os
from kfp.onprem import use_k8s_secret
from kfp import components
from kfp.components import load_component_from_file, load_component_from_url
from kfp import dsl
from kfp import compiler

kfp.__version__

'1.6.0-rc.0'

# Enter your gateway loadbalancer and the token from the cookie
[Use this extension on chrome to get token]( https://chrome.google.com/webstore/detail/editthiscookie/fngmhnnpilhplaeedifhccceomclgfbg?hl=en)

![image.png](./image.png)

In [7]:
#Update values for the load balancer and auth session
ingress_gateway='http://istio-ingressgateway.istio-system.svc.cluster.local'
auth="authservice_session=MTYyMDk5NjIyOHxOd3dBTkZKSFJVVkVTVEpGTms1SVN6VkZXRWxGU2xsUlZFSldTa28yTTFoYVQwdFhUalpUTmtoQ1RFOUpRVWhHVTBaTVIwNVZUMEU9fOCpJ6U7TQ0CDRJ2h6lSR4L7WqtTzywd-0Y_XhpnI9eu"
namespace="kubeflow-user-example-com"

In [8]:
client = kfp.Client(host=ingress_gateway+"/pipeline", cookies=auth)

In [21]:
client.create_experiment('Default')
experiments = client.list_experiments(namespace=namespace)
my_experiment = experiments.experiments[0]
my_experiment

{'created_at': datetime.datetime(2021, 5, 14, 13, 36, 40, tzinfo=tzlocal()),
 'description': None,
 'id': '3ebf8f54-1a72-4c1e-8f41-878acdbba58d',
 'name': 'Default',
 'resource_references': [{'key': {'id': 'kubeflow-user-example-com',
                                  'type': 'NAMESPACE'},
                          'name': None,
                          'relationship': 'OWNER'}],
 'storage_state': 'STORAGESTATE_AVAILABLE'}

### Using dsl for model archiver and torchserve integration
#### Init container is used for mar file generation

In [22]:
deploy="torchserve"
model="cifar10"
isvc_name=deploy+"."+namespace+"."+"example.com"
input_req="https://kubeflow-dataset.s3.us-east-2.amazonaws.com/cifar10_input/input.json"

In [23]:
prepare_tensorboard_op = load_component_from_file("./examples/cifar10/yaml/tensorboard/component.yaml")
prep_op = components.load_component_from_file(
    "./examples/cifar10/yaml/pre_process/component.yaml"
)
train_op = components.load_component_from_file(
    "./examples/cifar10/yaml/train/component.yaml"
)
deploy_op = load_component_from_file("./examples/cifar10/yaml/deploy/component.yaml")
pred_op = load_component_from_file("./examples/cifar10/yaml/prediction/component.yaml")

In [24]:
minio_op = components.load_component_from_file(
    "./examples/cifar10/yaml/minio/component.yaml"
)

In [28]:
@dsl.pipeline(name="Training pipeline", description="Sample training job test")
def pytorch_cifar10(minio_endpoint='http://minio-service.kubeflow:9000',
    log_bucket='mlpipeline',
    log_dir=f'tensorboard/logs/{dsl.RUN_ID_PLACEHOLDER}',
    mar_path=f'mar/{dsl.RUN_ID_PLACEHOLDER}/model-store',
    config_prop_path=f'mar/{dsl.RUN_ID_PLACEHOLDER}/config',
    model_uri = f's3://mlpipeline/mar/{dsl.RUN_ID_PLACEHOLDER}',
    tf_image='jagadeeshj/tb_plugin:v1.8'):
    
    prepare_tb_task = prepare_tensorboard_op(
        log_dir_uri=f's3://{log_bucket}/{log_dir}',
        image=tf_image,
        pod_template_spec=json.dumps({
            'spec': {
                'containers': [{
                    'env': [{
                        'name': 'AWS_ACCESS_KEY_ID',
                        'valueFrom': {
                            'secretKeyRef': {
                                'name': 'mlpipeline-minio-artifact',
                                'key': 'accesskey'
                            }
                        }
                    }, {
                        'name': 'AWS_SECRET_ACCESS_KEY',
                        'valueFrom': {
                            'secretKeyRef': {
                                'name': 'mlpipeline-minio-artifact',
                                'key': 'secretkey'
                            }
                        }
                    }, {
                        'name': 'AWS_REGION',
                        'value': 'minio'
                    }, {
                        'name': 'S3_ENDPOINT',
                        'value': f'{minio_endpoint}',
                    }, {
                        'name': 'S3_USE_HTTPS',
                        'value': '0',
                    }, {
                        'name': 'S3_VERIFY_SSL',
                        'value': '0',
                    }]
                }],
            },
        })
    ).set_display_name("Visualization")
    
    prep_task=prep_op().after(prepare_tb_task).set_display_name("Preprocess & Transform")
    train_task=train_op(input_data=prep_task.outputs['output_data'], profiler="pytorch").after(prep_task).set_display_name("Training")
    minio_tb_upload = (minio_op( bucket_name="mlpipeline", folder_name=log_dir, input_path=train_task.outputs["tensorboard_root"], filename="",) .apply( use_k8s_secret( secret_name="mlpipeline-minio-artifact", k8s_secret_key_to_env={ "secretkey": "MINIO_SECRET_KEY", "accesskey": "MINIO_ACCESS_KEY", },)) .after(train_task) .set_display_name("Tensorboard Events Pusher")) 
    minio_mar_upload = ( minio_op( bucket_name="mlpipeline", folder_name=mar_path, input_path=train_task.outputs["checkpoint_dir"], filename="cifar10_test.mar",) .apply( use_k8s_secret( secret_name="mlpipeline-minio-artifact", k8s_secret_key_to_env={ "secretkey": "MINIO_SECRET_KEY", "accesskey": "MINIO_ACCESS_KEY", },)) .after(train_task) .set_display_name("Mar Pusher")) 
    minio_config_upload = ( minio_op( bucket_name="mlpipeline", folder_name=config_prop_path, input_path=train_task.outputs["checkpoint_dir"], filename="config.properties",) .apply( use_k8s_secret( secret_name="mlpipeline-minio-artifact", k8s_secret_key_to_env={ "secretkey": "MINIO_SECRET_KEY", "accesskey": "MINIO_ACCESS_KEY", },)) .after(train_task) .set_display_name("Conifg Pusher"))
    
    
    model_uri= str(model_uri)
    isvc_yaml = '''
    apiVersion: "serving.kubeflow.org/v1beta1"
    kind: "InferenceService"
    metadata:
      name: {}
      namespace: {}
    spec:
      predictor:
        serviceAccountName: sa
        pytorch:
          storageUri: {}
          resources:
            limits:
              memory: 4Gi   
    '''.format(deploy, namespace, model_uri)
    deploy_task = deploy_op(
        action='apply',
        inferenceservice_yaml=isvc_yaml
    ).after(minio_mar_upload).set_display_name("Deployer")
    pred_task = (
        pred_op(
            host_name=isvc_name,
            input_request=input_req,
            cookie=auth,
            url=ingress_gateway,
            model=model,
            inference_type="predict",
        )
        .after(deploy_task)
        .set_display_name("Prediction")
    )
    explain_task = (
        pred_op(
            host_name=isvc_name,
            input_request=input_req,
            cookie=auth,
            url=ingress_gateway,
            model=model,
            inference_type="explain",
        )
        .after(pred_task)
        .set_display_name("Explanation")
    )    

In [29]:
# Compile pipeline
compiler.Compiler().compile(pytorch_cifar10, 'pytorch.tar.gz', type_check=True)

In [30]:
# Execute pipeline
run = client.run_pipeline(my_experiment.id, 'pytorch-cifar10', 'pytorch.tar.gz')