# This is a brief overview of the performance gains from porting PIV code to the cluster 

The exact programs run are below, but in brief, I have written a python MPI based parallel program to farm out PIV computations for frame pairs from PIV videos. The parallel nature of the PIV computation (i.e. each frame pair calculation is independent) allows for massive scaling up of the calculation and substantially faster run times. 

A set of videos can be processed by turning them into invidual image stacks in a folder, then spawning the parallel PIV code to run on each folder. In the test case here I ran this on 11 videos, each 1500 frames long, and on my machine with 6 cores running the best time for a single video was 108 minutes. On the cluster I processed all 11 videos in 18 minutes. 

The commands to process the videos is

    ./batch_mpi_piv_loop.sh ~/Data/TandemFanning/5_GoodRuns_140VAmp_105mmLens_Wake15khz/
    
which runs the batch script to schedule PIV calculations on all the subfolders of the path provided as an argument. The batch file in turn is just enabling the SLURM environment parameters and then calling 

    sbatch batch_mpi_piv_loop.sh {$1}
    
with {$1} being the folder input to that calc. 

## Analysis of run times for the MPI version of the PIV code 

In [6]:
import numpy as np
time2run = np.array((1310, 730.7, 404.93, 263.65))
numberCores = np.array((64, 128, 256, 512))     # have only run for these number of cores so far

In [7]:
%matplotlib nbagg 
%qtconsole

In [12]:
import matplotlib.pyplot as plt

plt.close(1)
plt.figure(1)

plt.plot(numberCores, time2run/60, 'ko-', markersize = 20, markerfacecolor = 'w', linewidth = 2);
plt.axis([0, 1000, 0, 30])

plt.xlabel('Number of cores');
plt.ylabel('Time to run (min)');
plt.title('Results for a single video called with sbatch');
plt.show()

<IPython.core.display.Javascript object>

## Code used in the PIV program

The basic shell program to schedule PIV computation

program name:

    ./batch_mpi_piv_loop.sh FOLDERLOCATION

``` bash
#!/bin/sh

# loop through all folders in a directory and start a SLURM PIV computration for them 
for FOLDER in $1*/; do
echo ${FOLDER}
 sbatch batch_mpi_piv.sbatch ${FOLDER}
 sleep 1
done
```

The PIV program call
program name: 

    /n/home03/ngravish/Source/PIV/batch_mpi_piv.sbatch

```bash
#!/bin/sh
# runs PIV code on the folder of images passed into the code 

#SBATCH -n 128 # Number of cores
#SBATCH -t 20 # Runtime in minutes
#SBATCH -p general # Partition to submit to
#SBATCH --mem-per-cpu=2000 # Memory per cpu in MB (see also --mem)

echo `basename $1`

mpirun -np 128 --mca orte_base_help_aggregate 0 python PIV_MPI_cluster.py \
                $1 \
                > /n/home03/ngravish/logs/`basename $1`_out.txt 2> /n/home03/ngravish/logs/`basename $1`_err.txt
```

And finally the actual python program which does the computation

    PIV_MPI_Cluster.py


