Load necessary prerequisites

In [None]:
!pip install ffmpeg-python

Video Processing Functions

In [None]:
import ffmpeg
import os
import csv
import subprocess
import re

def extract_frames(video_path, output_dir, data_folder, csv_path): # modify to extract even unannotated frames, and just have the annotations be whatever the first one is thats in the file until it is touched?
    """
    Extract frames (specified in csv file) from a video using ffmpeg-python.

    :param video_path: Path to the input video file.
    :param output_folder: Path to the output folder where frames will be saved.
    :param data_folder: Name of the folder where the original video data is stored.
    :param csv_path: Path to the input csv file which establishes which frames are to be extracted.
    """
    # Ensure the output directory exists
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # Read the frame numbers from the CSV file
    frame_numbers = []
    with open(csv_path, newline='') as csvfile:
        reader = csv.reader(csvfile)
        next(reader) #skip the header row
        for row in reader:
            frame_numbers.append(int(float(row[0])))

    # Get the base name of the video file (without directory, extension and date tag)
    video_basename = os.path.splitext(os.path.basename(video_path))[0]
    video_basename_no_date = (re.split(r'(_\d{4}-\d{2}-\d{2})', video_basename))[0]

    # Iterate through the frame numbers and extract each frame
    for frame_number in frame_numbers:
        output_file = os.path.join(output_dir, f"{data_folder}_{video_basename_no_date}_frame_{frame_number:03d}.png")
        # Extract the frame using ffmpeg
        command = [
            'ffmpeg',
            '-i', video_path,           # Input file
            '-vf', f'select=eq(n\\,{frame_number})',  # Select specific frame
            '-vsync', 'vfr',            # Variable frame rate
            '-q:v', '2',                # Quality setting for PNG
            '-frames:v', '1',           # Extract only one frame
            output_file
        ]
        try:
            result = subprocess.run(command, check=True, capture_output=True, text=True)
        except subprocess.CalledProcessError as e:
            print(f"Error extracting frame {frame_number}: {e.stderr}")
            raise

def list_avi_files(folder_path):
    """
    Lists all .avi files in the given folder.

    :param folder_path: Path to the folder
    :return: List of .mp4 file names
    """
    try:
        # List all files in the folder
        files = os.listdir(folder_path)
        # Filter out only .avi files
        avi_files = [file for file in files if file.endswith('.avi')]
        return avi_files
    except Exception as e:
        return str(e)


Function to convert single video .csv files into single-frame .txt files

In [None]:
import csv
import os

# Function to read the input CSV file and write each row to its own .txt file
def split_csv_to_txt(input_file_path, output_txt_folder_path, data_folder, current_fish):
    # Ensure the output directory exists
    os.makedirs(output_txt_folder_path, exist_ok=True)

    with open(input_file_path, mode='r', newline='', encoding='utf-8') as infile:
        reader = csv.reader(infile)
        next(reader)  # Skip the header row

        for i, row in enumerate(reader, start=1):
            output_file = os.path.join(output_txt_folder_path, f'{data_folder}_{current_fish}_frame_{int(float(row[0])):03}.txt')
            new_row = ["0", str((float(row[1]))/640), str((float(row[2]))/480), str(28/640), str(28/480)] #creating proper format for frame annotation .txt file
            with open(output_file, mode='w', encoding='utf-8') as outfile:
                outfile.write(' '.join(new_row) + '\n')
            print(f'Written {output_file}')


#Call the function to split the CSV
#split_csv_to_txt("drive/MyDrive/Colab Notebooks/Zebrafish Frames and Annotations/WT Touch Response Virginie Pt 2/Old WT 01.csv", "drive/MyDrive/Colab Notebooks/Zebrafish Frames and Annotations/WT Touch Response Virginie Pt 2/Old WT 01", "WT Touch Response Virginie Pt 2", "Old WT 01")

Script that extracts frames and annotations from a folder containing one excel file and a bunch of videos

In [None]:
import openpyxl
import csv
import os

def search_files_with_prefix(directory, prefix):
    files = os.listdir(directory)
    matching_files = [f for f in files if f.startswith(prefix)]
    return matching_files

