In [1]:
import os
import torch
import shutil
import uuid
import subprocess
from ultralytics import YOLO
import cv2
import numpy as np
from sklearn.cluster import KMeans
from collections import defaultdict
from collections import Counter

YOLO_MODEL_PATH = "/home/jupyter/datasphere/project/train18_best_model.pt"
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
PEDESTRIAN_CLASSES = [4]
YOLO_CONFIDENCE_THRESHOLD = 0.3
ULTRALYTICS_OUTPUT_DIR = "/home/jupyter/datasphere/project/runs/track"

temp_input_video_path = "/home/jupyter/datasphere/project/MOT20-01-raw.mp4"

### Подсчет среднего потока пешеходов через кластеризацию.

In [2]:
class YoloVideoFlowAnalyzer:
    def __init__(self):
        self.model = None
        self.device = DEVICE
        self.model = YOLO(YOLO_MODEL_PATH)
        self.model.to(DEVICE)

    def process_and_track_with_clusters(self, input_path: str):
        cap = cv2.VideoCapture(input_path)
        fps = cap.get(cv2.CAP_PROP_FPS) or 30
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

        trajectories = defaultdict(list)
        detections = defaultdict(list)

        frame_idx = 0
        results = self.model.track(
            source=input_path,
            tracker='bytetrack.yaml',
            classes=PEDESTRIAN_CLASSES,
            conf=YOLO_CONFIDENCE_THRESHOLD,
            stream=True,
            device=self.device,
            half=True
        )

        for res in results:
            ret, _ = cap.read()
            if not ret:
                break

            if res.boxes is not None:
                ids = res.boxes.id.int().cpu().tolist()
                xyxy = res.boxes.xyxy.cpu().numpy()
                for track_id, box in zip(ids, xyxy):
                    x1, y1, x2, y2 = box.astype(int)
                    cx, cy = (x1+x2)//2, (y1+y2)//2

                    trajectories[track_id].append((frame_idx, cx, cy))
                    detections[frame_idx].append((track_id, (x1, y1, x2, y2)))
            frame_idx += 1

        cap.release()

        features = []
        track_ids = []
        for t_id, pts in trajectories.items():
            start = np.array(pts[0][1:])
            end   = np.array(pts[-1][1:])
            features.append(end - start)
            track_ids.append(t_id)
        features = np.vstack(features)

        kmeans = KMeans(n_clusters=2, random_state=0).fit(features)
        labels = {tid: lbl for tid, lbl in zip(track_ids, kmeans.labels_)}

        cluster_colors = {
            -1: (128, 128, 128),
            0: (0, 255, 0),
            1: (0, 0, 255),
            2: (255, 0, 0),
            3: (255, 255, 0),
            4: (255, 0, 255),
            5: (0, 255, 255)
        }

        cap = cv2.VideoCapture(input_path)
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out_path = input_path.replace('.mp4', '_clusters.mp4')
        writer = cv2.VideoWriter(out_path, fourcc, fps, (width, height))

        frame_idx = 0
        while True:
            ret, frame = cap.read()
            if not ret:
                break

            for track_id, box in detections.get(frame_idx, []):
                x1, y1, x2, y2 = box
                lbl = labels.get(track_id, None)
                color = cluster_colors.get(lbl, (255,255,255))
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                cv2.putText(frame, str(track_id), (x1, y1-5),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

            for track_id, pts in trajectories.items():
                lbl = labels.get(track_id, None)
                color = cluster_colors.get(lbl, (255,255,255))
                recent = [p for p in pts if p[0] <= frame_idx][-10:]
                for i in range(1, len(recent)):
                    _, x0, y0 = recent[i-1]
                    _, x1, y1 = recent[i]
                    cv2.line(frame, (x0, y0), (x1, y1), color, 2)

            writer.write(frame)
            frame_idx += 1

        cap.release()
        writer.release()

        return out_path, trajectories, labels

In [None]:
processor = YoloVideoFlowAnalyzer()
video, trajs, labels = processor.process_and_track_with_clusters("/home/jupyter/datasphere/project/MOT20-01-raw.mp4")

In [6]:
cluster_counts = Counter(labels.values())

print("Количество пешеходов в каждом кластере:")
for cluster_id, count in cluster_counts.items():
    print(f"Кластер {cluster_id}: {count} пешеходов")

Количество пешеходов в каждом кластере:
Кластер 0: 283 пешеходов
Кластер 1: 21 пешеходов


### Подсчет среднего потока пешеходов через оптический поток

In [5]:
class YoloOpticalFlowAnalyzer:
    def __init__(self):
        self.model = None
        self.device = DEVICE
        self.model = YOLO(YOLO_MODEL_PATH)
        self.model.to(DEVICE)

    def analyze_with_optical_flow(self, input_path: str):
        cap = cv2.VideoCapture(input_path)
        fps = cap.get(cv2.CAP_PROP_FPS) or 30
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

        ret, prev_frame = cap.read()
        if not ret:
            raise RuntimeError("Cannot read first frame")
        prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)

        flow_vectors = []
        positions = []
        frame_vectors = []
        frame_positions = []
        frame_idx = 0

        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out_path = input_path.replace('.mp4', '_optflow_clusters.mp4')
        writer = cv2.VideoWriter(out_path, fourcc, fps, (width, height))

        results = self.model.track(
            source=input_path,
            tracker='bytetrack.yaml',
            classes=PEDESTRIAN_CLASSES,
            conf=YOLO_CONFIDENCE_THRESHOLD,
            stream=True,
            device=DEVICE,
            half=True
        )

        for res in results:
            ret, frame = cap.read()
            if not ret:
                break

            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
            prev_gray = gray

            current_vectors = []
            current_positions = []

            if res.boxes is not None:
                ids = res.boxes.id.int().cpu().tolist()
                xyxy = res.boxes.xyxy.cpu().numpy()

                for track_id, box in zip(ids, xyxy):
                    x1, y1, x2, y2 = box.astype(int)
                    cx, cy = (x1 + x2) // 2, (y1 + y2) // 2
                    if 0 <= cx < width and 0 <= cy < height:
                        dx, dy = flow[cy, cx]
                        current_vectors.append([dx, dy])
                        current_positions.append((cx, cy))

            flow_vectors.extend(current_vectors)
            positions.extend(current_positions)
            frame_vectors.append(current_vectors)
            frame_positions.append(current_positions)
            frame_idx += 1

        cap.release()

        flow_vectors = np.array(flow_vectors)
        if len(flow_vectors) == 0:
            raise ValueError("No optical flow vectors extracted.")

        kmeans = KMeans(n_clusters=2, random_state=0).fit(flow_vectors)
        all_labels = kmeans.labels_

        cluster_colors = {
            0: (0, 255, 0),
            1: (0, 0, 255),
            2: (255, 0, 0),
            3: (255, 255, 0),
            4: (255, 0, 255),
            5: (0, 255, 255)
        }

        cap = cv2.VideoCapture(input_path)
        label_idx = 0
        frame_idx = 0

        while True:
            ret, frame = cap.read()
            if not ret or frame_idx >= len(frame_vectors):
                break

            for (cx, cy), (dx, dy) in zip(frame_positions[frame_idx], frame_vectors[frame_idx]):
                if label_idx >= len(all_labels):
                    break
                lbl = all_labels[label_idx]
                color = cluster_colors.get(lbl, (255, 255, 255))
                end_point = (int(cx + dx * 10), int(cy + dy * 10))
                cv2.arrowedLine(frame, (cx, cy), end_point, color, 2, tipLength=0.3)
                label_idx += 1

            writer.write(frame)
            frame_idx += 1

        cap.release()
        writer.release()

        return out_path, all_labels, flow_vectors

