This notebook will demonstrate how to use Cloudknot to parallelize a tracking method.  Example Cloudknot functions are provided in the knotlet module, but the user must build his/her own functions for this step to work properly.  

In [1]:
import os
import diff_classifier.imagej as ij
import boto3
import os.path as op
import diff_classifier.aws as aws
import cloudknot as ck
import diff_classifier.knotlets as kn
import numpy as np

First, I define the nomenclature I use to name my files, as well as specify exceptions (files that weren't generated or are missing and will be skipped in the analysis).  In this case, I was analyzing data collected in tissue slices.  Videos are named according to the pup number, the slice number, the hemisphere, and the video number.

In [2]:
folder = '01_18_Experiment'

missing = []
for i in range(10, 15):
    missing.append("P1_S2_R_00{}".format(i))

for i in range(10, 15):
    missing.append("P2_S3_L_00{}".format(i))
    
for i in range(0, 15):
    missing.append("P3_S3_L_{}".format("%04d" % i))

In [3]:
to_track = {}
knot = {}
result_futures = {}
start_knot = 812

pups = ["P1"]
slices = ["S1"]
folder = '01_18_Experiment'

hemis = ["R"]
vids = 3

The function defined below is sent to each individual machine the user calls upon.  A single video is sent to each machine for analysis, and the resulting outputs are uploaded to S3.  This case uses files that are only temporarily stored in a private bucket.  

The following function is broken down into four separate sections performing different tasks of the analysis:

* **parameter prediction**: A regression tool is used to predict the quality tracking parameter used by Trackmate based off a training dataset of images whose qualities were assessed manually beforehand.  If analyzing a large number of samples, the user should build a similar training dataset.

* **splitting section**: Splits videos to be analyzed into smaller chunks to make analysis feasible.

* **tracking section**: Tracks the videos using a Trackmate script.

* **MSDs and features calculations**: Calculates MSDs and relevant features and outputs associated files and images.

In [4]:
def download_split_track_msds(prefix):
    """
    1. Checks to see if features file exists.
    2. If not, checks to see if image partitioning has occured.
    3. If yes, checks to see if tracking has occured.
    4. Regardless, tracks, calculates MSDs and features.
    """

    import matplotlib as mpl
    mpl.use('Agg')
    import diff_classifier.aws as aws
    import diff_classifier.utils as ut
    import diff_classifier.msd as msd
    import diff_classifier.features as ft
    import diff_classifier.imagej as ij
    import diff_classifier.heatmaps as hm

    from scipy.spatial import Voronoi
    import scipy.stats as stats
    from shapely.geometry import Point
    from shapely.geometry.polygon import Polygon
    import matplotlib.cm as cm
    import os
    import os.path as op
    import numpy as np
    import numpy.ma as ma
    import pandas as pd
    import boto3
    import random

    #Building predictor for tracking parameters
    ###############################################################################################
    
    folder = '01_18_Experiment'
    missing = []
    for i in range(10, 15):
        missing.append("P1_S2_R_00{}".format(i))

    for i in range(10, 15):
        missing.append("P2_S3_L_00{}".format(i))

    for i in range(0, 15):
        missing.append("P3_S3_L_{}".format("%04d" % i))

    pups = ["P3", "P2", "P1"]
    slices = ["S1", "S2", "S3"]
    folder = '01_18_Experiment'

    hemis = ["R", "L"]
    vids = 15
    tnum=20 #number of training datasets
    
    to_track = []
    for pup in pups:
        for hemi in hemis:
                for slic in slices:
                    for vid in range(0, vids):
                                pref = "{}_{}_{}_{}".format(pup, slic, hemi, "%04d" % vid)
                                if not pref in missing:
                                    for row in range(0, 4):
                                        for col in range(0, 4):
                                            to_track.append("{}_{}_{}".format(pref, row, col))

    y = np.array([1.5, 833.6, 9.24, 4.5, 3.3, 3.4, 2.85, 2.75, 3.7, 2.45,
                  2.6, 3.1, 3.25, 2.85, 2.05, 2.9, 5.4, 5.0, 2.7, 530])
    
    # Creates regression object based of training dataset composed of input images and manually
    # calculated quality cutoffs from tracking with GUI interface.
    regress = ij.regress_sys(folder, to_track, y, tnum, have_output=True)

    #Splitting section
    ###############################################################################################
    remote_folder = "01_18_Experiment/{}".format(prefix.split('_')[0])
    local_folder = os.getcwd()
    ires = 512
    frames = 651
    filename = '{}.tif'.format(prefix)
    remote_name = remote_folder+'/'+filename
    local_name = local_folder+'/'+filename

    msd_file = 'msd_{}.csv'.format(prefix)
    ft_file = 'features_{}.csv'.format(prefix)

    s3 = boto3.client('s3')

    names = []
    for i in range(0, 4):
        for j in range(0, 4):
            names.append('{}_{}_{}.tif'.format(prefix, i, j))

    try:
        obj = s3.head_object(Bucket='ccurtis7.pup', Key=remote_folder+'/'+ft_file)
    except:

        try:
            for name in names:
                aws.download_s3(remote_folder+'/'+name, name)
        except:
            aws.download_s3(remote_name, local_name)
            names = ij.partition_im(local_name)
            for name in names:
                aws.upload_s3(name, remote_folder+'/'+name)
                print("Done with splitting.  Should output file of name {}".format(remote_folder+'/'+name))

        #Tracking section
        ################################################################################################
        for name in names:
            outfile = 'Traj_' + name.split('.')[0] + '.csv'
            local_im = op.join(local_folder, name)

            row = int(name.split('.')[0].split('_')[4])
            col = int(name.split('.')[0].split('_')[5])

            try:
                aws.download_s3(remote_folder+'/'+outfile, outfile)
            except:

                quality = ij.regress_tracking_params(regress, name.split('.')[0], regmethod='PassiveAggressiveRegressor')
                
                if row==3:
                    y = 485
                else:
                    y = 511

                ij.track(local_im, outfile, template=None, fiji_bin=None, radius=4.5, threshold=0.,
                         do_median_filtering=True, quality=quality, x=511, y=y, ylo=1, median_intensity=300.0, snr=0.0,
                         linking_max_distance=8.0, gap_closing_max_distance=10.0, max_frame_gap=2,
                         track_displacement=10.0)

                aws.upload_s3(outfile, remote_folder+'/'+outfile)
            print("Done with tracking.  Should output file of name {}".format(remote_folder+'/'+outfile))


        #MSD and features section
        #################################################################################################
        files_to_big = False
        size_limit = 10

        for name in names:
            outfile = 'Traj_' + name.split('.')[0] + '.csv'
            local_im = name
            file_size_MB = op.getsize(local_im)/1000000
            if file_size_MB > size_limit:
                file_to_big = True

        if files_to_big:
            print('One or more of the {} trajectory files exceeds {}MB in size.  Will not continue with MSD calculations.'.format(
                  prefix, size_limit))
        else:
            counter = 0
            for name in names:
                row = int(name.split('.')[0].split('_')[4])
                col = int(name.split('.')[0].split('_')[5])

                filename = "Traj_{}_{}_{}.csv".format(prefix, row, col)
                local_name = local_folder+'/'+filename

                if counter == 0:
                    to_add = ut.csv_to_pd(local_name)
                    to_add['X'] = to_add['X'] + ires*col
                    to_add['Y'] = ires - to_add['Y'] + ires*(3-row)
                    merged = msd.all_msds2(to_add, frames=frames)
                else:

                    if merged.shape[0] > 0:
                        to_add = ut.csv_to_pd(local_name)
                        to_add['X'] = to_add['X'] + ires*col
                        to_add['Y'] = ires - to_add['Y'] + ires*(3-row)
                        to_add['Track_ID'] = to_add['Track_ID'] + max(merged['Track_ID']) + 1
                    else:
                        to_add = ut.csv_to_pd(local_name)
                        to_add['X'] = to_add['X'] + ires*col
                        to_add['Y'] = ires - to_add['Y'] + ires*(3-row)
                        to_add['Track_ID'] = to_add['Track_ID']

                    merged = merged.append(msd.all_msds2(to_add, frames=frames))
                    print('Done calculating MSDs for row {} and col {}'.format(row, col))
                counter = counter + 1

            merged.to_csv(msd_file)
            aws.upload_s3(msd_file, remote_folder+'/'+msd_file)
            merged_ft = ft.calculate_features(merged)
            merged_ft.to_csv(ft_file)

            aws.upload_s3(ft_file, remote_folder+'/'+ft_file)

            #Plots
            features = ('AR', 'D_fit', 'alpha', 'MSD_ratio', 'Track_ID', 'X', 'Y', 'asymmetry1', 'asymmetry2', 'asymmetry3',
                        'boundedness', 'efficiency', 'elongation', 'fractal_dim', 'frames', 'kurtosis', 'straightness', 'trappedness')
            vmin = (1.36, 0.015, 0.72, -0.09, 0, 0, 0, 0.5, 0.049, 0.089, 0.0069, 0.65, 0.26, 1.28, 0, 1.66, 0.087, -0.225)
            vmax = (3.98, 2.6, 2.3, 0.015, max(merged_ft['Track_ID']), 2048, 2048, 0.99, 0.415, 0.53,
                    0.062, 3.44, 0.75, 1.79, 650, 3.33, 0.52, -0.208)
            die = {'features': features,
                   'vmin': vmin,
                   'vmax': vmax}
            di = pd.DataFrame(data=die)
            for i in range(0, di.shape[0]):
                hm.plot_heatmap(prefix, feature=di['features'][i], vmin=di['vmin'][i], vmax=di['vmax'][i])
                hm.plot_scatterplot(prefix, feature=di['features'][i], vmin=di['vmin'][i], vmax=di['vmax'][i])

            hm.plot_trajectories(prefix)
            try:
                hm.plot_histogram(prefix)
            except ValueError:
                print("Couldn't plot histogram.")
            hm.plot_particles_in_frame(prefix)
            gmean1, gSEM1 = hm.plot_individual_msds(prefix, alpha=0.05)

Cloudknot requires a Docker image to load on each machine that is used.  This image has all the required dependencies for the code to run.  The Docker image created is available as 'arokem/python3-fiji:0.3'.  It essentially just includes a Fiji install in the correct location, and points to the correct Github installs.

Note: Use "docker system prune" to clear existing Dockers before creating a new Docker image.

In [None]:
github_installs=('https://github.com/ccurtis7/diff_classifier.git')
my_image = ck.DockerImage(func=download_split_track_msds, base_image='arokem/python3-fiji:0.3', github_installs=github_installs)

docker_file = open(my_image.docker_path)
docker_string = docker_file.read()
docker_file.close()

req = open(op.join(op.split(my_image.docker_path)[0], 'requirements.txt'))
req_string = req.read()
req.close()

new_req = req_string[0:req_string.find('\n')-3]+'5.28'+ req_string[req_string.find('\n'):]
req_overwrite = open(op.join(op.split(my_image.docker_path)[0], 'requirements.txt'), 'w')
req_overwrite.write(new_req)
req_overwrite.close()

In [None]:
#Test Docker Image
my_image.build("0.1", image_name="test_image")

This is the actual location where the commands are sent to AWS to start machines and begin the analysis.  The meat of is in the function "Knot."  The user specifies a few essentials:

* **name**: The user-defined name of the knot of machines to be started. Used to identify jobs in AWS.
* **docker_image**: The Docker image used to initialize each machine.
* **memory**: desired memory of each machine to be used.
* **resource_type**: in order to get the cheapest machines, I set this to SPOT so we can bid on machines.
* **bid_percentage**: in order to ensure I get a machine in each case, I set to 100%.  You can lower this.
* **image_id**:
* **pars_policies**: I give each machine access to the required S3 bucket here.

In [None]:
for pup in pups:
    for hemi in hemis:

            run_name = '{}_{}'.format(pup, hemi)
            to_track[run_name] = []
            for slic in slices:
                for vid in range(0, vids):
                            prefix = "{}_{}_{}_{}".format(pup, slic, hemi, "%04d" % vid)
                            if not prefix in missing:
                                to_track[run_name].append(prefix)

            test_length = len(to_track[run_name])
            print('Number of nodes to be loaded: {}'.format(test_length))
            
            knot[run_name] = ck.Knot(name='download_and_track_{}'.format(start_knot),
                           docker_image = my_image,
                           memory = 32000,
                           resource_type = "SPOT",
                           bid_percentage = 100,
                           image_id = 'ami-6d8a7510',
                           pars_policies=('AmazonS3FullAccess',))
            result_futures[run_name] = knot[run_name].map(to_track[run_name])
            start_knot = start_knot + 1
            print('Next knot name: {}'.format(start_knot))

To completely shut down all resources started after the analysis, it is good practice to clobber them using the clobber function.  The user can do this manually in the AWS Batch interface as well.

In [None]:
for key in knot:
    knot[key].clobber()

In [5]:
download_split_track_msds('P2_S2_R_0005')

This call to matplotlib.use() has no effect because the backend has already
been chosen; matplotlib.use() must be called *before* pylab, matplotlib.pyplot,
or matplotlib.backends is imported for the first time.

The backend was *originally* set to 'module://ipykernel.pylab.backend_inline' by the following code:
  File "/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main
    "__main__", fname, loader, pkg_name)
  File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
    exec code in run_globals
  File "/usr/local/lib/python2.7/dist-packages/ipykernel_launcher.py", line 16, in <module>
    app.launch_new_instance()
  File "/usr/local/lib/python2.7/dist-packages/traitlets/config/application.py", line 658, in launch_instance
    app.start()
  File "/usr/local/lib/python2.7/dist-packages/ipykernel/kernelapp.py", line 478, in start
    self.io_loop.start()
  File "/usr/local/lib/python2.7/dist-packages/zmq/eventloop/ioloop.py", line 177, in start
    super(ZMQIOLoop, self).sta