<a href="https://colab.research.google.com/github/AlmurshedO/Parsl_tutorial/blob/main/Parsl_tutorial_with_Dynamic_Task_Placement.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Set up Parsl

In [None]:
# this import is obly to clear output in this notebook
from IPython.display import clear_output


#installation of Parsl
!pip install parsl

# clean cell output
clear_output()

# Import Parsl components

In [None]:
#parsl imports
from parsl.providers import AdHocProvider, AWSProvider, LocalProvider 
from parsl.channels import LocalChannel
from parsl.executors import HighThroughputExecutor
from parsl.addresses import address_by_query
from parsl.config import Config
from parsl import load, clear, set_file_logger, logging
from parsl.app.app import python_app

# import time and sleep
from time import time, sleep 

# Configuring the infrastructure

Here we will have two virtual workers, i.e., `executors`,  we will call it `location_1` and `location_2`. These Virtual workers usually run in the remote machine, the only difference between this and the remote machine is the link delay (or link time).

`HighThroughputExecutor` is the executor class that can run in a Raspberry pi or PC. it is light wight and easy to configure.

In [None]:

# this is the configuration of the infrastructure
config = Config(
    executors=[
        HighThroughputExecutor(
            # label is the name of the worker and usualy is the location
            label='location_0',
            worker_debug=False,             
            provider=LocalProvider(
                #  This will run locatly if you whant to do it in remote machine use SSHChannel
                LocalChannel()
          )
        ), 
        HighThroughputExecutor(
            # label is the name of the worker and usualy is the location
            label='location_1',
            worker_debug=False,             
            provider=LocalProvider(
                #  This will run locatly if you whant to do it in remote machine use SSHChannel
                LocalChannel()
          )
        )
    ]
)

In [None]:
# Load the configuration

In [None]:
# this part of code need to be run once, if you want to reload the Configureation agine run `clear()` before this cell
 
# dataflow kernel (dfk) is the engine that manages the data flowing between workers 
# workers usually, runs processes within them. workers  

# clear()
dfk = load( config )

In [None]:
# this how to define virtual service function in Parsl

#test if parsl running in the `location_1`
@python_app(executors=['location_1'])
def test():
  return 0


In [None]:
# the will deploy the test function in `location_1` 
  # it will return  AppFuture object like this one <AppFuture at 0x7fee8b865810 state=pending>

app_future = test()
app_future

<AppFuture at 0x7f5c81ad16d0 state=pending>

In [None]:
# to return the result form the remore machine (worker `location_1`) do the following
print( app_future.result() )

0


In [None]:
# this funciton will not retun 0 directory it will sleep 5 second
  # note we import sleep with in the funciton; this is because every function run as process independently from each other. Thus, each functoin need to import its modeule, e.g., sleep,   
@python_app(executors=['location_1'])
def sleeping(t=5):
  from time import sleep
  sleep( t )
  return 0


In [None]:
# if we deply a function then retuned the results directoly, the system controller, which is this notebook, will be waiting for the results to be return from `location_1`
start = time()
app_future_sleep = sleeping( 5 )
result = app_future_sleep.result()
end = time()


total_time = round(end - start, 2)
print(f'it took this function {total_time}s to complete')

it took this function 5.03s to complete


In [None]:
# to avoid freezing the controller resources for a single function you can check whether or use `app_future_sleep done`
# if we deply a function then retuned the results directoly, the system controller, which is this notebook, will be waiting for the results to be return from `location_1`
start = time()
# deply sleeping function to worker `location_1`
app_future_sleep = sleeping( 5 )

# done check if the remore machine `location_1`
done =  app_future_sleep.done()

# this check the every half a second if the function done and the result is ready to be returned  
while not done:
  curr_time = time()
  curr_time_from_start = round(curr_time - start, 2)
  print(f'current time from start is {curr_time_from_start}')

  # sleep for half second to avoid spaning the CPU
  # update done variable
  done =  app_future_sleep.done()

result = app_future_sleep.result()
end = time()
total_time = round(end - start, 2)
print(f'it took this function {total_time}s to complete')

In [None]:
# t is the sleeping time in a second 
t = 2

start = time()
app_future_sleep = sleeping( t )
#  here we want to what to calculate the time 
result = app_future_sleep.result()
end = time()
total_time = end - start

