In [24]:
def process_and_merge_tracks(mot_file_path, video_path):
    """Process and merge overlapping tracks based on the provided MOT file and video."""

    def interpolate_tracks(tracking_data):
        """
        Interpolate x, y, width, height, class, confidence, and visibility for each track 
        between its first and last detection.
        """
        # Get unique object IDs
        unique_ids = tracking_data['object_id'].unique()

        # Create an empty DataFrame to store the interpolated values
        interpolated_data = pd.DataFrame()

        for obj_id in unique_ids:
            # Extract rows corresponding to the current object ID
            obj_data = tracking_data[tracking_data['object_id'] == obj_id]

            # Find the first and last frame for this object
            first_frame = obj_data['frame_id'].min()
            last_frame = obj_data['frame_id'].max()

            # Create a DataFrame for all frames between first and last
            all_frames = pd.DataFrame({'frame_id': range(first_frame, last_frame + 1)})
            all_frames['object_id'] = obj_id

            # Merge this with the original data (this will introduce NaNs for missing frames)
            merged_data = pd.merge(all_frames, obj_data, on=['frame_id', 'object_id'], how='left')

            # Interpolate the NaN values
            columns_to_interpolate = ['x', 'y', 'width', 'height', 'class', 'confidence', 'visibility']
            for col in columns_to_interpolate:
                merged_data[col].interpolate(inplace=True)

            # Handle non-integer values in the 'class' column, round them to the nearest integer
            merged_data['class'] = merged_data['class'].round().astype(int)

            first_detection_index = merged_data[merged_data['frame_id'] == first_frame].index[0]
            merged_data.at[first_detection_index, 'class'] = 3

            # Concatenate with the main interpolated_data DataFrame
            interpolated_data = pd.concat([interpolated_data, merged_data])

        # Reset the index for the final DataFrame
        interpolated_data.reset_index(drop=True, inplace=True)

        return interpolated_data


    def draw_bounding_boxes_on_video(video_path, mot_data, output_video_path):
        # Load the video
        cap = cv2.VideoCapture(video_path)
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

        # Initialize video writer
        out = cv2.VideoWriter(output_video_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))
        frame_id = 1
        while True:
            ret, frame = cap.read()
            if not ret:
                break

            # Extract bounding boxes for this frame
            frame_data = mot_data[mot_data['frame_id'] == frame_id]

            for _, row in frame_data.iterrows():
                x, y, w, h = int(row['x']), int(row['y']), int(row['width']), int(row['height'])
                track_id = int(row['object_id'])
                cls = int(row['class'])

                # Draw bounding box and ID
                color = (255, 0, 0) if cls == 1 else (0, 0, 255)
                cv2.rectangle(frame, (x, y), (x + w, y + h), color, 1)  # Reduced thickness to 1
                cv2.putText(frame, str(track_id), (x, y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)  # Positioned text above the bounding box

            out.write(frame)
            frame_id += 1

        cap.release()
        out.release()

    def subtract_median_background_with_morph(video_path, output_path, kernel_size=1, iterations=1):
        """
        This function reads the input video, calculates the median pixel value across 
        all frames, subtracts this median value from each frame, applies erosion and 
        dilation to remove small features, and then writes the result to an output video.

        Parameters:
        - video_path: path to the input video
        - output_path: path to the output video
        - kernel_size: size of the structuring element used for erosion/dilation
        - iterations: number of times erosion/dilation is applied
        """

        # Open the video
        cap = cv2.VideoCapture(video_path)

        # Get video properties
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

        # Initialize a list to store all frames
        frames = []

        # Read all frames and store in the list
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frames.append(frame)

        # Convert the list into a numpy array
        frames = np.array(frames)

        # Calculate the median frame
        median_frame = np.median(frames, axis=0).astype(dtype=np.uint8)

        # Initialize the video writer
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

        # Create a structuring element for erosion and dilation
        kernel = np.ones((kernel_size, kernel_size), np.uint8)

        # Subtract median frame from each frame, apply morphological operations, and write to the output video
        for i in range(total_frames):
            subtracted_frame = cv2.absdiff(frames[i], median_frame)
            
            # Convert to grayscale for morphological operations
            gray_frame = cv2.cvtColor(subtracted_frame, cv2.COLOR_BGR2GRAY)
            
            # Apply erosion and dilation
            eroded_frame = cv2.erode(gray_frame, kernel, iterations=iterations)
            dilated_frame = cv2.dilate(eroded_frame, kernel, iterations=iterations)

            # Convert back to BGR for saving
            final_frame = cv2.cvtColor(dilated_frame, cv2.COLOR_GRAY2BGR)

            out.write(final_frame)

        cap.release()
        out.release()
        
    def check_overlap(rect1, rect2):
        x1, y1, w1, h1 = rect1
        x2, y2, w2, h2 = rect2
        if x1 + w1 < x2 or x2 + w2 < x1:
            return False
        if y1 + h1 < y2 or y2 + h2 < y1:
            return False
        return True
    
    def check_for_duplicate_ids(tracking_data):
        """Check for rows in tracking_data with the same frame_id and object_id."""
        duplicates = tracking_data[tracking_data.duplicated(subset=['frame_id', 'object_id'], keep=False)]
        if not duplicates.empty:
            for _, row in duplicates.iterrows():
                print(f"Duplicate ID detected: Track ID {row['object_id']} in Frame {row['frame_id']}.")
        def check_overlap(rect1, rect2):
            x1, y1, w1, h1 = rect1
            x2, y2, w2, h2 = rect2
            if x1 + w1 < x2 or x2 + w2 < x1:
                return False
            if y1 + h1 < y2 or y2 + h2 < y1:
                return False
            return True

    def track_object(trackID, start_frame, end_frame, tracking_data, video_path):
        frames_without_merge = 0
        tracked_centroids = []
        new_track_id = None
        cap = cv2.VideoCapture(video_path)

        initial_frame_data = tracking_data[(tracking_data['object_id'] == trackID) & (tracking_data['frame_id'] == start_frame)].iloc[0]
        cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
        _, frame = cap.read()
        gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        initial_BB = gray_frame[int(initial_frame_data['y']):int(initial_frame_data['y'] + initial_frame_data['height']), 
                                int(initial_frame_data['x']):int(initial_frame_data['x'] + initial_frame_data['width'])]

        shape = initial_BB.shape
        sigma_x = shape[1] * .25
        sigma_y = shape[0] * .25
        y, x = np.ogrid[-shape[0]//2:shape[0]//2, -shape[1]//2:shape[1]//2]
        weight_matrix = np.exp(-(x**2 / (2.0 * sigma_x**2) + y**2 / (2.0 * sigma_y**2)))

        vector_r = range(-10,11)
        vector_c = range(-10,11)
        r = len(vector_r)
        c = len(vector_c)
        cosine_similarity_matrix = np.zeros((r, c))
        
        frame_height, frame_width = gray_frame.shape
        
        consecutive_overlap_count = 0
        for frame_number in range(start_frame + 1, end_frame + 1):
            frames_without_merge += 1
            cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
            _, frame = cap.read()
            gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

            for i in range(r):
                for j in range(c):
                    row_shift = vector_r[i]
                    col_shift = vector_c[j]
                    
                    y_start = max(0, int(initial_frame_data['y'])+row_shift)
                    y_end = min(frame_height, int(initial_frame_data['y'] + initial_frame_data['height'])+row_shift)
                    x_start = max(0, int(initial_frame_data['x'])+col_shift)
                    x_end = min(frame_width, int(initial_frame_data['x'] + initial_frame_data['width'])+col_shift)
                    
                    if y_start < y_end and x_start < x_end:
                        next_frame_BB_shifted = gray_frame[y_start:y_end, x_start:x_end]
                        
                        # Adjust weight_matrix size if needed
                        current_weight_matrix = weight_matrix[:next_frame_BB_shifted.shape[0], :next_frame_BB_shifted.shape[1]]
                        
                        cosine_similarity = - 2 * np.sum((next_frame_BB_shifted * current_weight_matrix)**2) 
                        cosine_similarity_matrix[i,j] = cosine_similarity

            cosine_similarity_matrix = gaussian_filter(cosine_similarity_matrix, sigma=1)

    # ... rest of your function remains unchanged
            min_cosine_similarity = np.min(cosine_similarity_matrix)
            min_cosine_similarity_index = np.where(cosine_similarity_matrix == min_cosine_similarity)
            row_shift = vector_r[min_cosine_similarity_index[0][0]]
            col_shift = vector_c[min_cosine_similarity_index[1][0]]

            initial_frame_data['x'] += col_shift
            initial_frame_data['y'] += row_shift

            tracked_centroids.append([
                frame_number,
                trackID,
                initial_frame_data['x'],
                initial_frame_data['y'],
                initial_frame_data['width'],
                initial_frame_data['height'],
                1, 2, 1
            ])

            cv2.rectangle(frame, (int(initial_frame_data['x']), int(initial_frame_data['y'])), 
                          (int(initial_frame_data['x'] + initial_frame_data['width']), int(initial_frame_data['y'] + initial_frame_data['height'])), 
                          (0, 255, 0), 2)

            active_tracks = tracking_data[(tracking_data['frame_id'] == frame_number) & (tracking_data['object_id'] != trackID)]
            overlap_detected_5 = False
            overlap_detected = False
            for _, track in active_tracks.iterrows():
                x, y, w, h = track['x'], track['y'], track['width'], track['height']
                if (track['class'] == 3) and check_overlap(
                    (initial_frame_data['x'], initial_frame_data['y'], initial_frame_data['width'], initial_frame_data['height']),
                    (x, y, w, h)
                ):
                    overlap_detected = True
                    overlapping_trackID = track['object_id']
                    break

            if overlap_detected:
                consecutive_overlap_count += 1
            else:
                consecutive_overlap_count = 0

            if consecutive_overlap_count >= 1:
                new_track_id = overlapping_trackID
                overlap_detected_5 = True
                #print(f"Overlap detected for trackID: {trackID} with {overlapping_trackID} at frame: {frame_number}.")
                break

        cap.release()

        tracked_data = pd.DataFrame(tracked_centroids, columns=['frame_id', 'object_id', 'x', 'y', 'width', 'height','confidence', 'class', 'visibility'])
        tracked_data['object_id'] = new_track_id
        return overlap_detected_5, new_track_id, tracked_data

    def handle_overlap(tracking_data, track_id_1, track_id_2, tracked_data):
        """Handle overlapping tracks by merging and updating the tracking data."""
        # 1. Change the 1st track's ID to the 2nd track's ID.
        tracking_data.loc[tracking_data['object_id'] == track_id_1, 'object_id'] = track_id_2
        
        # 2. Add new lines for the intermediate tracked locations.
        tracking_data = pd.concat([tracking_data, tracked_data], ignore_index=True)

        # 3. If there are 2 entries for one ID in the same frame, delete the entry where class = 2.
        duplicate_frames = tracking_data[tracking_data.duplicated(subset=['frame_id', 'object_id'], keep=False)]
        rows_to_drop = duplicate_frames[duplicate_frames['class'] == 2].index
        tracking_data.drop(rows_to_drop, inplace=True)

        return tracking_data
    
    from scipy.spatial import distance

    def correct_duplicate_ids(tracking_data):
        # Load the tracking data      
        unique_frames = tracking_data['frame_id'].unique()
        next_available_id = tracking_data['object_id'].max() + 1  # Start assigning IDs from the next available one
        previous_id_map = {}  # Dictionary to map old ID to new ID from the previous frame

        for frame in unique_frames:
            frame_data = tracking_data[tracking_data['frame_id'] == frame]
            duplicate_ids = frame_data[frame_data.duplicated(subset='object_id', keep=False)]['object_id'].unique()

            current_id_map = {}  # Dictionary to map old ID to new ID for the current frame

            for dup_id in duplicate_ids:
                duplicate_tracks = frame_data[frame_data['object_id'] == dup_id]
                if len(duplicate_tracks) > 1:
                    for index, row in duplicate_tracks.iloc[1:].iterrows():
                        # Check if the ID was mapped in the previous frame
                        if dup_id in previous_id_map:
                            new_id = previous_id_map[dup_id]
                        else:
                            new_id = next_available_id
                            next_available_id += 1

                        # Assign the new ID
                        tracking_data.at[index, 'object_id'] = new_id
                        current_id_map[dup_id] = new_id

            previous_id_map = current_id_map  # Update for the next frame

        return tracking_data
            
    def remove_short_tracks(tracked_data, min_detections=60):
        """
        Removes tracks from the dataframe that have fewer than a specified number of detections.
        
        Parameters:
        - tracked_data (pd.DataFrame): The dataframe containing tracking data.
        - min_detections (int): The minimum number of detections a track must have to be retained.
        
        Returns:
        - pd.DataFrame: A new dataframe with short tracks removed.
        """
        unique_ids = tracked_data['object_id'].unique()
        
        # Initialize a list to store indices of rows to be dropped
        drop_indices = []

        for track_id in unique_ids:
            track_rows = tracked_data[tracked_data['object_id'] == track_id]
            
            if len(track_rows) < min_detections:
                drop_indices.extend(track_rows.index.tolist())

        # Drop the rows with insufficient detections
        cleaned_data = tracked_data.drop(drop_indices)
        
        return cleaned_data



    def remove_static_tracks(tracked_data, window_size=20, move_threshold=10):
        """
        Removes tracks from the dataframe where the moving average position 
        at time t hasn't changed by more than a specified threshold from the 
        moving average of the first 10 frames.
        
        Parameters:
        - tracked_data (pd.DataFrame): The dataframe containing tracking data.
        - window_size (int): The size of the rolling window for moving average computation.
        - move_threshold (int): The minimum distance (in pixels) the moving average position 
                                of a track at time t must move from its initial moving average.
        
        Returns:
        - pd.DataFrame: A new dataframe with static tracks removed.
        """
        unique_ids = tracked_data['object_id'].unique()
        
        # Initialize a list to store indices of rows to be dropped
        drop_indices = []

        for track_id in unique_ids:
            print(track_id)
            track_rows = tracked_data[tracked_data['object_id'] == track_id].copy()
            
            # Calculate the center x and y coordinates
            track_rows['center_x'] = track_rows['x'] + track_rows['width'] / 2
            track_rows['center_y'] = track_rows['y'] + track_rows['height'] / 2
            
            # Compute moving averages for x and y for the first 10 frames
            initial_ma_x = track_rows['center_x'].iloc[:window_size].mean()
            initial_ma_y = track_rows['center_y'].iloc[:window_size].mean()
            
            # Compute moving averages for x and y for each subsequent frame
            track_rows['ma_center_x'] = track_rows['center_x'].rolling(window=window_size).mean()
            track_rows['ma_center_y'] = track_rows['center_y'].rolling(window=window_size).mean()
            
            track_rows.dropna(inplace=True)  # Remove rows with NaN values due to the rolling window
            
            moved = False
            for idx, row in track_rows.iterrows():
                distance_moved = np.sqrt((row['ma_center_x'] - initial_ma_x)**2 + 
                                        (row['ma_center_y'] - initial_ma_y)**2)
                #print( distance_moved)
                if distance_moved > move_threshold:
                    moved = True
                    break

            if not moved:
                # Extract the indices directly from the original tracked_data for the current track_id
                all_indices_for_track = tracked_data[tracked_data['object_id'] == track_id].index.tolist()
                drop_indices.extend(all_indices_for_track)
                #print(f"Track {track_id} is static.")

        # Drop the static tracks outside the loop
        tracked_data = tracked_data.drop(drop_indices, errors='ignore')  # The 'errors' parameter ensures it doesn't raise an error if an index doesn't exist

        return tracked_data

    def process_tracks(tracking_data, total_frames, video_path):
        """Process all tracks in the tracking data and merge overlapping tracks."""
        
        def diagnostic_check(dataframe):
            """Check for NaN values and print a message if found."""
            if dataframe['object_id'].isna().any():
                print("NaN values detected in process_tracks.")
        
        unique_tracks = tracking_data['object_id'].unique()
        
        # Diagnostic check after retrieving unique tracks
        diagnostic_check(tracking_data)
        
        # Process each track
        for track_id in unique_tracks:
            check_for_duplicate_ids(tracking_data)
            last_frame_of_track = tracking_data[tracking_data['object_id'] == track_id]['frame_id'].max()
            if last_frame_of_track < total_frames - 1:
                overlap_detected, new_track_id, tracked_data = track_object(track_id, last_frame_of_track, total_frames - 1, tracking_data, video_path)
                if overlap_detected:
                    tracking_data = handle_overlap(tracking_data, track_id, new_track_id, tracked_data)
                    if tracking_data['object_id'].isna().any():
                        print(f"NaN values detected after handling overlap for track_id: {track_id}.")
                
                # Diagnostic check after processing a single track_id
                diagnostic_check(tracking_data)
        
        # Diagnostic check after processing all tracks
        diagnostic_check(tracking_data)
        
        return tracking_data.sort_values(by=['frame_id', 'object_id'])

    def load_video_properties(video_path):
        """Load properties of the video like fps, width, height, and total frames."""
        cap = cv2.VideoCapture(video_path)
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        cap.release()
        return total_frames

    # Load tracking data and video properties
    column_names = ['frame_id', 'object_id', 'x', 'y', 'width', 'height', 'confidence', 'class', 'visibility']
    tracking_data = pd.read_csv(mot_file_path, header=None, names=column_names)
    
    tracking_data = interpolate_tracks(tracking_data)

    # Diagnostic check after loading tracking data
    if tracking_data['object_id'].isna().any():
        print("NaN values detected after loading the tracking data.")
    
    total_frames = load_video_properties(video_path)
    
    # Process and merge overlapping tracks
    # 1. Apply subtract_median_background_with_morph to the input video
    original_video_path = video_path
    median_output_path = video_path.rsplit('.', 1)[0] + "_median.mp4"
    
    # Check if median_output_path already exists
    if not os.path.exists(median_output_path):
        subtract_median_background_with_morph(video_path, median_output_path)
        video_path = median_output_path
    else:
        print(f"{median_output_path} already exists. Skipping background subtraction.")
        video_path = median_output_path
    updated_tracking_data = process_tracks(tracking_data, total_frames, video_path)

    # Correct duplicate IDs
    updated_tracking_data = correct_duplicate_ids(updated_tracking_data)
    
    # Remove short tracks
    updated_tracking_data = remove_short_tracks(updated_tracking_data, min_detections=60)
    
    # Remove static tracks
    updated_tracking_data = remove_static_tracks(updated_tracking_data, move_threshold=20)
    
    # Diagnostic check after processing tracks
    if updated_tracking_data['object_id'].isna().any():
        print("NaN values detected after processing the tracks.")

    # 2. Draw bounding boxes on the output video
    output_video_path = original_video_path.rsplit('.', 1)[0] + "_output.mp4"
    draw_bounding_boxes_on_video(original_video_path, updated_tracking_data, output_video_path)
    return updated_tracking_data


# Test the refactored function
from scipy.spatial.distance import cosine
import cv2
import numpy as np
import pandas as pd
from scipy.ndimage import gaussian_filter
import matplotlib.pyplot as plt
column_names = ['frame_id', 'object_id', 'x', 'y', 'width', 'height', 'confidence', 'class', 'visibility']
tracking_data_path= "/Users/benmartin/dev/results/240hz/gt 2/gt.txt"
video_path = "/Users/benmartin/dev/results/240hz/vids/54.mp4"
#refactored_result = process_and_merge_tracks(tracking_data_path, video_path)
#refactored_result.head()



In [30]:
def interpolate_tracks(tracking_data):
    """
    Interpolate x, y, width, height, class, confidence, and visibility for each track 
    between its first and last detection.
    """
    # Get unique object IDs
    unique_ids = tracking_data['object_id'].unique()

    # Create an empty DataFrame to store the interpolated values
    interpolated_data = pd.DataFrame()

    for obj_id in unique_ids:
        # Extract rows corresponding to the current object ID
        obj_data = tracking_data[tracking_data['object_id'] == obj_id]

        # Find the first and last frame for this object
        first_frame = obj_data['frame_id'].min()
        last_frame = obj_data['frame_id'].max()

        # Create a DataFrame for all frames between first and last
        all_frames = pd.DataFrame({'frame_id': range(first_frame, last_frame + 1)})
        all_frames['object_id'] = obj_id

        # Merge this with the original data (this will introduce NaNs for missing frames)
        merged_data = pd.merge(all_frames, obj_data, on=['frame_id', 'object_id'], how='left')

        # Interpolate the NaN values
        columns_to_interpolate = ['x', 'y', 'width', 'height', 'class', 'confidence', 'visibility']
        for col in columns_to_interpolate:
            merged_data[col].interpolate(inplace=True)

        # Handle non-integer values in the 'class' column, round them to the nearest integer
        merged_data['class'] = merged_data['class'].round().astype(int)

        # Set the 'class' of the first detection for this track to 2
        # Set the 'class' of the first detection for this track to 2
        first_detection_index = merged_data[merged_data['frame_id'] == first_frame].index[0]
        merged_data.at[first_detection_index, 'class'] = 2

        # Concatenate with the main interpolated_data DataFrame
        interpolated_data = pd.concat([interpolated_data, merged_data])

    # Reset the index for the final DataFrame
    interpolated_data.reset_index(drop=True, inplace=True)

    return interpolated_data

# Load tracking data and video properties
column_names = ['frame_id', 'object_id', 'x', 'y', 'width', 'height', 'confidence', 'class', 'visibility']
mot_file_path="/Users/benmartin/Library/CloudStorage/GoogleDrive-btmarti25@gmail.com/My Drive/Projects/LoomExp2022/tracking/331/right/BotSort.txt"
tracking_data = pd.read_csv(mot_file_path, header=None, names=column_names)

tracking_data = interpolate_tracks(tracking_data)


In [186]:

def draw_bounding_boxes_on_video(video_path, mot_data, output_video_path):
        # Load the video
        cap = cv2.VideoCapture(video_path)
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

        # Initialize video writer
        out = cv2.VideoWriter(output_video_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))
        frame_id = 1
        while True:
            ret, frame = cap.read()
            if not ret:
                break

            # Extract bounding boxes for this frame
            frame_data = mot_data[mot_data['frame_id'] == frame_id]

            for _, row in frame_data.iterrows():
                x, y, w, h = int(row['x']), int(row['y']), int(row['width']), int(row['height'])
                track_id = int(row['object_id'])
                cls = int(row['class'])

                # Draw bounding box and ID
                color = (255, 0, 0) if cls == 1 else (0, 0, 255)
                cv2.rectangle(frame, (x, y), (x + w, y + h), color, 1)  # Reduced thickness to 1
                cv2.putText(frame, str(track_id), (x, y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)  # Positioned text above the bounding box

            out.write(frame)
            frame_id += 1

        cap.release()
        out.release()
video_path = "/Users/benmartin/Library/CloudStorage/GoogleDrive-btmarti25@gmail.com/My Drive/Projects/LoomExp2022/tracking/331/right/331_right.mp4"
column_names = ['frame_id', 'object_id', 'x', 'y', 'width', 'height', 'confidence', 'class', 'visibility']
mot_file_path="/Users/benmartin/Library/CloudStorage/GoogleDrive-btmarti25@gmail.com/My Drive/Projects/LoomExp2022/tracking/331/right/BotSort.txt"
tracking_data = pd.read_csv(mot_file_path, header=None, names=column_names)

draw_bounding_boxes_on_video(video_path, tracking_data, "/Users/benmartin/Library/CloudStorage/GoogleDrive-btmarti25@gmail.com/My Drive/Projects/LoomExp2022/tracking/331/right/331_right_org.mp4")

In [25]:
import os
def process_all_videos_in_tracking_dir(root_path, specific_video=None):
    # Loop through the directories
    for sub_folder_name in os.listdir(root_path):
        sub_folder_path = os.path.join(root_path, sub_folder_name)  # Corrected this line
        
        for side in ['left', 'right']:
            #print(f"{sub_folder_name},{side}")
            #print(sub_folder_path)
            if specific_video and specific_video != f"{sub_folder_name},{side}":
                video_path = os.path.join(sub_folder_path, side, f'{sub_folder_name}_{side}.mp4')

                continue
            print(f"Processing {sub_folder_path},{side}")
            
            # Define the paths for the video, tracking data, and potential output video
            video_path = os.path.join(sub_folder_path, side, f'{sub_folder_name}_{side}.mp4')
            #print(video_path)
            mot_file_path = os.path.join(sub_folder_path, side, 'BotSort.txt')
            #print(mot_file_path)
            output_video_path = os.path.join(sub_folder_path, side, f'{sub_folder_name}_{side}_output.mp4')
            #print(output_video_path)
            # Check if the output video already exists
            if os.path.exists(output_video_path):
                print(f"Output video {output_video_path} already exists. Skipping this video.")
                continue
            print(video_path)
            # Check if the video exists before processing
            if os.path.exists(video_path):
                try:
                    #print(mot_file_path)
                    #Process and merge tracks for this video
                    updated_tracking_data = process_and_merge_tracks(mot_file_path, video_path)
                
                    # Save the updated MOT tracking data in the same directory
                    updated_mot_path = os.path.join(sub_folder_path, side, 'updated_gt.txt')
                    #print(updated_tracking_data)
                    updated_tracking_data.to_csv(updated_mot_path, index=False, header=False, sep=' ')
                except Exception as e:
                    print(f"An error occurred while processing video: {video_path}. Error: {e}")
                    #continue


# Set the path to the 'tracking' directory
tracking_dir_path = '/Users/benmartin/Library/CloudStorage/GoogleDrive-btmarti25@gmail.com/My Drive/Projects/LoomExp2022/tracking'
process_all_videos_in_tracking_dir(tracking_dir_path)
#process_all_videos_in_tracking_dir(tracking_dir_path, specific_video="134,left")



Processing /Users/benmartin/Library/CloudStorage/GoogleDrive-btmarti25@gmail.com/My Drive/Projects/LoomExp2022/tracking/61,left
/Users/benmartin/Library/CloudStorage/GoogleDrive-btmarti25@gmail.com/My Drive/Projects/LoomExp2022/tracking/61/left/61_left.mp4
/Users/benmartin/Library/CloudStorage/GoogleDrive-btmarti25@gmail.com/My Drive/Projects/LoomExp2022/tracking/61/left/61_left_median.mp4 already exists. Skipping background subtraction.
2.0
12.0
16.0
19.0
8.0
Processing /Users/benmartin/Library/CloudStorage/GoogleDrive-btmarti25@gmail.com/My Drive/Projects/LoomExp2022/tracking/61,right
/Users/benmartin/Library/CloudStorage/GoogleDrive-btmarti25@gmail.com/My Drive/Projects/LoomExp2022/tracking/61/right/61_right.mp4
/Users/benmartin/Library/CloudStorage/GoogleDrive-btmarti25@gmail.com/My Drive/Projects/LoomExp2022/tracking/61/right/61_right_median.mp4 already exists. Skipping background subtraction.
1.0
2.0
4.0
9.0
Processing /Users/benmartin/Library/CloudStorage/GoogleDrive-btmarti25@g

In [99]:
video_path="/Users/benmartin/Library/CloudStorage/GoogleDrive-btmarti25@gmail.com/My Drive/Projects/LoomExp2022/tracking/433/left/433_left.mp4"

os.path.exists(video_path)

True

In [162]:
import shutil
from pathlib import Path

def save_tracks_to_mot_format_from_txt(input_txt_path: Path, save_path: Path):
    # Ensure the input and save paths are pathlib Path objects
    input_txt_path = Path(input_txt_path)
    save_path = Path(save_path)
    
    # Read the tracks from the input txt file
    with open(input_txt_path, "r") as file:
        lines = file.readlines()
        mot_tracks = [line.strip().split(",") for line in lines]
    
    # 1. Create a folder named `gt`.
    track_folder = save_path / "gt"
    track_folder.mkdir(parents=True, exist_ok=True)
    
    # 2. Inside the `gt` folder, create a file named `labels.txt` and write "fish" into it.
    with open(track_folder / "labels.txt", "w") as wf:
        wf.write("fish")
        
    # 3. Save the tracks as a text file named `gt.txt` inside the `gt` folder.
    with open(track_folder / "gt.txt", "w") as file:
        for track in mot_tracks:
            # Convert the first seven items of each track to integers
            track_int = [int(item) for item in track[:9]]
            combined_track = track_int + list(track[7:])
            file.write(",".join(map(str, combined_track)) + "\n")
            
    # 4. Zip the `gt` folder.
    shutil.make_archive(save_path, 'zip', save_path, 'gt')
    
    # Optionally remove the `gt` folder after zipping
    shutil.rmtree(track_folder)

# Example usage:
input_txt_path = Path("/path/to/your/input.txt")
save_path = Path("/path/to/your/save/location")
save_tracks_to_mot_format_from_txt(input_txt_path, save_path)


FileNotFoundError: [Errno 2] No such file or directory: '/path/to/your/input.txt'

In [83]:
from collections import defaultdict
import datetime
import math
def mot_to_cvat_xml_adaptive_v2(mot_file_path, video_name, width, height, movement_threshold=20, keyframe_frequency=10):
    """
    Convert a MOT format file to CVAT XML for videos.
    
    Parameters:
    - mot_file_path (str): path to the MOT file.
    - video_name (str): name of the video.
    - width (int): width of the video frames.
    - height (int): height of the video frames.
    - movement_threshold (float): threshold distance to decide if an object has moved significantly.
    - keyframe_frequency (int): frequency at which frames should be checked for significant movement.

    Returns:
    - str: path to the converted CVAT XML file.
    """

    # Read the MOT file and process detections
    detections = defaultdict(list)
    with open(mot_file_path, 'r') as file:
        for line in file:
            frame_id, track_id, left, top, w, h, _, _, _ = map(float, line.strip().split(','))
            detections[int(track_id)].append({
                'frame_id': int(frame_id)-1,
                'left': left,
                'top': top,
                'width': w,
                'height': h
            })

    # Generate the CVAT XML for video
    root = ET.Element("annotations")
    meta = ET.SubElement(root, "meta")
    ET.SubElement(meta, "task", {
        "id": "unknown",
        "name": video_name,
        "size": str(sum([len(detections[track]) for track in detections])),
        "mode": "interpolation",
        "overlap": "0",
        "bugtracker": "",
        "created": str(datetime.datetime.now()),
        "updated": str(datetime.datetime.now()),
        "start_frame": "0",
        "stop_frame": "unknown",
        "frame_filter": "",
        "z_order": "0"
    })
    original_size = ET.SubElement(meta, "original_size")
    ET.SubElement(original_size, "height").text = str(height)
    ET.SubElement(original_size, "width").text = str(width)
    
    # For each track in the MOT file
    for track_id, track_detections in detections.items():
        # Sort detections by frame_id
        track_detections.sort(key=lambda x: x['frame_id'])

        # Set the first and last detection as keyframes
        track_detections[0]['keyframe'] = True
        track_detections[-1]['keyframe'] = True

        # Iterate through detections and check for significant movement
        last_keyframe_index = 0
        for i in range(1, len(track_detections)):
            if track_detections[i]['frame_id'] % keyframe_frequency == 0:
                distance_moved = math.sqrt(
                    (track_detections[i]['left'] - track_detections[last_keyframe_index]['left'])**2 + 
                    (track_detections[i]['top'] - track_detections[last_keyframe_index]['top'])**2
                )
                if distance_moved > movement_threshold:
                    track_detections[i]['keyframe'] = True
                    last_keyframe_index = i

        # Add track data to XML
        track = ET.SubElement(root, "track", {
            "id": str(track_id),
            "label": "fish",
            "source": "manual"
        })
        
        for detection in track_detections:
            box_attr = {
                "frame": str(detection['frame_id']),
                "outside": "0",
                "occluded": "0",
                "keyframe": "1" if detection.get('keyframe', False) else "0",
                "xtl": str(detection['left']),
                "ytl": str(detection['top']),
                "xbr": str(detection['left'] + detection['width']),
                "ybr": str(detection['top'] + detection['height'])
            }
            ET.SubElement(track, "box", box_attr)

    # Save the XML to a file
    output_file_path = mot_file_path.replace(".txt", "_cvat.xml")
    tree = ET.ElementTree(root)
    tree.write(output_file_path)

    return output_file_path

# Returning the function for user's convenience
mot_to_cvat_xml_adaptive_v2

# Returning the function for user's convenience


# Returning the function for user's convenience
mot_to_cvat_xml_adaptive
gtpath="/Users/benmartin/Library/CloudStorage/GoogleDrive-btmarti25@gmail.com/My Drive/Projects/LoomExp2022/tracking/8/gt 2/gt.txt"
output_path = mot_to_cvat_xml_adaptive_v2(mot_file_path=gtpath, video_name="video_name", width=1920, height=1280)



In [161]:
from collections import defaultdict
import csv
import xml.etree.ElementTree as ET

def mot_to_cvat_xml(mot_file_path, video_name, width, height, keyframe_frequency=10):
    """
    Convert a MOT format file to CVAT XML for videos.

    Args:
    - mot_file_path (str): path to the input MOT format file.
    - video_name (str): name of the video.
    - width (int): width of the frames in the video.
    - height (int): height of the frames in the video.
    - keyframe_frequency (int, optional): frequency for keyframes. Default is 10.

    Returns:
    - str: path to the converted CVAT XML file.
    """
    
    # Read the MOT file and process detections
    detections = defaultdict(list)
    with open(mot_file_path, 'r') as file:
        reader = csv.reader(file, delimiter=',')
        for row in reader:
            frame, track_id, x, y, w, h, _, _, _ = row
            detections[track_id].append((int(frame) - 1, float(x), float(y), float(x)+float(w), float(y)+float(h)))

    # Create XML
    root = ET.Element("annotations")
    version = ET.SubElement(root, "version").text = "1.1"

    meta = ET.SubElement(root, "meta")
    task = ET.SubElement(meta, "task")
    ET.SubElement(task, "id").text = "unknown"
    ET.SubElement(task, "name").text = video_name
    ET.SubElement(task, "size").text = str(max([int(frame) for detection in detections.values() for frame, *_ in detection]))
    ET.SubElement(task, "mode").text = "interpolation"
    ET.SubElement(task, "overlap").text = "0"
    ET.SubElement(task, "bugtracker").text = ""
    ET.SubElement(task, "flipped").text = "False"
    ET.SubElement(task, "labels").append(ET.Element("label", name="fish"))
    ET.SubElement(task, "segments").append(ET.Element("segment", id="0", start="0", stop=str(max([int(frame) for detection in detections.values() for frame, *_ in detection]))))
    ET.SubElement(task, "original_size").append(ET.Element("width", text=str(width)))
    ET.SubElement(task, "original_size").append(ET.Element("height", text=str(height)))

    for track_id, boxes in detections.items():
        track = ET.SubElement(root, "track", id=str(track_id), label="fish")
        for frame, xtl, ytl, xbr, ybr in boxes:
            ET.SubElement(track, "box", frame=str(frame), xtl=str(xtl), ytl=str(ytl), xbr=str(xbr), ybr=str(ybr), outside="0", occluded="0", keyframe="1" if frame % keyframe_frequency == 0 or frame == boxes[0][0] or frame == boxes[-1][0] else "0")

    tree = ET.ElementTree(root)
    output_file_path = mot_file_path.rsplit('.', 1)[0] + "_converted.xml"
    tree.write(output_file_path)

    return output_file_path

# Example usage:
example_output_path = mot_to_cvat_xml("/Users/benmartin/Library/CloudStorage/GoogleDrive-btmarti25@gmail.com/My Drive/Projects/LoomExp2022/tracking/8/gt 2/gt.txt", "video", 1920, 1080, keyframe_frequency=20)
example_output_path


'/Users/benmartin/Library/CloudStorage/GoogleDrive-btmarti25@gmail.com/My Drive/Projects/LoomExp2022/tracking/8/gt 2/gt_converted.xml'