In [123]:
import numpy as np
import math
import matplotlib.pyplot as plt
import pandas as pd
import os

In [124]:
def process_csv(file):
    """Modifies row/column structure of an input DLC coordinate CSV"""
    df = pd.read_csv(file, encoding='utf-8')

    # extract the first row (body parts) and second row (coordinate labels)
    first_row, second_row = df.iloc[0, 1:].values, df.iloc[1, 1:].values

    # create new column names
    new_columns = [f"{first_row[i]}_{second_row[i]}" for i in range(len(first_row))]
    new_columns = ['index'] + new_columns
    df.columns = new_columns

    # drop original first two rows
    df = df.drop([0, 1]).reset_index(drop=True)

    # drop rows in which all cell values are zero except index
    df = df.astype(float)
    df = df.loc[~(df.iloc[:, 1:].eq(0)).all(axis=1)]

    return df

In [125]:
def correct_angle(angle):
    """ Adjusts angle values so that angle > pi is wrapped back into the range -pi to pi"""
    thresh = np.pi
    ang = np.where(angle > thresh, angle - 2 * thresh, np.where(angle < -thresh, angle + 2 * thresh, angle))

    return ang

In [126]:
def frame_cleaner(df, low_likelihood_thresh=0.9, frame_len_thresh=120, low_likelihood_frame_len=10, crit_low_likelihood_thresh=0.1):
  """
  Cleans DLC coordinate CSVs to a specified length and splits sequences with repeatedly low likelihoods
    Args: 
      DLC coordinate file processed by process_csv()
      frame_len_thresh (int): minimum # frames for a valid sequence
      low_likelihood_thresh (float): threshold by which to consider a frame 'low likelihood'
      low_likelihood_frame_len (int): # consecutive low-likelihood frames required to drop a section
      crit_low_likelihood_thresh (float): criticially low-likelihood threshold to drop frames entirely
    Returns: cleaned dataframe
  """
  # set critically low likelihood points to NaN
  df.loc[df['nose_likelihood'] < crit_low_likelihood_thresh, 
          ['nose_x', 'nose_y', 'dorsal_x', 'dorsal_y', 'caudal_x', 'caudal_y']] = np.nan
  
  valid_data = []  # valid segments
  segment_id = 0   
  buffer = []      # temp storage for the current valid segment
  counter = 0      # consecutive low-likelihood frames
  in_bad_segment = False  # if in a low-likelihood section

  for index, row in df.iterrows():
      # check if row bad (either all NaN for body parts OR nose_likelihood < threshold)
      is_bad_row = row[['nose_x', 'nose_y', 'dorsal_x', 'dorsal_y', 'caudal_x', 'caudal_y']].isna().all() or row['nose_likelihood'] < low_likelihood_thresh

      if is_bad_row:
        counter += 1  # increase consecutive counter
        if counter >= low_likelihood_frame_len and not in_bad_segment:
            in_bad_segment = True  # turn bad segment pointer on

            # save current buffer as a segment if it meets the frame len thresh
            if len(buffer) >= frame_len_thresh:
                valid_segment = pd.DataFrame(buffer)
                valid_segment['segment'] = segment_id
                valid_data.append(valid_segment)
                segment_id += 1  # increment segment id

            buffer = []  # reset buffer
      else:
        counter = 0  # reset counter, we found a good frame
        in_bad_segment = False  # mark that we're in a valid segment
        row['index'] = index # track index
        buffer.append(row)  # store row in buffer

  # store the last valid segment if it's long enough
  if len(buffer) >= frame_len_thresh:
      valid_segment = pd.DataFrame(buffer)
      valid_segment['segment'] = segment_id
      valid_data.append(valid_segment)

  # Combine valid segments into a single dataframe
  if valid_data:
      cleaned_df = pd.concat(valid_data).reset_index(drop=True)
      return cleaned_df
  else:
      return pd.DataFrame() 