In [None]:
processor = YoloOpticalFlowAnalyzer()
video_path, labels, vectors = processor.analyze_with_optical_flow("/home/jupyter/datasphere/project/MOT20-01-raw.mp4")

In [11]:
cluster_counts = Counter(labels)

print("Количество пешеходов в каждом кластере:")
for cluster_id, count in cluster_counts.items():
    print(f"Кластер {cluster_id}: {count} пешеходов")

Количество пешеходов в каждом кластере:
Кластер 1: 2192 пешеходов
Кластер 0: 9215 пешеходов


### Подсчет среднего потока пешеходов через координаты кадров

Подсчет количества пешеходов, направление которых влево, вправо, вниз или вверх.

In [15]:
class YoloVideoFlowAnalyzer:
    def __init__(self):
        self.model = None
        self.device = DEVICE
        self.model = YOLO(YOLO_MODEL_PATH)
        self.model.to(DEVICE)

    def process_and_track_with_direction_clusters(self, input_path: str):
        cap = cv2.VideoCapture(input_path)
        fps = cap.get(cv2.CAP_PROP_FPS) or 30
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

        trajectories = defaultdict(list)
        detections = defaultdict(list)

        frame_idx = 0
        results = self.model.track(
            source=input_path,
            tracker='bytetrack.yaml',
            classes=PEDESTRIAN_CLASSES,
            conf=YOLO_CONFIDENCE_THRESHOLD,
            stream=True,
            device=DEVICE,
            half=True
        )

        for res in results:
            ret, _ = cap.read()
            if not ret:
                break

            if res.boxes is not None:
                ids = res.boxes.id.int().cpu().tolist()
                xyxy = res.boxes.xyxy.cpu().numpy()
                for track_id, box in zip(ids, xyxy):
                    x1, y1, x2, y2 = box.astype(int)
                    cx, cy = (x1+x2)//2, (y1+y2)//2
                    trajectories[track_id].append((frame_idx, cx, cy))
                    detections[frame_idx].append((track_id, (x1, y1, x2, y2)))
            frame_idx += 1

        cap.release()

        direction_labels = {}
        direction_counts = {"left": 0, "right": 0, "up": 0, "down": 0}

        for t_id, pts in trajectories.items():
            start_x, start_y = pts[0][1:]
            end_x, end_y = pts[-1][1:]
            dx = end_x - start_x
            dy = end_y - start_y

            if abs(dx) > abs(dy):
                direction = "right" if dx > 0 else "left"
            else:
                direction = "down" if dy > 0 else "up"

            direction_labels[t_id] = direction
            direction_counts[direction] += 1

        print("Direction counts:", direction_counts)

        direction_colors = {
            "left": (0, 0, 255),
            "right": (0, 255, 0),
            "up": (255, 0, 0),
            "down": (255, 255, 0),
        }

        cap = cv2.VideoCapture(input_path)
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out_path = input_path.replace('.mp4', '_direction_clusters.mp4')
        writer = cv2.VideoWriter(out_path, fourcc, fps, (width, height))

        frame_idx = 0
        while True:
            ret, frame = cap.read()
            if not ret:
                break

            for track_id, box in detections.get(frame_idx, []):
                x1, y1, x2, y2 = box
                direction = direction_labels.get(track_id, "unknown")
                color = direction_colors.get(direction, (255, 255, 255))
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                cv2.putText(frame, f"{track_id} {direction}", (x1, y1 - 5),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

            for track_id, pts in trajectories.items():
                direction = direction_labels.get(track_id, "unknown")
                color = direction_colors.get(direction, (255, 255, 255))
                recent = [p for p in pts if p[0] <= frame_idx][-10:]
                for i in range(1, len(recent)):
                    _, x0, y0 = recent[i-1]
                    _, x1, y1 = recent[i]
                    cv2.line(frame, (x0, y0), (x1, y1), color, 2)

            writer.write(frame)
            frame_idx += 1

        cap.release()
        writer.release()

        return out_path, trajectories, direction_labels

In [None]:
processor = YoloVideoFlowAnalyzer()
video, trajs, directions = processor.process_and_track_with_direction_clusters(temp_input_video_path)

In [22]:
cluster_counts = Counter(directions.values())

print("Количество пешеходов в каждом кластере:")
for cluster_id, count in cluster_counts.items():
    print(f"Кластер {cluster_id}: {count} пешеходов")

Количество пешеходов в каждом кластере:
Кластер down: 58 пешеходов
Кластер right: 120 пешеходов
Кластер left: 53 пешеходов
Кластер up: 73 пешеходов
