# Main Focus of the Notebook: Running Laptrack on detections

### This notebook takes as input the dataframe which contains filtered detected spots and runs laptrack module on it. 
### A new dataframe containing tree id and track id is returned. For us main variable for tracks is track id. 

In [13]:
from os import path
import pandas as pd
from IPython.display import display
from matplotlib import pyplot as plt
from laptrack import LapTrack, ParallelBackend
import napari
from skimage import io
import time  
import os 
import sys
import zarr 
import dask.array as da

### Do not change the code in the cell below

In [14]:
# This assumes that your notebook is inside 'Jupyter Notebooks', which is at the same level as 'test_data'
base_dir = os.path.join(os.path.dirname(os.path.abspath("__file__")), '..', 'movie_data')
# base_dir = os.path.join(os.path.dirname(os.path.abspath("__file__")), '..', 'test_movie_1')


zarr_directory = 'zarr_file/all_channels_data'
zarr_full_path = os.path.join(base_dir, zarr_directory)

input_directory = 'datasets'
input_file_name = 'cleaned_spots_intensities_c3_all.pkl'
input_directory_full = os.path.join(base_dir,input_directory, input_file_name)

output_directory = 'datasets'
output_file_name = 'track_df_c3_cleaned.pkl'
output_directory_full = os.path.join(base_dir,output_directory, output_file_name)

In [15]:
cleaned_spots_df = pd.read_pickle(input_directory_full)
z2 = zarr.open(zarr_full_path, mode='r')

## Parameters to optimize 

* **max_distance:** The distance cost cutoff for the connected points in the track.
* **gap_size:** The number of consecutive detection gaps allowed 
* **gap_closing_cost_tolerance:** How permissive gap closing is


In [16]:
max_distance = 2.5 # in pixels
gap_size = 1 # in frames
gap_closing_cost_tolerance = 1 # default: 1

lt = LapTrack(
    track_dist_metric="sqeuclidean",  # The similarity metric for particles. See `scipy.spatial.distance.cdist` for allowed values.
    # the square of the cutoff distance for the "sqeuclidean" metric
    track_cost_cutoff=max_distance**2,
    gap_closing_cost_cutoff = (gap_closing_cost_tolerance*2*max_distance)**2,
    # gap_closing_cost_cutoff = (2.5*max_distance)**2,

    gap_closing_max_frame_count = gap_size,
    splitting_dist_metric="sqeuclidean",
    splitting_cost_cutoff=False,  # Enable splitting with a cost cutoff
    merging_dist_metric="sqeuclidean",
    merging_cost_cutoff=False  # Enable merging with a cost cutoff
)

In [17]:
start_time = time.time()

track_df, split_df, merge_df = lt.predict_dataframe(
    cleaned_spots_df,
    coordinate_cols=[
        "mu_x",
        "mu_y",
        "mu_z"
    ],  # the column names for the coordinates
    frame_col="frame",  # the column name for the frame (default "frame")
    only_coordinate_cols=False,  # if False, returned track_df includes columns not in coordinate_cols.
    # False will be the default in the major release.
)

end_time = time.time()

execution_time = end_time - start_time
print(f"Total execution time: {execution_time} seconds")
track_df = track_df.reset_index()

Total execution time: 32.659488916397095 seconds


In [18]:
# Report number of frames and a random color per track

import random

# Calculate the number of unique frames for each track and create a mapping from track_id to number_of_frames
frame_counts = track_df.groupby('track_id')['frame'].nunique().to_dict()

# Map the frame counts to the original dataframe, creating a new column
track_df['number_of_frames'] = track_df['track_id'].map(frame_counts)

# Assuming 'track_df' is your DataFrame
track_ids = track_df['track_id'].unique().tolist()

# Generate a list of random colors
colors = [random.randint(0, 255) for _ in range(len(track_ids))]

# Create a dictionary that maps each track id to a random color
color_dict = dict(zip(track_ids, colors))

# Now you can use this dictionary to assign colors to the tracks
track_df['color'] = track_df['track_id'].map(color_dict)
# track_df['color']

### Visualise the performance of the tracking algorithm in napari. If there is a mismatch between the tracks and your visual tracking of the spots, you can adjust the parameters at the top of this notebook (see below). 

Once you open napari adjust the contrast so that spots are clearly visible 

In [19]:
# Create a napari viewer
viewer = napari.Viewer()