In [127]:
def add_angles(df,tb_duration=60):
  """ computes body angles while ensuring consistency of left/right
  Args: 
    df: dataframe pre-processed by frame_cleaner(process_csv())
    tb_duration (int): approximate # frames of a tb cycle used to smooth nd vector & define head wagging effect
      default is 60 frames (0.4hz)
  Returns: df with body angles (theta_head, theta_flex)
    theta_head: angle of the nd vector relative to a smoothed nd vector over the course of a tb (approx. degrees head wagging)
    theta_flex: angle of flexion of the body, instantaneous nd to dc
  """
  # vectors
  nd_x = df['dorsalfin_x'] - df['nose_x']
  nd_y = df['dorsalfin_y'] - df['nose_y']
  dc_x = df['caudalfin_x'] - df['dorsalfin_x']
  dc_y = df['caudalfin_y'] - df['dorsalfin_y']

  # smooth nd (heading) vector
  nd_x_smooth = nd_x.rolling(window=int(tb_duration), center=True, min_periods=1).mean()
  nd_y_smooth = nd_y.rolling(window=int(tb_duration), center=True, min_periods=1).mean()

  # vector angles
  ang_nd = np.arctan2(nd_x, nd_y)
  ang_nd_smooth = np.arctan2(nd_x_smooth, nd_y_smooth)
  ang_dc = np.arctan2(dc_x, dc_y)

  # angles theta_head and theta_flex
  theta_head = ang_nd_smooth - ang_nd
  theta_flex = ang_nd - ang_dc ###################### change for relative vs. instantaneous #########

  # correct and convert
  df['theta_head_deg'] = np.degrees(correct_angle(theta_head))
  df['theta_flex_deg'] = np.degrees(correct_angle(theta_flex))

  # add lengths (pixels)
  nd_length = np.sqrt(nd_x**2 + nd_y**2)  # nd
  dc_length = np.sqrt(dc_x**2 + dc_y**2)  # dc
  BL = nd_length + dc_length # BL
  df['BL_pixels']  = BL

  # add instantaneous flexion and head amplitudes (body lengths)
  df['amp_flex_BL'] = np.abs(np.sin(np.radians(df['theta_flex_deg'])) * dc_length) / BL
  df['amp_head_BL'] = np.abs(np.sin(np.radians(df['theta_head_deg'])) * nd_length) / BL

  return df


In [128]:
def clean_angles(df, bio_threshold_flex=75, bio_threshold_head=60):
  """ Cleans tail angle series with biological threshold and z-score filter. 
  Args: 
    df: a DataFrame containing tail angles from add_angles()
    bio_threshold_head (float): threshold of head wag (average nd- instaneous nd) beyond which would be biologically infeasible
    bio_threshold_flex (float): threshold of tail flexion (nd-dc) beyond which would be biologically infeasible
  Returns: a cleaned DataFrame
  """
  # z-score filtering (head angle)
  Q1, Q3 = df['theta_head_deg'].quantile(0.25), df['theta_head_deg'].quantile(0.75)
  IQR = Q3 - Q1
  lower_bound, upper_bound = Q1 - 1.5 * IQR, Q3 + 1.5 * IQR

  # threshold (head angle)
  for i, angle in enumerate(df['theta_head_deg']):
    if angle > bio_threshold_head or angle < -bio_threshold_head:
      df.at[i, 'theta_head_deg'] = np.nan
    if angle <= lower_bound or angle >= upper_bound:
      df.at[i, 'theta_head_deg'] = np.nan

  # z-score filtering (flex angle)
  Q1, Q3 = df['theta_flex_deg'].quantile(0.25), df['theta_flex_deg'].quantile(0.75)
  IQR = Q3 - Q1
  lower_bound, upper_bound = Q1 - 1.5 * IQR, Q3 + 1.5 * IQR

  # threshold (flex angle)
  for i, angle in enumerate(df['theta_flex_deg']):
    if angle > bio_threshold_flex or angle < -bio_threshold_flex:
      df.at[i, 'theta_flex_deg'] = np.nan
    if angle <= lower_bound or angle >= upper_bound:
      df.at[i, 'theta_flex'] = np.nan

  return df

# Plotting