def extract_filename(input_string):
    if '/' in input_string:
        # Split the path using os.path.split and return only the filename
        _, filename = os.path.split(input_string)
        return filename
    else:
        # If no forward slash is found, return the input string as is
        return input_string

def extract_annotations_and_frames(data_folder: str, naming_convention='', leading_zeros=0, col_spacing=9, frame_col_spacing=1, x_col_spacing=3, y_col_spacing=4):
    '''
    Arguments:
      data_folder (str): the name of the folder where the data (.xlsx file and associated videos) are stored
      naming_convention (str): optional arg that details the naming convention that should be used to create the resulting .txt files. It is also necessary
                                for using search_files_with_prefix, in order to extract frames from the corresponding video. If not provided on input, naming_convention is
                                determined by the sheet name, as given by wb.sheetnames.
      leading zeros (int): optional arg that specifies the number of leading zeros for the fish number. If not specified, the default is 0.
      col_spacing (int): optional arg that specifies the total number of columns occupied by a single fish's data in the .xlsx file.
      frame_col_spacing (int): optional arg that specifies the number of the column (assuming each fish's data starts at a 0th column) that contains the frame number data.
      x_col_spacing (int): " " that contains the x coordinate data.
      y_col_spacing (int): " " that contains the y coordinate data.
    Details:
      This is the high-level function where we orchestrate all the steps necessary for obtaining annotations and frames.
      Assumptions:
      1. a folder contains exactly one XLSX file and multiple video files;
      2. the XLSX file has the exact same name as the folder it is in, but with a .xlsx suffix
      Steps:
      0. Create the YAML file with a single class 0: zebrafish, used for the entire dataset
      1. The XLSX file is used as the master file to drive the process. It has the annotations for all
        videos in this folder.
      2. Open and process the XLSX file
      3. foreach ( annotation set in XLSX file ): (because there are multiple videos in the folder)
          1. get the fish number
          2. recreate the prefix of the AVI filename by using the supplied prefix, and appending the fish_id
            with the required number of leading zeros to the end of the prefix
          3. Search the folder for a match using wildcards - we should find exactly one matching file
          4. Create the YOLO-comptabile dataset for this folder:
              0. create and apply the file naming convention for the files (similar or same for images and annotation text files)
              1. extract all needed frames from the video you just found
              2. create the pytorch & YOLO-compatible annotations for every video frame, one file per frame
              3. save the files to the destination folder (and whatever hard-coded subfolders thereunder as required)
    '''
    # Load the workbook
    xlsx_file = data_folder + '.xlsx'
    wb = openpyxl.load_workbook(f'drive/MyDrive/Colab Notebooks/Zebrafish Data/{data_folder}/{xlsx_file}', data_only=True)

    # Ensure output directories exist
    output_dir = f'drive/MyDrive/Colab Notebooks/Zebrafish Frames and Annotations/{data_folder}'
    os.makedirs(output_dir, exist_ok=True)

    output_dir_annotations = f'drive/MyDrive/Colab Notebooks/Zebrafish Frames and Annotations/{data_folder}/Annotations'
    os.makedirs(output_dir_annotations, exist_ok=True)

    output_dir_frames = f'drive/MyDrive/Colab Notebooks/Zebrafish Frames and Annotations/{data_folder}/Frames'
    os.makedirs(output_dir_frames, exist_ok=True)

    # Track fish number for naming csv files
    fish_number = 1

    for sheet in wb.sheetnames:
        ws = wb[sheet]

        # Assuming fish data starts at column 1
        col_start = 1

        while col_start <= ws.max_column:
            # Columns for Frame Number, X, and Y coordinates
            frame_col = col_start + frame_col_spacing
            x_col = col_start + x_col_spacing
            y_col = col_start + y_col_spacing

            # Check if the X column is within the sheet's max column range
            if x_col > ws.max_column:
                break

            # Collect data from the X and Y columns
            frame_data = []
            x_data = []
            y_data = []

            for row in ws.iter_rows(min_row=1, max_row=ws.max_row, min_col=frame_col, max_col=y_col):
                frame_cell = row[0]
                x_cell = row[1]
                y_cell = row[2]
                if frame_cell.value is not None and x_cell.value is not None and y_cell.value is not None:
                    frame_data.append(frame_cell.value)
                    x_data.append(x_cell.value)
                    y_data.append(y_cell.value)

            # Write to a CSV file if we have data
            if frame_data and x_data and y_data:

                #Account for leading zeros in filename, if present
                zeros = ''
                if (leading_zeros != 0) and fish_number < (10*leading_zeros):
                  i=0
                  while i < leading_zeros:
                    zeros += '0'
                    i += 1

                #Define the naming convention based on current sheet, if naming_convention not specified in func input
                if not naming_convention:
                  naming_convention = sheet

                print(naming_convention)
                # Define the CSV file name
                csv_filename = f'{naming_convention}{zeros}{fish_number}.csv'
                csv_path = os.path.join(output_dir, csv_filename)

                # Create the CSV file
                with open(csv_path, 'w', newline='') as csvfile:
                    csvwriter = csv.writer(csvfile)
                    csvwriter.writerow(['Frame Number', 'X', 'Y'])

                    #Write an extra 15 frames into the CSV, these 15 frames precede the first frame where fish moves
                    for i in range(0,15):
                      try:
                        frame_number = (frame_data[0] - (15-i))
                      except Exception as e:
                        print(f"An error occurred in creating {csv_filename}: {e}")
                      if frame_number > 0:
                        csvwriter.writerow([str(frame_number), str(x_data[0]), str(y_data[0])])

                    csvwriter.writerows(zip(frame_data, x_data, y_data))

                print(f'Saved {csv_path}')

                #Find current_fish name for file search purposes
                current_fish = os.path.splitext(csv_filename)[0]

                # Use folder_name to search the video folder directory for a video with a matching name
                video_name = current_fish + '_'
                video_file = search_files_with_prefix(f'drive/MyDrive/Colab Notebooks/Zebrafish Data/{data_folder}', video_name)
                if video_file:
                  print(video_file)
                else:
                  print("Extraction Process Terminated. Reason: No Video Found")
                  return

                # Extract the individual frame .txt files from the video CSV file
                split_csv_to_txt(csv_path, output_dir_annotations, data_folder, current_fish)

                # Extract only the relevant frames from the video file
                extract_frames(f'drive/MyDrive/Colab Notebooks/Zebrafish Data/{data_folder}/{video_file[0]}', output_dir_frames, data_folder, csv_path)

            # Move to the next fish data block
            col_start += col_spacing
            fish_number += 1



