# Data Handling Workflows

Note that Parsl is not effective if multiple CPU cores aren't available because Parsl's ability to execute tasks in parallel is depenedent on the availability multiple cores.

In [1]:
import multiprocessing
print('Cores available: {}'.format(multiprocessing.cpu_count()))

Cores available: 4


### Importing Libraries and Configuration

In [2]:
import numpy as np
import random
import time

import parsl
import os
from parsl.app.app import python_app, bash_app
from parsl.providers import LocalProvider
from parsl.channels import LocalChannel

from parsl.config import Config
from parsl.executors import HighThroughputExecutor

config = Config(
    executors=[
        HighThroughputExecutor(
            label="htex_local",
            cores_per_worker=1,
            provider=LocalProvider(
                channel=LocalChannel(),
                init_blocks=1,
                max_blocks=1,
            ),
        )
    ],
)

parsl.load(config)

<parsl.dataflow.dflow.DataFlowKernel at 0x118a343d0>

Process HTEX-Interchange:
  File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
Traceback (most recent call last):
  File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.7/site-packages/parsl/executors/high_throughput/interchange.py", line 564, in starter
    ic.start()
  File "/usr/local/lib/python3.7/site-packages/parsl/executors/high_throughput/interchange.py", line 367, in start
    self.socks = dict(poller.poll(timeout=poll_period))
  File "/usr/local/lib/python3.7/site-packages/zmq/sugar/poll.py", line 99, in poll
    return zmq_poll(self.sockets, timeout=timeout)
  File "zmq/backend/cython/_poll.pyx", line 123, in zmq.backend.cython._poll.zmq_poll
  File "zmq/backend/cython/checkrc.pxd", line 12, in zmq.backend.cyt

### Map Reduce

A map reduce is a technique to execute multiple parallel jobs on a dataset to reduce the size of the dataset before executing a final function to get the result. A Map reduce is a more complicated version of synchronisation.

Let's consider a simple example where we are given multiple lists and we want to select the lists with the highest standard deviation.

![](./images/map_reduce.png)

In [3]:
# A python app to compute the standard deviation of the inputs
@python_app
def standard_deviation(inputs=[]):
    import numpy as np
    return np.std(inputs)

In [4]:
# A function to construct data that is a list of lists, each each list having 100 random numbers

def make_data():
    lists = []
    
    for _ in range(100):
        new_list = []
        
        for __ in range(100):
            new_list.append(random.random()*100)
            
        lists.append(new_list)
    return lists

our_data = make_data()

### Parallel Execution

In [None]:
# Computing the standard deviations for each list

start1 = time.time()

standard_deviations = []

for i in our_data:
    standard_deviations.append(standard_deviation(inputs=i))

# Finding the maximum standard deviation
standard_deviations = [i.result() for i in standard_deviations]
print('Maximum Standard Deviation: ', max(standard_deviations))

# Finding the list with the maximum standard deviation
maximum = max(standard_deviations)
print('Target List Number:', standard_deviations.index(maximum))

end1 = time.time()

# Hashing

We'll be using a simple hash function to store elements in our database. We'll evaluate the hash values in parallel and then store the items in those locations.

In [None]:
database = [0 for i in range(1000)]  # An empty database

In [4]:
@python_app 
def hash_function(element):
    import hashlib # importing all the libraries inside the remote function
    number = int(hashlib.md5(element).hexdigest()[:8], 16)%1000   # Creating a hash index
    return number

### Parallel Execution

In [None]:
import random

elements = []

for i in range(100):
    element = '' 
    for _ in range(5):
        element += random.choice('abcdefghijklmopqrstuvwxyz')
    element = element.encode() # Making a 5 letter element
    elements.append(element) # collecting 100 such elements

start1 = time.time()   
hashes = []
for i in elements: # Updating the database for all the elements
    hashes.append(hash_function(i))

hashes = [i.result() for i in hashes]

for i in range(len(elements)):
    database[hashes[i]] = elements[i]

end1 = time.time()

Note that this still doesn't solve the problem of overlap of elements. Chaining is the alternative here but in order to implement chaining, we have to evaluate the results which breaks the parallel thread.

## Montage Mosaic

