In [None]:
!pip install ultralytics opencv-python numpy

In [2]:
import cv2
import numpy as np
from ultralytics import YOLO
import os

In [3]:
# from google.colab import files
# from google.colab import drive
# drive.mount('/content/drive')

In [4]:
# cd "/content/drive/MyDrive/UPenn_CIS5810_Project/FinalProject/IterationV1"

In [5]:
TARGET_PREF = 'auto' # auto, referee, teama, teamb
ENABLE_SAFE_MARGIN = False

In [6]:
MODEL_PATH = "../../models/v6/best.pt"  # fine-tuned weights
VIDEO_IN   = "../../videos/Q4_side_510-540.mp4"  # input video
VIDEO_OUT  = "../../results/demo_v1.mp4"         # output video

assert os.path.exists(MODEL_PATH), f"The weight does not exist. {MODEL_PATH}"
assert os.path.exists(VIDEO_IN),   f"The video does not exist. {VIDEO_IN}"

model = YOLO(MODEL_PATH)
names = (model.model.names if hasattr(model, "models") else model.names)
print("Classes:", names)

# 0 teamA, 1 teamB, 2 ball, 3 referee
PLAYER_IDS = {0, 1}
TEAM_A_ID  = 0
TEAM_B_ID  = 1
BALL_ID = 2
REFEREE_ID = 3
print(f"PLAYER_IDS={PLAYER_IDS}, BALL_ID={BALL_ID}, REFEREE_ID={REFEREE_ID}")

CONF_THRES = 0.20
IMGSZ = 1536
BASE_ZOOM = 1.40
MIN_ZOOM = 1.30
MAX_ZOOM = 3.5
CLOSEUP_PLAYER_FRAC = 0.3
CENTER_WEIGHT_PLAYER = 0.70
BALL_SAFE_PAD_X = 80
BALL_SAFE_PAD_Y = 80

HOLD_MAX_FRAMES = 40   # max frames to hold last ball-handler
prev_holder_id = None  # last ball-handler id
prev_center = None  # last camera center
prev_player_bb = None  # last selected bbox
no_ball_streak = 0     # consecutive no-ball counter
CAM_LIMIT_MARGIN_X = 400
CAM_LIMIT_MARGIN_Y = 900

cap = cv2.VideoCapture(VIDEO_IN)
assert cap.isOpened(), f"Cannot open the video{VIDEO_IN}"
fps = cap.get(cv2.CAP_PROP_FPS) or 30
W   = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
H   = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
out = cv2.VideoWriter(VIDEO_OUT, cv2.VideoWriter_fourcc(*"mp4v"), fps, (W, H))

print(f"Video: {W}x{H} @ {fps:.2f} fps")

Classes: {0: 'teamA', 1: 'teamB', 2: 'ball', 3: 'referee'}
PLAYER_IDS={0, 1}, BALL_ID=2, REFEREE_ID=3
Video: 3840x2160 @ 29.97 fps


In [7]:
def _center_of_box(bb):
    x1, y1, x2, y2 = bb
    return ((x1 + x2) / 2.0, (y1 + y2) / 2.0)

def choose_target_ball_nearest(tracks, debug=False):
    balls   = [(bb, cf) for (_, bb, cls, cf) in tracks if cls == BALL_ID]
    players = [(tid, bb) for (tid, bb, cls, _) in tracks if cls in PLAYER_IDS]

    if balls and players:
        ball_bb, _ = max(balls, key=lambda x: x[1])
        bcx, bcy   = _center_of_box(ball_bb)

        tid, pbb = min(
            players,
            key=lambda x: (
                (_center_of_box(x[1])[0] - bcx) ** 2 +
                (_center_of_box(x[1])[1] - bcy) ** 2
            )
        )
        pcx, pcy = _center_of_box(pbb)
        if debug:
            print(f"[choose] ball present -> nearest player tid={tid}, ball=({bcx:.1f},{bcy:.1f})")
        return np.array([pcx, pcy], dtype=float), tid, pbb, np.array([bcx, bcy], dtype=float)

    if players:
        p_centers = np.array([_center_of_box(bb) for (_, bb) in players], dtype=float)
        centroid  = p_centers.mean(axis=0)  # (cx_mean, cy_mean)

        def dist2_to_centroid(bb):
            cx, cy = _center_of_box(bb)
            return (cx - centroid[0]) ** 2 + (cy - centroid[1]) ** 2

        tid, pbb = min(players, key=lambda x: dist2_to_centroid(x[1]))
        pcx, pcy = _center_of_box(pbb)
        if debug:
            print(f"[choose] no ball -> centroid={tuple(np.round(centroid,1))}, pick tid={tid}")
        return np.array([pcx, pcy], dtype=float), tid, pbb, None

    if debug:
        print("[choose] no players -> return None")
    return None, None, None, None