Make sure Drive is properly refreshed before initiating extraction (restart runtime or run drive.mount())

In [None]:
from google.colab import drive

drive.mount("/content/drive", force_remount=True)

Run the extract_annotations_and_frames function to extract frames and annotations for all data.

In [None]:
#Example usage
extract_annotations_and_frames('WT Touch Response 30-05-24', col_spacing=12)


Count the number of frames and annotations to make sure they are the same

In [None]:
import os
from google.colab import drive

drive.mount("/content/drive", force_remount=True)

def count_files_in_directory(directory_path):
    """
    Count the number of files in the given directory.

    Parameters:
    directory_path (str): The path to the directory.

    Returns:
    int: The number of files in the directory.
    """
    try:
        # List all entries in the directory
        entries = os.listdir(directory_path)
        # Count the number of files
        file_count = sum(1 for entry in entries if os.path.isfile(os.path.join(directory_path, entry)))
        return file_count
    except FileNotFoundError:
        print(f"The directory {directory_path} does not exist.")
        return 0
    except Exception as e:
        print(f"An error occurred: {e}")
        return 0

def list_directories_and_count_subdirs(directory):
    try:
        # List all directories in the given directory
        with os.scandir(directory) as entries:
            for entry in entries:
                if entry.is_dir():
                    print(f"----------Sub-Directory: {entry.name}----------")
                    # Call count_files_in_directory on each subdirectory
                    subdir_frames = entry.path + "/Frames"
                    subdir_annotations = entry.path + "/Annotations"
                    print(f'The number of files in {entry.name}/Frames is: {count_files_in_directory(subdir_frames)}')
                    print(f'The number of files in {entry.name}/Annotations is: {count_files_in_directory(subdir_annotations)}')
    except FileNotFoundError:
        print(f"The directory '{directory}' does not exist.")
    except PermissionError:
        print(f"Permission denied to access '{directory}'.")


