# **More Complex Workflows**

<table style="width:90%">
    <tr><td style="text-align:left;"><a href="./06 - Debugging.ipynb">Previous (Debugging)</a></td><td style="text-align:right;"><a href="./08 - Hints.ipynb">Next (Hints)</a></td></tr>
</table>


In this exercise we are going to construct a workflow to process an astronomical image.

<img src="images/noao-m34-stamp.png" style="margin-left:0px;"/>
(Credit: REU program/NOIRLab/NSF/AURA)

The workflow is as follows

<table style="margin-left:0px;">
    <tr>
        <td>
            <table>
                <thead>
                    <tr>
                        <th style="text-align:left">&nbsp;</th>
                        <th style="text-align:left">&nbsp;</th>
                        <th style="text-align:left">Count</th>
                        <th style="text-align:left">Time</th>
                    </tr>
                </thead>
                <tbody>
                    <tr>
                        <td style="text-align:left">1.</td>
                        <td style="text-align:left">Split the color image into red, green, and blue layers</td>
                        <td style="text-align:left">1</td>
                        <td style="text-align:left">&lt;1 sec</td>
                    </tr><tr>
                        <td style="text-align:left">2.</td>
                        <td style="text-align:left">Search for objects</td>
                        <td style="text-align:left">3</td>
                        <td style="text-align:left">few sec</td>
                    </tr><tr>
                        <td style="text-align:left">3.</td>
                        <td style="text-align:left">Cut out the objects (create sub images)</td>
                        <td style="text-align:left">1000's</td>
                        <td style="text-align:left">&lt;1 sec</td>
                    </tr><tr>
                        <td style="text-align:left">4.</td>
                        <td style="text-align:left">Fit the objects with a model</td>
                        <td style="text-align:left">1000's</td>
                        <td style="text-align:left">&lt;1 sec</td>
                    </tr><tr>
                        <td style="text-align:left">5.</td>
                        <td style="text-align:left">Generate an image with the fits removed</td>
                        <td style="text-align:left">3</td>
                        <td style="text-align:left">few sec</td>
                    </tr><tr>
                        <td style="text-align:left">6.</td>
                        <td style="text-align:left">Create a catalog of objects</td>
                        <td style="text-align:left">1</td>
                        <td style="text-align:left">~1 sec</td>
                    </tr>
                </tbody>
            </table>
        </td><td style="text-align: left;">
            <img src="images/parsl_flow_thumb2.png" style="margin-left: 20px;"/>
        </td>
    </tr>
</table>
    

All of the processing code is already written and a serialized version of the workflow is at the bottom. The exercise is to assemble the code fragments into a Parsl workflow and run it on your machine. One thing to watch out for is you don't want to over parallelize things. i.e. When does the overhead of creating a parallel job surpass any gain you get from parallelization?

In [None]:
import skimage.io
import numpy as np
import skimage.feature as feature
import scipy.optimize as opt
import matplotlib.pyplot as mpl
import math
from astropy.io import fits
from matplotlib.patches import Ellipse
from mpl_toolkits.axes_grid1 import make_axes_locatable
import pandas as pd


Below is the configuration you will need. It is set up for only a local run, but if you have access to an HPC system then you could add an Executor for that system as well.

In [None]:
import parsl
from parsl.executors.threads import ThreadPoolExecutor
from parsl.config import Config
from parsl.app.app import python_app
from parsl.data_provider.files import File

local_executor = ThreadPoolExecutor(max_threads=3, label='local_tpex')

parsl.clear()
myconfig = Config(executors=[local_executor])

parsl.load(myconfig)

In [None]:
def intersect(c1: list, c2: list, radius: float) -> bool:
    """ Function that determines if two circles intersect
    
        c1: two element list containing the coordinates of the center of a circle
        c2: two element list containing the coordinates of the center of a circle
        radius: float, the radius of the circles
        
        Returns: bool, True if the circles intersect, False otherwise
        
    """
    d = math.sqrt((c1[0] - c2[0]) * (c1[0] - c2[0]) +
                  (c1[1] - c2[1]) * (c1[1] - c2[1]));

    if d < (2*radius * .9):
        return True
    return False



In [None]:
def split_image(imname: str) -> tuple:
    """ Split an RGB image into its components
    
        imname: str, the name of the image to read
        
        Returns: tuple of np.ndarray of shape [:,:]
    """
    # read the input image in (the RGB values go from 0 to 255)
    myimage = skimage.io.imread(imname).astype(np.uint8)

    r = myimage[:,:,0]
    g = myimage[:,:,1]
    b = myimage[:,:,2]
    return r, g, b


