In [None]:
import pandas as pd
import numpy as np
from sorted_alpha import sorted_alpha
import os
from filterpy.kalman import KalmanFilter
from scipy.optimize import linear_sum_assignment
import cv2
def get_skeletons(frame_no, detection):
    skeletons = []
    data = pd.read_csv(detection, index_col=0)
    
    prob_columns = [col for col in data.columns if col.startswith('prob')]
    scores = [data[col].mean() for col in prob_columns]
    
    new_data = data[[col for col in data.columns if not col.startswith('prob')]]

    for x, y in zip(new_data.columns[::2], new_data.columns[1::2]):
        skeletons.append(new_data[[x, y]].to_numpy())
    
    valid_skeletons = []
    for skeleton in skeletons:
        if not (np.nanmin(skeleton[:, 0]) == np.nanmax(skeleton[:, 0]) or
                np.nanmin(skeleton[:, 1]) == np.nanmax(skeleton[:, 1])):
            valid_skeletons.append(skeleton)
    
    detections = []
    for skeleton, score in zip(valid_skeletons, scores):
        x1, y1 = np.nanmin(skeleton, axis=0)
        x2, y2 = np.nanmax(skeleton, axis=0)
        detections.append({'bbox': [x1, y1, x2, y2], 'score': score})

    return detections


In [None]:
def get_bb(detection):
    """takes a detection and returns its bounding box in format [min(x):min(y), max(x):max(y)]"""
    min_x = np.min(detection[:,0][np.isfinite(detection[:,0])])
    min_y = np.min(detection[:,1][np.isfinite(detection[:,0])])
    max_x = np.max(detection[:,0][np.isfinite(detection[:,0])])
    max_y = np.max(detection[:,1][np.isfinite(detection[:,0])])
    return(np.array([min_x, min_y, max_x, max_y]).astype(np.float32))

In [None]:
def find_min_round(coord):
    coord = np.array(coord)
    x = round(np.min(coord[np.isfinite(coord)]))
    return(x)

def find_max_round(coord):
    coord = np.array(coord)
    x = round(np.max(coord[np.isfinite(coord)]))
    return(x)

In [None]:
import pandas as pd
import numpy as np

def get_skeletons(frame_no, detection):
    """function that reads a detections frame, checks if necks are present, and returns the skeletons"""
    # returns skeletons as numpy arrays of shape (18,2)
    # each x-y pair is a np array
    # so each skeleton is an array of arrays 
    skeletons = []
    data = pd.read_csv(detection, index_col=0)
    
    # Remove columns that start with 'prob' and compute the average score for each
    prob_columns = [col for col in data.columns if col.startswith('prob')]
    scores = [data[col].mean() for col in prob_columns]
    
    # Remove 'prob' columns from the data
    new_data = data[[col for col in data.columns if not col.startswith('prob')]]

    for x, y in zip(new_data.columns[::2], new_data.columns[1::2]):
        skeletons.append(new_data[[x, y]].to_numpy())
    
    # Ensure that there are at least 4 different points for the bounding box
    valid_skeletons = []
    for skeleton in skeletons:
        if not (np.nanmin(skeleton[:, 0]) == np.nanmax(skeleton[:, 0]) or
                np.nanmin(skeleton[:, 1]) == np.nanmax(skeleton[:, 1])):
            valid_skeletons.append(skeleton)
    
    # Create detections list
    detections = []
    for skeleton, score in zip(valid_skeletons, scores):
        x1, y1 = np.nanmin(skeleton, axis=0)
        x2, y2 = np.nanmax(skeleton, axis=0)
        detections.append({'bbox': [x1, y1, x2, y2], 'score': score})

    return detections  # this is a list of detections with bounding boxes and scores

In [None]:
folder = '/Users/andrei-macpro/Documents/Data/pose/meal_openpose/1043_meal'
detections = sorted_alpha(folder)

In [None]:
for detection in detections:
    frame_no = int(detection.split('_')[-1].split('.')[0])
    processed_det = get_skeletons(frame_no, os.path.join(folder, detection))
    print(processed_det )

