## Using Dask for multi-processing.

This notebooks shows how you might use Dask to run 3D-DAOSTORM in parallel on a single movie. The idea is similar to the approach for SLURM. We're going to break up the analysis by creating XML files for sub-sets of the frames in the movie. Then we'll run the analysis in parallel on each of the sub-sets and finally combine the results into a single HDF5 file.

At least for this simple example movie, and on my laptop, this is not actually any faster than doing the analysis serially, but it at least demonstrates the idea.

References:
* [Dask](https://docs.dask.org/en/latest/)


### Configure environment and create test data

In [None]:
import os
os.chdir("/home/hbabcock/Data/storm_analysis/jy_testing/")
print(os.getcwd())

import numpy
numpy.random.seed(1)

In [None]:
import storm_analysis.diagnostics.daostorm_3d.settings as settings
import storm_analysis.diagnostics.daostorm_3d.configure as configure
import storm_analysis.diagnostics.daostorm_3d.make_data as makeData
import storm_analysis.diagnostics.daostorm_3d.collate as collate


In [None]:
settings.photons = [[10, 1000]]
print(settings.photons)

settings.iterations = 20
settings.model = '2dfixed'
settings.n_frames = 2000
settings.peak_locations = None

In [None]:
configure.configure()

# You might want to change 'True' to 'False' if you are re-running
# the notebook without changing the movie.
if True:
    makeData.makeData()

### Create job XML files

#### Notes: 

* The number of divisions should be roughly the same as the number of workers.

In [None]:
import storm_analysis.sa_library.datareader as datareader

import storm_analysis.slurm.check_analysis as checkAnalysis
import storm_analysis.slurm.split_analysis_xml as splitAnalysisXML


In [None]:
# Figure out movie length.
mv = datareader.inferReader("test_01/test.dax")
movie_len = mv.filmSize()[2]
mv.close()

# Make working directory.
w_dir = "test_01/work_dir"
if not os.path.exists(w_dir):
    os.mkdir(w_dir)

# Delete any existing XML files.
for elt in checkAnalysis.getSortedJobXML(w_dir):
    os.remove(elt)

# Make job XML files. The number of divisions is the last argument
# to this function. Typically you will get one extra division as the
# first 10 frames are put into a single job as these tend to be
# (at least for STORM imaging) quite dense.
splitAnalysisXML.splitAnalysisXML(w_dir, "dao.xml", 0, movie_len, 4)


### Dask analysis

In [None]:
import glob

# Delete existing HDF5 files.
for elt in glob.glob(os.path.join(w_dir, "p*.hdf5")):
    os.remove(elt)
    

In [None]:
import dask

import storm_analysis.daostorm_3d.mufit_analysis as mfit

@dask.delayed
def aJob(movie_name, mlist_name, xml_name):
    mfit.analyze(movie_name, mlist_name, xml_name)
    return 1


In [None]:
# Create local client for distributed analysis.
from dask.distributed import Client
    
# You may see errors if you specify multiple threads per worker.
client = Client(threads_per_worker=1, n_workers=4)
client

### Find localizations in parallel

In [None]:
job_xml_files = checkAnalysis.getSortedJobXML(w_dir)

jobs = []

m_name = os.path.abspath("test_01/test.dax")

# Assemble jobs.
for i in range(len(job_xml_files)):
    
    h5_name = os.path.abspath(os.path.join(w_dir, "p_{0:d}.hdf5".format(i+1)))
    xml_name = os.path.abspath(job_xml_files[i])
    
    a_job = aJob(m_name, h5_name, xml_name)
    jobs.append(a_job)


In [None]:
# Run jobs.
results = dask.compute(*jobs, schedule = 'distributed')

In [None]:
# Close client.
client.close()

### Check and assemble results

This checks that all of the HDF5 files were created and merges them all of them into a single HDF5 file.

In [None]:
import storm_analysis.slurm.merge_analysis as mergeAnalysis

In [None]:
checkAnalysis.checkAnalysis(w_dir)

In [None]:
if os.path.exists("test_01/test.hdf5"):
    os.remove("test_01/test.hdf5")
    
mergeAnalysis.mergeAnalysis(w_dir, "test_01/test.hdf5")

### Run the rest of analysis pipeline

This will do the tracking, drift correction and z value checking steps.

In [None]:
import storm_analysis.sa_utilities.track_drift_correct as trackDriftCorrect

trackDriftCorrect.trackDriftCorrect("test_01/test.hdf5", "dao.xml")