In [None]:
!pip install -q ultralytics supervision

In [None]:
from ultralytics import YOLO
import supervision as sv

In [None]:
SRC_VIDEO = "/kaggle/input/-dfl-bundesliga-460-mp4-videos-in-30sec-csv/DFL Bundesliga Data Shootout/train/A1606b0e6_0/A1606b0e6_0 (14).mp4"

In [None]:
model = YOLO("/kaggle/input/yolo8m_player_detection_model/pytorch/default/1/soccerNet-Player-Tracking/yolov8m_/weights/best.pt")

In [None]:
model2 = YOLO("/kaggle/input/trained-yolov5m-on-roboflow-annotated-data/best.pt")

In [None]:
BALL_ID = 0
GOALKEEPER_ID=1
PLAYER_ID = 2
REFEREE_ID = 3

In [None]:
elipse_annotator = sv.EllipseAnnotator(color=sv.ColorPalette.from_hex(["#cc0000","#0066ff","#000000"]))
class_label_annotator = sv.LabelAnnotator(color=sv.ColorPalette.from_hex(["#cc0000","#0066ff","#000000"]))
label_annotator  = sv.LabelAnnotator(color=sv.ColorPalette.from_hex(["#cc0000","#0066ff","#000000"]),text_position=sv.Position.BOTTOM_CENTER)
triangle_annotator = sv.TriangleAnnotator(
    color = sv.Color.from_hex('66ff33'),
    base=20,
    height=17
)

In [None]:
frame_generator = sv.get_video_frames_generator(SRC_VIDEO)
frame = next(frame_generator)
results = model2.predict(frame,conf=0.06)[0]

detections = sv.Detections.from_ultralytics(results)

tracker = sv.ByteTrack()
tracker.reset()

ball_detections = detections[detections.class_id==BALL_ID]
all_detections = detections[detections.class_id!=BALL_ID]
all_detections = tracker.update_with_detections(all_detections)

labels = [f"{tracker_id}" for tracker_id in all_detections.tracker_id]


annotated_frame= frame.copy()
annotated_frame = elipse_annotator.annotate(annotated_frame,all_detections)
annotated_frame = class_label_annotator.annotate(annotated_frame,all_detections)
annotated_frame = label_annotator.annotate(annotated_frame,all_detections,labels=labels)
annotated_frame = triangle_annotator.annotate(annotated_frame,ball_detections)

sv.plot_image(annotated_frame)

In [None]:
from tqdm import tqdm

In [None]:
def get_crops(source_video):
    frame_generator = sv.get_video_frames_generator(source_video,stride = 24)
    crops = []
    for frame in tqdm(frame_generator):
        res = model2.predict(frame,conf=0.1,verbose=False)[0]
        detections = sv.Detections.from_ultralytics(res)
        detections = detections[detections.class_id==PLAYER_ID]
        crops += [sv.crop_image(frame,xyxy) for xyxy in detections.xyxy]
    return crops

In [None]:
crops = get_crops(SRC_VIDEO)

In [None]:
len(crops)

In [None]:
sv.plot_images_grid(crops[:100],grid_size=(10,10))

In [None]:
import torch
from transformers import AutoProcessor, SiglipVisionModel

SIGLIP_MODEL_PATH = 'google/siglip-base-patch16-224'

DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
EMBEDDINGS_MODEL = SiglipVisionModel.from_pretrained(SIGLIP_MODEL_PATH).to(DEVICE)
EMBEDDINGS_PROCESSOR = AutoProcessor.from_pretrained(SIGLIP_MODEL_PATH)

In [None]:
from more_itertools import chunked
import numpy as np

In [None]:
def extract_features(crops):    
    crops = [sv.cv2_to_pillow(crop) for crop in crops]
    batches = chunked(crops,32)

    data = []
    with torch.no_grad():
        for batch in batches:
            inputs = EMBEDDINGS_PROCESSOR(images=batch,return_tensors="pt").to(DEVICE)
            out = EMBEDDINGS_MODEL(**inputs)
            embeddings = torch.mean(out.last_hidden_state,dim=1).cpu().numpy()
            data.append(embeddings)
    data = np.concatenate(data)
    return data

In [None]:
crops_encodings = extract_features(crops)

In [None]:
crops_encodings.shape

In [None]:
import umap
from sklearn.cluster import KMeans

In [None]:
def get_team_classifiers(crops):
    reducer = umap.UMAP(n_components=3)
    clustering_model = KMeans(n_clusters=2)
    data = extract_features(crops)
    projections = reducer.fit_transform(data)
    clustering_model.fit(projections)
    return reducer,clustering_model

In [None]:
REDUCER,CLUSTERING_MODEL = team_classifier(crops)

In [None]:
def predict(crops,reducer,clustering_model):
    if(len(crops)==0):
        return np.array([])
    data = extract_features(crops)
    projections = reducer.transform(data)
    return clustering_model.predict(projections)

In [None]:
predict(crops,REDUCER,CLUSTERING_MODEL)[:10]

In [None]:
from sklearn.neighbors import KNeighborsClassifier