```python
# %% import stuff
import numpy as np
import openpiv.tools
import openpiv.process
import openpiv.scaling
import cv2
import time
import multiprocessing 
import os
import sys
import ctypes
import sys
import scipy.io as sio
from PIL import Image
import pypar as pp
import warnings

num_processors = pp.size()
rank = pp.rank()
node = pp.get_processor_name()

# MPI Constants
MASTER_PROCESS = 0
WORK_TAG = 1
DIE_TAG = 2

DeltaT = 1 # frames to skip
window_size = 24
overlap = 12



def PIVCompute(frame_a, frame_b, window_size = 24, overlap = 12):

    tmpu, tmpv, sig2noise = openpiv.pyprocess.piv(frame_a, frame_b,
                    window_size=window_size, overlap=overlap, dt=1, 
                    sig2noise_method='peak2peak', corr_method = 'direct')

    x, y = openpiv.process.get_coordinates( image_size=frame_a.shape, window_size=window_size, overlap=overlap)
    tmpu, tmpv, mask = openpiv.validation.sig2noise_val( tmpu, tmpv, sig2noise, threshold = 1.3)
    u, v = openpiv.filters.replace_outliers( tmpu, tmpv, method='localmean', max_iter=10, kernel_size=4)

    return u, v


############################################################################################
# main part of code

if __name__ == '__main__':
    warnings.filterwarnings("ignore")

    # load in the folder path, then grab just the top directory from the path by 
    # splitting the string, and stripping the trailing slash
    vidpath = sys.argv[1]
    foldername = vidpath.rstrip('/').split('/')[-1]

    print "Vidpath = " + vidpath
    print "Foldername = " + foldername


    tif_files = sorted([f for f in os.listdir(vidpath) if f.endswith('.tif')]) # Sorted is important on the cluster!

# !!!! DEBUG
    # tif_files = tif_files[0:10]


# Create list of file pairs to process
    process_list = zip(range(0,len(tif_files)-DeltaT), range(DeltaT,len(tif_files)))
    work_size = len(process_list)

    # Dispatch jobs to worker processes
    work_index = 0
    num_completed = 0

    # Master process
    if rank == MASTER_PROCESS:
        start_time = time.time()

        #
        # print(process_list)

        frame_a = np.array(Image.open(os.path.join(vidpath, tif_files[0])));
        frame_b = np.array(Image.open(os.path.join(vidpath, tif_files[1])));

        # run PIV computation to get size of matrix and x,y's
        x, y = openpiv.process.get_coordinates(image_size=frame_a.shape,
                                                 window_size=window_size, overlap=overlap)

        # pre-allocate the u, v matrices
        u = np.zeros((work_size, x.shape[0], x.shape[1]))
        v = np.zeros((work_size, x.shape[0], x.shape[1]))

        # make list of sources and the tasks sent to them
        source_list = np.zeros(num_processors)

        # Start all worker processes
        for i in range(0, min(num_processors-1, work_size)):

            proc = i+1
            source_list[proc] = work_index

            pp.send(work_index, proc, tag=WORK_TAG)
            pp.send(process_list[i], proc)
            print "Sent process list " + str(process_list[i]) + " to processor " + str(proc)

            work_index += 1

        # Receive results from each worker, and send it new data
        for i in range(num_processors-1, work_size):
            results, status = pp.receive(source=pp.any_source, tag=pp.any_tag, return_status=True)
            proc = status.source

            index = source_list[proc]
            print "index is " + str(index) + " from process " + str(proc) + " u origin value is " + str(results[0,0,0])

            # receive and parse the resulting var
            u[index,:,:] = results[0,:,:]
            v[index,:,:] = results[1,:,:]

            # re-up workers
            pp.send(work_index, proc, tag=WORK_TAG)
            pp.send(process_list[work_index], proc)
            source_list[proc] = work_index

            print "Sent work index " + str(work_index) + " to processor " + str(proc)

            # increment work_index
            work_index += 1
            num_completed += 1


        # Get results from remaining worker processes
        while num_completed < work_size:
            results, status = pp.receive(source=pp.any_source, tag=pp.any_tag, return_status=True)
            proc = status.source

            index = source_list[proc]

            print "index is " + str(index) + " from process " + str(proc) + " u origin value is " + str(results[0,0,0])

            # receive and parse the resulting var
            u[index,:,:] = results[0,:,:]
            v[index,:,:] = results[1,:,:]

            num_completed += 1

        # Shut down worker processes
        for proc in range(1, num_processors):
            print "Stopping worker process " + str(proc)
            pp.send(-1, proc, tag=DIE_TAG)

        # Package up the results to save, also save all the PIV parameters
        sio.savemat(os.path.join(vidpath, '../' + foldername + '_CLUSTER.mat'),{'x':x, 'y':y, 'u':u,                                                        'v': v, 
                                                                    'window_size':window_size,
                                                                    'overlap':overlap})
        end_time = time.time()
        print repr(end_time - start_time)


    else:
        ### Worker Processes ###
        continue_working = True
        while continue_working:

            # check if being put to sleep
            work_index, status =  pp.receive(source=MASTER_PROCESS, tag=pp.any_tag, 
                    return_status=True)

            if status.tag == DIE_TAG:
                continue_working = False

            # not being put to sleep, load in videos of interest and compute
            else:
                frame_pair, status = pp.receive(source=MASTER_PROCESS, tag=pp.any_tag, 
                    return_status=True)
                work_index = status.tag


                frame_a = np.array(Image.open(os.path.join(vidpath, tif_files[frame_pair[0]])));
                frame_b = np.array(Image.open(os.path.join(vidpath, tif_files[frame_pair[1]])));

                # Code below simulates a task running
                u, v = PIVCompute(frame_a, frame_b, window_size = window_size, overlap = overlap)
                print  "Received work frame pair " + str(frame_pair) + " u origin value is " + str(u[0,0])                 

                # package up into work array
                work_array = np.zeros((2,u.shape[0], u.shape[1]))
                work_array[0,:,:] = u
                work_array[1,:,:] = v

                result_array = work_array.copy()

                pp.send(result_array, destination=MASTER_PROCESS, tag=work_index)
        #### while
    #### if worker

    pp.finalize()

```