In [None]:
import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as T
import numpy as np
import cv2
from scipy.optimize import linear_sum_assignment


ImportError: cannot import name 'letterbox' from 'ultralytics.utils.ops' (c:\Users\tahmi\Documents\Work\PlayerIden25_Liatai\CrossCamPlayer25\.venv\Lib\site-packages\ultralytics\utils\ops.py)

In [21]:

def letterbox(im, new_shape=(640, 640), color=(114, 114, 114), auto=False, scaleFill=False, stride=32):
    shape = im.shape[:2]  # current shape [height, width]
    if isinstance(new_shape, int):
        new_shape = (new_shape, new_shape)

    # Compute scale ratio (new / old)
    r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
    if not scaleFill:
        r = min(r, 1.0)

    # Compute padding
    ratio = r, r  # width, height ratios
    new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
    dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1]

    if auto:
        dw, dh = np.mod(dw, stride), np.mod(dh, stride)
    elif scaleFill:
        dw, dh = 0, 0

    dw /= 2  # divide padding into 2 sides
    dh /= 2

    # Resize and add border
    if shape[::-1] != new_unpad:
        im = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR)
    top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
    left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
    im = cv2.copyMakeBorder(im, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color)

    return im, ratio, (dw, dh)

In [2]:
MODEL = '../models/best.pt'
BROADCAST = '../videos/broadcast.mp4'
TACTICAM = '../videos/tacticam.mp4'

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [3]:
DEVICE

device(type='cpu')

In [4]:
import torch
model = torch.load(MODEL, weights_only=False)
print(model)

model_instance = model['model']
for name, param in model_instance.named_parameters():
    print(f"Parameter: {name}, Shape: {param.shape}")