Script that counts all files created by previous extract function

N.B. ONLY runs properly when all subdirectories in "Zebrafish Frames and Annotations" contain the subdirectories "Frames" and "Annotations", i.e. if you have already run the copy_subdirectory_contents_to_master to create "images" and "labels" this code will not work.

In [None]:
 list_directories_and_count_subdirs("drive/MyDrive/Colab Notebooks/Zebrafish Frames and Annotations")


Extract data from each of the data folders into two master Frames/Annotations folders

In [None]:
import os
import shutil

def copy_subdirectory_contents_to_master(master_frames_dir, master_annotations_dir, base_dir):
    # Ensure the master directories exist
    os.makedirs(master_frames_dir, exist_ok=True)
    os.makedirs(master_annotations_dir, exist_ok=True)

    # Walk through the base directory
    for subdir in os.listdir(base_dir):
        subdir_path = os.path.join(base_dir, subdir)

        if os.path.isdir(subdir_path):
            frames_subdir = os.path.join(subdir_path, 'Frames')
            annotations_subdir = os.path.join(subdir_path, 'Annotations')

            # Copy frames if the Frames subdirectory exists
            if os.path.exists(frames_subdir):
                for item in os.listdir(frames_subdir):
                    item_path = os.path.join(frames_subdir, item)
                    if os.path.isfile(item_path):
                        shutil.copy(item_path, master_frames_dir)
                    elif os.path.isdir(item_path):
                        shutil.copytree(item_path, os.path.join(master_frames_dir, item), dirs_exist_ok=True)

            # Copy annotations if the Annotations subdirectory exists
            if os.path.exists(annotations_subdir):
                for item in os.listdir(annotations_subdir):
                    item_path = os.path.join(annotations_subdir, item)
                    if os.path.isfile(item_path):
                        shutil.copy(item_path, master_annotations_dir)
                    elif os.path.isdir(item_path):
                        shutil.copytree(item_path, os.path.join(master_annotations_dir, item), dirs_exist_ok=True)


Using the above function to extract subfolder data into master "images" and "labels" directories

In [None]:
master_frames = 'drive/MyDrive/Colab Notebooks/Zebrafish Frames and Annotations/images'
master_annotations = 'drive/MyDrive/Colab Notebooks/Zebrafish Frames and Annotations/labels'
base_directory = 'drive/MyDrive/Colab Notebooks/Zebrafish Frames and Annotations'

copy_subdirectory_contents_to_master(master_frames, master_annotations, base_directory)

Using "count_files_in_directories" func to ensure data extraction went smoothly.

N.B. The # of files in "images" and "labels" should be identical

In [None]:
print(f"----------Sub-Directories: images and labels----------")
# Call count_files_in_directory on each subdirectory
subdir_frames = "drive/MyDrive/Colab Notebooks/Zebrafish Frames and Annotations" + "/images"
subdir_annotations = "drive/MyDrive/Colab Notebooks/Zebrafish Frames and Annotations" + "/labels"
print(f'The number of files in /images is: {count_files_in_directory(subdir_frames)}')
print(f'The number of files in /labels is: {count_files_in_directory(subdir_annotations)}')

Delete any missing files (if any)

In [None]:
import os
import subprocess

def find_celibate_files(txt_directory, png_directory):
    try:
        # List all .txt files in the first directory
        txt_files = [f for f in os.listdir(txt_directory) if f.endswith('.txt')]

        for txt_file in txt_files:
            # Construct the expected .png file name
            png_file = txt_file.replace('.txt', '.png')
            png_path = os.path.join(png_directory, png_file)

            if not os.path.exists(png_path):
                print(f"No match was found for the following .txt file and it will be deleted: {txt_file}")
                filepath = f'drive/MyDrive/Colab Notebooks/Zebrafish Frames and Annotations/labels/{txt_file}'
                subprocess.run(['rm', filepath])

    except FileNotFoundError as e:
        print(f"Error: {e}")
    except PermissionError as e:
        print(f"Permission denied: {e}")

