# Spot tracking pipeline
This notebook handles a single tiff or nd2 file, with layout [T, Y, X] (single position & timelapse dataset, assumed to be single-channel).

## Key pipeline components:
- Data loading
  - ND2 parsing by [nd2](https://github.com/tlambert03/nd2) or tiff loading (tifffile)
  - (optional) flatfield illumination correction
- Spot detection by Laplacian of Gaussian [scikit-image](https://github.com/scikit-image/scikit-image)
- Spot tracking by [LapTrack](https://github.com/yfukai/laptrack)
  - Custom metric, integrating distance & intensity difference
- Spot intensity extraction + background subtraction (inspired by [TransTrack](https://github.com/TanenbaumLab/TransTrack))
- Track filtering (optional)
  - Mask overlap (manual)
  - Track feature treshold
    - Length
    - Spot density
    - Start window
  - Manual toggle
- Data visualization with Napari
- Spot track isolation (crops) focussed on track ends
  - Annotation

- Batch mode for all automated steps (loop over image files)

In [None]:
# Default libs
import os, sys
import re
import numpy as np
import pandas as pd
from tqdm.notebook import tqdm
from functools import reduce

# Papallel processing
import ray
from ray.util.multiprocessing import Pool
pool = Pool()

# Data loading
from imageio import imread_v2
import nd2

# Vizualisation
import napari
import matplotlib.pyplot as plt
import distinctipy

# Detection and tracking
from skimage.feature import blob_log
from laptrack import LapTrack
    
# Measurements
from skimage import measure
from sklearn.neighbors import NearestNeighbors

from scipy.spatial import distance_matrix
from scipy.spatial.distance import cdist
from skimage.segmentation import expand_labels

# find path to function imports
from pathlib import Path
path_imports = str(Path(os.getcwd()).resolve().parents[0]) + '/src/'
sys.path.append(path_imports)

# import external function
import importlib
import FlatDarkField
import SpotGeneric
import TrackFilter
import LinkPointToObject
import Grid

# Functions to use

In [None]:
# Image load tiff
def load_image(path, flatfield, img_ff, img_df):
    '''
    Loads image (tiff or nd2, all slices) and applies flatfield correction.
    '''
    filename = os.path.basename(path)
    extension = filename.split('.')[-1]

    if extension.startswith('tif'):
        img = imread_v2(path)
    elif extension == 'nd2':
        img = nd2.imread(path)
    else:
        raise ValueError('Invalid file extension')

    if flatfield:
        img = FlatDarkField.ffdf_series(img, img_ff, img_df)
    return img

In [None]:
def filter_time_window_below_per_group(df, group_col, value_col, window):
    grouped = df.groupby(group_col)
    filtered_df = pd.DataFrame()

    for _, group in grouped:
        max_value = group[value_col].max()
        threshold = max_value - window
        mask = group[value_col] >= threshold
        # print(max_value, threshold, sum(mask))
        filtered_group = group[mask]
        filtered_df = pd.concat([filtered_df, filtered_group])
    return filtered_df

# File paths

In [None]:
path_base = '/dummy/'

path_analysis_spots = path_base + "/Spots/"
path_analysis_tracks = path_base + "/Tracks/"
path_features = path_base + "/Features/"
path_figures = path_base + "/Figures/"
path_movies = path_base + "/Movies/"

os.makedirs(path_analysis_spots, exist_ok=True)
os.makedirs(path_analysis_tracks, exist_ok=True)
os.makedirs(path_features, exist_ok=True)
os.makedirs(path_figures, exist_ok=True)
os.makedirs(path_movies, exist_ok=True)

In [None]:
# should flatfield correction be executed?
flatfield = False #True

## Settings    

In [None]:
# Detection settings
detection_threshold = 10**-3 #with flatfield corrected data 10**-3, without correction typically 30-75
measurement_radius = 4 #in px

# Tracking settings
distance_threshold = 20 #in px
frame_gap_frames = 2 #number of frames no spot to be tracked
frame_gap_distance = 20 #in px
max_relIntDiff = .5 # intensity gap to break track linking

# Output filtering & arrangement
track_length_minimum = 3
track_density_maximum = 1200
threshold_include_max_outside = .1
frame_threshold = 10 # track start window
retain_before_frame = True # before or after frame number

track_index_rearange_length = True #(export only)

# Measurement settings
bbox_extent = 8 # size (width & height) of intensity extraction box, scale this with pixel size and spot size in images
coef_var_thr = .4 # coeficient of variation theshold for inclusion of background measurement boxes surrounding spot detections. Better be a little stringent here! (based on histogram)

# Flat field definitions

In [None]:
# load ff and df images
path_ff = "G:/Group Tanenbaum/Lab Microscopy/misc/Flat field correction images/Flatfieldimages_Harry_20230608/"
path_ff_GFP = path_ff + "488_8-6-2023_normalizedflatfieldimage_100ms.tif"
path_df = path_ff + "darkfieldimage__100ms.tif"

if flatfield:
    img_ff_GFP = imread_v2(path_ff_GFP).squeeze()
    img_df = np.round(imread_v2(path_df)).astype(np.uint16)
    img_ff = img_ff_GFP
else:
    img_ff = []
    img_df = []

# Data loading (nd2 or tiff)
! a single file/dataset is assumed to be single position and single channel

In [None]:
# list image files in directory
img_files = [i for i in os.listdir(path_base) if i.endswith('.tif') or i.endswith('.nd2')]
print(img_files)

In [None]:
# Select image region to analyze
filename = img_files[0]
path_img = path_base + filename
print(filename)

In [None]:
img = load_image(path_img, flatfield, img_ff, img_df)

In [None]:
# initiate a viewer
viewer = napari.Viewer()

In [None]:
# load image into viewer
viewer.add_image(img, name = 'image', blending = 'additive')

# Spot detection

In [None]:
# execute
spots = SpotGeneric.detect_spots(img = img, thresholdLoG = detection_threshold)
print(len(spots))

In [None]:
# plot spot number per movie frame
list_n_spots = []
for i in range(img.shape[0]):
    n_spots = sum(spots[:,0] == i)
    list_n_spots.append(n_spots)

plt.figure(figsize = (5, 2))
plt.plot(list_n_spots)
plt.title('n-spots per frame')
plt.xlabel('frame')
plt.ylabel('count')
plt.show()

In [None]:
# add spots to viewer
viewer.add_points(spots, name = 'Spots', size = 10, face_color = 'transparent', border_color = 'red', border_width = 0.15)

# Spot feature extraction
Used for tracking

In [None]:
spots, spots_int = SpotGeneric.measure_spots_intensities(spots = spots, radius = measurement_radius, img = img)
spots_int_rel, spots_in_range = SpotGeneric.metric_spots_relative_intensity(spots = spots, spotsIntensity = spots_int, gapMax = frame_gap_frames)

coordinate_df = pd.DataFrame(spots, columns = ['frame', 'centroid0', 'centroid1'])

coordinate_df['label'] = np.arange(len(spots))
coordinate_df['frame'] = coordinate_df['frame'].astype('int')

# Tracking
Tracking metric = eucledian distance * (1 + absolute relative intensity difference)

A = mean intensity parent  
B = mean intensity link candidate  

|A - B| / A

In [None]:
# Laptrack custom tracking metric
def LAP_link_metric_intensity(c1, c2):
    (frame1, label1), (frame2, label2) = c1, c2
    if frame1 > frame2:
        # switch spots in query if time order is wrong
        tmp = (frame1, label1)
        (frame1, label1) = (frame2, label2)
        (frame2, label2) = tmp

    # double frame index
    ind = (frame1, frame2, label1, label2)
    if ind in spots_int_rel.index:
        if spots_int_rel.loc[ind]["relIntDiff"] >= max_relIntDiff:
            # Too large difference intensity
            value = np.float32("inf")
        else:
            # Valid result
            value = spots_int_rel.loc[ind]["distanceScaled"]
    else:
        # No candidates found
        value = np.float32("inf")
    return value

lap_tracker = LapTrack(
    track_dist_metric = LAP_link_metric_intensity,
    track_cost_cutoff = distance_threshold,
    gap_closing_dist_metric =  LAP_link_metric_intensity,
    gap_closing_max_frame_count = frame_gap_frames-2,
    gap_closing_cost_cutoff = frame_gap_distance,
    splitting_cost_cutoff = False,
    merging_cost_cutoff = False,
    parallel_backend = 'ray'
)

In [None]:
# execute
track_df, split_df, merge_df = lap_tracker.predict_dataframe(
    coordinate_df, coordinate_cols = ["frame", "label"], only_coordinate_cols = False
)

In [None]:
# plot track starts and ends
track_starts = track_df.groupby('track_id')['frame_y'].min().values
track_ends = track_df.groupby('track_id')['frame_y'].max().values

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 3))
ax1.set_title('track starting frame')
ax1.hist(track_starts, bins=50, label='starts')
ax1.set_xlabel('frame')
ax1.set_ylabel('n tracks')

ax2.set_title('track ending frame')
ax2.hist(track_ends, bins=50, label='ends')
ax2.set_xlabel('frame')
ax2.set_ylabel('n tracks')

plt.show()

In [None]:
# load track_df to Napari
viewer.add_tracks(track_df[["track_id", "frame_y", "centroid0", "centroid1"]],
                  tail_length=1, tail_width=4,
                  colormap='hsv')

In [None]:
# Make points into labelmap, color by track_id
mask = np.zeros(img.shape, dtype = np.uint16)

# Make point objects in empty mask, index by track_id + 1
for i, row in track_df.iterrows():
    mask[(int(row['frame_y']), int(row['centroid0']), int(row['centroid1']))] = row['track_id'] + 1

# Expand individual labels in the mask, this method enables closeby points to have non-overlapping fill areas
expanded_labels = list()
for i in range(0,len(mask)):
    expanded_labels.append(expand_labels(mask[i], distance = 5))
expanded_labels = np.array(expanded_labels)

viewer.add_labels(expanded_labels, name = 'Spots by Track_ID')

# Intensity extraction & background subtraction

In [None]:
# extract intensities from image frames
tracks = track_df.reset_index()
data_frames = np.zeros(shape = [len(tracks), 25, bbox_extent**2])

list_frames = np.unique(tracks['frame_y'].values)
for frame in list_frames:
    spots_df = tracks[tracks['frame_y'] == frame]
    spots_frame = spots_df[['centroid0', 'centroid1']].values.astype('int')
    data_frames[spots_df.index] = SpotGeneric.extract_bbox_grid_intensity(img[frame], spots_frame, bbox_extent)

bbox_indices = SpotGeneric.index_bbox_grid_tiers()
intensity_peak, intensity_bg, intensity_peaksub, coef = SpotGeneric.parse_bbox_grid_intensity(data_frames, bbox_indices, thr_bg = coef_var_thr)

print('n of spots with invalid background:', np.sum(np.isnan(intensity_bg)))

plt.hist(coef.flatten(), bins = 100)
plt.title('coeficient of variation all bboxes (including spots)')
plt.ylabel('count')
plt.xlabel('coeficient of variation')
plt.axvline(x=coef_var_thr, color='r', linestyle='--') 
plt.show()

In [None]:
# QC figure
bins = np.linspace(0, round(np.max(intensity_peak)), 100)

plt.figure(figsize = (10, 4))
plt.hist(intensity_peak, label = 'peak', bins = bins, alpha = .5)
plt.hist(intensity_bg, label = 'bg', bins = bins, alpha = .5)
plt.hist(intensity_peaksub, label = 'peak_bg', bins = bins, alpha = .5)
plt.legend()
plt.yscale('log')

plt.ylabel('count')
plt.xlabel('intensity value')
plt.title('spot & background signal histogram')
plt.show()

In [None]:
# join intenstity data to tracking table
tracks_join = tracks.join(
    pd.DataFrame({
        'intensity': intensity_peak,
        'background': intensity_bg,
        'intensity_bg': intensity_peaksub
    })
)

In [None]:
# plot example background stability
unique_track_ids, counts = np.unique(tracks_join['track_id'].values, return_counts=True)
largest_group_ids = unique_track_ids[np.argsort(counts)[-5:]]
track_sample = tracks_join[np.isin(tracks_join['track_id'].values, largest_group_ids)]

for i in largest_group_ids:
    sample = track_sample[track_sample['track_id'] == i]
    plt.plot(sample['frame'], sample['background'], label = i)
plt.title('background intensity stability')
plt.xlabel('frame')
plt.ylabel('intensity')
plt.legend()
plt.show()

In [None]:
# plot longest sample track intensity traces
array, counts = np.unique(tracks_join['track_id'].values, return_counts=True)
track_sample = tracks_join[tracks_join['track_id'] == array[counts == np.max(counts)][0]]

plt.plot(track_sample['frame'], track_sample['background'], label = 'bg')
plt.plot(track_sample['frame'], track_sample['intensity'], label = 'int')
plt.plot(track_sample['frame'], track_sample['intensity_bg'], label = 'corr')
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left', borderaxespad=0.)
plt.xlabel('frame')
plt.ylabel('intensity')
plt.show()

In [None]:
# QC, overlay coef variation values for all bboxes on the images
# this draws all bboxes (including spot itself) that are beyond the theshold set

img_heatmap = np.zeros_like(img, dtype = coef.dtype)
for i, mask_slice in enumerate(img_heatmap):
    indexes = tracks_join['frame_y'] == i
    spots_current = tracks_join[indexes][['centroid0', 'centroid1']].values.astype('int')
    values = coef[np.where(indexes)] >= coef_var_thr #>= coef_var_thr # to show raw values, or excluded bboxes

    mask_slice = SpotGeneric.draw_bbox_grid_mask(mask_slice, spots_current, bbox_extent, values, True)

viewer.add_image(img_heatmap, name = 'coefVar', colormap = 'inferno', blending = 'additive', opacity = .75, contrast_limits = [0.0, 2.0])

# Export all tracks to csv

In [None]:
df_export = tracks_join[['track_id', 'centroid1', 'centroid0', 'frame_y']].copy()

df_export.rename(columns={
    'centroid1': 'x',
    'centroid0': 'y',
    'frame_y': 't'
}, inplace = True)

if track_index_rearange_length:
    # re-index track_id value by tracklength
    key_counts = df_export['track_id'].value_counts()

    # Step 2: Create a rank based on the counts (largest group gets index 1)
    rank_mapping = {key: rank + 1 for rank, key in enumerate(key_counts.index)}

    # Step 3: Map the new rank to the key column
    df_export['track_id'] = df_export['track_id'].map(rank_mapping)

df_export.to_csv(path_base + filename + "_tracks_all.csv")

# Mask-based filtering setup
The labels layer will be used for filtering at a later stage.

In [None]:
mask_inclusion = viewer.add_labels(name = 'Inclusion', data = np.zeros(img.shape[1:], dtype = bool), blending = 'additive')
mask_exclusion = viewer.add_labels(name = 'Exclusion', data = np.zeros(img.shape[1:], dtype = bool), blending = 'additive')

!!! Now start drawing mask(s) in the viewer

In [None]:
# inclusion mask filter, tracks inside for 1-'outside threshold' are kept (or all if no mask)
try:
    mask_inclusion
except NameError:
    print('no inclusion mask data, anything will be accepted')
    tracks_inclusion_cutoff = tracks_join['track_id'].unique()
else:
    if np.max(mask_inclusion.data):
        tracks_inclusion_cutoff = TrackFilter.track_filter_mask(
            tracks_join, spots, mask_inclusion.data, threshold=1-threshold_include_max_outside)
    else:
        print('no mask drawn, but mask exists, anything will be accepted')
        tracks_inclusion_cutoff = tracks_join['track_id'].unique()

In [None]:
# exclusion mask filter, tracks inside for 'outside threshold' are kept (or all if no mask)
try:
    mask_exclusion
except NameError:
    print('no exclusion mask data, anything will be accepted')
    tracks_exclusion_cutoff = tracks_join['track_id'].unique()
else:
    if np.max(mask_exclusion.data):
        tracks_exclusion_cutoff = TrackFilter.track_filter_mask(
            tracks_join, spots, mask_exclusion.data, inside = False, threshold=threshold_include_max_outside)
    else:
        print('no mask drawn, but mask exists, anything will be accepted')
        tracks_exclusion_cutoff = tracks_join['track_id'].unique()

# Criteria track filtering
- Track length (frame number, ! ignores gaps)
- Local spot density (quantile-based, time agnostic)
- Overlap with inclusion or exclusion mask

In [None]:
# Track length filter (n obs)
tracks_length_cutoff = TrackFilter.track_filter_min_length(tracks_join, track_length_minimum)

In [None]:
# Local spot density per track
tracks_density_cutoff = TrackFilter.track_filter_density(tracks_join, spots, track_density_maximum, supplement=True)

In [None]:
# Tracks initiate window filter (before or after frame number)
tracks_start_window = TrackFilter.track_filter_start_window(tracks_join, retain_before_frame, frame_threshold)

In [None]:
# Concatenate and parse track_id allow lists
tracks_retain = reduce(
    np.intersect1d, (
        tracks_inclusion_cutoff,
        tracks_exclusion_cutoff,
        tracks_length_cutoff,
        tracks_density_cutoff,
        tracks_start_window
        )
    )

print('n tracks to retain: ' + str(len(tracks_retain)))

# actual tracking table filter
track_df_filter = tracks_join[np.isin(tracks_join['track_id'].values, tracks_retain)]

In [None]:
# Visualize filtered tracks to napari

# Make into labelmap, color by tree_id
mask = np.zeros(img.shape, dtype = np.uint16)

# Make point objects in empty mask, index by tree_id
for i, row in track_df_filter.iterrows():
    mask[(int(row['frame_y']), int(row['centroid0']), int(row['centroid1']))] = row['track_id'] + 1

# Expand labels in maks
expanded_labels = list()
for i in range(0,len(mask)):
    expanded_labels.append(expand_labels(mask[i], distance = 5))
expanded_labels = np.array(expanded_labels)

track_labelmap = viewer.add_labels(expanded_labels, name = 'Filtered Tracks')

In [None]:
# write table to file
df_export = track_df_filter[['track_id', 'centroid1', 'centroid0', 'frame_y']].copy()

df_export.rename(columns={
    'centroid1': 'x',
    'centroid0': 'y',
    'frame_y': 't'
}, inplace = True)

if track_index_rearange_length:
    # re-index track_id value by tracklength
    key_counts = df_export['track_id'].value_counts()

    # Step 2: Create a rank based on the counts (largest group gets index 1)
    rank_mapping = {key: rank + 1 for rank, key in enumerate(key_counts.index)}

    # Step 3: Map the new rank to the key column
    df_export['track_id'] = df_export['track_id'].map(rank_mapping)

df_export.to_csv(path_base + filename + "_tracks_filter.csv")

## Manually select tracks for export
This will create two layers in the Napari viewer, and allows users to manually select/toggle individual tracks for inclusion in export.
- 'track toggle' points layer: All tracks visualized as circles colored by track identity. When this layer is active the user can click individual points to toggle track inclusion.
- 'tracks selected' tracks layer: All toggled tracks, starts as an empty layer, the user should not interact with this.

! This starts from a tracking table with no filtering applied in the above steps.

In [None]:
# init toggle layer
colormap = distinctipy.get_colors(70)

track_points = viewer.add_points(
    data = tracks_join[['frame_y', 'centroid0', 'centroid1']].values,
    name = 'track toggle',
    features = {'track_id': tracks_join['track_id'].values},
    border_color = 'track_id',
    face_color = 'transparent',
    border_colormap = colormap,
    border_width = .15,
    size = 10)
# track_points.mode = 'select'

# empty tracks layes
tracks_include = viewer.add_tracks(
    data = np.zeros((1,4)),
    features = {'track_id': np.zeros((1,))},
    name = 'tracks selected',
    color_by = 'track_id',
    tail_length = 10, tail_width = 3,
    colormap = 'hsv')

# switch to correct layer
viewer.layers.selection.active = track_points

In [None]:
# init user interactive framework
def toggle_track_in_layer(df, idTrack, destination, reindex = True, verbose = False):
	if reindex:
		idTrack = idTrack-1
	
	if verbose:
		print(idTrack)
		print('yet present', idTrack in destination.data[:, 0])
		print('valid id', idTrack in df['track_id'].values)
		print(np.unique(destination.data[1:, 0]).astype('int'))
	
	# remove if included
	if idTrack in destination.data[:, 0]:
		#print('off')
		destination.data = np.delete(destination.data, np.where(destination.data[:, 0] == idTrack)[0], axis=0)
		return
	
	# add if not included yet
	if idTrack not in destination.data[:, 0] and idTrack in df['track_id'].values:
		#print('on')
		destination.data = np.vstack((destination.data, df[df['track_id'] == idTrack][["track_id", "frame_y", "centroid0", "centroid1"]].values))

@track_points.mouse_drag_callbacks.append
def click(layer, event):
	if event.type == "mouse_press":
		# is the value passed from the click event?
		point_index = layer.get_value(
			event.position,
			view_direction=event.view_direction,
			dims_displayed=event.dims_displayed,
			world=True,
		)
		if point_index is not None:
			label = layer.features['track_id'][point_index]
			toggle_track_in_layer(tracks_join, label, tracks_include, reindex	= False)

napari.run()

!!! Now select the tracks you want to have in the Napari viewer.
Have the 'track toggle' layer active and click points.
Enabled tracks should become visible in the 'tracks selected' layer with track tail.

In [None]:
# list all togled current toggled tracks
print(np.unique(tracks_include.data[1:, 0]).astype('int'))

In [None]:
# Export data for enabled tracks

# fetch only tracks that are toggled
track_df_toggle = tracks_join[tracks_join['track_id'].isin(np.unique(tracks_include.data[1:, 0]).astype('int'))]

# write to file
df_export = track_df_toggle[['track_id', 'centroid1', 'centroid0', 'frame_y']].copy()

df_export.rename(columns={
    'centroid1': 'x',
    'centroid0': 'y',
    'frame_y': 't'
}, inplace = True)

if track_index_rearange_length:
    # re-index track_id value by tracklength
    key_counts = df_export['track_id'].value_counts()

    # Step 2: Create a rank based on the counts (largest group gets index 1)
    rank_mapping = {key: rank + 1 for rank, key in enumerate(key_counts.index)}

    # Step 3: Map the new rank to the key column
    df_export['track_id'] = df_export['track_id'].map(rank_mapping)

df_export.to_csv(path_base + filename + "_tracks_toggle.csv")

# Single track spot cropping, track ends focus
Observe the fate of track ends.

In [None]:
# settings
frame_window = 10 # frames before and after track end to isolate
frame_limit = frame_window # track ends how many frames before end of timelapse?

In [None]:
# find all track ends according to tracking table
track_ends = track_df.groupby('track_id')['frame_y'].max().reset_index()

# exlude ends near end of the timelapse
track_ends_before = track_ends[track_ends['frame_y'] <= img.shape[0] - frame_limit]
print(len(track_ends_before))

track_df_before = track_df[np.isin(track_df['track_id'].values, track_ends_before['track_id'].values)].reset_index()

In [None]:
# all track segments before the end (lag)
track_df_before_lag = filter_time_window_below_per_group(track_df_before, 'track_id', 'frame', frame_window)
print(len(track_df_before_lag))

In [None]:
# all track segments after the end, virtual extend at last known position (lead)

# extend tracks at last known position
track_df_before_ends = track_df_before.merge(track_ends_before)
track_df_before_ends.rename(columns={'frame_y':'frame_start'}, inplace=True)

track_df_before_ends['frame_end'] = track_df_before_ends['frame_start'] + frame_window + 1

# Create a new column with the ranges
track_df_before_ends['frame'] = track_df_before_ends.apply(lambda row: list(range(int(row['frame_start'])+1, int(row['frame_end']) if row['frame_end'] <= img.shape[0] else img.shape[0])), axis=1)

# Explode the dataframe so each frame gets its own row
df_expanded = track_df_before_ends.explode('frame').reset_index(drop=True)

# Optional: drop frame_start and frame_end if no longer needed
track_df_before_lead = df_expanded.drop(columns=['frame_start', 'frame_end'])
print(len(track_df_before_lead))

In [None]:
# concatenate lag and lead track segment tables
track_df_before_windows = pd.concat([track_df_before_lag, track_df_before_lead])
print(len(track_df_before_windows))

# make frame number relative to track end per track
track_df_before_windows = track_df_before_windows.merge(track_ends.rename(columns={'frame_y': 'frame_mark'}), how = 'left', on = 'track_id')
track_df_before_windows['frame_rel'] = track_df_before_windows['frame'] - track_df_before_windows['frame_mark']
track_df_before_windows.head()

In [None]:
# # verify all track end windows on image level (circles on events to be isolated)
# viewer.add_points(track_df_before_windows[['frame', 'centroid0', 'centroid1']],
#                   name = 'track end markings',
#                   face_color='transparent',
#                   border_color='white',
#                   border_width=.2)

In [None]:
# Cropping table for all events from the full data

# Crop buffer size in pixels (bbox size around spot in px)
buffer = 50

img_width = img.shape[-1]
img_height = img.shape[-2]

table_centering = track_df_before_windows.copy()
table_centering.rename(columns={'centroid1': 'x', 'centroid0': 'y', 'frame': 't'}, inplace=True)

# reindex track_id to index
table_centering['index'] = table_centering.groupby('track_id').ngroup()

# Build crop & padding table
table_centering['x_int'] = table_centering['x'].round(0).astype(int)
table_centering['y_int'] = table_centering['y'].round(0).astype(int)

# calculate min and max X and Y by buffer from track centroid
table_centering['x_min'] = table_centering['x_int'] - buffer
table_centering['x_max'] = table_centering['x_int'] + buffer
table_centering['y_min'] = table_centering['y_int'] - buffer
table_centering['y_max'] = table_centering['y_int'] + buffer

# calculate padding pixels in X and Y
table_centering['pad_left'] = np.abs(np.clip(table_centering['x_min'], None, 0))
table_centering['pad_right'] = np.abs(np.clip(table_centering['x_max'] - img_width, 0, None))
table_centering['pad_top'] = np.abs(np.clip(table_centering['y_min'], None, 0))
table_centering['pad_bottom'] = np.abs(np.clip(table_centering['y_max'] - img_height, 0, None))

# clip min and max to image dims
table_centering['x_min'] = np.clip(table_centering['x_min'], 0, None)
table_centering['y_min'] = np.clip(table_centering['y_min'], 0, None)
table_centering['x_max'] = np.clip(table_centering['x_max'], None, img_width)
table_centering['y_max'] = np.clip(table_centering['y_max'], None, img_height)

# sort and format crop table
crop_table = table_centering.sort_values(by = ['t']).reset_index(drop = True)
crop_table_np = crop_table[['index', 't', 'frame_rel', 'x_min', 'x_max', 'y_min', 'y_max', 'pad_left', 'pad_right', 'pad_top', 'pad_bottom']].to_numpy()
# print(table_centering.tail())

In [None]:
# init empty np array for pasting the crops into
sizes_stab = {
    'T': frame_window*2 + 1,
    'P': len(track_ends_before),
    'X': buffer * 2,
    'Y': buffer * 2}
print(sizes_stab)

stabilized_movie_img = np.zeros(shape = list(sizes_stab.values()), dtype = np.uint16)

In [None]:
# Crop imaging data and paste into np array according to crop table
for i in tqdm(crop_table_np):
    index, t, t_rel = i[:3]
    t_rel = t_rel + frame_window
    
    # Crop parameters
    x_min, x_max, y_min, y_max = i[-8:-4]
    pad_left, pad_right, pad_top, pad_bottom = i[-4:]
    
    # Crop
    img_crop = img[t, y_min:y_max, x_min:x_max]
    
    # pad tile image if required
    if (np.sum(i[-4:]) > 0):
        pad_tuple = ((pad_top, pad_bottom), (pad_left, pad_right))
        img_crop = np.pad(img_crop, pad_tuple, mode = 'constant')

    # paste tile image
    stabilized_movie_img[t_rel, index] = img_crop

# Store array to disk
np.save(path_movies + filename + "_Stabilized.npy", stabilized_movie_img)

In [None]:
# Visualize individual track results
viewer = napari.Viewer()

layerImage = viewer.add_image(stabilized_movie_img,
                              blending = 'additive',
                              contrast_limits=[0, np.max(intensity_peak)],#[np.quantile(stabilized_movie_img, q = .1), np.quantile(stabilized_movie_img, q = .995)],
                              visible = True,
                              gamma = 1)
viewer.dims.axis_labels = ('Time', 'Track', 'Y', 'X')

# reference circle
viewer.add_points([buffer, buffer],
                  size=5,
                  opacity=.5,
                  face_color='transparent',
                  border_color='red',
                  border_width=.1)

In [None]:
# indlude overlay: track_id & actual frame number
minr = -5
minc = 0
maxr = 0
maxc = buffer*2

table_centering['overlay'] = 'track: ' + table_centering['track_id'].astype('str') + ' frame: ' + table_centering['t'].astype('str')

text_parameters = {
    'string': '{overlay}',
    'size': 12,
    'color': 'yellow',
    'anchor': 'center',
    'translation': [0, 0],
    'opacity': 1
}

bbox_rect = np.array(
    [[minr, minc], [maxr, minc], [maxr, maxc], [minr, maxc]]
)

overlay = []
for i, row in table_centering.iterrows():
    overlay.append(np.insert(bbox_rect, [0, 0], [row['frame_rel'] + frame_window, row['index']], axis = 1))

overlay = np.stack(overlay)

viewer.add_shapes(
    overlay,
    face_color = 'transparent',
    edge_color = 'transparent',
    edge_width = 1,
    properties = table_centering,
    # text = text_parameters,
    text = 'overlay',
    name = 'overlay track info',
    opacity = 0.75
)

## Annotation framework
- points (maybe bind to annotated quadrants?)
  - has this one been evaluated/seen?
  - what category of event is this
  - how many daughters are observed
- join to index table & export
- load data back to viewer (even if different filtering criteria)

### Categorical True/False annotation

In [None]:
# time agnostic point annotation layer (empty init)
annot_bool = viewer.add_points(
    data = np.zeros(shape = [0, 3]),
    name = 'observed',
    ndim = 3,
    face_color = 'green',
    border_color = 'transparent'
)
annot_bool.mode = 'add'

In [None]:
# fetch annotations from viewer (retains relative XY, is it needed??)
annotation = pd.DataFrame(annot_bool.data.astype(int), columns= ['index', 'annot_y', 'annot_x'])
annotation['observed'] = True
print(annotation.tail())

# deduplicate categorical annotations!
annotation = annotation.groupby('index').last()

# bind annotation to full table
table = crop_table[crop_table['frame_rel'] == 0].merge(annotation, how = 'left', on = 'index')

# show all annotated events in table
table[table['observed'] == True]

### Count annotation (multipoint)

In [None]:
# time agnostic point annotation layer (empty init)
annot_count = viewer.add_points(
    data = np.zeros(shape = [0, 3]),
    name = 'count',
    ndim = 3,
    face_color = 'blue',
    border_color = 'transparent'
)
annot_count.mode = 'add'

In [None]:
# fetch annotations from viewer
annotation = pd.DataFrame(annot_count.data.astype(int), columns= ['index', 'annot_y', 'annot_x'])
print(annotation.tail())

# count annotations per track! (XY per annot is destroyed)
annotation = annotation.groupby('index').size().reset_index(name='count')

# bind annotation to full table
table = crop_table[crop_table['frame_rel'] == 0].merge(annotation, how = 'left', on = 'index')

# show all annotated events in table
table[table['count'] > 0]

# Batch mode tracking
- all movies in the same folder
- single position per file
- no napari viewer output
- no manual filtering (mask or toggling)

In [None]:
# list image files in directory
img_files = [i for i in os.listdir(path_base) if i.endswith('.tif') or i.endswith('.nd2')]
print(img_files)

In [None]:
# Laptrack custom tracking metric
def LAP_link_metric_intensity(c1, c2):
    (frame1, label1), (frame2, label2) = c1, c2
    if frame1 > frame2:
        # switch spots in query if time order is wrong
        tmp = (frame1, label1)
        (frame1, label1) = (frame2, label2)
        (frame2, label2) = tmp

    # double frame index
    ind = (frame1, frame2, label1, label2)
    if ind in spots_int_rel.index:
        if spots_int_rel.loc[ind]["relIntDiff"] >= max_relIntDiff:
            # Too large difference intensity
            value = np.float32("inf")
        else:
            # Valid result
            value = spots_int_rel.loc[ind]["distanceScaled"]
    else:
        # No candidates found
        value = np.float32("inf")
    return value

lap_tracker = LapTrack(
    track_dist_metric = LAP_link_metric_intensity,
    track_cost_cutoff = distance_threshold,
    gap_closing_dist_metric =  LAP_link_metric_intensity,
    gap_closing_max_frame_count = frame_gap_frames-2,
    gap_closing_cost_cutoff = frame_gap_distance,
    splitting_cost_cutoff = False,
    merging_cost_cutoff = False,
    parallel_backend = 'ray'
)

In [None]:
# Batch mode all files in folder
for filename in tqdm(img_files):
    # Load image
    path_img = path_base + filename
    img = load_image(path_img, flatfield, img_ff[0], img_df)

    # Spot detection
    spots = SpotGeneric.detect_spots(img = img, thresholdLoG = detection_threshold)

    # Spot feature extraction
    spots, spots_int = SpotGeneric.measure_spots_intensities(spots = spots, radius = measurement_radius, img = img)
    spots_int_rel, spots_in_range = SpotGeneric.metric_spots_relative_intensity(spots = spots, spotsIntensity = spots_int, gapMax = frame_gap_frames)
    coordinate_df = pd.DataFrame(spots, columns = ['frame', 'centroid0', 'centroid1'])
    coordinate_df['label'] = np.arange(len(spots))
    coordinate_df['frame'] = coordinate_df['frame'].astype('int')

    # Tracking
    track_df, split_df, merge_df = lap_tracker.predict_dataframe(
        coordinate_df, coordinate_cols=["frame", "label"], only_coordinate_cols=False
    )

    # extract intensities and background subtraction
    tracks = track_df.reset_index()
    data_frames = np.zeros(shape = [len(tracks), 25, bbox_extent**2])

    list_frames = np.unique(tracks['frame_y'].values)
    for frame in list_frames:
        spots_df = tracks[tracks['frame_y'] == frame]
        spots_frame = spots_df[['centroid0', 'centroid1']].values.astype('int')
        data_frames[spots_df.index] = SpotGeneric.extract_bbox_grid_intensity(img[frame], spots_frame, bbox_extent)

    bbox_indices = SpotGeneric.index_bbox_grid_tiers()
    intensity_peak, intensity_bg, intensity_peaksub, coef = SpotGeneric.parse_bbox_grid_intensity(data_frames, bbox_indices, thr_bg = coef_var_thr)

    # join intenstity data to tracking table
    tracks_join = tracks.join(
        pd.DataFrame({
            'intensity': intensity_peak,
            'background': intensity_bg,
            'intensity_bg': intensity_peaksub
        })
    )

    # write to file
    df_export = tracks_join[['track_id', 'centroid1', 'centroid0', 'frame_y']].copy()

    df_export.rename(columns={
        'centroid1': 'x',
        'centroid0': 'y',
        'frame_y': 't'
    }, inplace = True)

    if track_index_rearange_length:
        # re-index track_id value by tracklength
        key_counts = df_export['track_id'].value_counts()

        # Step 2: Create a rank based on the counts (largest group gets index 1)
        rank_mapping = {key: rank + 1 for rank, key in enumerate(key_counts.index)}

        # Step 3: Map the new rank to the key column
        df_export['track_id'] = df_export['track_id'].map(rank_mapping)

    df_export.to_csv(path_base + filename + "_tracks_all.csv")

    # Track length filter (n obs)
    tracks_length_cutoff = TrackFilter.track_filter_min_length(tracks_join, track_length_minimum)
    # Local spot density per track
    tracks_density_cutoff = TrackFilter.track_filter_density(tracks_join, spots, track_density_maximum, supplement=True)
    # Tracks initiate window filter (before or after frame number)
    tracks_start_window = TrackFilter.track_filter_start_window(tracks_join, retain_before_frame, frame_threshold)
    # Concatenate and parse track_id allow lists
    tracks_retain = reduce(
        np.intersect1d, (
            tracks_inclusion_cutoff,
            tracks_exclusion_cutoff,
            tracks_length_cutoff,
            tracks_density_cutoff,
            tracks_start_window
            )
        )

    print('n tracks to retain: ' + str(len(tracks_retain)))

    # actual tracking table filter
    track_df_filter = tracks_join[np.isin(tracks_join['track_id'].values, tracks_retain)]

    # write to file
    df_export = track_df_filter[['track_id', 'centroid1', 'centroid0', 'frame_y']].copy()

    df_export.rename(columns={
        'centroid1': 'x',
        'centroid0': 'y',
        'frame_y': 't'
    }, inplace = True)

    if track_index_rearange_length:
        # re-index track_id value by tracklength
        key_counts = df_export['track_id'].value_counts()

        # Step 2: Create a rank based on the counts (largest group gets index 1)
        rank_mapping = {key: rank + 1 for rank, key in enumerate(key_counts.index)}

        # Step 3: Map the new rank to the key column
        df_export['track_id'] = df_export['track_id'].map(rank_mapping)

    df_export.to_csv(path_base + filename + "_tracks_filter.csv")