# EnTK Seals Pipeline notebook.

This notebook provides a prototypical implementation of the Seal use case, as it is shown in the [Seal Execution Model](https://docs.google.com/document/d/1E79LfwXG1ZJ1fTQiGsDvSggE6BJSDEqAyHKcCxjqgoY/edit?ts=5af5d13e). Each cell of the notebook creates a necesary component of the pipeline definition.

In [1]:
import os

from radical.entk import Pipeline, Stage, Task, AppManager, ResourceManager

## Pipeline Definition

The next cell defines the prototype pipeline for the Seal Use case. The pipeline has two stages, and each stage has a single task.
The first stage executes the prediction and the second the detection.

What needs to be added is [Stage number 0](https://docs.google.com/document/d/1E79LfwXG1ZJ1fTQiGsDvSggE6BJSDEqAyHKcCxjqgoY/edit?ts=5af5d13e) and the last stage that aggregates the results. Also, the single task in both cases should be broken to multiple tasks based on the number of images.

In [2]:
def generate_pipeline(name, stages):  #generate the pipeline of prediction and blob detection

    # Create a Pipeline object
    p = Pipeline()
    p.name = name

    for s_cnt in range(stages):


        if(s_cnt==0):
            # Create a Stage object
            s0 = Stage()
            s0.name = 'Stage %s'%s_cnt
            # Create Task 1, training
            t1 = Task()
            t1.name = 'Predictor'
            t1.pre_exec = ['module load psc_path/1.1',
                           'module load slurm/default',
                           'module load intel/17.4',
                           'module load python3',
                           'module load cuda',
                           'mkdir -p classified_images/crabeater',
                           'mkdir -p classified_images/weddel',
                           'mkdir -p classified_images/pack-ice',
                           'mkdir -p classified_images/other',
                           'source /pylon5/mc3bggp/paraskev/pytorchCuda/bin/activate'
                          ]
            t1.executable = 'python3'   # Assign executable to the task   
            # Assign arguments for the task executable
            t1.arguments = ['pt_predict.py','-class_names','crabeater','weddel','pack-ice','other']
            t1.link_input_data = ['/pylon5/mc3bggp/paraskev/seal_test/nn_model.pth.tar',
                                  '/pylon5/mc3bggp/paraskev/nn_images',
                                  '/pylon5/mc3bggp/paraskev/seal_test/test_images'
                                  ]
            t1.upload_input_data = ['pt_predict.py','sealnet_nas_scalable.py']
            t1.cpu_reqs = {'processes': 1,'threads_per_process': 1, 'thread_type': 'OpenMP'}
            t1.gpu_reqs = {'processes': 1,'threads_per_process': 1, 'thread_type': 'OpenMP'}
        
            s0.add_tasks(t1)    
            # Add Stage to the Pipeline
            p.add_stages(s0)
        else:
            # Create a Stage object
            s1 = Stage()
            s1.name = 'Stage %s'%s_cnt
            # Create Task 2,
            t2 = Task()
            t2.pre_exec = ['module load psc_path/1.1',
                           'module load slurm/default',
                           'module load intel/17.4',
                           'module load python3',
                           'module load cuda',
                           'module load opencv',
                           'source /pylon5/mc3bggp/paraskev/pytorchCuda/bin/activate',
                           'mkdir -p blob_detected'
                         ]
            t2.name = 'Blob_detector'         
            t2.executable = ['python3']   # Assign executable to the task   
            # Assign arguments for the task executable
            t2.arguments = ['blob_detector.py']
            t2.upload_input_data = ['blob_detector.py']
            t2.link_input_data = ['$Pipeline_%s_Stage_%s_Task_%s/classified_images'%(p.uid, s0.uid, t1.uid)]
            t2.download_output_data = ['blob_detected/'] #Download resuting images 
            t2.cpu_reqs = {'processes': 1,'threads_per_process': 1, 'thread_type': 'OpenMP'}
            t2.gpu_reqs = {'processes': 1, 'threads_per_process': 1, 'thread_type': 'OpenMP'}
            s1.add_tasks(t2)
            # Add Stage to the Pipeline
            p.add_stages(s1)

    return p

## Pipeline generation

 The pipeline is define and now we create an object. Although now it only has 2 stages think that the number of stages might change based on specifics of the application. Thus, allowing the number of stages to be an input shows that possible extension


In [3]:
p = generate_pipeline(name='Pipeline 1', stages=2)

## Resource description and acquisition

We define a dictionary with the following values:
```
{'resource': the resource to execute the pipeline, e.g. 'xsede.bridges' for Bridges,
 'walltime': The amount of time the resources are needed,
 'cpus': Number of CPUs needed,
 'gpus' : Number of GPUs needed,
 'schema' : Way to access the resource without a password. We reccomend gsissh. ,
 'project': Project to charge,
 'queue' : The queue you submit for example GPU-small
    }
```

After the dictionary is created we acquire the resources by creating a `ResourceManager`

---
Instructions how to install gsissh on Ubuntu can be found [here](https://github.com/vivek-bala/docs/blob/master/misc/gsissh_setup_stampede_ubuntu_xenial.sh)

In [4]:
res_dict = {'resource': 'xsede.bridges',
             'walltime': 30,
             'cpus': 12,
             'gpus' : 2,
             'schema' : 'gsisshh',
             'project': 'mc3bggp',
             'queue' : 'GPU-small'
    }
    
# Create Resource Manager
rman = ResourceManager(res_dict)


## Execution

In order to execute the pipeline we create an application manager and assign to it th resource manager previously created. We also assign the generated pipeline.

Finally, we request from the application manager to run the application and we wait for it to finish.

In [None]:
# Create Application Manager
appman = AppManager(port=32773)

# Assign resource manager to the Application Manager
appman.resource_manager = rman

# Assign the workflow as a set of Pipelines to the Application Manager
appman.assign_workflow(set([p]))

# Run the Application Manager
appman.run()

[94mnew session: [39m[0m[rp.session.js-157-203.jetstream-cloud.org.iparask.017696.0004][39m[0m[94m   \
database   : [39m[0m[mongodb://giannis:giannis@149.165.168.81:32768/entk][39m[0m[92m            ok
[39m[0m[94mcreate pilot manager[39m[0m[92m                                                          ok
[39m[0m[94msubmit 1 pilot(s)
        [39m[0m.[39m[0m[92m                                                                     ok
[39m[0mSyncing task radical.entk.task.0000 with state SCHEDULING
Synced task radical.entk.task.0000 with state SCHEDULING
Syncing task radical.entk.task.0000 with state SCHEDULED
Synced task radical.entk.task.0000 with state SCHEDULED
[94mcreate unit manager[39m[0m[92m                                                           ok
[94madd 1 pilot(s)[39m[0m[39m[0m[92m                                                                ok
[39m[0m[94msubmit 1 unit(s)
        [39m[0m.[39m[0m[92m                                 

2018-06-14 08:32:55,922: radical.entk.wfprocessor.0000: wfprocessor                     : dequeue-thread : ERROR   : Unable to receive message from completed queue: 
2018-06-14 08:32:55,930: radical.entk.wfprocessor.0000: wfprocessor                     : dequeue-thread : ERROR   : Error in dequeue-thread: 


Traceback (most recent call last):
  File "/home/iparask/miniconda2/envs/EnTKGPU/lib/python2.7/site-packages/radical/entk/appman/wfprocessor.py", line 476, in _dequeue
    method_frame, header_frame, body = mq_channel.basic_get(queue=self._completed_queue[0])
  File "/home/iparask/miniconda2/envs/EnTKGPU/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 2032, in basic_get
    no_ack=no_ack)
  File "/home/iparask/miniconda2/envs/EnTKGPU/lib/python2.7/site-packages/pika/channel.py", line 360, in basic_get
    self._validate_channel_and_callback(callback)
  File "/home/iparask/miniconda2/envs/EnTKGPU/lib/python2.7/site-packages/pika/channel.py", line 1362, in _validate_channel_and_callback
    raise exceptions.ChannelClosed()
ChannelClosed



Exception in thread dequeue-thread:
Traceback (most recent call last):
  File "/home/iparask/miniconda2/envs/EnTKGPU/lib/python2.7/threading.py", line 801, in __bootstrap_inner
    self.run()
  File "/home/iparask/miniconda2/envs/EnTKGPU/lib/python2.7/threading.py", line 754, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/home/iparask/miniconda2/envs/EnTKGPU/lib/python2.7/site-packages/radical/entk/appman/wfprocessor.py", line 596, in _dequeue
    raise Error(text=ex)
Error: Error: 



Syncing task radical.entk.task.0001 with state EXECUTED
Synced task radical.entk.task.0001 with state EXECUTED
Syncing task radical.entk.task.0001 with state DEQUEUEING
Synced task radical.entk.task.0001 with state DEQUEUEING
Syncing task radical.entk.task.0001 with state DEQUEUED
Synced task radical.entk.task.0001 with state DEQUEUED
Syncing task radical.entk.task.0001 with state DONE
Synced task radical.entk.task.0001 with state DONE
[94mwait for 1 pilot(s)
        [39m[0mO[39m[0/[39m[0-[39m[0\[39m[0|[39m[0/[39m[0-[39m[0\[39m[0|[39m[0/[39m[0-[39m[0\[39m[0|[39m[0/[39m[0-[39m[0\[39m[0|[39m[0/[39m[0-[39m[0\[39m[0|[39m[0/[39m[0-[39m[0\[39m[0|[39m[0/[39m[0-[39m[0\[39m[0|[39m[0/[39m[0-[39m[0\[39m[0|[39m[0/[39m[0-[39m[0\[39m[0|[39m[0/[39m[0-[39m[0\[39m[0|[39m[0/[39m[0-[39m[0\[39m[0|[39m[0/[39m[0-[39m[0\[39m[0|[39m[0/[39m[0-[39m[0\[39m[0|[39m[0/[39m[0-[39m[0\[39m[0|[39m[0/[39

2018-06-14 08:33:28,443: radical.entk.resource_manager.0000: MainProcess                     : pmgr.0000.subscriber._state_sub_cb: ERROR   : Pilot has completed