In [None]:
def twoD_Gaussian(point: tuple, amplitude: float, xo: float, yo: float, sigma_x: float, sigma_y: float, theta: float):
    """ Generate a 2D Gaussian 
    
        point: tuple, the coordinates to get the value of the Gaussian at
        amplitude: float, the amplitude of the Gaussian
        xo, yo: float, the center position of the Gaussian
        sigma_x, sigma_y: float, the width of the Gaussian in each direction
        theta: float, the rotation angle of the Gaussian in radians
    """
    (x, y) = point
    xo = float(xo)                                                              
    yo = float(yo)
    a = (np.cos(theta) ** 2) / (2 * sigma_x ** 2) + (np.sin(theta) ** 2) / (2 * sigma_y ** 2)   
    b = -(np.sin(2 * theta)) / (4 * sigma_x ** 2) + (np.sin(2 * theta)) / (4 * sigma_y ** 2)    
    c = (np.sin(theta) ** 2) / (2 * sigma_x ** 2) + (np.cos(theta) ** 2) / (2 * sigma_y ** 2)   
    gaussian = amplitude * np.exp(-(a * ((x - xo) ** 2) + 2 * b * (x - xo) * (y - yo) + c * ((y - yo) ** 2)))                                   
    return gaussian.ravel()

In [None]:
def find_peaks(in_img: np.ndarray, min_dist: int = 10) -> list:
    """ Find object locations by searching for bright parts of the image
    
        in_img: np.ndarray containing the image to be searched with shape [:,:]
        
        Returns: list of center coordinates
    """
    bias = 0.
    limcut = 0.6  #minimum intesity

    img = in_img.astype(np.float32)/255.
    img[img < bias] = bias
    limg = np.arcsinh(img)
    limg = limg / limg.max()
    limg[limg < limcut] = 0.

    peaks = feature.peak_local_max(limg, min_distance=min_dist, threshold_abs=limcut)
        
    skip = []
    pksize = len(peaks)
    peaklist = []
    for i in range(pksize):
        if i in skip:
            continue
        item = []
        for j in range(i + 1, pksize):
            if j in skip:
                continue
            if intersect(peaks[i], peaks[j], min_dist):
                skip.append(j)
                skip.append(i)
                item.append(peaks[j])
        if item:
            item.append(peaks[i])
            peaklist.append(item)
        else :
            peaklist.append([peaks[i]])

    return peaklist

In [None]:
def make_fits(images: list, centers: list, offsets: list) -> list:
    """ Fit a 2D Gaussian to each image, this assumes a single object is in the field of view.
    
        img: np.ndarray with shape [:,:]
        cntr: list of center coordinates of the images
        
        Retruns: list of tuples containing the best fit parameters
    """
    fit_params = pd.DataFrame({'Amplitude': pd.Series(dtype=float),
                               'Xo': pd.Series(dtype=float),
                               'Yo': pd.Series(dtype=float),
                               'sigmaX': pd.Series(dtype=float),
                               'sigmaY': pd.Series(dtype=float),
                               'theta': pd.Series(dtype=float)})
    
    for i in range(len(images)):
        try :
            tempimg = images[i].ravel()
            nx, ny = images[i].shape
            # generate the coordinate axes
            x, y = np.mgrid[:nx, :ny]
  
            initial_guess = (0.5, offsets[i][0], offsets[i][1], 2., 2., 0)
            # do the fit
            popt, pcov = opt.curve_fit(twoD_Gaussian, (x,y), tempimg, p0=initial_guess, 
                bounds=([0, 0, 0, 0, 0, 0], [1., nx, ny, nx, ny, 6.28]))
            amp, x, y, dx, dy, theta = popt
            x += centers[i][0] - offsets[i][0]
            y += centers[i][1] - offsets[i][1]
            
            fit_params = pd.concat([fit_params, pd.DataFrame({'Amplitude':[amp], 'Xo':[x], 'Yo':[y], 'sigmaX':[dx], 'sigmaY':[dy], 'theta':[theta]})], ignore_index=True)

        except Exception as ex:
            pass

    return fit_params