def _ball_center_from_tracks(tracks):
    balls = [(bb, cf) for (_, bb, cls, cf) in tracks if cls == BALL_ID]
    if not balls:
        return None
    ball_bb, _ = max(balls, key=lambda x: x[1])
    return np.array(_center_of_box(ball_bb), dtype=float)

def _nearest_to_point(items_tid_bb, point_xy):
    """items_tid_bb: [(tid, bb), ...] ；point_xy: (x,y)"""
    tid, bb = min(
        items_tid_bb,
        key=lambda x: ( _center_of_box(x[1])[0]-point_xy[0] )**2 + ( _center_of_box(x[1])[1]-point_xy[1] )**2
    )
    return tid, bb

def _centroid_of_bboxes(items_tid_bb):
    if not items_tid_bb:
        return None
    centers = np.array([_center_of_box(bb) for (_, bb) in items_tid_bb], dtype=float)
    return centers.mean(axis=0)

def choose_target_by_pref(tracks, pref='auto', debug=False):
    pref = (pref or 'auto').lower()
    ball_center = _ball_center_from_tracks(tracks)

    if pref == 'referee':
        refs = [(tid, bb) for (tid, bb, cls, _) in tracks if cls == REFEREE_ID]
        if refs:
            if ball_center is not None:
                tid, bb = _nearest_to_point(refs, ball_center)
                cx, cy = _center_of_box(bb)
                if debug:
                    print(f"[pref=referee] nearest-to-ball ref tid={tid}")
                return np.array([cx, cy], dtype=float), tid, bb, ball_center
            else:
                centroid = _centroid_of_bboxes(refs)
                tid, bb = _nearest_to_point(refs, centroid)
                cx, cy = _center_of_box(bb)
                if debug:
                    print(f"[pref=referee] no ball -> centroid pick ref tid={tid}")
                return np.array([cx, cy], dtype=float), tid, bb, None
        if debug:
            print("[pref=referee] no referee -> fallback auto")
        return choose_target_ball_nearest(tracks, debug=debug)

    if pref in ('teama', 'teamb'):
        team_cls = 0 if pref == 'teama' else 1
        team_players = [(tid, bb) for (tid, bb, cls, _) in tracks if cls == team_cls]
        if team_players:
            centroid = _centroid_of_bboxes(team_players)
            tid, bb = _nearest_to_point(team_players, centroid)
            if debug:
                print(f"[pref={pref}] centroid center, zoom ref tid={tid}")
            return centroid.astype(float), tid, bb, ball_center
        if debug:
            print(f"[pref={pref}] no team players -> fallback auto")
        return choose_target_ball_nearest(tracks, debug=debug)

    return choose_target_ball_nearest(tracks, debug=debug)