#access channel 3 only from zarr array 
dask_array = da.from_zarr(z2)

#the axis arrangement is (t,c,z,y,x)
#for the sake of improved performance only 1 channel could be imported here (if images get super large and performance issues occur)
all_channels = dask_array[:,:,:,:,:]

# Add the 4D stack to the viewer
# layer_raw = viewer.add_image(all_channels, channel_axis = 1, name = ['channel 1', 'channel 2', 'channel 3'])
layer_raw = viewer.add_image(all_channels, channel_axis = 1, name = ['channel 1', 'channel 2', 'channel 3'], contrast_limits=[110,250], interpolation3d = 'nearest', blending = 'additive', colormap = 'gray_r', visible = [False, False, True])

# # randomize the track ids and save the track ids in a list
# track_ids_shuffle = track_df['track_id'].tolist()
# random.shuffle(track_ids_shuffle)

properties = dict({
    # 'number_of_frames': dict(zip(track_df.index.tolist(), track_df['number_of_frames']))
    'track_id_rand': track_df['color'],
    'number_of_frames': track_df['number_of_frames']
    
})
# add detections here!
# also rejected ones 

tracks_layer = viewer.add_tracks(track_df[["track_id", "frame", "mu_z", "mu_y", "mu_x"]], properties=properties, color_by='track_id_rand', tail_length = 15, tail_width = 6, colormap = 'hsv', name = 'tracks')
# tracks_layer2 = viewer.add_tracks(track_df[["track_id", "frame", "mu_z", "mu_y", "mu_x"]], properties=properties, color_by='number_of_frames', colormap = 'inferno', name = 'tracks by frame count')

points_layer = viewer.add_points(cleaned_spots_df[["frame", "mu_z", "mu_y", "mu_x"]], size=2, 
                                properties=properties, name = 'Cleaned Spots', face_color = 'white', symbol = 'ring')

#  Add Bounding Box

layer_raw[0].bounding_box.visible = True
layer_raw[1].bounding_box.visible = True
layer_raw[2].bounding_box.visible = True

# Now, the dataframe 'data_with_frame_counts' contains an additional column 'number_of_frames'
# which indicates how many unique frames each track appears in.
# data_with_frame_counts['number_of_frames'].hist()

  warn(


# View a subset range of slices

In [28]:
# min and max z to show
min_z = 15
max_z = 36

viewer = napari.Viewer()

# for a limited number of z slices
image_slice = all_channels[:,:,min_z:max_z,:,:]

# for a limited number of z slices
tracks_slice = track_df[(track_df['mu_z'] > min_z) & (track_df['mu_z'] < max_z)]
tracks_slice 

properties_slice = dict({
    # 'number_of_frames': dict(zip(track_df.index.tolist(), track_df['number_of_frames']))
    'track_id_rand': tracks_slice['color'],
    'number_of_frames': tracks_slice['number_of_frames']
})

points_slice= cleaned_spots_df[(cleaned_spots_df['mu_z'] > min_z) & (cleaned_spots_df['mu_z'] < max_z)]

layer_raw = viewer.add_image(image_slice, channel_axis = 1, name = ['channel 1', 'channel 2', 'channel 3'],contrast_limits=[110,250], translate = [0,min_z,0,0], interpolation3d = 'nearest', blending = 'additive', colormap = 'gray_r', visible = [False, False, True])
tracks_layer = viewer.add_tracks(tracks_slice[["track_id", "frame", "mu_z", "mu_y", "mu_x"]], properties=properties_slice, color_by='track_id_rand', tail_length = 15, tail_width = 6, colormap = 'hsv', name = 'tracks')
points_layer = viewer.add_points(points_slice[["frame", "mu_z", "mu_y", "mu_x"]], size=2, 
                                properties=properties_slice, name = 'Cleaned Spots', face_color = 'white', symbol = 'ring')

layer_raw[0].bounding_box.visible = True
layer_raw[1].bounding_box.visible = True
layer_raw[2].bounding_box.visible = True


  warn(


## Based on your visual inspection of the tracks, adjust the tracking parameters:

* If a single track appears to be broken into multiple tracks you can increase the max_distance and/or increase the gap size and rerun tracking 

* If multiple tracks appear to be merged into a single track you can decrease the max_distance and/or decrease the gap size and rerun tracking 


In [21]:
# Save the DataFrame to a pickle file
track_df.to_pickle(output_directory_full)