In [None]:
class OCSORTKalmanBoxTracker:
    def __init__(self, bbox):
        """
        Initialize the Kalman filter for a bounding box.
        bbox: [x1, y1, x2, y2] initial bounding box
        """
        # Define the Kalman filter
        self.kf = KalmanFilter(dim_x=7, dim_z=4)  # 7 states (x, y, dx, dy, h, dh, score), 4 measurements (x, y, h, score)
        
        # State transition matrix
        self.kf.F = np.eye(7)
        self.kf.F[0, 2] = 1  # Velocity model for x
        self.kf.F[1, 3] = 1  # Velocity model for y
        
        # Measurement matrix (directly centered around observation)
        self.kf.H = np.array([[1, 0, 0, 0, 0, 0, 0],
                              [0, 1, 0, 0, 0, 0, 0],
                              [0, 0, 0, 0, 1, 0, 0],
                              [0, 0, 0, 0, 0, 0, 1]])

        # Covariance matrix
        self.kf.P *= 10  # Initial uncertainty
        self.kf.R[2:, 2:] *= 10  # Measurement noise
        self.kf.Q[-1, -1] *= 0.1  # Process noise
        
        self.time_since_update = 0
        self.history = []
        self.age = 0
        self.id = None
        
        # Initialize state vector from bbox
        self.update(bbox)
    
    def predict(self):
        """
        Predict the next state.
        """
        self.kf.predict()
        self.time_since_update += 1
        self.age += 1
    
    def update(self, bbox):
        """
        Update the state with observed bounding box.
        bbox: [x1, y1, x2, y2]
        """
        self.kf.update(np.array([bbox[0], bbox[1], bbox[2], bbox[3]]))
        self.time_since_update = 0

In [None]:
class OCSORTKalmanBoxTrackerWithAccel(OCSORTKalmanBoxTracker):
    def __init__(self, bbox):
        super().__init__(bbox)
        # Add acceleration to the state transition model
        self.kf.F = np.eye(9)
        self.kf.F[0, 2] = 1  # Velocity for x
        self.kf.F[1, 3] = 1  # Velocity for y
        self.kf.F[2, 4] = 1  # Acceleration for x
        self.kf.F[3, 5] = 1  # Acceleration for y
        
        # Modify measurement matrix H to account for acceleration
        self.kf.H = np.array([[1, 0, 0, 0, 0, 0, 0, 0, 0],
                              [0, 1, 0, 0, 0, 0, 0, 0, 0],
                              [0, 0, 0, 0, 1, 0, 0, 0, 0],
                              [0, 0, 0, 0, 0, 0, 1, 0, 0]])

In [None]:
def iou(bbox1, bbox2):
    """
    Compute IoU between two bounding boxes.
    bbox: [x1, y1, x2, y2]
    """
    x1, y1, x2, y2 = max(bbox1[0], bbox2[0]), max(bbox1[1], bbox2[1]), min(bbox1[2], bbox2[2]), min(bbox1[3], bbox2[3])
    intersection = max(0, x2 - x1) * max(0, y2 - y1)
    area1 = (bbox1[2] - bbox1[0]) * (bbox1[3] - bbox1[1])
    area2 = (bbox2[2] - bbox2[0]) * (bbox2[3] - bbox2[1])
    union = area1 + area2 - intersection
    return intersection / union if union > 0 else 0

In [None]:
def adaptive_metric(tracker_bbox, detection_bbox, observation_point1, observation_point2, alpha=0.5):
    """
    Adaptive distance metric combining IoU and observation-based Euclidean distance.
    tracker_bbox: [x1, y1, x2, y2] from tracker
    detection_bbox: [x1, y1, x2, y2] from detection
    observation_point1: [x, y] central observation of the tracker
    observation_point2: [x, y] central observation of the detection
    alpha: weighting factor between IoU and Euclidean distance
    """
    iou_score = iou(tracker_bbox, detection_bbox)
    euclidean_distance = np.linalg.norm(np.array(observation_point1) - np.array(observation_point2))
    
    return alpha * iou_score + (1 - alpha) * (1 / (1 + euclidean_distance))  # Normalize distance