# Example usage
find_celibate_files('drive/MyDrive/Colab Notebooks/Zebrafish Frames and Annotations/labels', 'drive/MyDrive/Colab Notebooks/Zebrafish Frames and Annotations/images')


Remove the files found in the previous block by "find_celibate_files"

In [None]:
#Example
!rm 'drive/MyDrive/Colab Notebooks/Zebrafish Frames and Annotations/labels/Tyler Double KO Touch Responses_Double KO 15_frame_298.txt'

Clear Annotations and Frames Directories (For use during testing only  --- RUN WITH EXTREME CAUTION)

In [None]:
import os
import glob


def empty_two_directories(dir1, dir2):
  confirmation = input("Please Confirm (Type 'Y'): ")

  if confirmation == "Y":
    files = glob.glob(f'drive/MyDrive/Colab Notebooks/Zebrafish Frames and Annotations/{dir1}')
    for f in files:
        os.remove(f)

    files = glob.glob(f'drive/MyDrive/Colab Notebooks/Zebrafish Frames and Annotations/{dir2}')
    for f in files:
        os.remove(f)
    print("Directories cleared")
  else:
    print("Cancelled")


Please Confirm (Type 'Y'): Y
Directories cleared


Code for double checking that an annotation makes sense for a given .txt/.png pair (this generates an image which is a visual representation of any image-annotation pair)

In [None]:
import os
import random
import cv2  # OpenCV for image processing

def overlay_bounding_box(labels_dir, images_dir, output_dir):
    # Ensure the output directory exists
    os.makedirs(output_dir, exist_ok=True)

    # Get a list of all .txt files in the labels directory
    txt_files = [f for f in os.listdir(labels_dir) if f.endswith('.txt')]

    # If there are no .txt files, return None
    if not txt_files:
        print("No .txt files found in the labels directory.")
        return None

    # Select a random .txt file
    selected_txt_file = random.choice(txt_files)

    # Derive the corresponding .png file name
    base_name = os.path.splitext(selected_txt_file)[0]
    corresponding_png_file = base_name + '.png'

    # Check if the corresponding .png file exists in the images directory
    if corresponding_png_file not in os.listdir(images_dir):
        print(f"Corresponding .png file not found for {selected_txt_file}.")
        return None

    # Read the contents of the selected .txt file
    with open(os.path.join(labels_dir, selected_txt_file), 'r') as file:
        line = file.readline().strip()

    # Parse the line into a list of values
    values = line.split()

    # Ensure the line contains exactly 5 columns
    if len(values) != 5:
        print(f"Unexpected format in {selected_txt_file}. Expected 5 columns, got {len(values)}.")
        return None

    # Extract the required values
    label = int(values[0])
    x_center = float(values[1])
    y_center = float(values[2])
    x_len = float(values[3])
    y_height = float(values[4])

    # Load the corresponding image
    image_path = os.path.join(images_dir, corresponding_png_file)
    image = cv2.imread(image_path)

    # Get image dimensions
    img_height, img_width = image.shape[:2]

    # Calculate bounding box coordinates
    x_min = int((x_center - x_len / 2) * img_width)
    x_max = int((x_center + x_len / 2) * img_width)
    y_min = int((y_center - y_height / 2) * img_height)
    y_max = int((y_center + y_height / 2) * img_height)

    # Overlay the bounding box on the image
    cv2.rectangle(image, (x_min, y_min), (x_max, y_max), (0, 255, 0), 2)

    # Save the new image in the output directory
    output_image_path = os.path.join(output_dir, corresponding_png_file)
    cv2.imwrite(output_image_path, image)

    print(f"Image with bounding box saved to {output_image_path}")


In [None]:
labels_dir = 'drive/MyDrive/Colab Notebooks/Zebrafish Frames and Annotations/labels'
images_dir = 'drive/MyDrive/Colab Notebooks/Zebrafish Frames and Annotations/images'
output_dir = 'drive/MyDrive/Colab Notebooks/Zebrafish Frames and Annotations/testing_txt_png_pairs'
for i in range (0,8):
  overlay_bounding_box(labels_dir, images_dir, output_dir)