In [None]:
def make_cutouts(in_img: np.ndarray, peaks: list, radius: int = 10) -> tuple:
    """ Create a small subimage around the given coordinates. If there is more than 
        one set of coordinates, then the subimage will be made to contain all of the,
        useful for non-point sources.
        
        in_img: np.ndarray, the original image
        peaks: list of lists containing grouped peaks
        
        Returns: tuple containing the subimages as np.ndarray's, the subimage center
                 coordinates in the frame of the original, and the offset to the lower
                 left corner
    """
    img = in_img.astype(np.float32)/255.
    images = []
    centers = []
    offsets = []
    for peak in peaks:
        # initialize the boundaries
        xmin = img.shape[0]
        ymin = img.shape[1]
        xmax = 0
        ymax = 0
        xmedian = 0
        ymedian = 0
    
        # loop through each set of coordinates and adjust the bounds to contain them all
        for c in peak:
            xmedian += c[0]
            ymedian += c[1]
            xmin = min(c[0], xmin)
            ymin = min(c[1], ymin)
            xmax = max(c[0], xmax)
            ymax = max(c[1], ymax)
        # add a small buffer space
        xmin = int(max(xmin - radius, 0))
        ymin = int(max(ymin - radius, 0))
        xmax = int(min(xmax + radius + 1, img.shape[0]))
        ymax = int(min(ymax + radius + 1, img.shape[1]))
    
        # cut out the sub image
        images.append(img[xmin:xmax, ymin:ymax])
        center = (xmedian / len(peak), ymedian / len(peak))
        centers.append(center)
        offsets.append((center[0] - xmin, center[1] - ymin))
    return images, centers, offsets

In [None]:
def make_image(in_img: np.ndarray, fit: list, label: str) -> None:
    """ Generate an image with sources subtracted out from the original"""
    img = in_img.astype(np.float32)/255.
    model = img.copy()
    model.fill(0.)
    x, y = np.mgrid[:img.shape[0], :img.shape[1]]
    fit = fit.reset_index()
    for i, p in fit.iterrows():
        model += twoD_Gaussian((x,y), p['Amplitude'], p['Xo'], p['Yo'], p['sigmaX'], p['sigmaY'], p['theta']).reshape(img.shape[0],img.shape[1])
    
    f, axs = mpl.subplots(1,2, sharey=True)
    f.set_figwidth(14)
    f.set_figheight(6)
    axs[0].imshow(img, vmin=0., vmax=1.)
    p = axs[1].imshow(img-model, vmin = 0., vmax = 1.)
    d = make_axes_locatable(axs[1])
    cx = d.append_axes("right", size="10%", pad="2%")
    mpl.colorbar(p, cax=cx)
    f.suptitle("Band: " + label)
    mpl.savefig(label + "_compare.png", format='png')

In [None]:
def make_catalog(tables, labels):
    hdus = [fits.PrimaryHDU()]
    counts = {}
    for i, v in enumerate(tables):
        counts[labels[i]] = v['Amplitude'].values.shape[0]
        col1 = fits.Column(name='Amp', format='E', array=v['Amplitude'].values)
        col2 = fits.Column(name='Xo', format='E', array=v['Xo'].values)
        col3 = fits.Column(name='Yo', format='E', array=v['Yo'].values)
        col4 = fits.Column(name='sigmaX', format='E', array=v['sigmaX'].values)
        col5 = fits.Column(name='sigmaY', format='E', array=v['sigmaY'].values)
        col6 = fits.Column(name='theta', format='E', array=v['theta'].values)
    
        hdu = fits.BinTableHDU.from_columns([col1, col2, col3, col4, col5, col6])
        hdu.header['band'] = labels[i]
        hdus.append(hdu)
    hdul = fits.HDUList(hdus)
    hdul.writeto('parameters.fits', overwrite=True)
    print("Found these object counts")
    for i, v in counts.items():
        print(f"{i}: {v}")

In [None]:

lbls = ["red", "green", "blue"]
params = []
for i, img in enumerate(split_image("images/noao-m34.png")):
    points = find_peaks(img)
    images, centers, offsets = make_cutouts(img, points)
    fit = make_fits(images, centers, offsets)
    params.append(fit)
    make_image(img, fit, lbls[i])
    myi = skimage.io.imread(lbls[i] + "_compare.png").astype(np.uint8)
    mpl.imshow(myi)
    mpl.show()

make_catalog(params, lbls)


Here are some suggestions
 - You will need to convert some functions to Apps
 - You may need to convert some of the above functions into inner/nested functions of other function(s)
 - The workflow produces output files that will need to be accounted for in the Parsl app
 - There is more than one solution

Click <a href="./08 - Hints.ipynb">here</a> for some hints.
<br>Click <a href="./09 - Solution1.ipynb">here</a> for one solution.
<br>Click <a href="./10 - Solution2.ipynb">here</a> for another solution.

### Put your solution in the code cell below

In [None]:
for img in ['red', 'green', 'blue']:
    myi = skimage.io.imread(img + "_compare.png").astype(np.uint8)
    mpl.imshow(myi)
    mpl.show()

<P><br>
<table style="width:90%">
    <tr><td style="text-align:left;"><a href="./06 - Debugging.ipynb">Previous (Debugging)</a></td><td style="text-align:right;"><a href="./08 - Hints.ipynb">Next (Hints)</a></td></tr>
</table>