# Offside Detection Video Output

In [13]:
# Idea: Remove label from everyone and only use it for offside detection?

# Check if GPU is being used
!nvidia-smi

Tue Nov 26 15:54:22 2024       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 560.94                 Driver Version: 560.94         CUDA Version: 12.6     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                  Driver-Model | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  NVIDIA GeForce RTX 3060      WDDM  |   00000000:01:00.0  On |                  N/A |
|  0%   44C    P8             20W /  170W |    2948MiB /  12288MiB |     12%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
                                                

In [14]:
%pip install -q gdown inference-gpu supervision


[notice] A new release of pip is available: 23.1.2 -> 24.3.1
[notice] To update, run: C:\Users\jcdos\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip


Note: you may need to restart the kernel to use updated packages.


In [15]:
import os
import sys

# Run on GPU
os.environ["ONNXRUNTIME_EXECUTION_PROVIDER"] = "[CUDAExecutionProvider]"
project_root = os.path.abspath('..')
sys.path.append(project_root)

In [16]:
import supervision as sv
from common.team import TeamClassifier
from tqdm import tqdm
import numpy as np
from inference import get_model
from dotenv import load_dotenv
import cv2
import torch
from sports.annotators.soccer import draw_pitch, draw_points_on_pitch, draw_pitch_voronoi_diagram
from sports.configs.soccer import SoccerPitchConfiguration

# Check if your device is CUDA compatible
if torch.cuda.is_available():
    print(f"CUDA is available. GPU: {torch.cuda.get_device_name(0)}")
else:
    print("CUDA is not available.")

CUDA is available. GPU: NVIDIA GeForce RTX 3060


In [17]:
# Setup paths for source & target videos
# Change to dynamic inputs for cloud deployment
SOURCE_VIDEO_PATH = "../videos/08fd33_4.mp4"
# SOURCE_VIDEO_PATH = "../videos/soccer_video_offside_3.mp4"
TARGET_VIDEO_PATH = "08fd33_4_result_1.mp4"
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
BALL_ID = 0
GOALKEEPER_ID = 1
PLAYER_ID = 2
REFEREE_ID = 3
OFFSIDE_ID = 4
STRIDE = 30

CONFIG = SoccerPitchConfiguration()

In [18]:
# Load env variables
load_dotenv()
ROBOFLOW_API_KEY = os.getenv("ROBOFLOW_API_KEY")

# ID for roboflow detection model created
PLAYER_DETECTION_MODEL_ID = "football-players-detection-3zvbc/12"

# Load Player/Ball model
PLAYER_DETECTION_MODEL = get_model(
    model_id=PLAYER_DETECTION_MODEL_ID,
    api_key=ROBOFLOW_API_KEY
)

# Not enough epochs on own model -- grab model better trained model from Roboflow
# https://universe.roboflow.com/roboflow-jvuqo/football-field-detection-f07vi/model/15

# Load Pitch Detection Model
PITCH_DETECTION_MODEL_ID ="football-field-detection-f07vi/15"
PITCH_DETECTION_MODEL = get_model(
    model_id=PITCH_DETECTION_MODEL_ID, 
    api_key=ROBOFLOW_API_KEY
)






In [None]:
# Setup class for displaying Ellipse around players
ellipse_annotator = sv.EllipseAnnotator(
    color=sv.ColorPalette.from_hex(['#00B400', '#FFD700', '#606060']),
    thickness=2
)

# Setup class for displaying red ellipse around offside players
offside_annotator = sv.EllipseAnnotator(
    color=sv.ColorPalette.from_hex(['#D2122E']),
    thickness=2
)

# Setup class for displaying triangle over ball
triangle_annotator = sv.TriangleAnnotator(
    color=sv.Color.from_hex('#FF1493'),
    base=20,
    height=17,
)

# Setup class for labeling box annotations
label_annotator = sv.LabelAnnotator(
    color=sv.ColorPalette.from_hex(['#D2122E']),
    text_color=sv.Color.from_hex('#000000'),
    text_position=sv.Position.BOTTOM_CENTER,
)

# Create Vertext Annotator for pitch detection
vertex_annotator = sv.VertexAnnotator(
    color=sv.Color.from_hex('#FF1493'),
    radius=8
)