In [None]:
def get_gk_team(players_detections,gk_detections):
    if(len(gk_detections)==0):
        return np.array([])
    gk_positions = gk_detections.get_anchors_coordinates(sv.Position.BOTTOM_CENTER)
    players_postions = players_detections.get_anchors_coordinates(sv.Position.BOTTOM_CENTER)
    
    kniegh = KNeighborsClassifier()
    kniegh.fit(players_postions,players_detections.class_id)
    gk_team_ids = kniegh.predict(gk_positions)  
    
    return np.array(gk_team_ids)
#     goalkeepers_team_ids = []
#     for goalkeeper in gk_positions:
#         dist1 = nplinalg.norm(goalkeeper-first_team_avg_postions)
#         dist2 = nplinalg.norm(goalkeeper-second_team_avg_postions) 
#         goalkeepers_team_ids.append(0 if dist1<dist2 else 1)
#     return np.array(goalkeepers_team_ids)

In [None]:
OUTPUT_VIDEO = "/kaggle/working/output.mp4"

video_info = sv.VideoInfo.from_video_path(SRC_VIDEO)
video_sink = sv.VideoSink(OUTPUT_VIDEO,video_info=video_info)

random_crops = get_crops(SRC_VIDEO)
REDUCER,CLUSTERING_MODEL = get_team_classifiers(random_crops)
frame_generator = sv.get_video_frames_generator(SRC_VIDEO)
tracker = sv.ByteTrack()
tracker.reset()

with video_sink:
    for frame in tqdm(frame_generator,total=video_info.total_frames):
        results = model2.predict(frame,conf=0.06)[0]
        detections = sv.Detections.from_ultralytics(results)

        ball_detections = detections[detections.class_id==BALL_ID]
        all_detections = detections[detections.class_id!=BALL_ID]
        all_detections = tracker.update_with_detections(all_detections)
        player_detections = all_detections[all_detections.class_id==PLAYER_ID]
        gk_detections = all_detections[all_detections.class_id==GOALKEEPER_ID]
        ref_detections = all_detections[all_detections.class_id==REFEREE_ID]

        player_crops = [sv.crop_image(frame,xyxy) for xyxy in player_detections.xyxy]
        player_detections.class_id = predict(player_crops,REDUCER,CLUSTERING_MODEL)
        gk_detections.class_id = get_gk_team(player_detections,gk_detections)
        ref_detections.class_id -=1

        match_detections = sv.Detections.merge([player_detections,gk_detections])
        match_detections.class_id = match_detections.class_id.astype(int)
        all_detections =sv.Detections.merge([player_detections,gk_detections,ref_detections])
        all_detections.class_id = all_detections.class_id.astype(int)
        labels = [f"{tracker_id},team:{team_id}" for tracker_id,team_id in zip(match_detections.tracker_id,match_detections.class_id)]


        annotated_frame= frame.copy()
        annotated_frame = elipse_annotator.annotate(annotated_frame,all_detections)
        annotated_frame = class_label_annotator.annotate(annotated_frame,all_detections)
        annotated_frame = label_annotator.annotate(annotated_frame,match_detections,labels=labels)
        annotated_frame = triangle_annotator.annotate(annotated_frame,ball_detections)

        video_sink.write_frame(annotated_frame)

In [None]:
random_crops = get_crops(SRC_VIDEO)
REDUCER,CLUSTERING_MODEL = get_team_classifiers(random_crops)

In [None]:
tracker = sv.ByteTrack()
tracker.reset()

def callbackx(frame:np.ndarray,index:int)->np.ndarray:
    results = model2.predict(frame,conf=0.06)[0]
    detections = sv.Detections.from_ultralytics(results)

    ball_detections = detections[detections.class_id==BALL_ID]
    all_detections = detections[detections.class_id!=BALL_ID]
    
    all_detections = tracker.update_with_detections(all_detections)
    
    player_detections = all_detections[all_detections.class_id==PLAYER_ID]
    gk_detections = all_detections[all_detections.class_id==GOALKEEPER_ID]
    ref_detections = all_detections[all_detections.class_id==REFEREE_ID]

    player_crops = [sv.crop_image(frame,xyxy) for xyxy in player_detections.xyxy]
    player_detections.class_id = predict(player_crops,REDUCER,CLUSTERING_MODEL)
    gk_detections.class_id = get_gk_team(player_detections,gk_detections)
    ref_detections.class_id -=1

    match_detections = sv.Detections.merge([player_detections,gk_detections])
    match_detections.class_id = match_detections.class_id.astype(int)
    all_detections =sv.Detections.merge([player_detections,gk_detections,ref_detections])
    all_detections.class_id = all_detections.class_id.astype(int)
    
    labels = [f"{tracker_id},team:{team_id}" for tracker_id,team_id in zip(match_detections.tracker_id,match_detections.class_id)]

    annotated_frame= frame.copy()
    annotated_frame = elipse_annotator.annotate(annotated_frame,all_detections)
    annotated_frame = class_label_annotator.annotate(annotated_frame,all_detections)
    annotated_frame = label_annotator.annotate(annotated_frame,match_detections,labels=labels)
    annotated_frame = triangle_annotator.annotate(annotated_frame,ball_detections)
    return annotated_frame

In [None]:
sv.process_video(source_path=SRC_VIDEO,target_path="/kaggle/working/output2.mp4",callback=callbackx)