In [5]:
import cv2
import numpy as np
import pygame
from scipy.optimize import linear_sum_assignment
import random
from collections import deque
import math

In [26]:
import math
import random
from collections import deque

import cv2
import numpy as np
import pygame
from scipy.optimize import linear_sum_assignment


def bbox_center(bbox):
    x, y, w, h = bbox
    return x + w / 2.0, y + h / 2.0


class Track:
    def __init__(self, track_id, bbox, history_len=30):
        self.id = track_id
        self.bbox = bbox
        self.centroid = bbox_center(bbox)
        self.missed = 0
        self.color = (
            random.randint(64, 255),
            random.randint(64, 255),
            random.randint(64, 255),
        )
        self.history = deque(maxlen=history_len)
        self.history.append(self.centroid)


class MultiObjectTracker:
    def __init__(self, max_missed=10, max_dist=60.0):
        self.max_missed = max_missed
        self.max_dist = max_dist
        self.tracks = []
        self.next_id = 0

    def _add_track(self, bbox):
        t = Track(self.next_id, bbox)
        self.tracks.append(t)
        self.next_id += 1

    def update(self, detections):
        tracks = self.tracks

        if not tracks:
            for det in detections:
                self._add_track(det)
            return self.tracks

        if not detections:
            for t in tracks:
                t.missed += 1
            self.tracks = [t for t in tracks if t.missed <= self.max_missed]
            return self.tracks

        det_centers = [bbox_center(b) for b in detections]
        cost = np.zeros((len(tracks), len(detections)), dtype=np.float32)

        for i, t in enumerate(tracks):
            tx, ty = t.centroid
            for j, (dx, dy) in enumerate(det_centers):
                cost[i, j] = np.hypot(tx - dx, ty - dy)

        row_ind, col_ind = linear_sum_assignment(cost)

        assigned_tracks = set()
        assigned_dets = set()

        for r, c in zip(row_ind, col_ind):
            if cost[r, c] < self.max_dist:
                t = tracks[r]
                det = detections[c]
                t.bbox = det
                t.centroid = det_centers[c]
                t.missed = 0
                t.history.append(t.centroid)
                assigned_tracks.add(r)
                assigned_dets.add(c)

        for i, t in enumerate(tracks):
            if i not in assigned_tracks:
                t.missed += 1

        self.tracks = [t for t in tracks if t.missed <= self.max_missed]

        for j, det in enumerate(detections):
            if j not in assigned_dets:
                self._add_track(det)

        return self.tracks


def merge_boxes(boxes, rel_factor=0.8, min_dist_px=60, min_area=200):
    if len(boxes) <= 1:
        return boxes

    def to_xyxy(b):
        x, y, w, h = b
        return x, y, x + w, y + h

    def union_box(a, b):
        ax1, ay1, ax2, ay2 = to_xyxy(a)
        bx1, by1, bx2, by2 = to_xyxy(b)
        x1 = min(ax1, bx1)
        y1 = min(ay1, by1)
        x2 = max(ax2, bx2)
        y2 = max(ay2, by2)
        return (x1, y1, x2 - x1, y2 - y1)

    def centers_and_diag(b):
        x, y, w, h = b
        cx = x + w / 2.0
        cy = y + h / 2.0
        d = math.hypot(w, h)
        return cx, cy, d

    def is_merge_candidate(a, b):
        ax1, ay1, ax2, ay2 = to_xyxy(a)
        bx1, by1, bx2, by2 = to_xyxy(b)
        ix1 = max(ax1, bx1)
        iy1 = max(ay1, by1)
        ix2 = min(ax2, bx2)
        iy2 = min(ay2, by2)
        iw = max(0, ix2 - ix1)
        ih = max(0, iy2 - iy1)
        if iw * ih > 0:
            return True

        acx, acy, ad = centers_and_diag(a)
        bcx, bcy, bd = centers_and_diag(b)
        dist = math.hypot(acx - bcx, acy - bcy)

        thr_rel = rel_factor * max(ad, bd)
        thr = max(thr_rel, min_dist_px)
        return dist < thr

    boxes = list(boxes)
    merged = True

    while merged:
        merged = False
        used = [False] * len(boxes)
        new_boxes = []

        for i in range(len(boxes)):
            if used[i]:
                continue

            cur = boxes[i]
            changed = True
            while changed:
                changed = False
                for j in range(len(boxes)):
                    if i == j or used[j]:
                        continue
                    if is_merge_candidate(cur, boxes[j]):
                        cur = union_box(cur, boxes[j])
                        used[j] = True
                        changed = True
                        merged = True

            used[i] = True
            x, y, w, h = cur
            if w * h >= min_area:
                new_boxes.append(cur)

        boxes = new_boxes

    return boxes


