<a href="https://colab.research.google.com/github/Alan-Marcus/HCCS/blob/main/03%20Analysis%20scripts%20and%20templates/SLEAP/Colab%20Notebooks/SLEAP_Movement_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**HCCS: Movement Analysis for SLEAP**

This is an example notebook to calculate the movement parameters for animals in videos recorded using the Home-Cage Camera System (HCCS; https://github.com/Alan-Marcus/HCCS) and tracked using SLEAP (https://sleap.ai).
This notebook will calculate the Total Distance Moved and Time Mobile for each animal at each time point and save them in a CSV file. Optionally, the raw movement of each animal can also be plotted.

#0. Setup

1. Upload the video analysis files and the video_details file to the analysis folder in Google Drive.
2. Run the following cells to setup the required utilities and connect your Google Drive to this session.

In [None]:
#@title Install Utilities
import h5py
import numpy as np
import pandas as pd
from scipy.interpolate import interp1d
from google.colab import files
from google.colab import drive
from tqdm import tqdm
import matplotlib.pyplot as plt
from csv import writer
from csv import DictWriter
print('done')

In [None]:
#@title Connect Google Drive
drive.mount('/content/drive/')

#1. Specify Settings and Calculate Movement Parameters

In [None]:
#@title Edit Parameters
#@markdown (This step will overwrite the existing movement results file unless a different filename is specified here)
path_to_SLEAP_folder = "/content/drive/MyDrive/SLEAP" #@param {type:"string"}
video_details_file = "HCCS_example_project_video_details.csv"  #@param {type:"string"}
movement_results_filename = "HCCS_example_project-movement-results"  #@param {type:"string"}

#@markdown ---
#@markdown ### Choose missing value imputation method
missing_fill = "previous" #@param ["previous", "next", "linear"]

#@markdown ---
#@markdown ### Edit movement threshold value
#@markdown (cm; values ≤ this value are zeroed)
threshold_limit =  0.2#@param {type:"number"} 

#@markdown ---
#@markdown ### Choose to create raw movement plots
xy_movement = "Yes" #@param ["Yes", "No"]
xy_tracks = "Yes" #@param ["Yes", "No"]
dpi_value =  100#@param {type:"number"}

#---don't edit below
path_to_SLEAP_analysis_folder = path_to_SLEAP_folder + "/analysis/"
movementresults = path_to_SLEAP_analysis_folder + movement_results_filename + ".csv"
h5_list = path_to_SLEAP_analysis_folder + video_details_file

#Define filling of missing values
def fill_missing(Y, kind=missing_fill):
    """Fills missing values independently along each dimension after the first."""
    # Store initial shape.
    initial_shape = Y.shape
    # Flatten after first dim.
    Y = Y.reshape((initial_shape[0], -1))
    # Interpolate along each slice.
    for i in range(Y.shape[-1]):
        y = Y[:, i]
        # Build interpolant.
        x = np.flatnonzero(~np.isnan(y))
        f = interp1d(x, y[x], kind=kind, fill_value=np.nan, bounds_error=False)
        # Fill missing
        xq = np.flatnonzero(np.isnan(y))
        y[xq] = f(xq)
        # Fill leading or trailing NaNs with the nearest non-NaN values
        mask = np.isnan(y)
        y[mask] = np.interp(np.flatnonzero(mask), np.flatnonzero(~mask), y[~mask])
        # Save slice
        Y[:, i] = y
    # Restore to initial shape.
    Y = Y.reshape(initial_shape)
    return Y

#Define appending list to csv
def append_list_as_row(list_of_elem, file_mode):
    with open(movementresults, file_mode, newline='') as write_obj:
        # Create a writer object from csv module
        csv_writer = writer(write_obj)
        # Add contents of list as last row in the csv file
        csv_writer.writerow(list_of_elem)

#Define appending dictionary to csv
def append_dict_as_row():
    with open(movementresults, 'a', newline='') as write_obj:
        # Create a writer object from csv module
        dict_writer = DictWriter(write_obj, field_names)
        # Add dictionary as wor in the csv
        dict_writer.writerow(dict)

#Load data, iterate over each row of data to find maximum number of tracks, create movement results csv, then iterate over each row to fill missing values and calculate movement parameters
data = pd.read_csv(h5_list)
num_rows = [*range(len(data))]

print("Counting maximum number of tracks in H5 files")
max_tracks = 0
for row in tqdm(num_rows):
  #Load data
  with h5py.File(path_to_SLEAP_analysis_folder + data.loc[row,"h5_filename"], "r") as f:
    locations = f["tracks"][:].T
  if locations.shape[3] > max_tracks:
    max_tracks = locations.shape[3]
print("Maximum number of tracks:", max_tracks)

row1_contents = ['']*3 + ['Total distance moved (cm)'] + ['']*(max_tracks-1) + ['Time mobile (s)'] + ['']*(max_tracks-1) + ['Time mobile (%)'] + ['']*(max_tracks-1)
row2_contents = ['Date','Time_point','Group'] + ['Animal_' + str(i) for i in range(1, max_tracks+1)]*3 + ['Filename']
field_names = ['Date','Time_point','Group'] + ['total_distance_animal' + str(i) for i in range(1, max_tracks+1)] + ['time_mobile_s_animal' + str(i) for i in range(1, max_tracks+1)]+ ['time_mobile_pc_animal' + str(i) for i in range(1, max_tracks+1)]+ ['Filename']
append_list_as_row(row1_contents, 'w')
append_list_as_row(row2_contents, 'a')
print()
print("Created movement results file:", movementresults)

print("Calculating movement parameters")
for row in tqdm(num_rows):
  #Load data
  with h5py.File(path_to_SLEAP_analysis_folder + data.loc[row,"h5_filename"], "r") as f:
    locations = f["tracks"][:].T
  locations = fill_missing(locations)

  #Calculate movement parameters
  h5_filename = data.loc[row,"h5_filename"]
  start_frame = int(data.loc[row,"start_frame"])
  end_frame = int(data.loc[row,"end_frame"])
  pixels_per_cm = data.loc[row,"pixels_per_cm"]
  frames_per_s = data.loc[row,"fps"]

  threshold = pixels_per_cm*threshold_limit
  frame_num = list(range(start_frame-1, end_frame))   #Note: SLEAP interface reports first frame as 1 not 0, so automatically adjusted here
  animal_list = list(range(0,locations.shape[3]))   #dynamiclly creates animal_list
  dict = {}
  dict['Date'] = data.loc[row,"date"]
  dict['Time_point'] = data.loc[row,"time_point"]
  dict['Group'] = data.loc[row,"group"]
  dict['Filename'] = h5_filename

  for animal in animal_list:
    raw_distance = []
    filtered_distance = []
    time_mobile_frame = []

    for frame in frame_num:
      if frame >start_frame-1:
        calc_raw_distance = ((locations[frame, 0, 0, animal]-locations[frame-1, 0, 0, animal])**2+(locations[frame, 0, 1, animal]-locations[frame-1, 0, 1, animal])**2)**(1/2)
        if calc_raw_distance <= threshold:  #values <= to the threshold value are zeroed
          calc_filtered_distance = 0
        else:
          calc_filtered_distance = calc_raw_distance
        if calc_filtered_distance >0:
          time_mobile_frame.append(1)
        else:
          time_mobile_frame.append(0)
        raw_distance.append(calc_raw_distance)
        filtered_distance.append(calc_filtered_distance)

    dict['total_distance_animal' + str(animal+1)] = sum(filtered_distance) / pixels_per_cm
    dict['time_mobile_s_animal' + str(animal+1)] = sum(time_mobile_frame) / frames_per_s
    dict['time_mobile_pc_animal' + str(animal+1)] = sum(time_mobile_frame)*100 / (end_frame - start_frame)

  append_dict_as_row()

  #Plot centroid xy individual movement and xy tracks
  centroid_loc = locations[start_frame-1:end_frame, 0, :, :]/pixels_per_cm
  if xy_movement == "Yes":
    #xy individual movement
    plt.figure(figsize=(15,6), dpi=dpi_value)
    plt.ylim(-972/pixels_per_cm,1296/pixels_per_cm)
    plt.title('Centroid xy movement: '+h5_filename+'\n(x-axis shown >0; y-axis shown <0)')
    for animal in animal_list:
      #x movement
      p = plt.plot(centroid_loc[:,0,animal], label='animal-'+ str(animal+1))
      #y movement
      plt.plot(-1*centroid_loc[:,1,animal], p[0].get_color())
    plt.legend()
    plt.savefig(path_to_SLEAP_analysis_folder + "plots/xy-movement_" + movement_results_filename + "_" + h5_filename + '.png', bbox_inches='tight')
    plt.close()
  if xy_tracks == "Yes":
    #xy tracks
    plt.figure(figsize=(8,6), dpi=dpi_value)
    plt.xlim(0,1296/pixels_per_cm)
    plt.ylim(972/pixels_per_cm,0)
    plt.title('Centroid xy tracks: '+h5_filename)
    for animal in animal_list:
      plt.plot(centroid_loc[:,0,animal],centroid_loc[:,1,animal], label='animal-'+ str(animal+1))
    plt.legend()
    plt.savefig(path_to_SLEAP_analysis_folder + "plots/xy-tracks_" + movement_results_filename + "_" + h5_filename + '.png', bbox_inches='tight')
    plt.close()

print()
print(len(data), "videos analysed and data added to " + movementresults)
if xy_movement == "Yes":
  print(len(data), "xy_movement plots created and saved in " + path_to_SLEAP_analysis_folder + "plots/")
if xy_tracks == "Yes":
  print(len(data), "xy_tracks plots created and saved in " + path_to_SLEAP_analysis_folder + "plots/")

#2. Download output

In [None]:
#@title Download the movement results file
files.download(movementresults)

In [None]:
#@title Zip the movement plots
!cd $path_to_SLEAP_analysis_folder/plots/ && zip -r $path_to_SLEAP_folder/$movement_results_filename-plots *.png
print("Done")

In [None]:
#@title Download the zipped movement plots
files.download(path_to_SLEAP_folder + "/" + movement_results_filename + "-plots" + ".zip")