def compute_zoom_target(player_bb, ball_center, center_xy, W, H,
                        closeup_frac=CLOSEUP_PLAYER_FRAC,
                        min_zoom=MIN_ZOOM, max_zoom=MAX_ZOOM,
                        pad_x=BALL_SAFE_PAD_X,
                        pad_y=BALL_SAFE_PAD_Y):
    if player_bb is None:
        return BASE_ZOOM

    x1, y1, x2, y2 = player_bb
    bbox_h = max(1.0, (y2 - y1))
    zoom_from_bbox = float(np.clip((closeup_frac * H) / bbox_h, min_zoom, max_zoom))

    if ball_center is not None and center_xy is not None:
        cx, cy = float(center_xy[0]), float(center_xy[1])
        dx = abs(ball_center[0] - cx)
        dy = abs(ball_center[1] - cy)

        need_w = 2*dx + 2*pad_x

        need_h = 2*dy + 2*pad_y

        zoom_bound_w = W / need_w if need_w > 0 else max_zoom
        zoom_bound_h = H / need_h if need_h > 0 else max_zoom
        zoom_from_bbox = min(zoom_from_bbox, zoom_bound_w, zoom_bound_h)

    return float(np.clip(zoom_from_bbox, min_zoom, max_zoom))

def crop_like_ptz(frame, center_xy, zoom,
                  limit_x=CAM_LIMIT_MARGIN_X,
                  limit_y=CAM_LIMIT_MARGIN_Y,
                  use_limit=ENABLE_SAFE_MARGIN):
    h, w = frame.shape[:2]
    zoom = max(1.0, float(zoom))

    view_w = int(w / zoom)
    view_h = int(h / zoom)

    cx, cy = float(center_xy[0]), float(center_xy[1])

    raw_x1 = cx - view_w / 2
    raw_y1 = cy - view_h / 2

    if use_limit:
        min_x = limit_x
        max_x = w - limit_x - view_w

        if max_x < min_x:
            valid_center_x = w / 2
            x1 = int(valid_center_x - view_w / 2)
        else:
            x1 = int(np.clip(raw_x1, min_x, max_x))

        min_y = limit_y
        max_y = h - limit_y - view_h

        if max_y < min_y:
            valid_center_y = h / 2
            y1 = int(valid_center_y - view_h / 2)
        else:
            y1 = int(np.clip(raw_y1, min_y, max_y))

    else:
        x1 = int(raw_x1)
        y1 = int(raw_y1)

    x1 = int(np.clip(x1, 0, w - view_w))
    y1 = int(np.clip(y1, 0, h - view_h))

    roi = frame[y1:y1+view_h, x1:x1+view_w]
    return cv2.resize(roi, (w, h))

def _find_bbox_by_id(tracks, tid):
    for (id_, bb, cls, _) in tracks:
        if id_ == tid:
            return bb
    return None

In [8]:
class SmoothPTZ:
    """
    center: 2D critically-damped spring + speed/acc limits
    zoom  : EMA
    """
    def __init__(self, W, H, fps,
                 k_center=10.0, # spring stiffness
                 zeta=0.90, # damping ratio
                 max_speed_px=1200.0, # pixels/sec
                 max_accel_px=4000.0, # pixels/sec^2
                 deadband_px=4.0, # deadband
                 zoom_tau_s=0.25, # time constant
                 zoom_min=1.0, zoom_max=3.5):
        self.W, self.H = W, H
        self.dt = 1.0 / max(1.0, float(fps))
        # center state
        self.c = np.array([W/2.0, H/2.0], dtype=float)  # position
        self.v = np.zeros(2, dtype=float) # velocity
        self.k = float(k_center)
        self.zeta = float(zeta)
        self.max_speed = float(max_speed_px)
        self.max_accel = float(max_accel_px)
        self.deadband = float(deadband_px)
        # zoom state
        self.z = 1.0
        self.zoom_min, self.zoom_max = float(zoom_min), float(zoom_max)
        # alpha = 1 - exp(-dt/tau)
        self.zoom_alpha = 1.0 - np.exp(-self.dt / float(zoom_tau_s))

    def step(self, target_center, target_zoom):
        # --- center: critically damped spring with clamps ---
        e = np.asarray(target_center, dtype=float) - self.c
        if np.hypot(e[0], e[1]) < self.deadband:
            e[:] = 0.0
        # a = k*e - 2*zeta*sqrt(k)*v
        a = self.k * e - 2.0 * self.zeta * np.sqrt(self.k) * self.v
        a_norm = np.hypot(a[0], a[1])
        if a_norm > self.max_accel:
            a *= (self.max_accel / a_norm)
        self.v += a * self.dt
        v_norm = np.hypot(self.v[0], self.v[1])
        if v_norm > self.max_speed:
            self.v *= (self.max_speed / v_norm)
        self.c += self.v * self.dt
        self.c[0] = float(np.clip(self.c[0], 0, self.W))
        self.c[1] = float(np.clip(self.c[1], 0, self.H))

        # --- zoom: EMA ---
        tz = float(np.clip(target_zoom, self.zoom_min, self.zoom_max))
        self.z += self.zoom_alpha * (tz - self.z)
        self.z = float(np.clip(self.z, self.zoom_min, self.zoom_max))

        return self.c.copy(), self.z

