In [None]:
# install necessary packages for computer vision and deep learning.
!pip install opencv-contrib-python==4.9.0.80  # Install OpenCV for image and video processing (with extra modules)
!pip install opencv-python==4.9.0.80          # Standard OpenCV package
!pip install opencv-python-headless==4.9.0.80 # OpenCV version for servers (no display GUI)

# Install Ultralytics, the library for YOLO models
!pip install ultralytics

# Force numpy to version 1.26.4 to avoid conflicts
!pip install numpy==1.26.4 --force-reinstall

In [None]:
! kill -9 $(ps -A | grep python | awk '{print $1}')

In [3]:
import cv2
import numpy as np
from collections import defaultdict, deque
import os
from ultralytics import YOLO
import datetime

In [6]:
def calculate_angle(p1, p2, p3):
    """
    Calculate the angle between three points (p1-p2-p3) in degrees.

    Args:
        p1, p2, p3: NumPy arrays representing coordinates of three points.
    Returns:
        Angle in degrees.
    """
    v1 = p1 - p2  # Vector from p2 to p1
    v2 = p3 - p2  # Vector from p2 to p3
    cosine_angle = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))  # Cosine of angle
    cosine_angle = np.clip(cosine_angle, -1.0, 1.0)  # Avoid numerical errors
    return np.degrees(np.arccos(cosine_angle))  # Convert to degrees

