# Import Modules

In [8]:
import cv2
import os
import numpy as np

## Define and Specify paths

In [2]:
#Define my paths
def create_directory_if_not_exists(path):
    # Check if the directory exists
    if not os.path.exists(path):
        # Create the directory since it does not exist
        os.makedirs(path)
        print(f"Directory '{path}' created.")
    else:
        print(f"Directory '{path}' already exists.")

# Specify the path where you want to create the directory
train_path1 = "train_path1"
train_path2 = "train_path2"
test_path = "test_path"
create_directory_if_not_exists(train_path1)
create_directory_if_not_exists(train_path2)
create_directory_if_not_exists(test_path)


Directory 'train_path1' created.
Directory 'train_path2' created.
Directory 'test_path' created.


# Extract Video Frames for Analysis

In [3]:
def extract_frames(video_path, output_folder):
    cap = cv2.VideoCapture(video_path)
    frame_count = 0

    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        frame_path = os.path.join(output_folder, f"frame_{frame_count:04d}.png")
        cv2.imwrite(frame_path, frame)
        frame_count += 1

    cap.release()

video_train1 = "training-video-001.mp4"
video_train2 = "training-video-002.mp4"
video_test = "testing-video-001.mp4"

extract_frames(video_train1, train_path1)
extract_frames(video_train2, train_path2)
extract_frames(video_test, test_path)

# A walkthrough inside the directories

In [4]:
def count_images(path, ext=".png"):
  jpg_count = 0
  for item in os.listdir(path):
    full_path = os.path.join(path, item)
    if os.path.isfile(full_path) and item.lower().endswith(ext):
      jpg_count += 1
    elif os.path.isdir(full_path):
      jpg_count += count_images(full_path)
  return jpg_count


def look_at_dir(path):
  for dirpath, dirnames, filenames in os.walk(path):
    print(f"There are {len(dirnames)} directories and {len(filenames)} images in '{dirpath}'.")

count_train_path1 = count_images(train_path1)
count_train_path2 = count_images(train_path2)
count_test_path = count_images(test_path)

print(f"Total number of image files train_path1 folder: {count_train_path1}")
print(f"Total number of image files train_path2 folder {count_train_path2}")
print(f"Total number of image files test_path folder {count_test_path}")

Total number of image files train_path1 folder: 1208
Total number of image files train_path2 folder 1208
Total number of image files test_path folder 1208


Given that you have 1208 frames extracted from a 40-second video, we can calculate the frame rate (FPS) using the following formula:

$$FPS = \frac{\text{Number of Frames}}{\text{Video duration in seconds}} = \frac{1208}{40} = 30.2 \text{  frame rate per second}$$

Thus, the videos must be recorded in a 30 FPS

# Background Subtraction and Contour Detection

In [5]:
# Initialize background subtractor
fgbg = cv2.createBackgroundSubtractorMOG2()

# Function to calculate the centroid of a contour
def get_centroid(contour):
    M = cv2.moments(contour)
    if M['m00'] != 0:
        cx = int(M['m10'] / M['m00'])
        cy = int(M['m01'] / M['m00'])
        return (cx, cy)
    else:
        return None