In [9]:
smoother = SmoothPTZ(
    W=W, H=H, fps=fps,
    k_center=10.0,
    zeta=0.90,
    max_speed_px=1200.0,
    max_accel_px=4000.0,
    deadband_px=4.0,
    zoom_tau_s=0.25,
    zoom_min=MIN_ZOOM, zoom_max=MAX_ZOOM
)

In [10]:
frame_idx = 0
while True:
    ok, frame = cap.read()
    if not ok:
        print("Video ended or read failed.")
        break

    # YOLO ByteTrack
    res = model.track(
        source=frame,
        conf=CONF_THRES,
        imgsz=IMGSZ,
        persist=True,
        verbose=False
    )[0]

    # tracks
    tracks = []
    boxes = res.boxes
    if boxes is not None and len(boxes) > 0:
        xyxy = boxes.xyxy.cpu().numpy()
        cls  = boxes.cls.cpu().numpy().astype(int)
        conf = boxes.conf.cpu().numpy()
        if boxes.id is not None:
            ids = boxes.id.cpu().numpy().astype(int)
        else:
            ids = np.arange(len(xyxy))
        for i in range(len(xyxy)):
            tracks.append((int(ids[i]), xyxy[i], int(cls[i]), float(conf[i])))

    uniq_cls = np.unique([c for (_,_,c,_) in tracks]) if tracks else []
    n_players = sum(1 for (_,_,c,_) in tracks if c in PLAYER_IDS)
    n_balls   = sum(1 for (_,_,c,_) in tracks if c == BALL_ID)

    player_center, chosen_id, player_bb, ball_center = choose_target_by_pref(
        tracks, pref=TARGET_PREF, debug=True
    )

    balls_present = (ball_center is not None)
    if balls_present:
        prev_holder_id = chosen_id
        prev_player_bb = player_bb
        no_ball_streak = 0
    else:
        no_ball_streak += 1
        held_bb = None
        if prev_holder_id is not None:
            held_bb = _find_bbox_by_id(tracks, prev_holder_id)
        if held_bb is not None:
            player_bb = held_bb
            cx, cy = _center_of_box(player_bb)
            player_center = np.array([cx, cy], dtype=float)
            chosen_id = prev_holder_id
        elif prev_center is not None and no_ball_streak <= HOLD_MAX_FRAMES:
            player_center = None  # prev_center
        else:
            pass

    if player_center is not None:
        if ball_center is not None:
            center_target = CENTER_WEIGHT_PLAYER * player_center + (1.0 - CENTER_WEIGHT_PLAYER) * ball_center
        else:
            center_target = player_center
    else:
        if (not balls_present) and (prev_center is not None) and (no_ball_streak <= HOLD_MAX_FRAMES):
            center_target = prev_center.copy()
        else:
            center_target = np.array([W/2.0, H/2.0], dtype=float)
            player_bb = None

    desired_zoom = compute_zoom_target(
        player_bb,
        ball_center if balls_present else None,
        center_target,
        W, H,
        closeup_frac=CLOSEUP_PLAYER_FRAC,
        min_zoom=MIN_ZOOM, max_zoom=MAX_ZOOM,
        pad_x=BALL_SAFE_PAD_X,
        pad_y=BALL_SAFE_PAD_Y
    )

    smooth_center, smooth_zoom = smoother.step(center_target, desired_zoom)

    if frame_idx < 5 or frame_idx % 10 == 0:
        pb_h = 0 if player_bb is None else (player_bb[3]-player_bb[1])
        pb_w = 0 if player_bb is None else (player_bb[2]-player_bb[0])
        print(
            f"[{frame_idx:04d}] uniq_cls={uniq_cls} players={n_players} balls={n_balls} "
            f"chosen_id={chosen_id} pb_size=({pb_w:.0f}x{pb_h:.0f}) "
            f"target=({center_target[0]:.1f},{center_target[1]:.1f}) "
            f"zoom_tgt={desired_zoom:.2f} "
            f"smooth=({smooth_center[0]:.1f},{smooth_center[1]:.1f}) "
            f"z={smooth_zoom:.2f}"
        )

    if boxes is not None and len(boxes) > 0:
        for i in range(len(boxes)):
            x1,y1,x2,y2 = map(int, boxes.xyxy[i].cpu().numpy())
            c = int(boxes.cls[i].cpu().numpy())
            if c == BALL_ID:
                cv2.rectangle(frame, (x1,y1), (x2,y2), (0,0,255), 2)
        if chosen_id is not None and boxes.id is not None:
            ids_arr = boxes.id.cpu().numpy().astype(int)
            for i, tid in enumerate(ids_arr):
                if tid == chosen_id:
                    x1,y1,x2,y2 = map(int, boxes.xyxy[i].cpu().numpy())
                    cv2.rectangle(frame, (x1,y1), (x2,y2), (0,255,0), 2)
                    break
        elif player_bb is not None:
            x1,y1,x2,y2 = map(int, player_bb)
            cv2.rectangle(frame, (x1,y1), (x2,y2), (0,255,0), 2)

    cv2.circle(frame, (int(smooth_center[0]), int(smooth_center[1])), 5, (0,255,255), -1)

    view = crop_like_ptz(
        frame,
        smooth_center,
        smooth_zoom,
        limit_x=CAM_LIMIT_MARGIN_X,
        limit_y=CAM_LIMIT_MARGIN_Y
    )
    out.write(view)
    prev_center = smooth_center.copy()
    prev_player_bb = player_bb
    frame_idx += 1