# Create edge annotations using SoccerPitchConfiguration
edge_annotator = sv.EdgeAnnotator(
    color=sv.Color.from_hex('00BFFF'),
    thickness=2,
    edges=CONFIG.edges
)

In [None]:
# Function to extract cropped images from video to identify player teams
# Only keep top half of cropped image --> top half contains torso with jersey
# Excess information (bottom half) causes model to misclassify players
# @Params: video_path<str>
# @Return: crops<arr>
def extract_crops(source_video_path: str):

    # Generate frames from source video
    # Stride: Only use every nth frame
    frame_generator = sv.get_video_frames_generator(source_path=SOURCE_VIDEO_PATH, stride=STRIDE)

    crops = []

    # Loop through each frame generated
    for  frame in tqdm(frame_generator, desc="collection crops"):

        # Infer object type (ie. person) with minimum confidence
        # Covert to standard format
        # Remove overlapping detections
        # Filter detections by class --> only keep players
        result = PLAYER_DETECTION_MODEL.infer(frame, confidence=0.3)[0]
        detections = sv.Detections.from_inference(result)
        detections = detections.with_nmm(threshold=0.5, class_agnostic=True)
        detections = detections[detections.class_id == PLAYER_ID]
        
        # Loop through each detection coordinates
        # Crop the full player image     
        # Get the dimensions of the crop
        # Crop only the top half of the image
        # Add the top half crop to the list
        for xyxy in detections.xyxy:
            crop = sv.crop_image(frame, xyxy)
            height, width, _ = crop.shape
            top_half_crop = crop[:height // 2, :]
            crops.append(top_half_crop)
    
    return crops

In [None]:
def new_resolve_goalkeepers_team_id(goalkeeper_detections: sv.Detections, view_transformer) -> np.ndarray:
    
    pitch_length = 12000
    goalkeepers_xy = goalkeeper_detections.get_anchors_coordinates(sv.Position.BOTTOM_CENTER)
    pitch_goalkeeper_xy = view_transformer.transform_points(goalkeepers_xy)

    goalkeepers_team_ids = []
    
    print(pitch_goalkeeper_xy)

    # Make sure goalkeepers were detected
    if len(pitch_goalkeeper_xy) != 0:

        # If goalkeeper is closer to right side of pitch --> team 1
        # If goalkeeper is closer to left side of pitch --> team 0
        # Use aboslute value of each side of the pitch minus the goalkeeper x-coordinate
        for i in range(len(pitch_goalkeeper_xy)):
            if abs(pitch_length - pitch_goalkeeper_xy[i][0]) < abs(0 - pitch_goalkeeper_xy[i][0]):
                pitch_goalkeeper_xy[i][0] = 1
                goalkeepers_team_ids.append(1)
            else:
                pitch_goalkeeper_xy[i][0] = 0
                goalkeepers_team_ids.append(0)

    # return the team ids of the goalkeepers
    return np.array(goalkeepers_team_ids)

In [None]:
def resolve_team_directions(players_detections, pitch_players_xy, goalkeepers_detections, pitch_goalkeeper_xy):
    
    team_0_xy = pitch_players_xy[players_detections.class_id == 0]
    team_1_xy = pitch_players_xy[players_detections.class_id == 1]

    # Balance the number of players by removing outliers if needed
    # Check to see if both teams have even number of players
    # If not, remove the player furthest to the right if team 0 has more detections
    # Remove furthest player to the left if team 1 has more detections
    # Idea is to balance number of players on each team to get a more accurate team center
    # Uneven detections leads to program thinking team 0 is defending the right instead of the left
    if len(team_0_xy) > len(team_1_xy):
        for _ in range(difference):
            # Remove the player with the largest x value from team_0 until teams are balanced in size
            largest_x_index = np.argmax(team_0_xy[:, 0])
            team_0_xy = np.delete(team_0_xy, largest_x_index, axis=0)
    elif len(team_1_xy) > len(team_0_xy):
        difference = len(team_1_xy) - len(team_0_xy)
        for _ in range(difference):
            # Remove the player with the smallest x value from team_1 until teams are balanced in size
            smallest_x_index = np.argmin(team_1_xy[:, 0])
            team_1_xy = np.delete(team_1_xy, smallest_x_index, axis=0)

    # Calculate the centers of each team (not including goalkeepers)
    team_0_player = team_0_xy.mean(axis=0) if len(team_0_xy) > 0 else np.array([0, 0])
    team_1_player = team_1_xy.mean(axis=0) if len(team_1_xy) > 0 else np.array([0, 0])

    # Initialize the goalkeepers positions to 0
    # Could also initialize to be equal to team_n_player
    team_0_keeper = 0
    team_1_keeper = 0

    # Check if goalkeeper detections exist
    # If they do, add the goalkeeper positions to the team centers to create more distinct team centers
    if len(pitch_goalkeeper_xy[goalkeepers_detections.class_id == 0]) > 0:
        team_0_keeper = pitch_goalkeeper_xy[goalkeepers_detections.class_id == 0][0].mean(axis=0)
    if len(pitch_goalkeeper_xy[goalkeepers_detections.class_id == 1]) > 0:
        team_1_keeper = pitch_goalkeeper_xy[goalkeepers_detections.class_id == 1][0].mean(axis=0)

    if len(pitch_goalkeeper_xy[goalkeepers_detections.class_id == 0]) == 0 and len(pitch_goalkeeper_xy[goalkeepers_detections.class_id == 1]) == 0:
        print("No goalkeepers detected")

    # Add team and goal keeper together and divide by 2 to get team centers
    team_0_center = (team_0_player + team_0_keeper) / 2
    team_1_center = (team_1_player + team_1_keeper) / 2

    # Swap class ids and centers in the event that team 0 center is greater than team 1 center
    if team_0_center[0] > team_1_center[0]:
        temp = team_1_center
        team_1_center = team_0_center
        team_0_center = temp
        
        for i in range(len(players_detections.class_id)):
            j = 0
            if players_detections.class_id[i] == 0:
                j = 1
            players_detections.class_id[i] = j

    return players_detections.class_id

In [None]:
def get_last_defenders(players_detections: sv.Detections, pitch_players_xy: np.ndarray) -> np.ndarray:
    
    left_defender_x = 12000 # Initialize to max size of 2D pitch
    right_defender_x = 0 # Initialize to min size of 2D pitch
    left_defender_id = 999
    right_defender_id = 999
    
    for i in range(len(pitch_players_xy)):

        x_pos = pitch_players_xy[i][0]

        # Team 0 is always left --> only check left defender from team 0
        if players_detections.class_id[i] == 0:
            if x_pos < left_defender_x:
                left_defender_x = x_pos
                left_defender_id = i

    
    for i in range(len(pitch_players_xy)):

        x_pos = pitch_players_xy[i][0]
        # Team 1 is always right --> only check right defender from team 1
        if players_detections.class_id[i] == 1:   
            if x_pos > right_defender_x:
                right_defender_x = x_pos
                right_defender_id = i

    return [ left_defender_id, right_defender_id ]



In [None]:
def check_offsides(players_detections, pitch_players_xy, last_defender_ids) -> np.ndarray:

    left_defender = last_defender_ids[0] # Team 0 Defender
    right_defender = last_defender_ids[1] # Team 1 Defender

    offside_ids = []

    for i in range(len(pitch_players_xy)):

        # If player is on the same team as the left defender and is further up the pitch --> offside
        if players_detections.class_id[i] == 1 and pitch_players_xy[i][0] < pitch_players_xy[left_defender][0]:
            offside_ids.append(i)
        
        # If player is on the same team as the right defender and is further up the pitch --> offside
        if players_detections.class_id[i] == 0 and pitch_players_xy[i][0] > pitch_players_xy[right_defender][0]:
            offside_ids.append(i)

    return offside_ids

In [None]:
# Transform perspective of pitch to 2D plane
class ViewTransformer:

    # Initialize points in float32 format
    # Calculate homography matrix for 2D perspective transformation
    # @Params: source<np.ndarray>, target<np.ndarray>
    # @Return: None
    def __init__(self, source: np.ndarray, target: np.ndarray):
        source = source.astype(np.float32)
        target = target.astype(np.float32)
        self.m, _ = cv2.findHomography(source, target)

    # Points exist in 2 dimensions
    # Expand points to 3 dimensions to match homography matrix
    # Remove extra dimension once completed
    # @Params: source<np.ndarray>, target<np.ndarray>
    # @Return: np.ndarray
    def transform_points(self, points: np.ndarray) -> np.ndarray:

        if points.size == 0:
            return points
        
        if points.shape[1] != 2:
            raise ValueError("Points must have shape (n, 2)")
        
        
        points = points.reshape(-1, 1, 2).astype(np.float32)
        points = cv2.perspectiveTransform(points, self.m)
        return points.reshape(-1, 2).astype(np.float32)

: 

In [None]:
crops = extract_crops(SOURCE_VIDEO_PATH)
team_classifier = TeamClassifier()
team_classifier.fit(crops)

# Track newly detected instances of players/balls
tracker = sv.ByteTrack()
tracker.reset()

# Create video source and where to store completed video
video_info = sv.VideoInfo.from_video_path(SOURCE_VIDEO_PATH)
video_sink = sv.VideoSink(TARGET_VIDEO_PATH, video_info=video_info)

# For testing only grab a single frame and draw on it for results
frame_generator = sv.get_video_frames_generator(source_path=SOURCE_VIDEO_PATH)

# Only difference between offside and offside-video is the loop to run logic on every frame of video
with video_sink:
    for frame in tqdm(frame_generator, total=video_info.total_frames, desc="Processing frames"):

        # Run player detection model
        player_result = PLAYER_DETECTION_MODEL.infer(frame, confidence=0.3)[0]
        player_detections = sv.Detections.from_inference(player_result)

        # Run pitch detection model
        pitch_result = PITCH_DETECTION_MODEL.infer(frame, confidence=0.3)[0]
        key_points = sv.KeyPoints.from_inference(pitch_result)

        # Get detections for ball from each frame
        ball_detections = player_detections[player_detections.class_id == BALL_ID]
        ball_detections.xyxy = sv.pad_boxes(xyxy=ball_detections.xyxy, px=10)

        # Set all detections to not include ball
        # Remove overlapping detections
        # Adjust class id of all remaining detections by 1
        # Add tracker to all detections besides ball 
        all_detections = player_detections[player_detections.class_id != BALL_ID]
        all_detections = all_detections.with_nmm(threshold=0.5, class_agnostic=True)
        all_detections = tracker.update_with_detections(all_detections)

        
        # Define player, goalkeeper and referees from within all_detections
        players_detections = all_detections[all_detections.class_id == PLAYER_ID]
        goalkeepers_detections = all_detections[all_detections.class_id == GOALKEEPER_ID]
        referee_detections = all_detections[all_detections.class_id == REFEREE_ID]

        referee_detections.class_id -= 1

        # Crop images of each player and predict their team
        player_crops = [sv.crop_image(frame, xyxy) for xyxy in players_detections.xyxy]
        players_detections.class_id = team_classifier.predict(player_crops)

        
        # Filter out keypoints with confidence less than 0.5
        # Create new object with filtered key points
        # Create new KeyPoints object with new key point positions
        # Filter out low confidence pitch reference points
        filter = key_points.confidence[0] > 0.5
        frame_reference_points = key_points.xy[0][filter]
        frame_reference_key_points = sv.KeyPoints(xy=frame_reference_points[np.newaxis, ...])
        pitch_reference_points = np.array(CONFIG.vertices)[filter]

        # Pass pitch_reference_Points and frame_reference_points to do transformation
        view_transformer = ViewTransformer(
            source=pitch_reference_points,
            target=frame_reference_points,    
        )

        # Gather all the points together
        # Create new KeyPoints object
        pitch_all_points = np.array(CONFIG.vertices)
        frame_all_points = view_transformer.transform_points(pitch_all_points)
        frame_all_key_points = sv.KeyPoints(xy=frame_all_points[np.newaxis, ...])

        # 2D Plane X is from 0 - 12000
        # Pass pitch_reference_Points and frame_reference_points to do transformation
        view_transformer = ViewTransformer(
            source=frame_reference_points,
            target=pitch_reference_points,    
        )

        # Get position of ball on a frame
        # Frame (x,y) is captured image
        # Pitch (x,y) is position of ball on 2D plane
        # Transform position to 2D plane
        frame_ball_xy = ball_detections.get_anchors_coordinates(sv.Position.BOTTOM_CENTER)
        pitch_ball_xy = view_transformer.transform_points(frame_ball_xy)

        # Get position of players on a frame
        frame_player_xy = players_detections.get_anchors_coordinates(sv.Position.BOTTOM_CENTER)
        pitch_players_xy = view_transformer.transform_points(frame_player_xy)

        # Get position of referees on a frame
        frame_referee_xy = referee_detections.get_anchors_coordinates(sv.Position.BOTTOM_CENTER)
        pitch_referee_xy = view_transformer.transform_points(frame_referee_xy)

        # Get position of goalkeepers on a frame
        frame_goalkeeper_xy = goalkeepers_detections.get_anchors_coordinates(sv.Position.BOTTOM_CENTER)
        pitch_goalkeeper_xy = view_transformer.transform_points(frame_goalkeeper_xy)

        # Resolve the teams of each goalkeeper detection if it exists
        goalkeepers_detections.class_id = new_resolve_goalkeepers_team_id(goalkeepers_detections, view_transformer)

        # Resolve team directions of players for offside detection
        # Team 0 should always be left side
        # Team 1 should always be right side
        player_detections.class_id = resolve_team_directions(players_detections, pitch_players_xy, goalkeepers_detections, pitch_goalkeeper_xy)

        # Get list of ids -- index of offside player will be 4
        # Store new list of ids in players_detections class ids
        # Find the last defenders of each team
        # Last defenders determine the offside line that an opposing player cannot be past
        last_defender_ids = get_last_defenders(players_detections, pitch_players_xy)

        # Current offside model only determines if a player is in an offside position
        # There is no account for when the ball was touched to fully determine if offsides should be called
        offside_ids = check_offsides(players_detections, pitch_players_xy, last_defender_ids)

        # Change the id of each offside player to 4
        for i in offside_ids:
            print(f"Player {i} is offside")
            players_detections.class_id[i] = OFFSIDE_ID

        # Redefine all_detections to merge players and goalkeepers
        all_detections = sv.Detections.merge([players_detections, goalkeepers_detections, referee_detections])

        # Ensure class_id values are integers
        all_detections.class_id = np.array(all_detections.class_id, dtype=int)

        # Seperate offside players
        offsides_detections = all_detections[all_detections.class_id == OFFSIDE_ID]

        # Create labels only for offsides detections
        labels = ["Offsides" for _ in offsides_detections.tracker_id]

        # Set annotated frame to current frame
        # Add ellipise & label annotator to all detections (contains players & goalkeepers)
        # Add triangle annotator to ball detections
        annotated_frame = frame.copy()
        annotated_frame = ellipse_annotator.annotate(annotated_frame, all_detections)
        annotated_frame = offside_annotator.annotate(annotated_frame, offsides_detections)
        annotated_frame = label_annotator.annotate(annotated_frame, offsides_detections, labels=labels)
        annotated_frame = triangle_annotator.annotate(annotated_frame, ball_detections)

        # Plot single frame image for testing results
        video_sink.write_frame(annotated_frame)



collection crops: 25it [00:05,  4.66it/s]
Embedding extraction: 16it [00:46,  2.91s/it]
Processing frames:   0%|          | 0/750 [00:00<?, ?it/s]
Embedding extraction: 0it [00:00, ?it/s]
Embedding extraction: 1it [00:01,  1.72s/it]
Processing frames:   0%|          | 1/750 [00:02<28:50,  2.31s/it]
Embedding extraction: 0it [00:00, ?it/s]
Embedding extraction: 1it [00:01,  1.76s/it]
Processing frames:   0%|          | 2/750 [00:04<26:57,  2.16s/it]
Embedding extraction: 0it [00:00, ?it/s]
Embedding extraction: 1it [00:01,  1.82s/it]
Processing frames:   0%|          | 3/750 [00:06<27:43,  2.23s/it]
Embedding extraction: 0it [00:00, ?it/s]
Embedding extraction: 1it [00:01,  1.73s/it]
Processing frames:   1%|          | 4/750 [00:08<27:18,  2.20s/it]
Embedding extraction: 0it [00:00, ?it/s]
Embedding extraction: 1it [00:01,  1.75s/it]
Processing frames:   1%|          | 5/750 [00:10<27:06,  2.18s/it]
Embedding extraction: 0it [00:00, ?it/s]
Embedding extraction: 1it [00:01,  1.65s/it]
Pr