# Track player positions across frames
def track_players(frames_folder):
    frame_files = sorted([os.path.join(frames_folder, f) for f in os.listdir(frames_folder) if f.endswith('.png')])
    player_positions = {}

    for frame_file in frame_files:
        frame = cv2.imread(frame_file)
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        fgmask = fgbg.apply(gray)
        contours, _ = cv2.findContours(fgmask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

        frame_centroids = []
        for contour in contours:
            if cv2.contourArea(contour) > 500:
                centroid = get_centroid(contour)
                if centroid:
                    frame_centroids.append(centroid)

        # Logic to associate centroids with player IDs
        for i, centroid in enumerate(frame_centroids):
            player_positions.setdefault(i, []).append(centroid)

        # Display frame with centroids (optional)
        for centroid in frame_centroids:
            cv2.circle(frame, centroid, 5, (0, 255, 0), -1)

    return player_positions

##Calculate Player Positions
player_positions1 = track_players(train_path1)
player_positions2 = track_players(train_path2)
player_positions_test = track_players(test_path)

In [50]:
from itertools import islice

# Get the first 5 entries from the dictionary
player_positions1 = dict(islice(player_positions1.items(), 5))
player_positions2 = dict(islice(player_positions2.items(), 5))
player_positions_test = dict(islice(player_positions_test.items(), 5))

dict_keys([0, 1, 2, 3, 4])

In [51]:
import pickle

# Save player positions to files
with open('player_positions1.pkl', 'wb') as f:
    pickle.dump(player_positions1, f)
with open('player_positions2.pkl', 'wb') as f:
    pickle.dump(player_positions2, f)
with open('player_positions_test.pkl', 'wb') as f:
    pickle.dump(player_positions_test, f)

# Visualization Attemp for the Centroids

In [7]:
import cv2
import os

def visualize_player_positions(frames_folder, player_positions):
    frame_files = sorted([os.path.join(frames_folder, f) for f in os.listdir(frames_folder) if f.endswith('.png')])
    
    for i, frame_file in enumerate(frame_files):
        frame = cv2.imread(frame_file)
        
        # Draw the centroids on the frame
        for player_id, positions in player_positions.items():
            if i < len(positions):
                centroid = positions[i]
                cv2.circle(frame, centroid, 5, (0, 255, 0), -1)  # Draw the centroid
                
                # Optionally, display player ID near the centroid
                cv2.putText(frame, str(player_id), (centroid[0] + 10, centroid[1]), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
        
        # Display the frame
        cv2.imshow('Frame with Tracked Positions', frame)
        if cv2.waitKey(30) & 0xFF == 27:  # Press 'Esc' to exit
            break

    cv2.destroyAllWindows()

# Example usage (visualize the pre-calculated player positions)
visualize_player_positions(train_path1, player_positions1)
# visualize_player_positions('training_frames2', player_positions2)
# visualize_player_positions('test_frames', player_positions_test)

In [10]:
def detect_phase(frame, template, threshold=0.7):
    """Detect phase (move/stop) based on template matching."""
    gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    res = cv2.matchTemplate(gray_frame, template, cv2.TM_CCOEFF_NORMED)
    loc = np.where(res >= threshold)
    if len(loc[0]) > 0:
        return 'stop'
    else:
        return 'move'

# Load the template image of the doll looking at the players
template = cv2.imread('stopped.png', cv2.IMREAD_GRAYSCALE)

# Classify each frame
def classify_frames_with_template(frames_folder, template):
    frame_files = sorted([os.path.join(frames_folder, f) for f in os.listdir(frames_folder) if f.endswith('.png')])
    phases = {}

    for frame_file in frame_files:
        frame = cv2.imread(frame_file)
        phase = detect_phase(frame, template)
        phases[frame_file] = phase

    return phases

# Example usage
phases1 = classify_frames_with_template(train_path1, template)
phases2 = classify_frames_with_template(train_path2, template)
phases_test = classify_frames_with_template(test_path, template)

# Include ROI in Detection phase

In [27]:
# Load the template image (ROI)
template = cv2.imread('roi_template.png', cv2.IMREAD_GRAYSCALE)
template_height, template_width = template.shape

# Define the Region of Interest (ROI) in the frame where the doll is expected to be located
# Adjust these values based on the location of the doll in your frames
roi_x, roi_y, roi_width, roi_height = 0, 0, 500, 1080

# Function to detect the doll's phase (move/stop) using ROI
def detect_phase(frame, template, threshold=0.8):
    """Detect phase (move/stop) based on template matching within ROI."""
    gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    
    # Extract the ROI from the frame
    roi = gray_frame[roi_y:roi_y + roi_height, roi_x:roi_x + roi_width]
    
    res = cv2.matchTemplate(roi, template, cv2.TM_CCOEFF_NORMED)
    _, max_val, _, max_loc = cv2.minMaxLoc(res)

    # Adjust coordinates to the original frame
    top_left = (max_loc[0] + roi_x, max_loc[1] + roi_y)
    bottom_right = (top_left[0] + template_width, top_left[1] + template_height)

    # Visualize the matching result
    cv2.rectangle(frame, top_left, bottom_right, (0, 255, 0), 2)
    cv2.imshow('Template Matching', frame)
    cv2.waitKey(1)  # Display for a short period

    if max_val >= threshold:
        return 'stop'
    else:
        return 'move'

# Function to classify phases for each frame
def classify_frames_with_template(frames_folder, template, threshold=0.8):
    frame_files = sorted([os.path.join(frames_folder, f) for f in os.listdir(frames_folder) if f.endswith('.png')])
    phases = {}

    for frame_file in frame_files:
        frame = cv2.imread(frame_file)
        phase = detect_phase(frame, template, threshold)
        phases[frame_file] = phase

    cv2.destroyAllWindows()
    return phases

# Example usage
# Replace 'train_path1', 'train_path2', and 'test_path' with the actual paths to your frames folders
phases1 = classify_frames_with_template(train_path1, template)
phases2 = classify_frames_with_template(train_path2, template)
phases_test = classify_frames_with_template(test_path, template)

# Print the detected phases for verification
print(f"Phases (Training 1): {phases1}")
print(f"Phases (Training 2): {phases2}")
print(f"Phases (Test): {phases_test}")

Phases (Training 1): {'train_path1/frame_0000.png': 'move', 'train_path1/frame_0001.png': 'move', 'train_path1/frame_0002.png': 'move', 'train_path1/frame_0003.png': 'move', 'train_path1/frame_0004.png': 'move', 'train_path1/frame_0005.png': 'move', 'train_path1/frame_0006.png': 'move', 'train_path1/frame_0007.png': 'move', 'train_path1/frame_0008.png': 'move', 'train_path1/frame_0009.png': 'move', 'train_path1/frame_0010.png': 'move', 'train_path1/frame_0011.png': 'move', 'train_path1/frame_0012.png': 'move', 'train_path1/frame_0013.png': 'move', 'train_path1/frame_0014.png': 'move', 'train_path1/frame_0015.png': 'move', 'train_path1/frame_0016.png': 'move', 'train_path1/frame_0017.png': 'move', 'train_path1/frame_0018.png': 'move', 'train_path1/frame_0019.png': 'move', 'train_path1/frame_0020.png': 'move', 'train_path1/frame_0021.png': 'move', 'train_path1/frame_0022.png': 'move', 'train_path1/frame_0023.png': 'move', 'train_path1/frame_0024.png': 'move', 'train_path1/frame_0025.png'

In [None]:
# def calculate_distance(point1, point2):
#     """Calculate Euclidean distance between two points."""
#     return np.sqrt((point1[0] - point2[0])**2 + (point1[1] - point2[1])**2)

# def average_rate_of_motion(player_positions):
#     """Calculate the average rate of motion per player."""
#     average_motion = {}
#     for player_id, positions in player_positions.items():
#         if len(positions) > 1:
#             distances = [calculate_distance(positions[i], positions[i + 1]) for i in range(len(positions) - 1)]
#             average_motion[player_id] = np.mean(distances)
#     return average_motion

# def relative_distance_progressed(player_positions):
#     """Calculate the cumulative distance each player manages to advance per movement cycle."""
#     cumulative_distance = {}
#     for player_id, positions in player_positions.items():
#         if len(positions) > 1:
#             distances = [calculate_distance(positions[i], positions[i + 1]) for i in range(len(positions) - 1)]
#             cumulative_distance[player_id] = np.sum(distances)
#     return cumulative_distance


# def consistency_in_motion_patterns(player_positions):
#     """Identify patterns in players’ motions."""
#     motion_patterns = {}
#     for player_id, positions in player_positions.items():
#         if len(positions) > 1:
#             distances = [calculate_distance(positions[i], positions[i + 1]) for i in range(len(positions) - 1)]
#             variations = np.std(distances)  # Standard deviation as a measure of variation
#             motion_patterns[player_id] = variations
#     return motion_patterns


# def filter_movements_by_phase(player_positions, phases):
#     """Filter out player movements during 'stop' phases."""
#     filtered_positions = {}

#     for player_id, positions in player_positions.items():
#         filtered_positions[player_id] = []
#         for i, position in enumerate(positions):
#             frame_file = f'frame_{i:04d}.png'
#             if phases.get(frame_file) == 'move':
#                 filtered_positions[player_id].append(position)

#     return filtered_positions

# # Example usage
# filtered_positions1 = filter_movements_by_phase(player_positions1, phases1)
# filtered_positions2 = filter_movements_by_phase(player_positions2, phases2)
# filtered_positions_test = filter_movements_by_phase(player_positions_test, phases_test)

# # Recalculate metrics using filtered positions
# avg_motion1 = average_rate_of_motion(filtered_positions1)
# avg_motion2 = average_rate_of_motion(filtered_positions2)
# avg_motion_test = average_rate_of_motion(filtered_positions_test)

# distance_progressed1 = relative_distance_progressed(filtered_positions1)
# distance_progressed2 = relative_distance_progressed(filtered_positions2)
# distance_progressed_test = relative_distance_progressed(filtered_positions_test)

# motion_patterns1 = consistency_in_motion_patterns(filtered_positions1)
# motion_patterns2 = consistency_in_motion_patterns(filtered_positions2)
# motion_patterns_test = consistency_in_motion_patterns(filtered_positions_test)

# print("Average Rate of Motion per Player (Training 1):", avg_motion1)
# print("Average Rate of Motion per Player (Training 2):", avg_motion2)
# print("Average Rate of Motion per Player (Test):", avg_motion_test)

# print("Relative Distance Progressed (Training 1):", distance_progressed1)
# print("Relative Distance Progressed (Training 2):", distance_progressed2)
# print("Relative Distance Progressed (Test):", distance_progressed_test)

# print("Consistency in Motion Patterns (Training 1):", motion_patterns1)
# print("Consistency in Motion Patterns (Training 2):", motion_patterns2)
# print("Consistency in Motion Patterns (Test):", motion_patterns_test)

# Metric Calculation

In [53]:
def calculate_distance(point1, point2):
    """Calculate Euclidean distance between two points."""
    return np.sqrt((point1[0] - point2[0])**2 + (point1[1] - point2[1])**2)

def average_rate_of_motion(player_positions):
    """Calculate the average rate of motion per player."""
    average_motion = {}
    for player_id, positions in player_positions.items():
        if len(positions) > 1:
            distances = [calculate_distance(positions[i], positions[i + 1]) 
                         for i in range(len(positions) - 1)]
            average_motion[player_id] = np.mean(distances)
    return average_motion

def relative_distance_progressed(player_positions):
    """Calculate the cumulative distance each player manages to advance per movement cycle."""
    cumulative_distance = {}
    for player_id, positions in player_positions.items():
        if len(positions) > 1:
            distances = [calculate_distance(positions[i], positions[i + 1]) 
                         for i in range(len(positions) - 1)]
            cumulative_distance[player_id] = np.sum(distances)
    return cumulative_distance

def consistency_in_motion_patterns(player_positions):
    """Identify patterns in players’ motions."""
    motion_patterns = {}
    for player_id, positions in player_positions.items():
        if len(positions) > 1:
            distances = [calculate_distance(positions[i], positions[i + 1]) 
                         for i in range(len(positions) - 1)]
            variations = np.std(distances)  # Standard deviation as a measure of variation
            motion_patterns[player_id] = variations
    return motion_patterns


# Calculate metrics without filtering
avg_motion1 = average_rate_of_motion(player_positions1)
avg_motion2 = average_rate_of_motion(player_positions2)
avg_motion_test = average_rate_of_motion(player_positions_test)

distance_progressed1 = relative_distance_progressed(player_positions1)
distance_progressed2 = relative_distance_progressed(player_positions2)
distance_progressed_test = relative_distance_progressed(player_positions_test)

motion_patterns1 = consistency_in_motion_patterns(player_positions1)
motion_patterns2 = consistency_in_motion_patterns(player_positions2)
motion_patterns_test = consistency_in_motion_patterns(player_positions_test)

print("Average Rate of Motion per Player (Training 1):", avg_motion1)
print("Average Rate of Motion per Player (Training 2):", avg_motion2)
print("Average Rate of Motion per Player (Test):", avg_motion_test)

print("Relative Distance Progressed (Training 1):", distance_progressed1)
print("Relative Distance Progressed (Training 2):", distance_progressed2)
print("Relative Distance Progressed (Test):", distance_progressed_test)

print("Consistency in Motion Patterns (Training 1):", motion_patterns1)
print("Consistency in Motion Patterns (Training 2):", motion_patterns2)
print("Consistency in Motion Patterns (Test):", motion_patterns_test)

Average Rate of Motion per Player (Training 1): {0: 10.508346711911326, 1: 16.419582815011285, 2: 19.292723434254825, 3: 17.830274200965302, 4: 35.060725765241216}
Average Rate of Motion per Player (Training 2): {0: 4.635162183584238, 1: 12.269180161131974, 2: 9.093452311228448, 3: 12.29867883436366, 4: 12.35619026641398}
Average Rate of Motion per Player (Test): {0: 4.435680144474085, 1: 11.152579889903972, 2: 9.992304474710755, 3: 12.194437802381323, 4: 21.93522595310478}
Relative Distance Progressed (Training 1): {0: 11758.839970628775, 1: 18340.674004367607, 2: 21183.410330811796, 3: 18614.806265807776, 4: 35867.12245784176}
Relative Distance Progressed (Training 2): {0: 5339.706835489042, 1: 14121.826365462903, 2: 10184.666588575863, 3: 13663.832184978026, 4: 13171.698823997303}
Relative Distance Progressed (Test): {0: 5101.032166145197, 1: 12814.314293499665, 2: 11411.211710119682, 3: 13804.103592295658, 4: 24611.323519383564}
Consistency in Motion Patterns (Training 1): {0: 76.1

In [None]:
import matplotlib.pyplot as plt

def plot_player_positions_graph(player_positions, title="Player Positions Over Time"):
    """
    This function plots player positions (trajectories) on a graph over time (across frames).
    """
    plt.figure(figsize=(12, 8))
    for player_id, positions in player_positions.items():
        x_positions = [pos[0] for pos in positions]  # Extract x coordinates
        y_positions = [pos[1] for pos in positions]  # Extract y coordinates
        plt.plot(x_positions, y_positions, label=f"Player {player_id}", marker='o')
    
    plt.title(title)
    plt.xlabel("X Position")
    plt.ylabel("Y Position")
    plt.legend()
    plt.grid(True)
    plt.show()
    
# plot_player_positions_graph(player_positions1, title="Player Positions (Training 1)")
plot_player_positions_graph(player_positions2, title="Player Positions (Training 2)")
# plot_player_positions_graph(player_positions_test, title="Player Positions (Test)")


# Suggested CNN Model (for now)

In [None]:
import torch
import torch.nn as nn



class guard_classifier(nn.Module):
    def __init__(self, input_channels: int, output_shape: int):
        super().__init__()
        self.conv_block_1 = nn.Sequential(
            nn.Conv2d(in_channels=input_channels,
                      out_channels=32,
                      kernel_size=(3,3),
                      stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(in_channels=32,
                      out_channels=32,
                      kernel_size=(3,3),
                      stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(2,2)),
        )
        self.conv_block_2 = nn.Sequential(
            nn.Conv2d(in_channels=32,
                      out_channels=64,
                      kernel_size=(3,3),
                      stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(in_channels=64,
                      out_channels=64,
                      kernel_size=(3,3),
                      stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(2,2)),
        )
        self.conv_block_3 = nn.Sequential(
            nn.Conv2d(in_channels=64,
                      out_channels=128,
                      kernel_size=(3,3),
                      stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(in_channels=128,
                      out_channels=128,
                      kernel_size=(3,3),
                      stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(2,2)),
        )
        # Calculate the size of the flattened features after the convolutional layers
        # Assuming input size of 1920x1080, after three max pooling layers (each 2x2),
        # the size will be reduced to 240x135
        flattened_size = 128 * 240 * 135

        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features=flattened_size,
                      out_features=256),
            nn.ReLU(),
            nn.Linear(in_features=256,
                      out_features=256),
            nn.ReLU(),
            nn.Linear(in_features=256,
                      out_features=output_shape)
        )

    def forward(self, x: torch.Tensor):
        x = self.conv_block_1(x)
        x = self.conv_block_2(x)
        x = self.conv_block_3(x)
        x = self.classifier(x)
        return x

model = guard_classifier(input_channels=3, output_shape=2)
print(model)