In [None]:
class OCSORTTracker:
    def __init__(self):
        self.trackers = []
    
    def update(self, detections):
        # Predict new positions of the existing trackers
        for tracker in self.trackers:
            tracker.predict()

        # Associate detections to trackers using observation-based adaptive metric
        matches, unmatched_detections = self.associate_detections(detections)
        
        # Update matched trackers
        for match in matches:
            tracker_idx, detection_idx = match
            self.trackers[tracker_idx].update(detections[detection_idx])
        
        # Create new trackers for unmatched detections
        for idx in unmatched_detections:
            self.trackers.append(OCSORTKalmanBoxTracker(detections[idx]['bbox']))
    
    def associate_detections(self, detections):
        # Implementation of association strategy with IoU and observation metric
        iou_matrix = np.zeros((len(self.trackers), len(detections)))
        observation_matrix = np.zeros((len(self.trackers), len(detections)))
        
        for t, tracker in enumerate(self.trackers):
            for d, detection in enumerate(detections):
                iou_matrix[t, d] = iou(tracker.kf.x[:4], detection['bbox'])
                observation_matrix[t, d] = adaptive_metric(tracker.kf.x[:4], detection['bbox'], tracker.kf.x[:2])
        
        # Combine IoU and observation metrics
        combined_matrix = iou_matrix + observation_matrix
        
        # Solve the assignment problem using the Hungarian algorithm
        row_indices, col_indices = linear_sum_assignment(-combined_matrix)
        
        matches = []
        unmatched_detections = list(range(len(detections)))
        
        for row, col in zip(row_indices, col_indices):
            if combined_matrix[row, col] > 0:  # Ensure a valid match
                matches.append((row, col))
                unmatched_detections.remove(col)
        
        return matches, unmatched_detections

In [None]:
import numpy as np
import pandas as pd
import cv2
import os
from scipy.optimize import linear_sum_assignment
from filterpy.kalman import KalmanFilter

class OCSORTKalmanBoxTracker:
    def __init__(self, bbox):
        """
        Initialize the tracker with the initial bounding box.
        bbox: [x1, y1, x2, y2] initial bounding box
        """
        # Define the Kalman filter
        self.kf = KalmanFilter(dim_x=7, dim_z=4)  # 7 states (cx, cy, vx, vy, w, h, score), 4 measurements (cx, cy, w, h)
        
        # State transition matrix
        self.kf.F = np.eye(7)
        self.kf.F[0, 2] = 1  # Velocity model for x
        self.kf.F[1, 3] = 1  # Velocity model for y
        self.kf.F[4, 5] = 1  # Velocity model for width and height
        
        # Measurement matrix (directly centered around observation)
        self.kf.H = np.array([[1, 0, 0, 0, 0, 0, 0],
                              [0, 1, 0, 0, 0, 0, 0],
                              [0, 0, 0, 0, 1, 0, 0],
                              [0, 0, 0, 0, 0, 1, 0]])

        # Covariance matrix
        self.kf.P *= 10  # Initial uncertainty
        self.kf.R[2:, 2:] *= 10  # Measurement noise
        self.kf.Q[-1, -1] *= 0.1  # Process noise
        self.kf.Q[4:, 4:] *= 0.1  # Process noise for width and height

        self.time_since_update = 0
        self.history = []
        self.age = 0
        self.id = None
        self.lost_frames = 0  # Initialize lost_frames attribute
        
        # Initialize state vector from bbox
        self.init_state(bbox)
    
    def init_state(self, bbox):
        """
        Initialize the state vector with the bounding box.
        """
        x1, y1, x2, y2 = bbox
        cx = (x1 + x2) / 2.0  # Center x
        cy = (y1 + y2) / 2.0  # Center y
        w = x2 - x1  # Width
        h = y2 - y1  # Height
        score = 1.0  # Placeholder for the detection score
        self.kf.x = np.array([cx, cy, 0, 0, w, h, score]).reshape((7, 1))
        print("Kalman Filter initialized with bounding box:", self.kf.x.flatten())
    
    def predict(self):
        """
        Predict the next state.
        """
        self.kf.predict()
        self.time_since_update += 1
        self.age += 1
        print("Predicted state:", self.kf.x.flatten())
    
    def update(self, bbox):
        """
        Update the state with the new bounding box.
        """
        x1, y1, x2, y2 = bbox
        cx = (x1 + x2) / 2.0
        cy = (y1 + y2) / 2.0
        w = x2 - x1  
        h = y2 - y1
        self.kf.update([cx, cy, w, h])
        self.time_since_update = 0
        self.history.append(self.kf.x)
        self.lost_frames = 0  # Reset lost_frames on update
        print("Updated state with bounding box:", self.kf.x.flatten())
    
    def get_state(self):
        """
        Get the current bounding box estimate.
        """
        cx, cy, _, _, w, h, _ = self.kf.x.flatten()
        x1 = cx - w / 2
        y1 = cy - h / 2
        x2 = cx + w / 2
        y2 = cy + h / 2
        return [x1, y1, x2, y2]