def detect_by_frame_diff(prev_gray, gray, min_area=300):
    diff = cv2.absdiff(gray, prev_gray)
    diff = cv2.GaussianBlur(diff, (13, 13), 0)

    _, th = cv2.threshold(diff, 10, 255, cv2.THRESH_BINARY)

    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (9, 9))
    th = cv2.morphologyEx(th, cv2.MORPH_CLOSE, kernel, iterations=3)
    th = cv2.morphologyEx(th, cv2.MORPH_OPEN, kernel, iterations=1)

    contours, _ = cv2.findContours(th, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    boxes = []
    for cnt in contours:
        x, y, w, h = cv2.boundingRect(cnt)
        if w * h < min_area:
            continue
        boxes.append((x, y, w, h))

    boxes = merge_boxes(boxes, rel_factor=0.8, min_dist_px=60, min_area=min_area)
    return boxes, diff


def main(video_path="flying_shapes.mp4", bg_alpha=0.01, diff_gain=3.0):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Cannot open video:", video_path)
        return

    ret, frame = cap.read()
    if not ret:
        print("Empty video")
        return

    h, w = frame.shape[:2]
    gray0 = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    prev_gray = gray0.copy()

    bg_accum = gray0.astype(np.float32)

    tracker = MultiObjectTracker(max_missed=10, max_dist=60.0)

    pygame.init()
    screen = pygame.display.set_mode((3 * w, h))
    pygame.display.set_caption("Tracking | frame diff | background")
    clock = pygame.time.Clock()
    fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
    font = pygame.font.SysFont(None, 20)

    running = True
    while running:
        ret, frame = cap.read()
        if not ret:
            break

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        bg_accum = (1.0 - bg_alpha) * bg_accum + bg_alpha * gray.astype(np.float32)
        bg_gray = bg_accum.astype(np.uint8)

        detections, diff = detect_by_frame_diff(prev_gray, gray)
        tracks = tracker.update(detections)
        prev_gray = gray

        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        diff_vis = cv2.convertScaleAbs(diff, alpha=diff_gain, beta=0)
        diff_rgb = cv2.cvtColor(diff_vis, cv2.COLOR_GRAY2RGB)

        _, bg_bin = cv2.threshold(bg_gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
        bg_rgb = cv2.cvtColor(bg_bin, cv2.COLOR_GRAY2RGB)

        surface_left = pygame.image.frombuffer(frame_rgb.tobytes(), (w, h), "RGB")
        surface_mid = pygame.image.frombuffer(diff_rgb.tobytes(), (w, h), "RGB")
        surface_right = pygame.image.frombuffer(bg_rgb.tobytes(), (w, h), "RGB")

        for t in tracks:
            x, y, bw, bh = map(int, t.bbox)
            pygame.draw.rect(surface_left, t.color, pygame.Rect(x, y, bw, bh), 2)

            if len(t.history) > 1:
                pts = [(int(px), int(py)) for (px, py) in t.history]
                pygame.draw.lines(surface_left, t.color, False, pts, 2)

            cx, cy = map(int, t.centroid)
            pygame.draw.circle(surface_left, t.color, (cx, cy), 3)

            text_surf = font.render(f"id={t.id}", True, t.color)
            surface_left.blit(text_surf, (x, max(y - 15, 0)))

        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                running = False

        keys = pygame.key.get_pressed()
        if keys[pygame.K_ESCAPE] or keys[pygame.K_q]:
            running = False

        screen.blit(surface_left, (0, 0))
        screen.blit(surface_mid, (w, 0))
        screen.blit(surface_right, (2 * w, 0))
        pygame.display.flip()
        clock.tick(fps)

    cap.release()
    pygame.quit()


if __name__ == "__main__":
    main("flying_shapes.mp4", bg_alpha=0.4, diff_gain=90.0)


In [None]:
import math
import random
from collections import deque

import cv2
import numpy as np
import pygame
from scipy.optimize import linear_sum_assignment


def bbox_center(bbox):
    x, y, w, h = bbox
    return x + w / 2.0, y + h / 2.0


class Track:
    def __init__(self, track_id, bbox, history_len=30):
        self.id = track_id
        self.bbox = bbox
        self.centroid = bbox_center(bbox)
        self.missed = 0
        self.color = (
            random.randint(64, 255),
            random.randint(64, 255),
            random.randint(64, 255),
        )
        self.history = deque(maxlen=history_len)
        self.history.append(self.centroid)


class MultiObjectTracker:
    def __init__(self, max_missed=10, max_dist=60.0):
        self.max_missed = max_missed
        self.max_dist = max_dist
        self.tracks = []
        self.next_id = 0

    def _add_track(self, bbox):
        t = Track(self.next_id, bbox)
        self.tracks.append(t)
        self.next_id += 1

    def update(self, detections):
        tracks = self.tracks

        if not tracks:
            for det in detections:
                self._add_track(det)
            return self.tracks

        if not detections:
            for t in tracks:
                t.missed += 1
            self.tracks = [t for t in tracks if t.missed <= self.max_missed]
            return self.tracks

        det_centers = [bbox_center(b) for b in detections]
        cost = np.zeros((len(tracks), len(detections)), dtype=np.float32)

        for i, t in enumerate(tracks):
            tx, ty = t.centroid
            for j, (dx, dy) in enumerate(det_centers):
                cost[i, j] = np.hypot(tx - dx, ty - dy)

        row_ind, col_ind = linear_sum_assignment(cost)

        assigned_tracks = set()
        assigned_dets = set()

        for r, c in zip(row_ind, col_ind):
            if cost[r, c] < self.max_dist:
                t = tracks[r]
                det = detections[c]
                t.bbox = det
                t.centroid = det_centers[c]
                t.missed = 0
                t.history.append(t.centroid)
                assigned_tracks.add(r)
                assigned_dets.add(c)

        for i, t in enumerate(tracks):
            if i not in assigned_tracks:
                t.missed += 1

        self.tracks = [t for t in tracks if t.missed <= self.max_missed]

        for j, det in enumerate(detections):
            if j not in assigned_dets:
                self._add_track(det)

        return self.tracks


def merge_boxes(boxes, rel_factor=0.8, min_dist_px=60, min_area=200):
    if len(boxes) <= 1:
        return boxes

    def to_xyxy(b):
        x, y, w, h = b
        return x, y, x + w, y + h

    def union_box(a, b):
        ax1, ay1, ax2, ay2 = to_xyxy(a)
        bx1, by1, bx2, by2 = to_xyxy(b)
        x1 = min(ax1, bx1)
        y1 = min(ay1, by1)
        x2 = max(ax2, bx2)
        y2 = max(ay2, by2)
        return (x1, y1, x2 - x1, y2 - y1)

    def centers_and_diag(b):
        x, y, w, h = b
        cx = x + w / 2.0
        cy = y + h / 2.0
        d = math.hypot(w, h)
        return cx, cy, d

    def is_merge_candidate(a, b):
        ax1, ay1, ax2, ay2 = to_xyxy(a)
        bx1, by1, bx2, by2 = to_xyxy(b)
        ix1 = max(ax1, bx1)
        iy1 = max(ay1, by1)
        ix2 = min(ax2, bx2)
        iy2 = min(ay2, by2)
        iw = max(0, ix2 - ix1)
        ih = max(0, iy2 - iy1)
        if iw * ih > 0:
            return True

        acx, acy, ad = centers_and_diag(a)
        bcx, bcy, bd = centers_and_diag(b)
        dist = math.hypot(acx - bcx, acy - bcy)

        thr_rel = rel_factor * max(ad, bd)
        thr = max(thr_rel, min_dist_px)
        return dist < thr

    boxes = list(boxes)
    merged = True

    while merged:
        merged = False
        used = [False] * len(boxes)
        new_boxes = []

        for i in range(len(boxes)):
            if used[i]:
                continue

            cur = boxes[i]
            changed = True
            while changed:
                changed = False
                for j in range(len(boxes)):
                    if i == j or used[j]:
                        continue
                    if is_merge_candidate(cur, boxes[j]):
                        cur = union_box(cur, boxes[j])
                        used[j] = True
                        changed = True
                        merged = True

            used[i] = True
            x, y, w, h = cur
            if w * h >= min_area:
                new_boxes.append(cur)

        boxes = new_boxes

    return boxes


def detect_from_diff(diff, min_area=500, diff_thresh=15):
    diff_blur = cv2.GaussianBlur(diff, (13, 13), 0)
    _, th = cv2.threshold(diff_blur, diff_thresh, 255, cv2.THRESH_BINARY)

    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (9, 9))
    th = cv2.morphologyEx(th, cv2.MORPH_CLOSE, kernel, iterations=3)
    th = cv2.morphologyEx(th, cv2.MORPH_OPEN, kernel, iterations=1)

    contours, _ = cv2.findContours(th, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    boxes = []
    for cnt in contours:
        x, y, w, h = cv2.boundingRect(cnt)
        if w * h < min_area:
            continue
        boxes.append((x, y, w, h))

    boxes = merge_boxes(boxes, rel_factor=0.8, min_dist_px=60, min_area=min_area)
    return boxes, diff_blur, th


def main(
    video_path="flying_shapes.mp4",
    bg_alpha=0.01,   # скорость обновления фона
    diff_gain=4.0,   # усиление яркости diff для отображения
    diff_thresh=15   # порог для бинаризации diff
):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Cannot open video:", video_path)
        return

    ret, frame = cap.read()
    if not ret:
        print("Empty video")
        return

    h, w = frame.shape[:2]
    gray0 = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    # фон = бегущее среднее
    bg_accum = gray0.astype(np.float32)

    tracker = MultiObjectTracker(max_missed=10, max_dist=60.0)

    pygame.init()
    screen = pygame.display.set_mode((3 * w, h))
    pygame.display.set_caption("Tracking | frame-bg diff | background")
    clock = pygame.time.Clock()
    fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
    font = pygame.font.SysFont(None, 20)

    running = True
    while running:
        ret, frame = cap.read()
        if not ret:
            break

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # фон, который используем для детекции на ЭТОМ кадре
        bg_gray_det = bg_accum.astype(np.uint8)

        # diff = frame - background
        diff = cv2.absdiff(gray, bg_gray_det)

        # детекция по diff
        detections, diff_blur, th = detect_from_diff(
            diff, min_area=500, diff_thresh=diff_thresh
        )
        tracks = tracker.update(detections)

        # обновляем фон (везде, без маски) – тут уже влияет bg_alpha
        bg_accum = (1.0 - bg_alpha) * bg_accum + bg_alpha * gray.astype(np.float32)
        bg_gray_vis = bg_accum.astype(np.uint8)

        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        diff_vis = cv2.convertScaleAbs(diff_blur, alpha=diff_gain, beta=0)
        diff_rgb = cv2.cvtColor(diff_vis, cv2.COLOR_GRAY2RGB)

        bg_rgb = cv2.cvtColor(bg_gray_vis, cv2.COLOR_GRAY2RGB)

        surface_left = pygame.image.frombuffer(frame_rgb.tobytes(), (w, h), "RGB")
        surface_mid = pygame.image.frombuffer(diff_rgb.tobytes(), (w, h), "RGB")
        surface_right = pygame.image.frombuffer(bg_rgb.tobytes(), (w, h), "RGB")

        for t in tracks:
            x, y, bw, bh = map(int, t.bbox)
            pygame.draw.rect(surface_left, t.color, pygame.Rect(x, y, bw, bh), 2)

            if len(t.history) > 1:
                pts = [(int(px), int(py)) for (px, py) in t.history]
                pygame.draw.lines(surface_left, t.color, False, pts, 2)

            cx, cy = map(int, t.centroid)
            pygame.draw.circle(surface_left, t.color, (cx, cy), 3)

            text_surf = font.render(f"id={t.id}", True, t.color)
            surface_left.blit(text_surf, (x, max(y - 15, 0)))

        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                running = False

        keys = pygame.key.get_pressed()
        if keys[pygame.K_ESCAPE] or keys[pygame.K_q]:
            running = False

        screen.blit(surface_left, (0, 0))
        screen.blit(surface_mid, (w, 0))
        screen.blit(surface_right, (2 * w, 0))
        pygame.display.flip()
        clock.tick(fps)

    cap.release()
    pygame.quit()


if __name__ == "__main__":
    main("flying_shapes.mp4", bg_alpha=0.0005, diff_gain=4.0, diff_thresh=15)