def analyze_soccer_video(input_video, output_video):
    """
    Analyze a soccer video to detect and count shots based on player pose and ball position.

    Args:
        input_video: Path to the input video file.
        output_video: Path to save the output video with annotations.
    """
    # Load YOLO models for pose and ball detection
    pose_detector = YOLO("/content/drive/MyDrive/Colab Notebooks/soccer_prj/soccer/yolo11x-pose.pt")
    ball_detector = YOLO("/content/drive/MyDrive/Colab Notebooks/soccer_prj/soccer/soccer_ball.pt")

    # Open the input video
    video_reader = cv2.VideoCapture(input_video)
    if not video_reader.isOpened():
        print("Error: Could not open video.")
        return

    # Get video properties
    fps = video_reader.get(cv2.CAP_PROP_FPS)  # Frames per second
    width = int(video_reader.get(cv2.CAP_PROP_FRAME_WIDTH))  # Video width
    height = int(video_reader.get(cv2.CAP_PROP_FRAME_HEIGHT))  # Video height
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Video codec
    video_writer = cv2.VideoWriter(output_video, fourcc, fps, (width, height))  # Output video writer

    # Set font scaling for text display based on video size
    base_scale = height / 500
    base_thickness = max(1, int(height / 300))
    count_scale = min(base_scale * 1.2, 2.0)  # Scale for shot count text
    count_thickness = base_thickness  # Thickness for shot count text
    angle_scale = base_scale * 0.6  # Scale for angle text
    angle_thickness = max(1, base_thickness - 1)  # Thickness for angle text
    count_y_pos = int(height / 10)  # Y-position for shot count display

    # Initialize variables for tracking shots and player data
    shot_count = 0  # Total number of shots detected
    frame_count = 0  # Current frame number
    left_knee_angles = defaultdict(lambda: deque(maxlen=int(fps / 3)))  # Store left knee angles
    right_knee_angles = defaultdict(lambda: deque(maxlen=int(fps / 3)))  # Store right knee angles
    last_shot_timestamps = {}  # Timestamps of last shot for each person
    last_shot_frames = {}  # Frame numbers of last shot for each person
    is_shooting = defaultdict(bool)  # Flag to track if a person is shooting
    shooting_frame_counts = defaultdict(int)  # Count consecutive shooting condition frames
    MIN_SHOOT_FRAMES = 2  # Minimum frames for a valid shot
    BALL_CONF_THRESHOLD = 0.4  # Confidence threshold for ball detection
    MIN_BALL_FOOT_DISTANCE = 50  # Min distance between ball and foot (pixels)
    MAX_BALL_FOOT_DISTANCE = 200  # Max distance between ball and foot (pixels)
    MIN_TIME_BETWEEN_SHOTS = 2  # Min time between shots (seconds)
    MIN_FRAMES_BETWEEN_SHOTS = int(fps * 2)  # Min frames between shots (2 seconds)
    BALL_DISTANCE_FRAMES = 5  # Frames to maintain valid ball-foot distance
    last_known_ball_position = None  # Last known position of the ball
    ball_position_buffer = deque(maxlen=10)  # Buffer for recent ball positions
    ball_foot_distances = defaultdict(lambda: deque(maxlen=BALL_DISTANCE_FRAMES))  # Track ball-foot distances
    frames_since_last_ball = float('inf')  # Frames since last ball detection

    while True:
        # Read the next frame from the video
        ret, frame = video_reader.read()
        if not ret:
            break  # Exit if no more frames

        # Get current time for timestamp comparisons
        current_time = datetime.datetime.now()

        # Detect poses using YOLO pose model
        pose_results = pose_detector(frame)
        annotated_frame = pose_results[0].plot()  # Get frame with pose annotations

        # Detect the ball using YOLO ball model
        ball_detected = False
        ball_results = ball_detector(frame)
        for box in ball_results[0].boxes:
            if int(box.cls) == 0 and float(box.conf) >= BALL_CONF_THRESHOLD:
                bbox = box.xyxy.cpu().numpy()[0]
                ball_center = np.array([(bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2])  # Ball center
                ball_position_buffer.append(ball_center)
                last_known_ball_position = ball_center
                ball_detected = True
                frames_since_last_ball = 0
                # Draw a yellow rectangle around the ball
                cv2.rectangle(annotated_frame, (int(bbox[0]), int(bbox[1])),
                              (int(bbox[2]), int(bbox[3])), (0, 255, 255), 2)
                # Display ball confidence score
                cv2.putText(annotated_frame, f"Ball ({float(box.conf):.2f})",
                            (int(bbox[0]), int(bbox[1]) - 10), cv2.FONT_HERSHEY_SIMPLEX,
                            angle_scale, (0, 255, 255), angle_thickness)
        if not ball_detected:
            frames_since_last_ball += 1  # Increment frames since last ball detection

        # Use last known ball position if within 30 frames
        ball_position = last_known_ball_position if last_known_ball_position is not None and frames_since_last_ball < 30 else None

        # Analyze poses for each detected person
        if pose_results[0].keypoints is not None:
            keypoints = pose_results[0].keypoints.data.cpu().numpy()
            for person_idx, kps in enumerate(keypoints):
                if kps.shape[0] < 17:
                    continue  # Skip if not enough keypoints

                # Calculate knee angles (left: hip-knee-ankle, right: hip-knee-ankle)
                left_knee_angle = calculate_angle(kps[11, :2], kps[13, :2], kps[15, :2]) \
                    if all(kps[i, 2] > 0.3 for i in [11, 13, 15]) else None
                right_knee_angle = calculate_angle(kps[12, :2], kps[14, :2], kps[16, :2]) \
                    if all(kps[i, 2] > 0.3 for i in [12, 14, 16]) else None

                # Store knee angles for pattern analysis
                if left_knee_angle is not None:
                    left_knee_angles[person_idx].append(left_knee_angle)
                if right_knee_angle is not None:
                    right_knee_angles[person_idx].append(right_knee_angle)

                # Check distance between ball and support foot
                ball_distance_condition = False
                left_foot_ball_dist = float('inf')
                right_foot_ball_dist = float('inf')
                support_foot_dist = float('inf')
                if ball_position is not None:
                    if kps[15, 2] > 0.3:  # Left ankle confidence
                        left_foot_ball_dist = np.linalg.norm(ball_position - kps[15, :2])
                    if kps[16, 2] > 0.3:  # Right ankle confidence
                        right_foot_ball_dist = np.linalg.norm(ball_position - kps[16, :2])
                    support_foot_dist = min(left_foot_ball_dist, right_foot_ball_dist)
                    if MIN_BALL_FOOT_DISTANCE <= support_foot_dist <= MAX_BALL_FOOT_DISTANCE:
                        ball_foot_distances[person_idx].append(support_foot_dist)
                    else:
                        ball_foot_distances[person_idx].clear()
                    ball_distance_condition = len(ball_foot_distances[person_idx]) >= BALL_DISTANCE_FRAMES

                # Check knee angle patterns (indicates shooting motion)
                left_angle_pattern = False
                right_angle_pattern = False
                if len(left_knee_angles[person_idx]) >= 2:
                    angles = list(left_knee_angles[person_idx])
                    diffs = np.diff(angles)
                    for i in range(len(diffs) - 1):
                        if diffs[i] < -10 and diffs[i + 1] > 10:  # Decrease then increase
                            left_angle_pattern = True
                            break
                    if diffs[-1] > 15:  # Large increase in last frame
                        left_angle_pattern = True
                if len(right_knee_angles[person_idx]) >= 2:
                    angles = list(right_knee_angles[person_idx])
                    diffs = np.diff(angles)
                    for i in range(len(diffs) - 1):
                        if diffs[i] < -10 and diffs[i + 1] > 10:  # Decrease then increase
                            right_angle_pattern = True
                            break
                    if diffs[-1] > 15:  # Large increase in last frame
                        right_angle_pattern = True

                # Determine if shooting conditions are met
                left_shot_condition = left_angle_pattern and (ball_distance_condition or not ball_detected)
                right_shot_condition = right_angle_pattern and (ball_distance_condition or not ball_detected)
                shooting_condition = left_shot_condition or right_shot_condition

                # Check time and frame constraints to prevent double-counting
                can_shoot = True
                if person_idx in last_shot_frames:
                    frames_since_last_shot = frame_count - last_shot_frames[person_idx]
                    if frames_since_last_shot < MIN_FRAMES_BETWEEN_SHOTS:
                        can_shoot = False
                        shooting_frame_counts[person_idx] = 0

                if person_idx in last_shot_timestamps:
                    time_since_last_shot = (current_time - last_shot_timestamps[person_idx]).total_seconds()
                    if time_since_last_shot < MIN_TIME_BETWEEN_SHOTS:
                        can_shoot = False
                        shooting_frame_counts[person_idx] = 0

                # Count shots if conditions are met
                if shooting_condition:
                    shooting_frame_counts[person_idx] += 1
                else:
                    shooting_frame_counts[person_idx] = 0

                if (shooting_frame_counts[person_idx] >= MIN_SHOOT_FRAMES and
                    not is_shooting[person_idx] and can_shoot):
                    is_shooting[person_idx] = True
                    shot_count += 1
                    last_shot_timestamps[person_idx] = current_time
                    last_shot_frames[person_idx] = frame_count
                    shooting_frame_counts[person_idx] = 0

                if not shooting_condition:
                    is_shooting[person_idx] = False

                # Draw knee angles on the frame
                if left_knee_angle:
                    pt = tuple(kps[13, :2].astype(int))
                    cv2.putText(annotated_frame, f"LK:{left_knee_angle:.1f}", pt,
                                cv2.FONT_HERSHEY_SIMPLEX, angle_scale, (0, 255, 0), angle_thickness)
                if right_knee_angle:
                    pt = tuple(kps[14, :2].astype(int))
                    cv2.putText(annotated_frame, f"RK:{right_knee_angle:.1f}", pt,
                                cv2.FONT_HERSHEY_SIMPLEX, angle_scale, (0, 255, 0), angle_thickness)

        # Display the shot count on the frame
        cv2.putText(annotated_frame, f"Shoot Count: {shot_count}", (10, count_y_pos),
                    cv2.FONT_HERSHEY_SIMPLEX, count_scale, (0, 255, 0), count_thickness)

        # Write the annotated frame to the output video
        video_writer.write(annotated_frame)
        frame_count += 1

    # Release video resources
    video_reader.release()
    video_writer.release()

if __name__ == "__main__":
    # Define input and output directories
    input_dir = "/content/drive/MyDrive/Colab Notebooks/soccer_prj/soccer/soccer_input"
    output_dir = "/content/drive/MyDrive/Colab Notebooks/soccer_prj/soccer/soccer_output"
    os.makedirs(output_dir, exist_ok=True)  # Create output directory if it doesn't exist

    # Process all video files in the input directory
    video_files = [f for f in os.listdir(input_dir) if f.lower().endswith(('.mp4', '.avi', '.mov'))]
    for video_file in video_files:
        input_path = os.path.join(input_dir, video_file)
        output_path = os.path.join(output_dir, f"optimized_{video_file}")
        print(f"Starting processing for video: {input_path}")
        analyze_soccer_video(input_path, output_path)
        print(f"Finished processing for video: {input_path}")

Starting processing for video: /content/drive/MyDrive/Colab Notebooks/soccer_prj/soccer/soccer_input/real_video.mp4

0: 384x640 1 person, 3149.9ms
Speed: 20.6ms preprocess, 3149.9ms inference, 31.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 2546.5ms
Speed: 5.2ms preprocess, 2546.5ms inference, 0.8ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 2609.6ms
Speed: 5.0ms preprocess, 2609.6ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 3587.0ms
Speed: 4.3ms preprocess, 3587.0ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 3229.2ms
Speed: 13.2ms preprocess, 3229.2ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 ball, 2477.2ms
Speed: 4.2ms preprocess, 2477.2ms inference, 1.2ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 2595.4ms
Speed: 5.1ms preprocess, 2595.4ms inference, 1.3ms postpro