def iou(bbox1, bbox2):
    x1, y1, x2, y2 = bbox1
    x1_, y1_, x2_, y2_ = bbox2

    xi1 = max(x1, x1_)
    yi1 = max(y1, y1_)
    xi2 = min(x2, x2_)
    yi2 = min(y2, y2_)

    inter_area = max(0, xi2 - xi1) * max(0, yi2 - yi1)
    bbox1_area = (x2 - x1) * (y2 - y1)
    bbox2_area = (x2_ - x1_) * (y2_ - y1_)

    union_area = bbox1_area + bbox2_area - inter_area

    return inter_area / union_area

def adaptive_metric(bbox1, bbox2, state):
    return np.linalg.norm(state - np.array([bbox2[0], bbox2[1]]))

class OCSORTTracker:
    def __init__(self):
        self.trackers = []
        self.lost_trackers = []
        self.max_lost_frames = 5
    
    def update(self, detections):
        for tracker in self.trackers:
            tracker.predict()

        matches, unmatched_detections = self.associate_detections(detections)
        
        for match in matches:
            tracker_idx, detection_idx = match
            self.trackers[tracker_idx].update(detections[detection_idx]['bbox'])
        
        for idx in unmatched_detections:
            self.trackers.append(OCSORTKalmanBoxTracker(detections[idx]['bbox']))
        
        self.lost_trackers = [tracker for tracker in self.trackers if tracker not in matches]
        self.trackers = [tracker for tracker in self.trackers if tracker in matches]
        
        for tracker in self.lost_trackers:
            tracker.lost_frames += 1
            if tracker.lost_frames > self.max_lost_frames:
                self.lost_trackers.remove(tracker)
            else:
                self.trackers.append(tracker)
    
    def associate_detections(self, detections):
        iou_matrix = np.zeros((len(self.trackers), len(detections)))
        observation_matrix = np.zeros((len(self.trackers), len(detections)))
        
        for t, tracker in enumerate(self.trackers):
            for d, detection in enumerate(detections):
                iou_matrix[t, d] = iou(tracker.kf.x[:4], detection['bbox'])
                observation_matrix[t, d] = adaptive_metric(tracker.kf.x[:4], detection['bbox'], tracker.kf.x[:2])
        
        combined_matrix = iou_matrix + observation_matrix
        
        row_indices, col_indices = linear_sum_assignment(-combined_matrix)
        
        matches = []
        unmatched_detections = list(range(len(detections)))
        
        for row, col in zip(row_indices, col_indices):
            if combined_matrix[row, col] > 0.3:
                matches.append((row, col))
                unmatched_detections.remove(col)
        
        return matches, unmatched_detections




In [None]:
import os
import cv2

# Example usage
folder = '/Users/andrei-macpro/Documents/Data/pose/meal_openpose/1047_meal'
detection_files = sorted_alpha(folder)

video_path = '/Users/andrei-macpro/Documents/Data/videos/meal_videos/1047_meal.mp4'
cap = cv2.VideoCapture(video_path)

tracker = OCSORTTracker()

print("Detection files:", detection_files)

for detection_file in detection_files:
    frame_no = int(detection_file.split('_')[-1].split('.')[0])
    print(f'Processing detection file: {detection_file}, frame number: {frame_no}')
    cap.set(cv2.CAP_PROP_POS_FRAMES, frame_no)

    ret, frame = cap.read()
    if not ret:
        print(f'Failed to read frame {frame_no}')
        continue
    
    detections = get_skeletons(frame_no, os.path.join(folder, detection_file))
    tracker.update(detections)
    
    for t in tracker.trackers:
        bbox = t.get_state()

        x1, y1, x2, y2 = map(int, bbox)

        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
    
    # Draw the x, y grid
    step_size = 50  # Adjust the step size as needed
    for x in range(0, frame.shape[1], step_size):
        cv2.putText(frame, f'{x}', (x, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 1, cv2.LINE_AA)
    for y in range(0, frame.shape[0], step_size):
        cv2.putText(frame, f'{y}', (10, y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 1, cv2.LINE_AA)
    
    cv2.imshow('Tracking', frame)
    while True:
        key = cv2.waitKey(1) & 0xFF
        if key == ord('q'):
            cap.release()
            cv2.destroyAllWindows()
            exit()
        elif key == 32:  # Space bar key code
            break

cap.release()
cv2.destroyAllWindows()

In [None]:
detection_files = sorted_alpha(folder)
detection_files