In [1]:
import os
import cv2
import numpy as np
from scipy.optimize import linear_sum_assignment
from ultralytics import YOLO
import matplotlib.pyplot as plt
import shutil
import subprocess
import matplotlib.pyplot as plt
from natsort import natsorted

In [2]:
# Configurations
FRAMES_DIR = './TP3_data/frames'
INIT_FILE = './TP3_data/init.txt'
OUTPUT_FILE = 'results.txt'
MAX_AGE = 30  # Maximum frames to keep a track alive when not visible
REIDENTIFICATION_IOU_THRESHOLD = 0.8  # IOU threshold for re-identification
MAX_AGE = 30 
CLASS_NAME = 'cup'
FPS_OUTPUT = 30 
OUTPUT_VIDEO = 'result_video.avi'

In [3]:
def video_building(input_folder, output_video, fps):
    # === Paramètres ===
    frames_folder = input_folder   # Dossier contenant les images
    output_video = output_video    # Nom de la vidéo de sortie (ex. se terminant par .mp4)
    fps = fps                      # Images par seconde

    # === Récupère et trie les fichiers image ===
    images = [f for f in os.listdir(frames_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
    images = natsorted(images)  # Pour garder l'ordre naturel : frame1, frame2, ...

    # === Lire la première image pour obtenir la taille ===
    first_image_path = os.path.join(frames_folder, images[0])
    frame = cv2.imread(first_image_path)
    if frame is None:
        raise ValueError(f"Impossible de lire la première image : {first_image_path}")
    
    # Unpack only the first two dimensions
    height, width = frame.shape[:2]

    # === Initialiser l’écrivain vidéo ===
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # ou 'XVID' pour AVI
    out = cv2.VideoWriter(output_video, fourcc, fps, (width, height))

    # === Ajouter les frames à la vidéo ===
    for image_name in images:
        image_path = os.path.join(frames_folder, image_name)
        frame = cv2.imread(image_path)
        if frame is None:
            print(f"Erreur de lecture de l'image : {image_path}")
            continue
        out.write(frame)

    out.release()
    print(f"Vidéo créée avec succès : {output_video}")

# Exemple d'utilisation
input_folder = "./TP3_data/frames"  # Dossier contenant les images
output_video = "default_dataset.mp4"  # Nom de la vidéo de sortie
fps = 30  # Images par seconde

video_building(input_folder, output_video, FPS_OUTPUT)
INPUT_VIDEO = output_video

Vidéo créée avec succès : default_dataset.mp4


In [4]:
model = YOLO('yolov8n.pt')

In [5]:
# Verify the 'cup' class exists in the model
class_id = None
for k, v in model.names.items():
    if v == CLASS_NAME:
        class_id = k
        break
    elif v == 'cups':
        class_id = k
        break
        
if class_id is None:
    raise ValueError(f"Class '{CLASS_NAME}' not found in the model")

In [6]:
class TrackMemory:
    def __init__(self):
        self.disappeared_tracks = {}  # track_id -> [last_bbox, frames_disappeared]
        self.active_tracks = {}  # track_id -> bbox

    def update_active(self, track_id, bbox):
        """Update an active track"""
        self.active_tracks[track_id] = bbox
        # If this track was previously disappeared, remove it from that list
        if track_id in self.disappeared_tracks:
            del self.disappeared_tracks[track_id]

    def mark_disappeared(self):
        """Mark tracks that didn't appear in current frame as disappeared"""
        for track_id, bbox in list(self.active_tracks.items()):
            self.disappeared_tracks[track_id] = [bbox, 0]
        self.active_tracks.clear()

    def cleanup(self):
        """Remove tracks that have been gone too long"""
        for track_id in list(self.disappeared_tracks.keys()):
            self.disappeared_tracks[track_id][1] += 1
            if self.disappeared_tracks[track_id][1] > MAX_AGE:
                del self.disappeared_tracks[track_id]

    def attempt_reidentification(self, new_bbox):
        """Try to match new detection with disappeared tracks"""
        best_match = None
        best_iou = REIDENTIFICATION_IOU_THRESHOLD  # Minimum threshold to consider a match
        
        for track_id, (bbox, age) in self.disappeared_tracks.items():
            iou = calculate_iou(bbox, new_bbox)
            if iou > best_iou:
                best_iou = iou
                best_match = track_id
                
        return best_match


In [7]:
# Calculate IoU between two bboxes [x1, y1, x2, y2]
def calculate_iou(box1, box2):
    # Extract coordinates
    x1_1, y1_1, x2_1, y2_1 = box1
    x1_2, y1_2, x2_2, y2_2 = box2
    
    # Calculate intersection area
    x_left = max(x1_1, x1_2)
    y_top = max(y1_1, y1_2)
    x_right = min(x2_1, x2_2)
    y_bottom = min(y2_1, y2_2)
    
    if x_right < x_left or y_bottom < y_top:
        return 0.0
    
    intersection_area = (x_right - x_left) * (y_bottom - y_top)
    
    # Calculate union area
    box1_area = (x2_1 - x1_1) * (y2_1 - y1_1)
    box2_area = (x2_2 - x1_2) * (y2_2 - y1_2)
    
    union_area = box1_area + box2_area - intersection_area
    
    return intersection_area / union_area if union_area > 0 else 0.0

In [8]:
# Open the video
cap = cv2.VideoCapture(INPUT_VIDEO)
if not cap.isOpened():
    raise ValueError(f"Could not open video file: {INPUT_VIDEO}")

In [9]:
# Get video properties
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
input_fps = cap.get(cv2.CAP_PROP_FPS)

print(f"Video info: {width}x{height}, {total_frames} frames, {input_fps} FPS")

Video info: 1920x1080, 1385 frames, 30.0 FPS


In [10]:
# Initialize video writer for output
fourcc = cv2.VideoWriter_fourcc(*'XVID')
video_writer = cv2.VideoWriter(OUTPUT_VIDEO, fourcc, FPS_OUTPUT, (width, height))

In [11]:
# Initialize track memory
track_memory = TrackMemory()

In [12]:
# Open output file to write tracking results
tracking_results = {}
frame_idx = 0

# Load initial tracks if available
INIT_FILE = './TP3_data/init.txt'
initial_tracks = {}
if os.path.exists(INIT_FILE):
    with open(INIT_FILE, 'r') as f:
        for line in f:
            parts = list(map(int, line.strip().split()))
            # Format: frame_num, track_id, x, y, width, height
            if parts[0] == 0:  # Only use initialization for first frame
                x, y, w, h = parts[2], parts[3], parts[4], parts[5]
                bbox = [x, y, x+w, y+h]
                initial_tracks[parts[1]] = bbox
                tracking_results.setdefault(0, []).append((parts[1], x, y, w, h))

with open(OUTPUT_FILE, 'w') as f_out:
    # Process first frame with initial tracks if available
    if initial_tracks and frame_idx == 0:
        ret, frame = cap.read()
        if ret:
            # Write initial tracks to results file
            for track_id, bbox in initial_tracks.items():
                x1, y1, x2, y2 = bbox
                width_box = x2 - x1
                height_box = y2 - y1
                f_out.write(f"{frame_idx} {track_id} {x1} {y1} {width_box} {height_box}\n")
                track_memory.update_active(track_id, bbox)
            
            # Draw bounding boxes
            for track_id, (x1, y1, x2, y2) in initial_tracks.items():
                width_box = x2 - x1
                height_box = y2 - y1
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                cv2.putText(frame, f"ID {track_id}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX,
                            0.5, (0, 255, 0), 2)
            
            video_writer.write(frame)
            frame_idx += 1
    
    # Process the rest of the frames
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
            
        # Use YOLO's track method with persistence
        results = model.track(frame, persist=True, classes=class_id, tracker="bytetrack.yaml")
        
        # Before processing new detections, mark all current tracks as potentially disappeared
        track_memory.mark_disappeared()
        
        # Process tracking results
        if results and results[0].boxes:
            boxes = results[0].boxes
            
            # Check if track_ids exist
            if hasattr(boxes, 'id') and boxes.id is not None:
                track_ids = boxes.id.int().cpu().tolist()
                
                for i, box in enumerate(boxes.xyxy.cpu().numpy()):
                    x1, y1, x2, y2 = map(int, box)
                    width_box = x2 - x1
                    height_box = y2 - y1
                    bbox = [x1, y1, x2, y2]
                    
                    # Get track ID from YOLOv8 tracker
                    track_id = track_ids[i]
                    
                    # Try to re-identify with a disappeared track if this is a new detection
                    if track_id not in track_memory.active_tracks:
                        reidentified_id = track_memory.attempt_reidentification(bbox)
                        if reidentified_id is not None:
                            track_id = reidentified_id
                    
                    # Update track memory
                    track_memory.update_active(track_id, bbox)
                    
                    # Store for visualization and output
                    tracking_results.setdefault(frame_idx, []).append((track_id, x1, y1, width_box, height_box))
                    f_out.write(f"{frame_idx} {track_id} {x1} {y1} {width_box} {height_box}\n")
        
        # Cleanup and update tracks that have disappeared
        track_memory.cleanup()
        
        # Draw bounding boxes on the frame
        if frame_idx in tracking_results:
            for (track_id, x, y, w, h) in tracking_results[frame_idx]:
                cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
                cv2.putText(frame, f"ID {track_id}", (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX,
                            0.5, (0, 255, 0), 2)
        
        # Write frame to output video
        video_writer.write(frame)
        
        # Show progress
        if frame_idx % 10 == 0:
            print(f"Processing frame {frame_idx}/{total_frames}")
        
        frame_idx += 1



0: 384x640 1 cup, 63.9ms
Speed: 3.8ms preprocess, 63.9ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 cup, 52.0ms
Speed: 2.5ms preprocess, 52.0ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 cup, 45.8ms
Speed: 1.7ms preprocess, 45.8ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 cup, 48.1ms
Speed: 2.0ms preprocess, 48.1ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 cup, 49.9ms
Speed: 1.8ms preprocess, 49.9ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 cup, 46.1ms
Speed: 1.6ms preprocess, 46.1ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 cup, 47.2ms
Speed: 1.8ms preprocess, 47.2ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 cup, 45.4ms
Speed: 2.2ms preprocess, 45.4ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 cu

In [13]:
# Release resources
cap.release()
video_writer.release()
print(f"Video saved to {OUTPUT_VIDEO}")
print(f"Tracking results saved to {OUTPUT_FILE}")

Video saved to result_video.avi
Tracking results saved to results.txt