print("Done frames:", frame_idx)
cap.release()
out.release()
print("Saved:", os.path.abspath(VIDEO_OUT))



[choose] ball present -> nearest player tid=3, ball=(2502.4,1065.6)
[0000] uniq_cls=[0 1 2 3] players=11 balls=1 chosen_id=3 pb_size=(68x148) target=(2521.2,1063.3) zoom_tgt=3.50 smooth=(1924.5,1079.9) z=1.31
[choose] no ball -> centroid=(np.float64(1956.7), np.float64(1016.8)), pick tid=10
[0001] uniq_cls=[0 1 3] players=11 balls=0 chosen_id=3 pb_size=(56x148) target=(2529.2,1061.1) zoom_tgt=3.50 smooth=(1933.4,1079.6) z=1.59
[choose] no ball -> centroid=(np.float64(1954.8), np.float64(1016.7)), pick tid=10
[0002] uniq_cls=[0 1 3] players=11 balls=0 chosen_id=3 pb_size=(56x149) target=(2527.3,1060.3) zoom_tgt=3.50 smooth=(1946.7,1079.2) z=1.82
[choose] no ball -> centroid=(np.float64(1952.4), np.float64(1016.5)), pick tid=10
[0003] uniq_cls=[0 1 3] players=11 balls=0 chosen_id=3 pb_size=(63x148) target=(2522.7,1059.8) zoom_tgt=3.50 smooth=(1963.9,1078.6) z=2.03
[choose] no ball -> centroid=(np.float64(1949.2), np.float64(1016.5)), pick tid=10
[0004] uniq_cls=[0 1 3] players=11 balls=0