###   Task Parallelism Dataflow
![Classify UFO](https://docs.google.com/drawings/d/1PdOkhLhW1ddAlKnEMPN3cXkgdUCsWgf_OitOnpRTb2Y/pub?w=1901&h=1041)

In [1]:
import time
import pandas as pd
import numpy as np
import dataflowkit.datasets as D
import dataflowkit.recipes as R
import dataflowkit.graphs as G
from dataflowkit.utils.print_time import print_time, print_end
from multiprocessing import Process, Queue, cpu_count

#### Implement the logic of recipes

In [2]:
class SubClassifier1Recipe(R.BaseRecipe):
    def execute(self, ins, outs):
        # classificiation logic...
        # Time consuming process
        for i in np.arange(1e7):
            pass
        outs['tag'].save(('Diamond',))

In [3]:
class SubClassifier2Recipe(R.BaseRecipe):
    def execute(self, ins, outs):
        # classificiation logic...
        # Time consuming process
        for i in np.arange(1e7):
            pass
        outs['tag'].save(('Egg',))

In [4]:
class ClassifierRecipe(R.BaseRecipe):
    def __init__(self):
        self._sub_classifier_1 = mrf.create(SubClassifier1Recipe)
        self._sub_classifier_2 = mrf.create(SubClassifier2Recipe)
        
    def execute(self, ins, outs):
        tag1 = D.InMemory()
        tag2 = D.InMemory()
        
        # Task parallelsim
        mrf.wait([
                self._sub_classifier_1.async_execute(ins=dict(), outs=dict(tag=tag1)),
                self._sub_classifier_2.async_execute(ins=dict(), outs=dict(tag=tag2))
            ])
        
        # Aggregate the results
        tag = tag1.load() + tag2.load()
        
        outs['tagged_data'].save(tag)

In [5]:
class DataCleansingRecipe(R.BaseRecipe):
    def execute(self, ins, outs):
        for i in np.arange(1e7):
            pass

#### Declare job processer

In [6]:
def process_job(file_time_queue, file_queue, mrf, master_index):
    # declare recipes
    data_cleansing_recipe = DataCleansingRecipe()
    classifier_recipe = mrf.create(ClassifierRecipe, no_slaves=1) # Becuause classifier-recipe do task parallelism, 1 slave is assigned to handle all classifications
    
    while True:
        file = file_queue.get()
        
        # declare datasets
        tagged_data = D.InMemory()
        
        # declare graph
        data_cleansing_recipe.execute(ins=dict(), outs=dict())
        classifier_recipe.execute(ins=dict(), outs=dict(tagged_data=tagged_data))
        file_time_queue.put((file, 'end'))

#### Declare the files be process

In [7]:
# This process act as a driver of requesting classification process in different time intervals
file_queue = Queue()
file_time_queue = Queue()

def request_process():
    sleep_itfs = [4, 4, 4, 4, 2, 1, 1, 1, 1, 1, 1 ,1, 1, 4, 4, 4, 4, 4, 4]
    files = np.arange(len(sleep_itfs))
    for file, sleep_itf in zip(files, sleep_itfs):
        time.sleep(sleep_itf)
        file_time_queue.put((file, 'st'))
        file_queue.put(file)

In [8]:
file_process = Process(target=request_process)

In [9]:
def time_mark_p():
    while True:
        file, st_end = file_time_queue.get()
        if st_end == 'st':
            print_time('response time ' + str(file))
        else:
            print_end('response time ' + str(file))
time_mark_process = Process(target=time_mark_p)

#### MasterRecipeFactory must be initialized before opeining the processes

In [10]:
no_master = 2
mrf = G.MasterRecipeFactory.get_inatance(no_s2m_channels=10)

#### Declare the processes

In [11]:
processes = [Process(target=process_job, args=(file_time_queue, file_queue, mrf, master_index)) for master_index in np.arange(no_master)]

#### Execute the processes

In [12]:
file_process.start()
time_mark_process.start()
for p in processes:
    p.start()

response time 0 1.348825
response time 1 1.216707
response time 2 1.241925
response time 3 1.177671
response time 4 1.1626
response time 5 1.241948
response time 6 1.386317
response time 7 1.377919
response time 8 1.343626
response time 9 1.33669
response time 10 1.319302
response time 11 1.404223
response time 12 1.226274
response time 13 1.171778
response time 14 1.232311
response time 15 1.116744
response time 16 1.112119
response time 17 1.211769
response time 18 1.070335


#### Terminate the processes

In [13]:
[p.terminate() for p in processes]
file_process.terminate()
time_mark_process.terminate()