This Python script has been inspired from the [Montage Wrapper Documentation](https://montage-wrapper.readthedocs.io/en/v0.9.5) and the [tutorial](http://montage.ipac.caltech.edu/docs/first_mosaic_tutorial.html) for the Montage Mosaic.

In [13]:
import montage_wrapper as montage
montage.set_mpi_command('mpiexec -n {n_proc} {executable}')

from montage_wrapper.mpi import MPI_COMMAND
MPI_COMMAND

ValueError: MPI command does not include {n_proc}

### First Part

In [7]:
import os
cwd = os.getcwd()
from parsl.data_provider.files import File

In [8]:
!tar xvf Kimages.tar

x Kimages/
x Kimages/aK_asky_000928s0130044.fits
x Kimages/aK_asky_000928s0130056.fits
x Kimages/aK_asky_000928s0130068.fits
x Kimages/aK_asky_000928s0130080.fits
x Kimages/aK_asky_000928s0130092.fits
x Kimages/aK_asky_000928s0130103.fits
x Kimages/aK_asky_000928s0130115.fits
x Kimages/aK_asky_990502s1300162.fits
x Kimages/aK_asky_990502s1300174.fits
x Kimages/aK_asky_990502s1300186.fits
x Kimages/aK_asky_990502s1300198.fits
x Kimages/aK_asky_990502s1300209.fits
x Kimages/aK_asky_990502s1300221.fits
x Kimages/aK_asky_990502s1310044.fits
x Kimages/aK_asky_990502s1310056.fits
x Kimages/aK_asky_990502s1310068.fits
x Kimages/aK_asky_990502s1310080.fits
x Kimages/aK_asky_990502s1310092.fits
x Kimages/aK_asky_990502s1310103.fits
x Kimages/aK_asky_990502s1310115.fits
x Kimages/aK_asky_990502s1320162.fits
x Kimages/aK_asky_990502s1320174.fits
x Kimages/aK_asky_990502s1320186.fits
x Kimages/aK_asky_990502s1320198.fits
x Kimages/aK_asky_990502s1320209.fits
x Kimages/aK_asky_990502s1320221.fits
x

In [9]:
montage.mImgtbl(os.path.join(cwd,'Kimages/'),
                File(os.path.join(cwd,'Kimages.tbl')))

stat : OK
count : 91
badfits : 0
badwcs : 0

In [10]:
montage.mMakeHdr(File(os.path.join(cwd,'Kimages.tbl')),
                 File(os.path.join(cwd,'Ktemplate.hdr')))

stat : OK
msg : Cube columns exist but are either blank or inconsistent. Outputting 2D only.
count : 91
ncube : 0
naxis1 : 6110
naxis2 : 6857
clon : 275.185261
clat : -16.248875
lonsize : 1.697222
latsize : 1.904722
posang : 359.953803
lon1 : 276.072612
lat1 : -17.199892
lon2 : 274.296309
lat2 : -17.198517
lon3 : 274.306471
lat3 : -15.294203
lon4 : 276.065648
lat4 : -15.295564

In [11]:
os.mkdir(os.path.join(cwd,'Kprojdir/'))

In [20]:
montage.mProjExec(File(os.path.join(cwd,'Kimages.tbl')),
                  File(os.path.join(cwd,'Ktemplate.hdr')),
                  os.path.join(cwd,'Kprojdir/'),
                  File(os.path.join(cwd,'stats.tbl')), mpi=True, n_proc=2)

Exception: b'--------------------------------------------------------------------------\nmpiexec was unable to find the specified executable file, and therefore\ndid not launch the job.  This error was first reported for process\nrank 0; it may have occurred for other processes as well.\n\nNOTE: A common cause for this error is misspelling a mpiexec command\n      line parameter option (remember that mpiexec interprets the first\n      unrecognized command line token as the executable).\n\nNode:       Sohits-MacBook-Pro\nExecutable: mProjExecMPI\n--------------------------------------------------------------------------\n'

In [None]:
montage.mImgtbl(os.path.join(cwd,'Kprojdir/'),
                File(os.path.join(cwd,'images.tbl')))

In [None]:
montage.mAdd( File(os.path.join(cwd,'images.tbl')), 
              File(os.path.join(cwd,'Ktemplate.hdr')), 
              File(os.path.join(cwd,'m17_uncorrected.fits')), mpi=True, n_proc=2)

In [None]:
!mViewer -ct 1 -gray m17_uncorrected.fits -1s max gaussian-log -out m17_uncorrected.png

![](./images/m17_uncorrected.png)

### Second Part

In [None]:
montage.mOverlaps(File(os.path.join(cwd,'images.tbl')),
                  File(os.path.join(cwd,'diffs.tbl')))

In [None]:
os.mkdir(os.path.join(cwd,'diffdir/'))

In [None]:
montage.mDiffExec(File(os.path.join(cwd,'diffs.tbl')), 
                  File(os.path.join(cwd,'Ktemplate.hdr')), 
                  os.path.join(cwd,'diffdir/'),
                  proj_dir=os.path.join(cwd,'Kprojdir/'))

In [None]:
montage.mFitExec(File(os.path.join(cwd,'diffs.tbl')), 
                 File(os.path.join(cwd,'fits.tbl')), 
                 os.path.join(cwd,'diffdir/'))

In [None]:
montage.mBgModel(File(os.path.join(cwd,'images.tbl')), 
                 File(os.path.join(cwd,'fits.tbl')), 
                 File(os.path.join(cwd,'corrections.tbl')))

In [None]:
os.mkdir(os.path.join(cwd,'corrdir'))

In [None]:
montage.mBgExec(File(os.path.join(cwd,'images.tbl')), 
                File(os.path.join(cwd,'corrections.tbl')), 
                os.path.join(cwd,'corrdir'), 
                proj_dir=os.path.join(cwd,'Kprojdir'))

In [None]:
montage.mAdd(File(os.path.join(cwd,'images.tbl')), 
             File(os.path.join(cwd,'Ktemplate.hdr')), 
             File(os.path.join(cwd,'m17.fits')))

In [None]:
!mViewer -ct 1 -gray m17.fits -1s max gaussian-log -out m17.png

![](./images/m17.png)