<a href="https://colab.research.google.com/github/ghada233/SCAI-DeepNet/blob/main/goal_keeper_analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Install required packages
!pip install ultralytics numpy opencv-python matplotlib gym torch

Collecting ultralytics
  Downloading ultralytics-8.3.104-py3-none-any.whl.metadata (37 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.14-py3-none-any.whl.metadata (9.4 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuf

In [None]:
import cv2
from ultralytics import YOLO
import numpy as np

# Load YOLO model
model = YOLO('yolov8n.pt')  # Using nano version for faster processing

# Process video and print debug info
def process_video_debug(video_path):
    cap = cv2.VideoCapture(video_path)
    frames = []
    detections = []

    frame_count = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Run YOLO detection
        results = model(frame, verbose=False)  # Disable YOLO output

        # Store frame and detections
        frames.append(frame)
        detections.append(results)

        frame_count += 1
        print(f"Processed frame {frame_count}")

    cap.release()
    print(f"Total frames processed: {len(frames)}")
    print(f"Total detections: {len(detections)}")
    return frames, detections

# Process video
frames, detections = process_video_debug('/content/istockphoto-949472108-640_adpp_is.mp4')

Processed frame 1
Processed frame 2
Processed frame 3
Processed frame 4
Processed frame 5
Processed frame 6
Processed frame 7
Processed frame 8
Processed frame 9
Processed frame 10
Processed frame 11
Processed frame 12
Processed frame 13
Processed frame 14
Processed frame 15
Processed frame 16
Processed frame 17
Processed frame 18
Processed frame 19
Processed frame 20
Processed frame 21
Processed frame 22
Processed frame 23
Processed frame 24
Processed frame 25
Processed frame 26
Processed frame 27
Processed frame 28
Processed frame 29
Processed frame 30
Processed frame 31
Processed frame 32
Processed frame 33
Processed frame 34
Processed frame 35
Processed frame 36
Processed frame 37
Processed frame 38
Processed frame 39
Processed frame 40
Processed frame 41
Processed frame 42
Processed frame 43
Processed frame 44
Processed frame 45
Processed frame 46
Processed frame 47
Processed frame 48
Processed frame 49
Processed frame 50
Processed frame 51
Processed frame 52
Processed frame 53
Pr

In [None]:
def extract_detection_data(detections):
    extracted_data = []

    for det in detections:
        frame_data = {
            'ball': None,
            'kicker': None,
            'goalkeeper': None,
            'goalpost': None
        }

        # Get all detected objects
        for box in det[0].boxes:
            cls = int(box.cls)
            conf = float(box.conf)
            xyxy = box.xyxy[0].tolist()  # [x1, y1, x2, y2]

            # Person (player)
            if cls == 0 and conf > 0.5:
                if frame_data['kicker'] is None:  # First person is kicker
                    frame_data['kicker'] = xyxy
                else:  # Second person is goalkeeper
                    frame_data['goalkeeper'] = xyxy

            # Sports ball
            elif cls == 32 and conf > 0.3:
                frame_data['ball'] = xyxy


        # If no ball detected, use center position as fallback
        if frame_data['ball'] is None:
            frame_data['ball'] = [0.5, 0.5, 0.5, 0.5]

        extracted_data.append(frame_data)

    return extracted_data

detection_data = extract_detection_data(detections)
print(f"Extracted data for {len(detection_data)} frames")

Extracted data for 646 frames


In [None]:
import gym
from gym import spaces

class PenaltyEnv(gym.Env):
    def __init__(self, detection_data):
        super(PenaltyEnv, self).__init__()
        self.detection_data = detection_data
        self.current_frame = 0

        # Action space: left, center, right
        self.action_space = spaces.Discrete(3)

        # Observation space: ball x position, ball y position, kicker x position
        self.observation_space = spaces.Box(
            low=np.array([0, 0, 0]),
            high=np.array([1, 1, 1]),
            dtype=np.float32
        )

    def _get_obs(self):
        if len(self.detection_data) == 0:
            return np.array([0.5, 0.5, 0.5], dtype=np.float32)

        frame_data = self.detection_data[self.current_frame]

        # Get normalized positions
        ball_x = (frame_data['ball'][0] + frame_data['ball'][2]) / 2  # Average x position
        ball_y = (frame_data['ball'][1] + frame_data['ball'][3]) / 2  # Average y position

        if frame_data['kicker'] is not None:
            kicker_x = (frame_data['kicker'][0] + frame_data['kicker'][2]) / 2
        else:
            kicker_x = 0.5

        return np.array([ball_x, ball_y, kicker_x], dtype=np.float32)

    def step(self, action):
        if len(self.detection_data) == 0:
            return self._get_obs(), 0, True, {}

        # Simple reward logic based on ball position
        ball_x = (self.detection_data[self.current_frame]['ball'][0] +
                 self.detection_data[self.current_frame]['ball'][2]) / 2

        # Determine if action was correct
        if action == 0 and ball_x < 0.33:  # left
            reward = 1
        elif action == 1 and 0.33 <= ball_x <= 0.66:  # center
            reward = 1
        elif action == 2 and ball_x > 0.66:  # right
            reward = 1
        else:
            reward = -1

        self.current_frame += 1
        done = self.current_frame >= len(self.detection_data) - 1

        return self._get_obs(), reward, done, {}

    def reset(self):
        self.current_frame = 0
        return self._get_obs()

In [None]:
class QLearningAgent:
    def __init__(self, env, learning_rate=0.1, discount_factor=0.9,
                 exploration_rate=1.0, exploration_decay=0.995):
        self.env = env
        self.lr = learning_rate
        self.gamma = discount_factor
        self.epsilon = exploration_rate
        self.epsilon_decay = exploration_decay

        # Create Q-table with appropriate dimensions
        # We'll use 10 bins for each of the 3 state variables
        self.q_table = np.zeros((10, 10, 10, env.action_space.n))  # 10x10x10x3

    def discretize_state(self, state):
        # Convert continuous state to discrete bins (0-9)
        # Clip values to ensure they stay within bounds
        discretized = []
        for value in state:
            # Scale value to 0-9 range and ensure it's an integer
            scaled = int(np.clip(value * 10, 0, 9))
            discretized.append(scaled)
        return tuple(discretized)

    def get_action(self, state):
        if np.random.random() < self.epsilon:
            return self.env.action_space.sample()  # Explore
        discrete_state = self.discretize_state(state)
        return np.argmax(self.q_table[discrete_state])

    def learn(self, state, action, reward, next_state):
        discrete_state = self.discretize_state(state)
        discrete_next_state = self.discretize_state(next_state)

        # Q-learning update
        best_next_action = np.argmax(self.q_table[discrete_next_state])
        td_target = reward + self.gamma * self.q_table[discrete_next_state][best_next_action]
        td_error = td_target - self.q_table[discrete_state][action]
        self.q_table[discrete_state][action] += self.lr * td_error

        # Decay exploration rate
        self.epsilon *= self.epsilon_decay

In [None]:
# Create environment only if we have detection data
if len(detection_data) > 0:
    env = PenaltyEnv(detection_data)
    agent = QLearningAgent(env)

    # Training loop
    num_episodes = 50  # Reduced for demo purposes
    for episode in range(num_episodes):
        state = env.reset()
        total_reward = 0
        done = False

        while not done:
            action = agent.get_action(state)
            next_state, reward, done, _ = env.step(action)
            agent.learn(state, action, reward, next_state)
            state = next_state
            total_reward += reward

        print(f"Episode {episode + 1}, Total Reward: {total_reward}, Epsilon: {agent.epsilon:.2f}")

    # Test the agent
    state = env.reset()
    done = False
    while not done:
        action = agent.get_action(state)
        print(f"Frame {env.current_frame}: Suggested action - {'left' if action == 0 else 'center' if action == 1 else 'right'}")
        state, _, done, _ = env.step(action)
else:
    print("No detection data available. Check video processing.")

Episode 1, Total Reward: 407, Epsilon: 0.04
Episode 2, Total Reward: 635, Epsilon: 0.00


  logger.warn(f"Box bound precision lowered by casting to {self.dtype}")


Episode 3, Total Reward: 645, Epsilon: 0.00
Episode 4, Total Reward: 645, Epsilon: 0.00
Episode 5, Total Reward: 645, Epsilon: 0.00
Episode 6, Total Reward: 645, Epsilon: 0.00
Episode 7, Total Reward: 645, Epsilon: 0.00
Episode 8, Total Reward: 645, Epsilon: 0.00
Episode 9, Total Reward: 645, Epsilon: 0.00
Episode 10, Total Reward: 645, Epsilon: 0.00
Episode 11, Total Reward: 645, Epsilon: 0.00
Episode 12, Total Reward: 645, Epsilon: 0.00
Episode 13, Total Reward: 645, Epsilon: 0.00
Episode 14, Total Reward: 645, Epsilon: 0.00
Episode 15, Total Reward: 645, Epsilon: 0.00
Episode 16, Total Reward: 645, Epsilon: 0.00
Episode 17, Total Reward: 645, Epsilon: 0.00
Episode 18, Total Reward: 645, Epsilon: 0.00
Episode 19, Total Reward: 645, Epsilon: 0.00
Episode 20, Total Reward: 645, Epsilon: 0.00
Episode 21, Total Reward: 645, Epsilon: 0.00
Episode 22, Total Reward: 645, Epsilon: 0.00
Episode 23, Total Reward: 645, Epsilon: 0.00
Episode 24, Total Reward: 645, Epsilon: 0.00
Episode 25, Total

In [None]:
import cv2
import numpy as np

def create_annotated_video(input_path, output_path, decision_interval=50):
    cap = cv2.VideoCapture(input_path)
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))

    # Color settings
    colors = {
        'bg': (40, 40, 50),
        'text': (250, 250, 250),
        'left': (70, 120, 200),
        'center': (70, 180, 120),
        'right': (200, 120, 70)
    }

    current_frame = 0
    state = env.reset()
    current_decision = None
    decision_active = False

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Get YOLO detections
        results = model(frame)
        frame = results[0].plot()

        # Update decision every N frames
        if current_frame % decision_interval == 0:
            action = agent.get_action(state)
            current_decision = ['LEFT', 'CENTER', 'RIGHT'][action]
            decision_color = [colors['left'], colors['center'], colors['right']][action]
            decision_active = True

        # Display decision if active
        if decision_active:
            # Create header
            header = np.zeros((80, frame_width, 3), dtype=np.uint8)
            header[:] = colors['bg']

            # Add decision text
            cv2.putText(header, f"GOALKEEPER ACTION: {current_decision}",
                       (frame_width//2 - 200, 50),
                       cv2.FONT_HERSHEY_SIMPLEX, 1,
                       colors['text'], 2, cv2.LINE_AA)

            # Blend header
            frame[:80] = cv2.addWeighted(header, 0.8, frame[:80], 0.2, 0)

            # Add indicator bar at bottom
            cv2.rectangle(frame,
                         (0, frame_height-10),
                         (frame_width, frame_height),
                         decision_color, -1)

        # Add minimal frame counter
        cv2.putText(frame, f"{current_frame:04d}",
                   (frame_width - 100, 30),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7,
                   colors['text'], 1, cv2.LINE_AA)

        out.write(frame)

        if current_frame < len(detection_data) - 1:
            state, _, _, _ = env.step(action if 'action' in locals() else 0)
        current_frame += 1

    cap.release()
    out.release()
    print(f"Annotated video saved to {output_path}")

# Create the video
create_annotated_video('/content/istockphoto-949472108-640_adpp_is.mp4', 'output.mp4', decision_interval=50)

Annotated video saved to output.mp4


In [None]:
import pickle
import numpy as np

def save_agent(agent, filepath):
    """Save the Q-learning agent to a file"""
    with open(filepath, 'wb') as f:
        pickle.dump({
            'q_table': agent.q_table,
            'epsilon': agent.epsilon,
            'learning_rate': agent.lr,
            'discount_factor': agent.gamma,
            'exploration_decay': agent.epsilon_decay,
            'state_mapping': agent.discretize_state.__code__.co_code  # Save state processing logic
        }, f)

# Save the agent
save_agent(agent, '/content/drive/MyDrive/penalty_kick_agent.pkl')

In [None]:
# Export YOLO model to ONNX format (best for web deployment)
model.export(format='onnx', imgsz=[640, 640])  # Adjust image size as needed

# Move to Drive
!cp yolov8n.onnx "/content/drive/MyDrive/yolov8_penalty_detector.onnx"

Ultralytics 8.3.104 🚀 Python-3.11.11 torch-2.6.0+cu124 CPU (Intel Xeon 2.20GHz)

[34m[1mPyTorch:[0m starting from 'yolov8n.pt' with input shape (1, 3, 640, 640) BCHW and output shape(s) (1, 84, 8400) (6.2 MB)

[34m[1mONNX:[0m starting export with onnx 1.17.0 opset 19...
[34m[1mONNX:[0m slimming with onnxslim 0.1.49...
[34m[1mONNX:[0m export success ✅ 1.7s, saved as 'yolov8n.onnx' (12.2 MB)

Export complete (2.3s)
Results saved to [1m/content[0m
Predict:         yolo predict task=detect model=yolov8n.onnx imgsz=640  
Validate:        yolo val task=detect model=yolov8n.onnx imgsz=640 data=coco.yaml  
Visualize:       https://netron.app


In [None]:
env_config = {
    'frame_size': (1280, 720),  # Your video dimensions
    'action_meanings': ['left', 'center', 'right'],
    'observation_space': {
        'low': [0, 0, 0],
        'high': [1, 1, 1]
    }
}

import json
with open('/content/drive/MyDrive/env_config.json', 'w') as f:
    json.dump(env_config, f)

In [None]:
# Zip all necessary files
!zip -r "/content/model_package.zip" \
    "/content/drive/MyDrive/penalty_kick_agent.pkl" \
    "/content/drive/MyDrive/yolov8_penalty_detector.onnx" \
    "/content/drive/MyDrive/env_config.json"

# Download the package
from google.colab import files
files.download("/content/model_package.zip")

  adding: content/drive/MyDrive/penalty_kick_agent.pkl (deflated 98%)
  adding: content/drive/MyDrive/yolov8_penalty_detector.onnx (deflated 14%)
  adding: content/drive/MyDrive/env_config.json (deflated 21%)


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>