{'epoch': -1, 'best_fitness': None, 'model': DetectionModel(
  (model): Sequential(
    (0): Conv(
      (conv): Conv2d(3, 80, kernel_size=(6, 6), stride=(2, 2), padding=(2, 2), bias=False)
      (bn): BatchNorm2d(80, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
      (act): SiLU(inplace=True)
    )
    (1): Conv(
      (conv): Conv2d(80, 160, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (bn): BatchNorm2d(160, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
      (act): SiLU(inplace=True)
    )
    (2): C3(
      (cv1): Conv(
        (conv): Conv2d(160, 80, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn): BatchNorm2d(80, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
        (act): SiLU(inplace=True)
      )
      (cv2): Conv(
        (conv): Conv2d(160, 80, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn): BatchNorm2d(80, eps=0.001, momentum=0.03, affine=True, track_running_stats=True

## 


In [5]:
class TemporalEncoding(nn.Module):
    def __init__(self, out_channels=128):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Linear(2, 64), 
            nn.ReLU(),
            nn.Linear(64, out_channels),
            nn.ReLU()
        )
        
    def forward(self, trajectories):
        # (batch_size, seq_len, 2)
        batch_size, seq_len, _ = trajectories.shape
        
        flat_trajs = trajectories.view(-1, 2)  # (batch_size * seq_len, 2)
        encoded = self.encoder(flat_trajs)
        
        encoded = encoded.view(batch_size, seq_len, -1)
        return torch.mean(encoded, dim=1) 

## Feat extraction model

In [6]:
class PlayerEmbedder(nn.Module):
    def __init__(self):
        super().__init__()
        # visual
        self.cnn = models.resnet18(pretrained=True)
        self.cnn.fc = nn.Identity()
        
        # motion
        self.temporal_encoder = TemporalEncoding(out_channels=128)
        
        # feat projector
        self.projector = nn.Sequential(
            nn.Linear(512 + 128, 256),
            nn.ReLU(),
            nn.Linear(256, 128)
        )

    def forward(self, crops, trajectories):
        # vis feats
        vis_features = self.cnn(crops)
        
        # motion feats
        motion_features = self.temporal_encoder(trajectories)
        
        # concat + project
        combined = torch.cat((vis_features, motion_features), dim=1)
        return self.projector(combined)

In [7]:
class PlayerMatcher(nn.Module):
    def __init__(self, embedder):
        super().__init__()
        self.embedder = embedder
        self.similarity = nn.CosineSimilarity(dim=2)

    def forward(self, broadcast_inputs, tacticam_inputs):
        broadcast_emb = self.embedder(*broadcast_inputs)
        tacticam_emb = self.embedder(*tacticam_inputs)
        return self.similarity(
            broadcast_emb.unsqueeze(1),
            tacticam_emb.unsqueeze(0)
        )


## Mapping

In [8]:
def preprocess_frame(frame):
        transform = T.Compose([
            T.ToTensor(),
        ])
        return transform(frame).unsqueeze(0)

In [25]:

class PlayerMapper:
    def __init__(self, device='cpu', detector=None, img_size=640, use_half=True):
        self.device = device
        self.detector = detector.to(device)
        self.img_size = img_size
        self.half = use_half and torch.cuda.is_available()

        # Set model precision
        if self.half:
            self.detector.half()
        else:
            self.detector.float()

        self.embedder = PlayerEmbedder().to(device)
        self.matcher = PlayerMatcher(self.embedder).to(device)

        self.buffer = {
            'broadcast': {'crops': [], 'positions': []},
            'tacticam': {'crops': [], 'positions': []}
        }
        self.trajectories = {'broadcast': {}, 'tacticam': {}}
        self.next_id = {'broadcast': 0, 'tacticam': 0}
        self.frame_count = 0

    def process_frame(self, in_frame, view):
        # Letterbox resize & pad
        img, ratio, pad = letterbox(in_frame, new_shape=self.img_size)
        img = img[:, :, ::-1].transpose(2, 0, 1)
        img = np.ascontiguousarray(img)

        tensor = torch.from_numpy(img).to(self.device)
        tensor = tensor.float() / 255.0
        if self.half:
            tensor = tensor.half()
        tensor = tensor.unsqueeze(0)

        # Inference
        preds = self.detector(tensor)
        # preds[0] may be a Results object or raw tensor
        output = preds[0]
        if hasattr(output, 'boxes'):
            dets = output.boxes.xyxy.cpu().numpy()
            classes = output.boxes.cls.cpu().numpy()
        else:
            arr = output.cpu().numpy()
            dets = arr[:, :4]
            classes = arr[:, 5]

        # Filter class 0 (players)
        players = dets[classes == 0]

        frame_positions = []
        frame_crops = []
        for x1, y1, x2, y2 in players.astype(int):
            cx, cy = (x1 + x2) / 2, (y1 + y2) / 2
            crop = in_frame[y1:y2, x1:x2]
            frame_crops.append(crop)
            frame_positions.append((cx, cy))

        self.buffer[view]['crops'].append(frame_crops)
        self.buffer[view]['positions'].append(frame_positions)
        self.update_trajectories(view, frame_positions)

        return frame_crops, frame_positions

    def update_trajectories(self, view, current_positions):
        if not self.trajectories[view]:
            for pos in current_positions:
                self.trajectories[view][self.next_id[view]] = [pos]
                self.next_id[view] += 1
            return

        traj_ids = list(self.trajectories[view].keys())
        cost = np.zeros((len(traj_ids), len(current_positions)))
        for i, tid in enumerate(traj_ids):
            last = self.trajectories[view][tid][-1]
            for j, pos in enumerate(current_positions):
                cost[i, j] = np.linalg.norm(np.array(last) - np.array(pos))

        rows, cols = linear_sum_assignment(cost)
        for i, j in zip(rows, cols):
            tid = traj_ids[i]
            self.trajectories[view][tid].append(current_positions[j])
            if len(self.trajectories[view][tid]) > 5:
                self.trajectories[view][tid].pop(0)

        matched = set(cols)
        for j, pos in enumerate(current_positions):
            if j not in matched:
                self.trajectories[view][self.next_id[view]] = [pos]
                self.next_id[view] += 1

    def get_trajectories(self, view):
        return list(self.trajectories[view].values())

    def preprocess_crops(self, crops):
        tensors = []
        for crop in crops:
            if crop.size == 0:
                c = np.zeros((256,128,3), np.uint8)
            else:
                c = cv2.resize(crop, (128,256))
            t = torch.tensor(c).permute(2,0,1).float()/255.0
            tensors.append(t)
        return torch.stack(tensors)

    def prepare_trajectories(self, trajectories):
        seqs = []
        for traj in trajectories:
            if len(traj) < 5:
                padded = [traj[0]]*(5-len(traj)) + traj
            else:
                padded = traj[-5:]
            seqs.append(padded)
        return torch.tensor(seqs, dtype=torch.float32)

    def match_players(self):
        b_trajs = self.get_trajectories('broadcast')
        t_trajs = self.get_trajectories('tacticam')
        b_crops = self.buffer['broadcast']['crops'][-1]
        t_crops = self.buffer['tacticam']['crops'][-1]

        b_inputs = (self.preprocess_crops(b_crops).to(self.device),
                    self.prepare_trajectories(b_trajs).to(self.device))
        t_inputs = (self.preprocess_crops(t_crops).to(self.device),
                    self.prepare_trajectories(t_trajs).to(self.device))

        with torch.no_grad():
            sim = self.matcher(b_inputs, t_inputs)
        cost = 1 - sim.cpu().numpy()
        rows, cols = linear_sum_assignment(cost)

        b_ids = list(self.trajectories['broadcast'].keys())
        t_ids = list(self.trajectories['tacticam'].keys())
        return {b_ids[r]: t_ids[c] for r,c in zip(rows,cols)}

    def process_frames(self, broadcast_frame, tacticam_frame):
        self.process_frame(broadcast_frame, 'broadcast')
        self.process_frame(tacticam_frame, 'tacticam')
        self.frame_count += 1
        if self.frame_count % 5 == 0:
            return self.match_players()
        return None


In [22]:
class PlayerMapper:
    def __init__(self, device='cpu', detector=None, img_size=640, use_half=True):
        self.device = device
        self.detector = detector
        self.img_size = img_size
        self.half = use_half and torch.cuda.is_available()

        self.detector = self.detector.to(self.device)
        if self.half:
            self.detector.half()
        else:
            self.detector.float()

        self.embedder = PlayerEmbedder().to(device)
        self.matcher = PlayerMatcher(self.embedder).to(device)

        self.buffer = {
            'broadcast': {'crops': [], 'positions': []},
            'tacticam': {'crops': [], 'positions': []}
        }
        self.trajectories = {'broadcast': {}, 'tacticam': {}}
        self.next_id = {'broadcast': 0, 'tacticam': 0}
        self.frame_count = 0

    def process_frame(self, in_frame, view):
        # Letterbox resize + pad to ensure consistent shape
        img, ratio, pad = letterbox(in_frame, new_shape=self.img_size)
        img = img[:, :, ::-1].transpose(2, 0, 1)  # BGR->RGB, HWC->CHW
        img = np.ascontiguousarray(img)

        tensor = torch.from_numpy(img).to(self.device)
        tensor = tensor.float() / 255.0
        if self.half:
            tensor = tensor.half()
        tensor = tensor.unsqueeze(0)  # add batch dimension

        # Inference
        results = self.detector(tensor)[0]
        detections = results.boxes.xyxy.cpu().numpy()
        classes = results.boxes.cls.cpu().numpy()

        # Filter only player class (assumed cls==0)
        players = detections[classes == 0]

        frame_positions = []
        frame_crops = []
        for x1, y1, x2, y2 in players.astype(int):
            # map back to original coordinates if needed (using ratio/pad)
            cx, cy = (x1 + x2) / 2, (y1 + y2) / 2
            crop = in_frame[y1:y2, x1:x2]
            frame_crops.append(crop)
            frame_positions.append((cx, cy))

        self.buffer[view]['crops'].append(frame_crops)
        self.buffer[view]['positions'].append(frame_positions)
        self.update_trajectories(view, frame_positions)

        return frame_crops, frame_positions

    # (update_trajectories, get_trajectories, preprocess_crops, prepare_trajectories, match_players remain unchanged)

    def process_frames(self, broadcast_frame, tacticam_frame):
        self.process_frame(broadcast_frame, 'broadcast')
        self.process_frame(tacticam_frame, 'tacticam')
        self.frame_count += 1
        if self.frame_count % 5 == 0:
            return self.match_players()
        return None

    
    def update_trajectories(self, view, current_positions):
        # init with new ids for each
        if not self.trajectories[view]:
            for pos in current_positions:
                self.trajectories[view][self.next_id[view]] = [pos]
                self.next_id[view] += 1
            return
        
        # matching positions == trajectories
        cost_matrix = np.zeros((len(self.trajectories[view]), len(current_positions)))
        traj_ids = list(self.trajectories[view].keys())
        
        for i, traj_id in enumerate(traj_ids):
            last_pos = self.trajectories[view][traj_id][-1]
            for j, pos in enumerate(current_positions):
                cost_matrix[i, j] = np.linalg.norm(np.array(last_pos) - np.array(pos))
        
        # Algo_1
        row_ind, col_ind = linear_sum_assignment(cost_matrix)
        
        # upd traj
        for i, j in zip(row_ind, col_ind):
            traj_id = traj_ids[i]
            self.trajectories[view][traj_id].append(current_positions[j])
            if len(self.trajectories[view][traj_id]) > 5:
                self.trajectories[view][traj_id].pop(0)
        
        # new traj
        matched_indices = set(col_ind)
        for j, pos in enumerate(current_positions):
            if j not in matched_indices:
                self.trajectories[view][self.next_id[view]] = [pos]
                self.next_id[view] += 1
    
    def get_trajectories(self, view):
        return [traj for traj in self.trajectories[view].values()]
    
    def match_players(self):
        broadcast_trajs = self.get_trajectories('broadcast')
        tacticam_trajs = self.get_trajectories('tacticam')
        
        broadcast_crops = self.buffer['broadcast']['crops'][-1]
        tacticam_crops = self.buffer['tacticam']['crops'][-1]
        
        broadcast_inputs = (
            self.preprocess_crops(broadcast_crops),
            self.prepare_trajectories(broadcast_trajs)
        )
        tacticam_inputs = (
            self.preprocess_crops(tacticam_crops),
            self.prepare_trajectories(tacticam_trajs)
        )
        
        broadcast_inputs = (
            broadcast_inputs[0].to(self.device),
            broadcast_inputs[1].to(self.device)
        )
        tacticam_inputs = (
            tacticam_inputs[0].to(self.device),
            tacticam_inputs[1].to(self.device)
        )
        
        with torch.no_grad():
            similarity = self.matcher(broadcast_inputs, tacticam_inputs)
        
        cost_matrix = 1 - similarity.cpu().numpy()
        row_ind, col_ind = linear_sum_assignment(cost_matrix)
        
        # mapping: {broadcast_id: tacticam_id}
        broadcast_ids = list(self.trajectories['broadcast'].keys())
        tacticam_ids = list(self.trajectories['tacticam'].keys())
        return {broadcast_ids[row]: tacticam_ids[col] for row, col in zip(row_ind, col_ind)}
    
    def preprocess_crops(self, crops):
        transformed = []
        for crop in crops:
            if crop.size == 0:
                crop = np.zeros((256, 128, 3), dtype=np.uint8)
            else:
                crop = cv2.resize(crop, (128, 256))
            crop = torch.tensor(crop).permute(2, 0, 1).float() / 255.0
            transformed.append(crop)
        return torch.stack(transformed)
    
    def prepare_trajectories(self, trajectories):
        padded = []
        for traj in trajectories:
            if len(traj) < 5:
                padded_traj = [traj[0]] * (5 - len(traj)) + traj
            else:
                padded_traj = traj[-5:]
            padded.append(padded_traj)
        return torch.tensor(padded, dtype=torch.float32)
    
    def process_frames(self, broadcast_frame, tacticam_frame):
        self.process_frame(broadcast_frame, 'broadcast')
        self.process_frame(tacticam_frame, 'tacticam')
        self.frame_count += 1
        
        if self.frame_count % 5 == 0:
            return self.match_players()
        return None


In [26]:
def solve():
    mapper = PlayerMapper(device=DEVICE, detector=model_instance)
    
    broadcast_cap = cv2.VideoCapture(BROADCAST)
    tacticam_cap = cv2.VideoCapture(TACTICAM)
    
    frame_count = 0
    while True:
        ret_b, b_frame = broadcast_cap.read()
        ret_t, t_frame = tacticam_cap.read()
        
        if not ret_b or not ret_t:
            break
        
        mapping = mapper.process_frames(b_frame, t_frame)
        if mapping is not None:
            print(f"Frame {frame_count}: Player Mapping")
            for b_id, t_id in mapping.items():
                print(f"  Broadcast {b_id} → Tacticam {t_id}")
        
        frame_count += 1
    
    broadcast_cap.release()
    tacticam_cap.release()

In [27]:
solve()

IndexError: boolean index did not match indexed array along axis 1; size of axis is 4 but size of corresponding boolean axis is 8400