In [129]:
def plot_angles(df):
  """ Plots tail angle and body flexion time series.
  Args: cleaned DataFrame with both tail angle (`theta_tail`) and body flexion (`theta_flex`).
  Returns: plot of the tail angle and body flexion time series.
  """
  # Plot setup
  plt.figure(figsize=(10, 5))
  plt.xlabel('Time (sec)')
  plt.ylabel('Angle (degrees)')

  # Plot filtered tail angle data (after cleaning)
  plt.plot(df['index'] / 24, df['theta_head_deg'], color='green', label='Head Angle', marker='.', markersize=10, alpha=0.5, zorder=1)

  # Plot filtered body flexion data (after cleaning)
  plt.plot(df['index'] / 24, df['theta_flex_deg'], color='orange', label='Body Flexion Angle', marker='.', markersize=10, alpha=0.5, zorder=2)

  # Display legend and plot
  plt.legend()
  plt.show()


# Collate and Export All Data

In [136]:
# paths for example data
file_path = "I:/documents/DLCPaper/revisions/data/kinematic/coordinates/all/07202023PANB0605DLC_resnet50_jws_full_trainingFeb13shuffle1_200000 (1).csv"
export_path = "I:/documents/DLCPaper/revisions/data/kinematic/example_data/07202023PANB0605DLC_rawangles.csv"

# convert and export
df_processed = process_csv(file_path)
#df_frame_cleaned = frame_cleaner(df_processed)
df_angles_added = add_angles(df_processed)
#df_angles_cleaned = clean_angles(df_frame_cleaned)
df_angles_added.to_csv(export_path, index=False)
#df_angles_cleaned.to_csv(export_path, index=False)

In [121]:
# loops through and reads in files in folder of cleaned files from Tail_Position_Series_Constructor.ipynb
folder_path = "I:/documents/DLCPaper/revisions/data/kinematic/coordinates/all"
exportfolder_path = "I:/documents/DLCPaper/revisions/data/kinematic/coordinates_transformed/all"

# get a list of all the files in the folder
file_list = os.listdir(folder_path)

# loop through CSV files
for file_name in file_list:
    if file_name.endswith(".csv") and not file_name.startswith("._"):
        file_path = os.path.join(folder_path, file_name)
        export_path = os.path.join(exportfolder_path, 'transf_' + file_name.split('_')[0] + '.csv')

        try:
            df_processed = process_csv(file_path)
            df_frame_cleaned = frame_cleaner(df_processed)
            df_angles_added = add_angles(df_frame_cleaned)
            df_angles_cleaned = clean_angles(df_frame_cleaned)

            df_angles_cleaned['file_name'] = file_name.split('_')[0]
            df_angles_cleaned.to_csv(export_path, index=False)
            print(f"Processed: {file_name.split('_')[0]}")

        except KeyError as e:
            print(f"KeyError: {e} - Problem in {file_name}. Skipping this file.")
            continue  # Skip this file and move to the next one

Processed: 07202023PANB0202DLC
KeyError: 'nose_x' - Problem in 07202023PANB0303DLC_resnet50_jws_full_trainingFeb13shuffle1_200000 (1).csv. Skipping this file.
KeyError: 'nose_x' - Problem in 07202023PANB0301DLC_resnet50_jws_full_trainingFeb13shuffle1_200000 (1).csv. Skipping this file.
KeyError: 'nose_x' - Problem in 07202023PANB0201DLC_resnet50_jws_full_trainingFeb13shuffle1_200000 (1).csv. Skipping this file.
Processed: 07202023PANB0204DLC
KeyError: 'nose_x' - Problem in 07202023PANB0304DLC_resnet50_jws_full_trainingFeb13shuffle1_200000 (1).csv. Skipping this file.
KeyError: 'nose_x' - Problem in 07202023PANB0306DLC_resnet50_jws_full_trainingFeb13shuffle1_200000 (1).csv. Skipping this file.
KeyError: 'nose_x' - Problem in 07202023PANB0505DLC_resnet50_jws_full_trainingFeb13shuffle1_200000 (1).csv. Skipping this file.
KeyError: 'nose_x' - Problem in 07202023PANB0302DLC_resnet50_jws_full_trainingFeb13shuffle1_200000 (1).csv. Skipping this file.
KeyError: 'nose_x' - Problem in 07202023PA