In [1]:
import cv2
from ultralytics import YOLO
import supervision as sv
import pickle
import os
import numpy as np
from filterpy.kalman import KalmanFilter
from collections import defaultdict

def read_vid(vid_path):
    cap = cv2.VideoCapture(vid_path)
    frames = []
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frames.append(frame)
    cap.release()
    return frames

def save_vid(output_video_frames, output_vid_path):
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    frame_height, frame_width = output_video_frames[0].shape[:2]
    out = cv2.VideoWriter(output_vid_path, fourcc, 24, (frame_width, frame_height))
    for frame in output_video_frames:
        out.write(frame)
    out.release()

def get_center_of_bbox(bbox):
    x1, y1, x2, y2 = bbox
    return (x1 + x2) / 2, (y1 + y2) / 2

def get_bbox_width(bbox):
    return bbox[2] - bbox[0]

class Tracker:
    def __init__(self, modelpath):
        self.model = YOLO(modelpath)
        self.tracker = sv.ByteTrack()
        self.track_id_mapping = {}
        self.next_track_id = 1

        self.ball_kf = self.initialize_kalman_filter()
        self.ball_detected = False
        self.ball_bbox_size = 10
        self.missed_frames = 0
        self.max_missed_frames = 10

        self.player_positions = defaultdict(lambda: defaultdict(list))
        self.ball_positions = []

        self.possession_counts = {'team_1': [], 'team_2': []}

        self.total_passes = {'team_1': 0, 'team_2': 0}  # عدد التمريرات لكل فريق
        self.previous_ball_holder = None  # حامل الكرة السابق

        self.pixel_to_km = 0.0001

        self.current_ball_holder = None

        self.total_possession_frames = 0

    def initialize_kalman_filter(self):
        kf = KalmanFilter(dim_x=4, dim_z=2)
        kf.x = np.array([0., 0., 0., 0.])
        kf.F = np.array([[1, 0, 1, 0],
                         [0, 1, 0, 1],
                         [0, 0, 1, 0],
                         [0, 0, 0, 1]])
        kf.H = np.array([[1, 0, 0, 0],
                         [0, 1, 0, 0]])

        kf.P *= 1000.
        kf.Q *= 0.1
        kf.R *= 10
        return kf

    def remap_track_id(self, original_id):
        if original_id not in self.track_id_mapping:
            self.track_id_mapping[original_id] = self.next_track_id
            self.next_track_id += 1
            if self.next_track_id > 99:
                self.next_track_id = 1
        return self.track_id_mapping[original_id]

    def detect_frames(self, frames):
        batch_size = 20
        detections = []
        for i in range(0, len(frames), batch_size):
            detections_batch = self.model.predict(frames[i:i + batch_size], conf=0.05)
            detections += detections_batch
        return detections

    def get_obj_tracks(self, frames, read_from_stub=False, stub_path=None):
        if read_from_stub and stub_path is not None and os.path.exists(stub_path):
            with open(stub_path, 'rb') as f:
                tracks = pickle.load(f)
            return tracks

        detections = self.detect_frames(frames)

        tracks = {
            "team_1": [],
            "team_2": [],
            "goalkeepers": [],
            "referees": [],
            "ball": []
        }

        for frame_num, detection in enumerate(detections):
            cls_names = detection.names
            detection_supervision = sv.Detections.from_ultralytics(detection)

            detection_with_tracks = self.tracker.update_with_detections(detection_supervision)

            tracks["team_1"].append({})
            tracks["team_2"].append({})
            tracks["goalkeepers"].append({})
            tracks["referees"].append({})
            tracks["ball"].append({})

            ball_detected_in_frame = False

            for frame_detection in detection_with_tracks:
                bbox = frame_detection[0].tolist()
                cls_id = frame_detection[3]
                original_track_id = frame_detection[4]
                track_id = self.remap_track_id(original_track_id)

                class_name = cls_names[cls_id]

                if class_name == "team_1":
                    tracks["team_1"][frame_num][track_id] = {'bbpx': bbox}
                    x_center, y_center = get_center_of_bbox(bbox)
                    self.player_positions['team_1'][track_id].append((frame_num, x_center, y_center))
                elif class_name == "team_2":
                    tracks["team_2"][frame_num][track_id] = {'bbpx': bbox}
                    x_center, y_center = get_center_of_bbox(bbox)
                    self.player_positions['team_2'][track_id].append((frame_num, x_center, y_center))
                elif class_name == "Goalkeeper":
                    tracks["goalkeepers"][frame_num][track_id] = {'bbpx': bbox}
                elif class_name == "referee":
                    tracks["referees"][frame_num][track_id] = {'bbpx': bbox}

            ball_center = None
            if 'ball' in tracks and len(detection_supervision.xyxy) > 0:
                for i in range(len(detection_supervision.xyxy)):
                    bbox = detection_supervision.xyxy[i].tolist()
                    cls_id = int(detection_supervision.class_id[i])
                    class_name = cls_names[cls_id]

                    if class_name == 'ball':
                        x_center, y_center = get_center_of_bbox(bbox)
                        tracks['ball'][frame_num][1] = {'bbpx': bbox, 'center': (x_center, y_center)}
                        if not self.ball_detected:
                            self.ball_kf.x[:2] = [x_center, y_center]
                            self.ball_kf.x[2:] = [0., 0.]
                            self.ball_detected = True
                        else:
                            self.ball_kf.update(np.array([x_center, y_center]))
                        ball_detected_in_frame = True
                        self.missed_frames = 0
                        self.ball_positions.append((frame_num, x_center, y_center))
                        ball_center = (x_center, y_center)
                        break

            if not ball_detected_in_frame and self.ball_detected:
                self.missed_frames += 1
                if self.missed_frames <= self.max_missed_frames:
                    self.ball_kf.predict()
                    x_pred, y_pred = self.ball_kf.x[:2]
                    bbox = [
                        x_pred - self.ball_bbox_size,
                        y_pred - self.ball_bbox_size,
                        x_pred + self.ball_bbox_size,
                        y_pred + self.ball_bbox_size
                    ]
                    tracks['ball'][frame_num][1] = {'bbpx': bbox, 'center': (x_pred, y_pred), 'predicted': True}
                    self.ball_positions.append((frame_num, x_pred, y_pred))
                    ball_center = (x_pred, y_pred)
                else:
                    self.ball_detected = False
                    self.ball_kf = self.initialize_kalman_filter()
                    self.current_ball_holder = None
            elif not self.ball_detected:
                self.current_ball_holder = None

            if ball_center:
                closest_team, closest_player = self.find_closest_player(ball_center, frame_num)
                if closest_team:
                    self.total_possession_frames += 1
                    self.possession_counts[closest_team].append(1)
                    other_team = 'team_1' if closest_team == 'team_2' else 'team_2'
                    self.possession_counts[other_team].append(0)

                    previous_holder = self.current_ball_holder
                    self.current_ball_holder = (closest_team, closest_player)

                    # حساب التمريرات الصحيحة
                    if previous_holder is not None and previous_holder != self.current_ball_holder:
                        prev_team, prev_player = previous_holder
                        if prev_team == closest_team:
                            # تمريرة صحيحة
                            self.total_passes[closest_team] += 1
                    else:
                        pass  # لا تغيير

                else:
                    self.current_ball_holder = None
                    self.possession_counts['team_1'].append(0)
                    self.possession_counts['team_2'].append(0)
            else:
                self.current_ball_holder = None
                self.possession_counts['team_1'].append(0)
                self.possession_counts['team_2'].append(0)

        if stub_path is not None:
            with open(stub_path, 'wb') as f:
                pickle.dump(tracks, f)
        return tracks

    def find_closest_player(self, ball_center, frame_num):
        min_distance = float('inf')
        closest_team = None
        closest_player = None
        for team in ['team_1', 'team_2']:
            players = self.player_positions[team]
            for player_id, positions in players.items():
                position = next((pos for pos in positions if pos[0] == frame_num), None)
                if position:
                    x, y = position[1], position[2]
                    distance = np.hypot(ball_center[0] - x, ball_center[1] - y)
                    if distance < min_distance:
                        min_distance = distance
                        closest_team = team
                        closest_player = player_id
        return closest_team, closest_player

    def draw_ellipse(self, frame, bbox, color, track_id=None, predicted=False):
        y2 = int(bbox[3])
        x_center, _ = get_center_of_bbox(bbox)
        width = get_bbox_width(bbox)

        if predicted:
            line_type = cv2.LINE_AA
            thickness = 1
        else:
            line_type = cv2.LINE_4
            thickness = 2

        cv2.ellipse(
            frame,
            center=(int(x_center), int(y2)),
            axes=(int(width), int(0.35 * width)),
            angle=0.0,
            startAngle=45,
            endAngle=235,
            color=color,
            thickness=thickness,
            lineType=line_type
        )

        if track_id is not None:
            x1_text = int(x_center) - 10
            y1_text = int(y2 + 25)

            cv2.putText(
                frame,
                f"{track_id}",
                (x1_text, y1_text),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.4,
                color,
                1
            )

        return frame

    def draw_annotations(self, video_frames, tracks):
        output_video_frames = []
        num_frames = len(video_frames)
        for frame_num, frame in enumerate(video_frames):
            frame = frame.copy()

            team_1_dict = tracks["team_1"][frame_num]
            team_2_dict = tracks["team_2"][frame_num]
            goalkeepers_dict = tracks["goalkeepers"][frame_num]
            referees_dict = tracks["referees"][frame_num]
            ball_dict = tracks["ball"][frame_num]

            for track_id, player in team_1_dict.items():
                if 'bbpx' in player:
                    color = (0, 0, 200)
                    frame = self.draw_ellipse(frame, player["bbpx"], color, track_id)

            for track_id, player in team_2_dict.items():
                if 'bbpx' in player:
                    color = (200, 0, 0)
                    frame = self.draw_ellipse(frame, player["bbpx"], color, track_id)

            for track_id, goalkeeper in goalkeepers_dict.items():
                if 'bbpx' in goalkeeper:
                    color = (0, 200, 200)
                    frame = self.draw_ellipse(frame, goalkeeper["bbpx"], color, track_id)

            for track_id, referee in referees_dict.items():
                if 'bbpx' in referee:
                    color = (0, 200, 0)
                    frame = self.draw_ellipse(frame, referee["bbpx"], color, track_id)

            for _, ball in ball_dict.items():
                if 'bbpx' in ball:
                    predicted = ball.get('predicted', False)
                    if predicted:
                        color = (0, 165, 255)
                    else:
                        color = (200, 200, 200)
                    frame = self.draw_ellipse(frame, ball["bbpx"], color, predicted=predicted)

            team_1_possession = sum(self.possession_counts['team_1'][:frame_num+1])
            team_2_possession = sum(self.possession_counts['team_2'][:frame_num+1])
            total_possession = team_1_possession + team_2_possession
            if total_possession > 0:
                team_1_percentage = (team_1_possession / total_possession) * 100
                team_2_percentage = (team_2_possession / total_possession) * 100
            else:
                team_1_percentage = 0
                team_2_percentage = 0

            font_scale = 0.6
            thickness = 1

            cv2.putText(
                frame,
                f"Team 1 Possession: {team_1_percentage:.2f}%",
                (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX,
                font_scale,
                (0, 0, 200),
                thickness
            )
            cv2.putText(
                frame,
                f"Team 2 Possession: {team_2_percentage:.2f}%",
                (10, 60),
                cv2.FONT_HERSHEY_SIMPLEX,
                font_scale,
                (200, 0, 0),
                thickness
            )

            # عرض عدد التمريرات لكل فريق
            cv2.putText(
                frame,
                f"Team 1 Passes: {self.total_passes['team_1']}",
                (10, 90),
                cv2.FONT_HERSHEY_SIMPLEX,
                font_scale,
                (0, 0, 200),
                thickness
            )
            cv2.putText(
                frame,
                f"Team 2 Passes: {self.total_passes['team_2']}",
                (10, 120),
                cv2.FONT_HERSHEY_SIMPLEX,
                font_scale,
                (200, 0, 0),
                thickness
            )

            output_video_frames.append(frame)

        return output_video_frames

def main():
    vid_frames = read_vid(r'C:\Users\anwer\OneDrive\Desktop\Football\vid\WhatsApp Video 2024-10-20 at 5.36.29 PM.mp4')

    tracker = Tracker(r'C:\Users\anwer\OneDrive\Desktop\Football\Models\best.pt')
    tracks = tracker.get_obj_tracks(vid_frames, read_from_stub=False, stub_path=None)

    output_video_frames = tracker.draw_annotations(vid_frames, tracks)
    save_vid(output_video_frames, 'output_vid/output_vid.avi')

if __name__ == "__main__":
    main()



0: 384x640 1 ball, 1 referee, 13 team_1s, 11 team_2s, 39.0ms
1: 384x640 1 ball, 10 team_1s, 15 team_2s, 39.0ms
2: 384x640 1 referee, 11 team_1s, 13 team_2s, 39.0ms
3: 384x640 1 ball, 1 referee, 10 team_1s, 14 team_2s, 39.0ms
4: 384x640 1 referee, 10 team_1s, 11 team_2s, 39.0ms
5: 384x640 1 referee, 10 team_1s, 11 team_2s, 39.0ms
6: 384x640 2 Goalkeepers, 2 balls, 11 team_1s, 14 team_2s, 39.0ms
7: 384x640 1 Goalkeeper, 1 ball, 1 referee, 10 team_1s, 10 team_2s, 39.0ms
8: 384x640 1 Goalkeeper, 10 team_1s, 11 team_2s, 39.0ms
9: 384x640 2 Goalkeepers, 1 ball, 12 team_1s, 11 team_2s, 39.0ms
10: 384x640 1 Goalkeeper, 1 ball, 12 team_1s, 12 team_2s, 39.0ms
11: 384x640 7 Goalkeepers, 2 balls, 11 team_1s, 10 team_2s, 39.0ms
12: 384x640 1 ball, 12 team_1s, 16 team_2s, 39.0ms
13: 384x640 1 Goalkeeper, 2 balls, 1 referee, 10 team_1s, 9 team_2s, 39.0ms
14: 384x640 4 Goalkeepers, 3 balls, 9 team_1s, 8 team_2s, 39.0ms
15: 384x640 4 Goalkeepers, 3 balls, 1 referee, 12 team_1s, 9 team_2s, 39.0ms
16: 3