In [None]:
!pip install ultralytics

Collecting ultralytics
  Downloading ultralytics-8.2.90-py3-none-any.whl.metadata (41 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/41.9 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m41.9/41.9 kB[0m [31m1.3 MB/s[0m eta [36m0:00:00[0m
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.6-py3-none-any.whl.metadata (9.1 kB)
Downloading ultralytics-8.2.90-py3-none-any.whl (871 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m871.8/871.8 kB[0m [31m13.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading ultralytics_thop-2.0.6-py3-none-any.whl (26 kB)
Installing collected packages: ultralytics-thop, ultralytics
Successfully installed ultralytics-8.2.90 ultralytics-thop-2.0.6


In [1]:
!pip install xlsxwriter
!pip install OneEuroFilter --upgrade

Collecting xlsxwriter
  Downloading XlsxWriter-3.2.0-py3-none-any.whl.metadata (2.6 kB)
Downloading XlsxWriter-3.2.0-py3-none-any.whl (159 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/159.9 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━[0m [32m153.6/159.9 kB[0m [31m5.6 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m159.9/159.9 kB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: xlsxwriter
Successfully installed xlsxwriter-3.2.0
Collecting OneEuroFilter
  Downloading OneEuroFilter-0.2.1-py3-none-any.whl.metadata (3.9 kB)
Downloading OneEuroFilter-0.2.1-py3-none-any.whl (6.1 kB)
Installing collected packages: OneEuroFilter
Successfully installed OneEuroFilter-0.2.1


Functions that run the model on all videos in a folder, and create one CSV per video containing the model's predictions

In [None]:
import os

from ultralytics import YOLO
import cv2
import csv
import numpy as np

def list_avi_files(folder_path):
    """
    Lists all .avi files in the given folder.

    :param folder_path: Path to the folder
    :return: List of .mp4 file names
    """
    try:
        # List all files in the folder
        files = os.listdir(folder_path)
        # Filter out only .avi files
        avi_files = [file for file in files if file.endswith('.avi')]
        return avi_files
    except Exception as e:
        return str(e)

def predict_video_to_csv(directory: str, video_name: str, training_run: int, use_last: bool):
    '''
    Takes one video_name (str), one training_run (int) and use_last (bool) as inputs. Uses video_dir to locate a video to use, and then runs the model (specified by the training run number and use_last) on this video.
    '''

    video_path = os.path.join(directory, video_name)
    video_basename = video_name.split(".av")[0]

    cap = cv2.VideoCapture(video_path)
    ret, frame = cap.read()
    H, W, _ = frame.shape

    # Determine which version of train{training_run} to use.
    best_or_last = 'best'
    if use_last:
        best_or_last = 'last'

    model_path = os.path.join('.', 'runs', 'detect', f'train{str(training_run)}', 'weights', f'{best_or_last}.pt') #dot indicates PWD
    abs_model_path = 'drive/MyDrive/Colab Notebooks/' + model_path
    print(abs_model_path)

    # Load a model
    model = YOLO(abs_model_path)  # load a custom model

    threshold = 0.2

    results_data = []
    results_frame_number = 1
    while ret:

        results = model(frame)[0]

        if len(results.boxes.data.tolist()) == 0:
            results_data.append([str(results_frame_number), '1', np.nan, np.nan, '0'])

        obj_number = 1
        for result in results.boxes.data.tolist():
            x1, y1, x2, y2, score, class_id = result
            x = (x1 + x2)/2.0
            y = (y1 + y2)/2.0
            if score > threshold:
              results_data.append([results_frame_number, obj_number, x, y, score])
            else:
              results_data.append([results_frame_number, '1', np.nan, np.nan, '0'])
            obj_number += 1

        ret, frame = cap.read()
        results_frame_number += 1

    os.makedirs(f'{directory}/predicted_csvs/', exist_ok=True)
    csv_filename = f'{video_basename}.csv'
    csv_path = os.path.join(f'{directory}/predicted_csvs/', csv_filename)

    # Create the CSV file
    with open(csv_path, 'w', newline='') as csvfile:
        csvwriter = csv.writer(csvfile)
        csvwriter.writerow(['Frame Number', 'Object Number', 'X', 'Y', 'Confidence Score'])

        for row in results_data:
            csvwriter.writerow(row)

    cap.release()
    cv2.destroyAllWindows()

def predict_videos_to_csv(video_directory: str, training_run: int, use_last: bool):
    '''
    Searches through the video_directory and uses the model specified by training_run (int) and use_last (bool) to run the model on the videos.

    When use_last is True, the model selected by the function will be the specified training run's 'last.pt'. rather than the 'best.pt'.
    '''

    for file in list_avi_files(video_directory):
        predict_video_to_csv(video_directory, file, training_run, use_last)

Use predict_videos_to_csv function to create CSVs containing the model's predictions

In [None]:
#predict_videos_to_csv(26, False)

Functions to correct for missed frames in the model's predictions (this one is an old one, the next one is the correct one)

In [3]:
import os
import numpy as np
import pandas as pd

def correct_csv_files(directory, xy_cal):
    '''
    Corrects all mistakes in the csv files in {directory}; deletes all multiple detections in favour of the highest confidence detection, and fills in all missing detections by linear interpolation.
    '''
    corrected_dir = os.path.join(directory, "corrected_csvs")
    os.makedirs(corrected_dir, exist_ok=True)

    def distance(x1, y1, x2, y2):
        return np.sqrt((x2 - x1)**2 + (y2 - y1)**2)

    def correct_detections(df, xy_cal, max_velocity=300, max_distance=10, initial_position=(320, 240)):
        # Convert initial position from pixels to millimeters
        previous_x, previous_y = initial_position[0] * xy_cal, initial_position[1] * xy_cal
        corrected_rows = []

        for frame in range(1, df['Frame Number'].max() + 1):
            frame_data = df[df['Frame Number'] == frame].copy()

            if frame_data.empty:
                continue

            best_row = None
            best_distance = float('inf')

            for _, row in frame_data.iterrows():
                # Convert x and y from pixels to millimeters
                x, y, confidence = row['X'] * xy_cal, row['Y'] * xy_cal, row['Confidence Score']
                dist = distance(previous_x, previous_y, x, y)

                if dist <= max_distance and dist < best_distance:
                    best_distance = dist
                    best_row = row

            if best_row is None:
                frame_data['Distance'] = frame_data.apply(
                    lambda r: distance(previous_x, previous_y, r['X'] * xy_cal, r['Y'] * xy_cal), axis=1
                    )
                if frame_data['Distance'].notna().any():
                    best_row = frame_data.loc[frame_data['Distance'].idxmin()]

            if best_row is not None:
                corrected_rows.append(best_row)
                previous_x, previous_y = best_row['X'] * xy_cal, best_row['Y'] * xy_cal

        corrected_df = pd.DataFrame(corrected_rows)

        if 'Distance' in corrected_df.columns:
          corrected_df.drop('Distance', axis=1, inplace=True)

        return corrected_df

    def remove_leading_trailing_nans(df):
        # Drop leading rows with NaN values in columns 3 and 4
        while pd.isna(df.iloc[0, 2]) or pd.isna(df.iloc[0, 3]):
            df = df.iloc[1:]

        # Drop trailing rows with NaN values in columns 3 and 4
        while pd.isna(df.iloc[-1, 2]) or pd.isna(df.iloc[-1, 3]):
            df = df.iloc[:-1]

        return df.reset_index(drop=True)

    def sanity_check(df, xy_cal, max_distance=10, distance_increment=2):
        """
        Processes the DataFrame by replacing x and y values with NaN if the distance
        from the previous row to the current row exceeds 10mm in real space or if the velocity exceeds 300mm/s.

        Parameters:
            df (pd.DataFrame): DataFrame with at least 5 columns. x and y values are in columns 2 and 3 (0-indexed).
            xy_cal (float): Calibration factor to convert pixel values to real space.

        Returns:
            pd.DataFrame: Modified DataFrame with updated x and y values.
        """
        df = df.reset_index(drop=True)  # Reset the index to ensure continuous integer indexing
        for i in range(1, len(df)):
            if pd.isna(df.loc[i, 'X']) or pd.isna(df.loc[i, 'Y']):
                continue

            previous_index = i - 1
            nan_count = 0
            while previous_index >= 0 and (pd.isna(df.loc[previous_index, 'X']) or pd.isna(df.loc[previous_index, 'Y'])):
                previous_index -= 1
                nan_count += 1

            if previous_index < 0:
                continue

            x1, y1 = df.loc[previous_index, ['X', 'Y']] * xy_cal
            x2, y2 = df.loc[i, ['X', 'Y']] * xy_cal
            max_distance_adjusted = max_distance + nan_count * distance_increment
            distance_traveled = np.sqrt((x2 - x1)**2 + (y2 - y1)**2)

            if distance_traveled > max_distance_adjusted:
                df.loc[i, ['X', 'Y']] = np.nan

        return df

    def interpolate_consecutive_nans(df):
        """
        Interpolates NaN values using linear interpolation. If there are too many consecutive NaNs,
        uses the last valid value.

        Parameters:
            df (pd.DataFrame): DataFrame with NaN values to interpolate.

        Returns:
            pd.DataFrame: Modified DataFrame with interpolated values.
        """
        max_consecutive_nans = 5  # Define a threshold for maximum consecutive NaNs

        # Interpolate NaNs
        df.interpolate(axis=0, inplace=True)

        # Handle consecutive NaNs by using the last valid value
        for col in [2, 3]:  # x and y columns
            is_nan = df.iloc[:, col].isna()
            for i in range(len(df)):
                if is_nan[i]:
                    start_idx = i
                    while i < len(df) and is_nan[i]:
                        i += 1
                    if (i - start_idx) > max_consecutive_nans:
                        df.iloc[start_idx:i, col] = df.iloc[start_idx - 1, col]

        return df

    for filename in os.listdir(directory):
        if filename.endswith('.csv'):
            filepath = os.path.join(directory, filename)
            df = pd.read_csv(filepath)

            # remove leading and trailing failed detections
            df = remove_leading_trailing_nans(df)

            # correct multiple detections
            df = correct_detections(df, xy_cal)

            # correct erroneous detections by replacing the erroneous x and y values by np.nan
            df = sanity_check(df, xy_cal)

            # correct failed detections values by linear interpolation
            df = interpolate_consecutive_nans(df)

            # Return corrected df to a csv
            filepath_out = os.path.join(corrected_dir, filename)
            df.to_csv(filepath_out, index=False)


In [None]:
#correct_csv_files("/home/jovyan/work/predicted_csvs/train26")

In [None]:
#input_excel_file = '/home/jovyan/work/xlsx fish data/CRISPent inppf5 Touch Response 12-06-24.xlsx'
#output_csv_file = '/home/jovyan/work/xlsx fish data/first_swim_velocity.csv'
#find_first_velocity(input_excel_file, output_csv_file)

Use the csv files containing the (corrected) model predictions, and then calculate the desired metrics (swim duration, distance, mean velocity, max velocity)

In [26]:
import os
import re
import glob
import numpy as np
import pandas as pd
import openpyxl
from OneEuroFilter import OneEuroFilter

def calculate_velocity(x, y, frame_rate=30):
    """
    Calculates the velocity between each consecutive (x, y) point.

    Args:
        x (list or np.array): X position data.
        y (list or np.array): Y position data.
        frame_rate (float): Frame rate of the data (frames per second).

    Returns:
        velocity (np.array): Velocity at each frame, calculated as the distance between consecutive points.
    """
    # Calculate the distances between consecutive points
    distances = np.sqrt(np.diff(x)**2 + np.diff(y)**2)

    # Convert distances to velocities (velocity = distance / time)
    # Time between frames is 1 / frame_rate
    velocities = distances * frame_rate

    return velocities

def filter_position_data(x_data, y_data, frame_rate, fcmin=1, beta=0, dc=1):
    """
    Filters x and y position data using the OneEuroFilter.

    Args:
        x_data (list or np.array): The x position data.
        y_data (list or np.array): The y position data.
        freq (float): Frequency of the data in Hz (sampling rate). Default is 30 Hz.
        min_cutoff (float): Minimum cutoff frequency. Default is 1.0.
        beta (float): Speed coefficient. Adjusts the filter’s sensitivity to speed. Default is 0.0.
        d_cutoff (float): Cutoff frequency for the derivative of the signal. Default is 1.0.

    Returns:
        filtered_x (np.array): Filtered x position data.
        filtered_y (np.array): Filtered y position data.
    """

    # Create OneEuroFilter
    config = {
      'freq': 30,       # Hz
      'mincutoff': fcmin,  # Hz
      'beta': beta,
      'dcutoff': dc
      }

    f_x = OneEuroFilter(**config)
    f_y = OneEuroFilter(**config)

    filtered_x = []
    filtered_y = []

    # Apply the filter to the position data
    for t, (x, y) in enumerate(zip(x_data, y_data)):
        time = t / frame_rate  # Convert index to time based on the sampling rate
        filtered_x.append(f_x(x, time))
        filtered_y.append(f_y(y, time))

    return np.array(filtered_x), np.array(filtered_y)

def find_start_end_rows(df, x_pos, y_pos, frame_rate):
        """
        Find the start and end row indices based on positional data filtering and subsequent analysis of velocity based on the filtered position data.

        Parameters:
        df (pd.DataFrame): The dataframe to analyze.
        x_pos, y_pos (str): filtered x and y positional data.

        Returns:
        tuple: A tuple containing the start row index and the end row index.
        """
        filtered_x, filtered_y = filter_position_data(df.loc[:, x_pos].to_numpy(), df.loc[:, y_pos].to_numpy(), frame_rate, fcmin=0.1, beta=0.1, dc=1)

        velocity_filtered_pos = calculate_velocity(filtered_x, filtered_y, frame_rate)

        # Prepend a 0 for the first frame (no movement before the first frame)
        velocity_filtered_pos = np.insert(velocity_filtered_pos, 0, 0)

        # Create columns of filtered data and velocity in the df
        df['X_real_filt'] = pd.Series(filtered_x)
        df['Y_real_filt'] = pd.Series(filtered_y)
        df['vel_from_filt'] = pd.Series(velocity_filtered_pos)

        #print(df['vel_from_filt'].to_string(index=False))

        start_row = None
        end_row = None
        filtered_velocity_threshold = 10
        end_filtered_velocity_threshold = 10 # Minimum filtered velocity to consider movement
        consistent_low_velocity_frames = 8  # Number of consecutive low-velocity frames to detect the end

        # Find the start row
        for i in range(len(df)):
            if df.loc[i, 'vel_from_filt'] >= filtered_velocity_threshold:
                start_row = i - 1
                break

        # If start_row is still None, it means no value >= 20 was found
        if start_row is None:
            return (-1,-1)  # -1 indicates the function failed

        # Find end row by iterating in reverse order over the filtered velocity values
        for i in range(len(df) - 1, start_row + 1, -1):
            if velocity_filtered_pos[i] >= filtered_velocity_threshold:
                end_row = i
                break

        # If end_row is still None, it means no consistent low-velocity frames were found
        if end_row is None:
            end_row = len(df) - 1

        return start_row, end_row

def extract_metrics_from_predictions(xy_cal, frame_rate, csv_path, vid_num):
    '''
    Defines the start and end of zebrafish movement within a video's csv based on a hard-coded threshold for minimum velocity. Uses the start frame and end frame to create a new csv containing only the frames of interest.

    {xy_cal} must be obtained manually. It is obtained by using ImageJ's ruler tool to measure the number of pixels taken by the diameter of the petri dish within the video. By comparing the number of pixels to a known real world measurement (petri dish),
    we obtain the real world "length" occupied by a single pixel. The formula is:

    xy calibration = [diameter of petri dish (in mm)] / [# of pixels]

    A normal xy calibration value is approximately 0.35, but can vary to as low as 0.32 and as high as 0.37 depending on the camera's height above the petri dish (e.g., when the camera is closer, each pixel accounts for a smaller real life measure, therefore increasing the size of
    the equation's denominator (more pixels required to fill the same space) and increasing the xy calibration value. Essentially, a closer camera means larger value of xy.)\

    The units of xy_cal are mm/pixel.

    {frame_rate} is a required input in order to calculate the velocity of the fish. By using the time between frames, and the position at frame A versus frame B, the function performs the operation: (pos_at_frame_B - pos_at_frame_A / time_btn_frames) to obtain the fish's velocity
    '''
    df = pd.read_csv(csv_path)

    # Create output CSV path
    new_csv_path = os.path.split(csv_path)[0] + '/formatted_data/' + os.path.split(csv_path)[1]
    os.makedirs(os.path.split(new_csv_path)[0], exist_ok=True)

    # define a helper function so that we can use the .apply() method on the xy columns
    def convert_to_real_space(cell):
        new_cell = cell * xy_cal
        return new_cell

    def convert_to_pixel_space(cell):
        new_cell = cell / xy_cal
        return new_cell

    # create real space X and Y values in two new columns
    df['X_real'] = df['X'].map(convert_to_real_space)
    df['Y_real'] = df['Y'].map(convert_to_real_space)

    # Find the rows of interest, i.e. those in which the fish is moving
    start_row, end_row = find_start_end_rows(df, 'X_real', 'Y_real', frame_rate)

    if start_row == -1:
      print(f"No movement detected in video #{vid_num}!")
      return

    # create the distance column
    df['distance_since_last_frame'] = np.nan  # Initialize the new column with NaN
    df.loc[1:, 'distance_since_last_frame'] = np.sqrt((df.loc[1:, 'X_real_filt'] - df['X_real_filt'].shift(1).iloc[1:])**2 + (df.loc[1:, 'Y_real_filt'] - df['Y_real_filt'].shift(1).iloc[1:])**2)

    # Create filtered X and Y that are in pixel space
    df['X_filt'] = df['X_real_filt'].map(convert_to_pixel_space)
    df['Y_filt'] = df['Y_real_filt'].map(convert_to_pixel_space)

    # Create subset_df which contains only the frames during which the fish is moving
    subset_df = df.iloc[start_row:end_row + 1]
    subset_df = subset_df.copy()

    # Debugging: Print the DataFrame and the start_row
    #print("df:")
    #print(df)
    #print("subset_df:")
    #print(subset_df)
    print("start_row:", start_row)
    print("end_row:", end_row)

    # Create normalized X and Y rows (which are used to create figures by hand after data processing)
    subset_df.loc[:, 'X_norm'] = subset_df['X_real_filt'] - subset_df.loc[start_row, 'X_real_filt']
    subset_df.loc[:, 'Y_norm'] = subset_df['Y_real_filt'] - subset_df.loc[start_row, 'Y_real_filt']

    # Calculate the values for the metric column
    swim_duration = (subset_df.loc[end_row, 'Frame Number'] - subset_df.loc[start_row, 'Frame Number']) * (1 / frame_rate)
    swim_distance = subset_df['distance_since_last_frame'].sum()
    mean_swim_velocity = subset_df['vel_from_filt'].mean()
    max_swim_velocity = subset_df['vel_from_filt'].max()

    # Create the metrics dataframe
    metrics_list = [['swim duration', swim_duration], ['swim distance', swim_distance], ['mean swim velocity', mean_swim_velocity], ['max swim velocity', max_swim_velocity]]
    metrics_df = pd.DataFrame(metrics_list)

    # Format the dataframe
    df_formatted = subset_df[['Frame Number', 'X_filt', 'Y_filt', 'X_norm', 'Y_norm', 'distance_since_last_frame', 'vel_from_filt']]

    # Combine the dataframes manually because .concat refuses to work
    df_formatted = df_formatted.reset_index(drop=True)
    metrics_df = metrics_df.reset_index(drop=True)

    # Determine the maximum number of rows needed
    max_len = max(len(df_formatted), len(metrics_df))

    # Create an empty dataframe with the appropriate columns
    df_out = pd.DataFrame(index=range(max_len), columns=df_formatted.columns.tolist() + metrics_df.columns.tolist())

    # Fill the new dataframe with values from df_formatted and metrics_df
    df_out[df_formatted.columns] = df_formatted
    df_out[metrics_df.columns] = metrics_df

    df_out.to_csv(new_csv_path, index=False)

    # Create a csv for testing purposes
    #test_csv_path = os.path.split(new_csv_path)[0] + '/test_4.csv'
    #df_out_2 = df[['Frame Number', 'X', 'Y', 'distance_since_last_frame', 'velocity_since_last_frame', 'filtered_velocity']]
    #df_out_2.to_csv(test_csv_path, index=False)

def extract_metrics_from_directory(xy_cal, frame_rate, directory):
    '''
    Calls the extract_metrics_from_predictions function on all CSVs in a directory.
    '''
    csv_files = glob.glob(os.path.join(directory, '*.csv'))

    vid_num=1
    for file in csv_files:
        print(f"This is for csv #{vid_num}")
        extract_metrics_from_predictions(xy_cal, frame_rate, file, vid_num)
        vid_num+=1

def format_csv_for_imagej(df, directory, output_csv):
    '''
    Takes one formatted_data CSV file (as a pandas dataframe) as input, and arranges the columns using pandas such that the output CSV can be read into
    ImageJ's manual tracking plugin along with its associated annotated video file.
    '''
    # Initialize the output dataframe with the required columns
    output_df = pd.DataFrame()

    # Fill the empty header column with sequential numbers starting from 1
    output_df[''] = range(1, len(df) + 1)

    # Fill the 'Track n°' column with 1s
    output_df['Track n°'] = 1

    # Copy the 'Slice n°' (Frame Number) from the input dataframe
    output_df['Slice n°'] = df['Frame Number']

    # Copy the X and Y data from the input dataframe
    output_df['X'] = df['X_filt']
    output_df['Y'] = df['Y_filt']

    # Set the 'Distance', 'Velocity', and 'Pixel Value' columns to 0
    output_df['Distance'] = 0
    output_df['Velocity'] = 0
    output_df['Pixel Value'] = 0

    # Ensure the columns are in the correct order
    output_df = output_df[['', 'Track n°', 'Slice n°', 'X', 'Y', 'Distance', 'Velocity', 'Pixel Value']]

    # Write the output dataframe to a CSV file
    output_dir = MASTER_VIDEO_DIRECTORY + '/predictions'
    os.makedirs(output_dir, exist_ok=True)

    output_txt = output_csv.split('.csv')[0] + '.txt'

    output_filepath = os.path.join(output_dir, output_txt)
    output_df.to_csv(output_filepath, sep='\t', index=False)


def combine_csv_to_xlsx(directory):
    '''
    Combines all formatted data CSVs in one directory into one large xlsx file, which contains two sheets: one with all the data, and the other a summary page
    '''
    #Create the filepath for the output file
    output_file = os.path.split(MASTER_VIDEO_DIRECTORY)[-1] + '_results.xlsx'
    output_path = os.path.join(MASTER_VIDEO_DIRECTORY, output_file)

    # Create a writer object to write to Excel file
    with pd.ExcelWriter(output_path, engine='xlsxwriter') as writer:
        # Initialize a variable to keep track of the current row in the 'Fish Data' sheet
        current_row = 0

        # Create empty DataFrame for Data Overview
        data_overview = pd.DataFrame(columns=['Fish Number', 'Swim Duration', 'Swim Distance', 'Mean Swim Velocity', 'Max Swim Velocity'])

        # Initialize lists to store data for calculations
        col9_1_list, col9_2_list, col9_3_list, col9_4_list = [], [], [], []

        fish_number = 1

        # Define and use a natural sort function to turn the filename list into human readable form (i.e. csvs listed in order)
        def natural_sort_key(s):
            return [int(text) if text.isdigit() else text.lower() for text in re.split(r'(\d+)', s)]

        filenames = [filename for filename in os.listdir(directory) if filename.endswith(".csv")]
        filenames.sort(key=natural_sort_key)

        for filename in filenames:
            if filename.endswith(".csv"):
                # Read the CSV file
                csv_path = os.path.join(directory, filename)
                df = pd.read_csv(csv_path)

                # Check if the CSV file has at least 9 columns
                if df.shape[1] < 9:
                    print(f"Skipping {filename} as it has fewer than 9 columns")
                    continue

                # Create a CSV that can be opened in ImageJ, for review
                format_csv_for_imagej(df, MASTER_VIDEO_DIRECTORY, filename)

                # Create a new DataFrame with the required structure
                fish_df = pd.DataFrame(columns=['Fish Number'] + df.columns.tolist() + ['Blank'])
                fish_df['Fish Number'] = ['Fish {}'.format(fish_number)] + [''] * (len(df) - 1)
                fish_df[df.columns] = df
                fish_df['Blank'] = ''

                # Determine the starting column for the current fish data
                startcol = (fish_number - 1) * 11

                # Append the fish_df to the Excel writer
                fish_df.to_excel(writer, sheet_name='Fish Data', startrow=0, startcol=startcol, index=False, header=True)

                # Update the current row
                current_row += len(fish_df) + 1  # +1 for the header row

                # Extract the required data for Data Overview
                col9_data = df.iloc[:, 8]
                col9_1_list.append(col9_data.iloc[0])
                col9_2_list.append(col9_data.iloc[1])
                col9_3_list.append(col9_data.iloc[2])
                col9_4_list.append(col9_data.iloc[3])

                fish_number += 1

        # Construct the Data Overview DataFrame
        data_overview['Fish Number'] = ['Fish {}'.format(i+1) for i in range(len(col9_1_list))]
        data_overview['Swim Duration'] = col9_1_list
        data_overview['Swim Distance'] = col9_2_list
        data_overview['Mean Swim Velocity'] = col9_3_list
        data_overview['Max Swim Velocity'] = col9_4_list

        # Create the labels and values DataFrame
        labels_values = pd.DataFrame({
            'Label': ['average swim duration (s)', 'average swim distance (mm)', 'average mean swim velocity (mm/s)', 'average max swim velocity (mm/s)'],
            'Value': [data_overview['Swim Duration'].mean(), data_overview['Swim Distance'].mean(), data_overview['Mean Swim Velocity'].mean(), data_overview['Max Swim Velocity'].mean()]
        })

        # Write the Data Overview and labels/values to the second sheet
        data_overview.to_excel(writer, sheet_name='Data Overview', index=False)
        labels_values.to_excel(writer, sheet_name='Data Overview', startrow=len(data_overview) + 2, index=False)


Use the functions created in this project so far to create a master function that takes a directory full of videos as input, as well as an xy_calibration value, and frame_rate, and then outputs an xlsx file containing the processed data.

In [27]:
import shutil

def videos_to_fish_data(video_directory, xy_cal, frame_rate):
    '''
    Calls helper functions to direct the extraction of predictions, then corrects predictions, then extracts data from predictions.
    '''
    #predict_videos_to_csv(video_directory, 30, False)
    #print("predict_videos_to_csv ran successfully")

    csv_directory = os.path.join(video_directory, 'predicted_csvs')
    #correct_csv_files(csv_directory, xy_cal)
    #print("correct_csv_files ran successfully")

    corrected_directory = os.path.join(csv_directory, 'corrected_csvs')
    #extract_metrics_from_directory(xy_cal, frame_rate, corrected_directory)
    #print("extract_metrics_from_directory ran successfully")

    formatted_csv_directory = os.path.join(corrected_directory, 'formatted_data')
    combine_csv_to_xlsx(formatted_csv_directory)
    print("combine_metrics ran successfully")

    # remove temporary files
    #shutil.rmtree(f'drive/MyDrive/Colab Notebooks/Zebrafish Data/{video_directory}/predicted_csvs')
    #print("removed temporary directory")

In [28]:
from google.colab import drive

drive.mount("/content/drive", force_remount=True)

Mounted at /content/drive


In [29]:
!ls 'drive/MyDrive/Colab Notebooks/Zebrafish Data/TDP43 G348C Touch Response 02-08-24/WT/predicted_csvs/corrected_csvs/formatted_data'

ls: cannot access 'drive/MyDrive/Colab Notebooks/Zebrafish Data/TDP43 G348C Touch Response 02-08-24/WT/predicted_csvs/corrected_csvs/formatted_data': No such file or directory


In [30]:
MASTER_VIDEO_DIRECTORY = 'drive/MyDrive/Colab Notebooks/Zebrafish Data/TDP43 G348C Touch Response 02-08-24/WT'
videos_to_fish_data(MASTER_VIDEO_DIRECTORY, 0.337408, 30)


This is for csv #1
start_row: 36
end_row: 97
This is for csv #2
start_row: 32
end_row: 55
This is for csv #3
start_row: 40
end_row: 54
This is for csv #4
start_row: 36
end_row: 74
This is for csv #5
start_row: 32
end_row: 156
This is for csv #6
start_row: 39
end_row: 61
This is for csv #7
start_row: 49
end_row: 92
This is for csv #8
start_row: 37
end_row: 175
This is for csv #9
start_row: 49
end_row: 79
This is for csv #10
start_row: 49
end_row: 137
This is for csv #11
start_row: 33
end_row: 219
This is for csv #12
start_row: 39
end_row: 88
This is for csv #13
start_row: 47
end_row: 236
This is for csv #14
start_row: 25
end_row: 65
This is for csv #15
start_row: 38
end_row: 92
This is for csv #16
start_row: 31
end_row: 89
This is for csv #17
start_row: 39
end_row: 119
This is for csv #18
start_row: 22
end_row: 86
This is for csv #19
start_row: 28
end_row: 159
This is for csv #20
start_row: 39
end_row: 157
This is for csv #21
start_row: 28
end_row: 81
This is for csv #22
start_row: 30
e

In [None]:
MASTER_VIDEO_DIRECTORY = 'drive/MyDrive/Colab Notebooks/Zebrafish Data/TDP43 G348C Touch Response 02-08-24/TDP43'
videos_to_fish_data(MASTER_VIDEO_DIRECTORY, 0.337408, 30)

combine_metrics ran successfully


Code for deleting various folders at various stages of videos_to_fish_data (for testing purposes)

In [None]:
#shutil.rmtree(f'drive/MyDrive/Colab Notebooks/Zebrafish Data/TDP43 G348C Touch Response 02-08-24/WT/predicted_csvs/corrected_csvs')
#shutil.rmtree(f'drive/MyDrive/Colab Notebooks/Zebrafish Data/TDP43 G348C Touch Response 02-08-24/WT/predicted_csvs/corrected_csvs/formatted_data')

In [None]:
#shutil.rmtree(f'drive/MyDrive/Colab Notebooks/Zebrafish Data/TDP43 G348C Touch Response 02-08-24/WT/predicted_csvs/corrected_csvs')
#shutil.rmtree(f'drive/MyDrive/Colab Notebooks/Zebrafish Data/TDP43 G348C Touch Response 02-08-24/WT/predicted_csvs/corrected_csvs/formatted_data')

In [None]:
#TODO: Figure out how to package this code up and run it as independent software (i.e. without having to have jupyter notebooks, etc)