# Mlperf DataEngineer (Inference Stage)

****Inference Scenarios****

| scenarios | reference app | framework | model/dataset |
| ---- | ---- | ---- | ---- |
| batch-inference-single | [scenarios/job-single](https://github.com/peiniliu/inference/tree/dev/vision/classification_and_detection/scenarios/job-single) | tensorflow | resnet/imagenet2012 |
| batch-inference-workflow | [scenarios/job-pipeline](https://github.com/peiniliu/inference/tree/dev/vision/classification_and_detection/scenarios/job-pipeline) | tensorflow | resnet/imagenet2012 |
| online-inference-single | [scenarios/service-single](https://github.com/peiniliu/inference/tree/dev/vision/classification_and_detection/scenarios/service-single) | tensorflow | resnet/imagenet2012 |
| online-inference-graph | [scenarios/service-graph](https://github.com/peiniliu/inference/tree/dev/vision/classification_and_detection/scenarios/service-graph) | tensorflow | resnet/imagenet2012 |

In [1]:
import sys
import os
sys.path.insert(0,'../..')

import scanflow
from scanflow.client import ScanflowClient
from scanflow.client import ScanflowTrackerClient
from scanflow.client import ScanflowDeployerClient

In [2]:
from scanflow.tools import env
print(env.get_env("SCANFLOW_SERVER_URI"))
print(env.get_env("SCANFLOW_TRACKER_URI"))
#print(env.get_env("SCANFLOW_TRACKER_LOCAL_URI"))
print(env.get_env("MLFLOW_S3_ENDPOINT_URL"))
print(env.get_env("AWS_ACCESS_KEY_ID"))
print(env.get_env("AWS_SECRET_ACCESS_KEY"))

http://172.30.0.50:46666
http://172.30.0.50:46667
http://172.30.0.50:43447
admin
admin123


In [3]:
# App folder
scanflow_path = "/gpfs/bsc_home/xpliu/pv/jupyterhubpeini/scanflow"
app_dir = os.path.join(scanflow_path, "examples/mlperf/dataengineer")
app_name = "mlperf"
team_name = "dataengineer"

# scanflow client
client = ScanflowClient(
              #if you defined "SCANFLOW_SERVER_URI", you dont need to provide this
              #scanflow_server_uri="http://172.30.0.50:46666",
              verbose=True)

### Scenario 1: Batch-inference-single

In [4]:
#predictor
executor1 = client.ScanflowExecutor(name='predictor-batch', 
                      mainfile='/tmp/inference/vision/classification_and_detection/python/main.py',
                      parameters={'model_name': 'mlperf-resnet',
                                  #mlperf params
                                  'dataset': 'imagenet_tflocal_preprocess',
                                  'dataset-path': '/workflow/data_imagenet',
                                  'scenario': 'Offline', # four scenarios
                                  'model': '/workflow/model/0',
                                  'model_name': 'resnet50',
                                  'inputs': 'input_image',
                                  'outputs': 'predictions/Softmax:0',
                                  'backend': 'tflocal',
                                  'mlperf_conf': '/tmp/inference/mlperf.conf',
                                  'user_conf': '/tmp/inference/vision/classification_and_detection/user.conf',
                                  'device': 'cpu',
                                  'cache_dir': '/workflow/preprocessed'
                                  'output': '/workflow/output',
                                  },
                      base_image='mlperf')


##workflow1 batch-inference-single
##-- predictor-batch
workflow1 = client.ScanflowWorkflow(name='batch-inference-single', 
                     nodes=[executor1],
                     output_dir = "/workflow")
              

### Scenario 2: Batch-inference-graph

In [4]:
#predictor
executor1 = client.ScanflowExecutor(name='download-model',
                                    mainfile='download.py',
                                    parameters={'model_name': 'mlperf-resnet',
                                                'model_uri': ''},
                                   )

executor2 = client.ScanflowExecutor(name='preprocessing-batch', 
                      mainfile='/tmp/inference/vision/classification_and_detection/python/preprocessing.py',
                      parameters={'app_name': app_name,
                                  'team_name': 'data'},
                      base_image='mlperf')

executor3 = client.ScanflowExecutor(name='predictor-batch', 
                      mainfile='predictor.py',
                      parameters={'model_name': 'mlperf-resnet',
                                  #mlperf params
                                  'dataset': 'imagenet_tflocal_preprocess',
                                  'dataset-path': '/workflow/data_imagenet',
                                  'scenario': 'Offline', # four scenarios
                                  'model': '/workflow/model/0',
                                  'model-name': 'resnet50',
                                  'inputs': 'input_image',
                                  'outputs': 'predictions/Softmax:0',
                                  'backend': 'tflocal',
                                  'mlperf_conf': '/tmp/inference/mlperf.conf',
                                  'user_conf': '/tmp/inference/vision/classification_and_detection/user.conf',
                                  'device': 'cpu',
                                  'cache_dir': '/workflow/preprocessed'
                                  'output': '/workflow/output', },
                      base_image='mlperf')

executor4 = client.ScanflowExecutor(name='postprocessing-batch', 
                      mainfile='postprocessing.py',
                      parameters={'app_name': app_name,
                                  'team_name': 'data'},
                      requirements='requirements.txt')

dependency1 = client.ScanflowDependency(dependee='download-model',
                                    depender='preprocessing-batch')
dependency2 = client.ScanflowDependency(dependee='preprocessing-batch',
                                    depender='predictor-batch')
dependency3 = client.ScanflowDependency(dependee='predictor-batch',
                                    depender='postprocessing-batch')

##workflow1 batch-inference-graph
workflow1 = client.ScanflowWorkflow(name='batch-inference-graph', 
                     nodes=[executor1, executor2, executor3, executor4],
                     edges=[dependency1, dependency2, dependency3],
                     output_dir = "/workflow")
              

### Scenario 3: Online-inference-single

In [5]:
#predictor online
service = client.ScanflowService(name='predictor-online',
                                 implementation_type = 'TENSORFLOW_SERVER',
                                 modelUri = 's3://scanflow/3/e90820f4af0b4f7a8a3264b4c49689ce/artifacts/mlperf-resnet/model',
                                 envSecretRefName = 'scanflow-secret',
                                 endpoint = {'type': 'GRPC'},
                                 parameters=[{'name': 'model_name', 'type':'STRING', 'value':'predictor-online'},
                                             {'name': 'model_input', 'type':'STRING', 'value':'input_image'},
                                             {'name': 'model_output', 'type':'STRING', 'value':'predictions/Softmax:0'}]
                                )

##workflow3 online-inference-single
##       -- predictor-online
workflow3 = client.ScanflowWorkflow(name='online-inference-single', 
                     nodes=[service],
                     output_dir = "/workflow")

### Scenario 4: Online-inference-graph

In [None]:
#predictor online
service1 = client.ScanflowService(name='preprocessing-online',
                                  )

service2 = client.ScanflowService(name='predictor-online',
                                 implementation_type = 'TENSORFLOW_SERVER',
                                 modelUri = 's3://scanflow/3/e90820f4af0b4f7a8a3264b4c49689ce/artifacts/mlperf-resnet/model',
                                 envSecretRefName = 'scanflow-secret',
                                 endpoint = {'type': 'GRPC'},
                                 parameters=[{'name': 'model_name', 'type':'STRING', 'value':'predictor-online'},
                                             {'name': 'model_input', 'type':'STRING', 'value':'input_image'},
                                             {'name': 'model_output', 'type':'STRING', 'value':'predictions/Softmax:0'}]
                                )

##workflow4 online-inference-graph
workflow4 = client.ScanflowWorkflow(name='online-inference-graph', 
                     nodes=[service],
                     output_dir = "/workflow")

In [11]:
app = client.ScanflowApplication(app_name = app_name,
                                 app_dir = app_dir,
                                 team_name = team_name,
                                 workflows=[workflow1, workflow2, workflow3, workflow4])

In [12]:
dic = app.to_dict()

14-Jun-21 23:04:26 -  INFO - Scanflowagent-tracker: {'name': 'tracker', 'template': 'monitor', 'sensors': [{'name': 'count_number_of_predictions', 'isCustom': True, 'func_name': 'count_number_of_predictions', 'trigger': {'weeks': 0, 'days': 0, 'hours': 1, 'minutes': 0, 'seconds': 0, 'start_date': None, 'end_date': None, 'timezone': None, 'jitter': None}, 'args': None, 'kwargs': None, 'next_run_time': None}], 'dockerfile': None, 'image': None}
14-Jun-21 23:04:26 -  INFO - Scanflowagent-checker: {'name': 'checker', 'template': 'analyzer', 'sensors': [{'name': 'count_number_of_newdata', 'isCustom': True, 'func_name': 'count_number_of_newdata', 'trigger': {'weeks': 0, 'days': 0, 'hours': 1, 'minutes': 0, 'seconds': 0, 'start_date': None, 'end_date': None, 'timezone': None, 'jitter': None}, 'args': None, 'kwargs': None, 'next_run_time': None}], 'dockerfile': None, 'image': None}
14-Jun-21 23:04:26 -  INFO - Scanflowagent-planner: {'name': 'planner', 'template': 'planner', 'sensors': [{'name

In [13]:
build_app = client.build_ScanflowApplication(app = app, trackerPort=46669)

14-Jun-21 23:04:27 -  INFO - Building image 172.30.0.49:5000/tracker-agent
14-Jun-21 23:04:27 -  INFO - Building image 172.30.0.49:5000/checker-agent
14-Jun-21 23:04:27 -  INFO - Building image 172.30.0.49:5000/planner-agent
14-Jun-21 23:04:27 -  INFO - Building image 172.30.0.49:5000/executor-agent
14-Jun-21 23:04:27 -  INFO - Building image 172.30.0.49:5000/load-data
14-Jun-21 23:04:27 -  INFO - Building image 172.30.0.49:5000/predictor-batch
14-Jun-21 23:04:27 -  INFO - Building image 172.30.0.49:5000/predictor-online
14-Jun-21 23:04:27 -  INFO - [+] Image [172.30.0.49:5000/predictor-online] not found in repository. Building a new one.
14-Jun-21 23:04:27 -  INFO - [+] Dockerfile: [Dockerfile_scanflow_service] was not created.
14-Jun-21 23:04:27 -  INFO - dockerfile for using None from /gpfs/bsc_home/xpliu/pv/jupyterhubpeini/scanflow/examples/mnist/dataengineer/workflows
14-Jun-21 23:04:27 -  INFO - Building image 172.30.0.49:5000/load-data
14-Jun-21 23:04:27 -  INFO - Building image

In [14]:
build_app.to_dict()

14-Jun-21 23:04:28 -  INFO - Scanflowagent-tracker: {'name': 'tracker', 'template': 'monitor', 'sensors': [{'name': 'count_number_of_predictions', 'isCustom': True, 'func_name': 'count_number_of_predictions', 'trigger': {'weeks': 0, 'days': 0, 'hours': 1, 'minutes': 0, 'seconds': 0, 'start_date': None, 'end_date': None, 'timezone': None, 'jitter': None}, 'args': None, 'kwargs': None, 'next_run_time': None}], 'dockerfile': None, 'image': '172.30.0.49:5000/tracker-agent:latest'}
14-Jun-21 23:04:28 -  INFO - Scanflowagent-checker: {'name': 'checker', 'template': 'analyzer', 'sensors': [{'name': 'count_number_of_newdata', 'isCustom': True, 'func_name': 'count_number_of_newdata', 'trigger': {'weeks': 0, 'days': 0, 'hours': 1, 'minutes': 0, 'seconds': 0, 'start_date': None, 'end_date': None, 'timezone': None, 'jitter': None}, 'args': None, 'kwargs': None, 'next_run_time': None}], 'dockerfile': None, 'image': '172.30.0.49:5000/checker-agent:latest'}
14-Jun-21 23:04:28 -  INFO - Scanflowagent-

{'app_name': 'mnist',
 'app_dir': '/gpfs/bsc_home/xpliu/pv/jupyterhubpeini/scanflow/examples/mnist/dataengineer',
 'team_name': 'dataengineer',
 'workflows': [{'name': 'batch-inference',
   'nodes': [{'name': 'load-data',
     'node_type': 'executor',
     'mainfile': 'loaddata.py',
     'parameters': {'app_name': 'mnist', 'team_name': 'data'},
     'requirements': None,
     'dockerfile': None,
     'base_image': None,
     'env': None,
     'image': '172.30.0.49:5000/load-data:latest',
     'resources': None},
    {'name': 'predictor-batch',
     'node_type': 'executor',
     'mainfile': 'predictor.py',
     'parameters': {'model_name': 'mnist_cnn',
      'input_data': '/workflow/load-data/mnist/data/mnist_sample/test_images.npy'},
     'requirements': None,
     'dockerfile': None,
     'base_image': 'modeling-cnn1',
     'env': None,
     'image': '172.30.0.49:5000/predictor-batch:latest',
     'resources': None}],
   'edges': [{'depender': 'predictor-batch',
     'dependee': 'load

In [15]:
deployerClient = ScanflowDeployerClient(user_type="incluster",
                                        deployer="seldon",
                                        k8s_config_file="/gpfs/bsc_home/xpliu/.kube/config")

In [16]:
await deployerClient.create_environment(app=build_app)

14-Jun-21 23:04:36 -  INFO - Scanflowagent-tracker: {'name': 'tracker', 'template': 'monitor', 'sensors': [{'name': 'count_number_of_predictions', 'isCustom': True, 'func_name': 'count_number_of_predictions', 'trigger': {'weeks': 0, 'days': 0, 'hours': 1, 'minutes': 0, 'seconds': 0, 'start_date': None, 'end_date': None, 'timezone': None, 'jitter': None}, 'args': None, 'kwargs': None, 'next_run_time': None}], 'dockerfile': None, 'image': '172.30.0.49:5000/tracker-agent:latest'}
14-Jun-21 23:04:36 -  INFO - Scanflowagent-checker: {'name': 'checker', 'template': 'analyzer', 'sensors': [{'name': 'count_number_of_newdata', 'isCustom': True, 'func_name': 'count_number_of_newdata', 'trigger': {'weeks': 0, 'days': 0, 'hours': 1, 'minutes': 0, 'seconds': 0, 'start_date': None, 'end_date': None, 'timezone': None, 'jitter': None}, 'args': None, 'kwargs': None, 'next_run_time': None}], 'dockerfile': None, 'image': '172.30.0.49:5000/checker-agent:latest'}
14-Jun-21 23:04:36 -  INFO - Scanflowagent-

False

In [17]:
trackerClient = ScanflowTrackerClient(scanflow_tracker_local_uri="http://172.30.0.50:46669",
                        verbose=True)

In [18]:
trackerClient.download_app_model(model_name="mlperf-resnet")

09-Jun-21 10:21:32 -  INFO - Found credentials in environment variables.
09-Jun-21 10:21:33 -  INFO - mnist--scanflow-model-datascience--{'training_dataset_len': 60000.0, 'accuracy': 0.9}--{}
09-Jun-21 10:21:34 -  INFO - mnist_cnn exists
2021/06/09 10:21:34 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: mnist_cnn, version 9


In [19]:
trackerClient.save_app_meta(build_app)

14-Jun-21 23:06:03 -  INFO - Connecting tracking server uri: http://172.30.0.50:46667
14-Jun-21 23:06:03 -  INFO - save app to artifact uri: s3://scanflow/1/09dbc72504584286802a914e2f70ac36/artifacts
14-Jun-21 23:06:03 -  INFO - Scanflowagent-tracker: {'name': 'tracker', 'template': 'monitor', 'sensors': [{'name': 'count_number_of_predictions', 'isCustom': True, 'func_name': 'count_number_of_predictions', 'trigger': {'weeks': 0, 'days': 0, 'hours': 1, 'minutes': 0, 'seconds': 0, 'start_date': None, 'end_date': None, 'timezone': None, 'jitter': None}, 'args': None, 'kwargs': None, 'next_run_time': None}], 'dockerfile': None, 'image': '172.30.0.49:5000/tracker-agent:latest'}
14-Jun-21 23:06:03 -  INFO - Scanflowagent-checker: {'name': 'checker', 'template': 'analyzer', 'sensors': [{'name': 'count_number_of_newdata', 'isCustom': True, 'func_name': 'count_number_of_newdata', 'trigger': {'weeks': 0, 'days': 0, 'hours': 1, 'minutes': 0, 'seconds': 0, 'start_date': None, 'end_date': None, 'ti

In [20]:
trackerClient.save_app_artifacts(app_name=app_name, 
                                team_name=team_name, 
                                app_dir=app_dir)

14-Jun-21 23:06:08 -  INFO - Connecting tracking server uri: http://172.30.0.50:46667
14-Jun-21 23:06:08 -  INFO - save app in /gpfs/bsc_home/xpliu/pv/jupyterhubpeini/scanflow/examples/mnist/dataengineer to artifact uri: s3://scanflow/1/c5639371ccf6485b9e3618e788169932/artifacts


In [16]:
await deployerClient.run_workflow(app_name='mlperf', 
                                  team_name='dataengineer',
                                  workflow = build_app.workflows[0])

25-May-21 15:58:22 -  INFO - [+] output dir /workflow
25-May-21 15:58:22 -  INFO - [+] Create batch-inference output PV
25-May-21 15:58:22 -  INFO - create_pv true
25-May-21 15:58:22 -  INFO - [+] Create batch-inference output PVC
25-May-21 15:58:22 -  INFO - create_pvc true
25-May-21 15:58:22 -  INFO - output dir created
25-May-21 15:58:22 -  INFO - env for executor {'AWS_ACCESS_KEY_ID': 'admin', 'AWS_SECRET_ACCESS_KEY': 'admin123', 'MLFLOW_S3_ENDPOINT_URL': 'http://minio.minio-system.svc.cluster.local:9000', 'SCANFLOW_TRACKER_URI': 'http://scanflow-tracker-service.scanflow-system.svc.cluster.local', 'SCANFLOW_SERVER_URI': 'http://scanflow-server-service.scanflow-system.svc.cluster.local', 'SCANFLOW_TRACKER_LOCAL_URI': 'http://scanflow-tracker.scanflow-mnist-dataengineer.svc.cluster.local'}
25-May-21 15:58:22 -  INFO - [+] Building workflow: [batch-inference:load-data].
25-May-21 15:58:22 -  INFO - ['--app_name', 'mnist', '--team_name', 'data']
25-May-21 15:58:22 -  INFO - [+] Buildin

OrderedDict([('apiVersion', 'argoproj.io/v1alpha1'), ('kind', 'Workflow'), ('metadata', {'name': 'batch-inference'}), ('spec', {'entrypoint': 'batch-inference', 'volumes': [OrderedDict([('name', 'outputpath'), ('persistentVolumeClaim', {'claimName': 'batch-inference'})]), OrderedDict([('name', 'scanflowpath'), ('persistentVolumeClaim', {'claimName': 'scanflow-scanflow-mnist-dataengineer'})])], 'templates': [OrderedDict([('name', 'batch-inference'), ('dag', {'tasks': [OrderedDict([('name', 'load-data'), ('template', 'load-data'), ('arguments', OrderedDict([('parameters', [{'name': 'para-load-data-0', 'value': '--app_name'}, {'name': 'para-load-data-1', 'value': 'mnist'}, {'name': 'para-load-data-2', 'value': '--team_name'}, {'name': 'para-load-data-3', 'value': 'data'}])]))]), OrderedDict([('name', 'predictor-batch'), ('dependencies', ['load-data']), ('template', 'predictor-batch'), ('arguments', OrderedDict([('parameters', [{'name': 'para-predictor-batch-0', 'value': '--model_name'}, {

True

In [None]:
await deployerClient.deploy_workflow(app_name='mlperf', 
                                  team_name='dataengineer',
                                  workflow = build_app.workflows[2])

In [None]:
await deployerClient.deploy_workflow(app_name='mlperf', 
                                  team_name='dataengineer',
                                  workflow = build_app.workflows[3])