# this to convert second to millisecond 
millisecond = 1 * 1000

# substract the sleeping time from total_time to know the overhead in this virtual machine in this notebook in millisecond 
overhead_time_to_deploy_vFunction = (total_time - t) * millisecond
overhead_time_to_deploy_vFunction = round( overhead_time_to_deploy_vFunction, 2 )
print(f'overhead of parsl deploy a function in this notebook is {overhead_time_to_deploy_vFunction} millisecond')


overhead of parsl deploy a function in this notebook is 20.24 millisecond


In [None]:
# to make sure that the overhead is the same we deply a function which does not do think

# here we will do it in use virtual worker `location_0`
@python_app(executors=['location_0'])
def do_not_thing():
  pass

do_not_thing()


<AppFuture at 0x7f5c81572410 state=pending>

In [None]:
%%timeit

# %%timeit in the beginning of the cell will run the cell multible time and return the average time of the multible runs

do_not_thing().result()


100 loops, best of 5: 21.2 ms per loop


# Dynamicly choose locations
Here we pass the python function as values, this is a concept in functional programming. This means that `deply_function_to_location(function, location)` receive any python function, i.e.,  `function`, and string if the name of the `location` the we have defined as label in the configuration section (which is the python executor name that we have gave to the location). 

In [None]:
# to dynamicly choose the location

def deply_function_to_location(location, function, *args, **kwargs):
  # how to use *args, **kwargs is https://realpython.com/python-kwargs-and-args/
    # arges is the inputs of the function that does not has defult values
    # kwargs is the inputs that had defult value or passed by name for example def add(x, y=1)
      # x is arges and y is kwargs

  # python_app returns the same fucntion but one that runs in specific remote machine, e.g., `location_1`
  placing_function_to_location = python_app(function, executors=[ location ])
  return placing_function_to_location(*args, **kwargs)


In [None]:

# define normal function
def add(x, y):
  return x + y

# input of the add function
x = 1
y = 2

# the return of deply_function_to_location() is Parsl AppFuture that the result of it is the output of the remote machine i.e., `location_1` executing `add` with x and y inputs  
future = deply_function_to_location('location_1', add, x, y)
future.result()

3

# Create SFC Placement  with Round-Robin (Without CSP)

The SFC example is:
   

         adding -->  doubling --> tripling --> doubling --> tripling

The AppFuture of each function will be pass to the next function, this is known as Futures pipelining or Promises pipelining for more details vist this link: https://en.wikipedia.org/wiki/Futures_and_promises

             

In [None]:
# create functions

def adding(X):
  result = 0
  for x in X:
    result = result + x
  return result

def doubling(x):
  return x*2

def tripling(x):
  return x*3

# SFC pipelining: adding -->  doubling --> tripling --> doubling --> tripling
SFC = [adding, doubling, tripling, doubling, tripling]
# the SFC input
curr_input = [2, -1]

# `number_location` holds the number of the virtual workers in the configuration, will be used to not exceeds the number
number_location = 2
curr_location_index = 0
for curr_function in SFC:
  # curr_location is hold the label of the parsl executor, i.e., location, 
  curr_location = f'location_{ curr_location_index }'
  curr_input = deply_function_to_location(curr_location, curr_function, curr_input)
  # prtining the current placemnt 
  print( f'Function {curr_function} placed in location {curr_location}' )
  # `%` operation is Modular arithmetic returns the remainder or signed remainder of a division
    # numbers "wrap around" when reaching a certain value which is in this case `number_location - 1`
    # for more information about Modular arithmetic vist:
      # https://en.wikipedia.org/wiki/Modular_arithmetic 
      # https://en.wikipedia.org/wiki/Modulo_operation
  curr_location_index = (curr_location_index + 1) % number_location
  



last_function_output = curr_input

print(f'SFC result is {last_function_output.result()}')
  
    




Function <function adding at 0x7f5c783a4d40> placed in location location_0
Function <function doubling at 0x7f5c783a49e0> placed in location location_1
Function <function tripling at 0x7f5c783a4320> placed in location location_0
Function <function doubling at 0x7f5c783a49e0> placed in location location_1
Function <function tripling at 0x7f5c783a4320> placed in location location_0
SFC result is 36
