<a href="https://colab.research.google.com/github/armin-tavakoli/Signal_Project/blob/main/Yet_another_copy_of_Signal_Object_Tracking_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install numpy
!pip install torch torchvision torchaudio
!pip install ultralytics
!pip install opencv-python-headless



In [None]:
from google.colab import files
import ipywidgets as widgets
from IPython.display import display

uploader = files.upload()
video_path = next(iter(uploader))

Saving person2.mp4 to person2.mp4


In [None]:
from ultralytics import YOLO

try:
    model = YOLO('yolov8n.pt')
    class_names = list(model.names.values())
except Exception as e:
    print(f"Error loading YOLO model: {e}")
    class_names = ["person", "car", "dog", "cat"]

print("Detectable objects by the model:")
for i in range(0, len(class_names), 5):
    print("  ".join(f"{name:<15}" for name in class_names[i:i+5]))

print("\n" + "="*50)
target_object_name = input("Enter the name of the target object from the list above: ").lower()
x_frames_str = input("Enter the number of initial frames (x) for object detection: ")

if target_object_name not in class_names:
    print(f"\nWarning: '{target_object_name}' is not in the list of detectable objects. The tracker may not find it.")

try:
    x_frames = int(x_frames_str)
    if x_frames <= 0: raise ValueError
    print(f"\nObject detection will run on the first {x_frames} frames to find a '{target_object_name}'.")
except ValueError:
    print("\nInvalid input. Please enter a positive integer for the number of frames.")

Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.
Downloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8n.pt to 'yolov8n.pt'...


100%|██████████| 6.25M/6.25M [00:00<00:00, 229MB/s]


Detectable objects by the model:
person           bicycle          car              motorcycle       airplane       
bus              train            truck            boat             traffic light  
fire hydrant     stop sign        parking meter    bench            bird           
cat              dog              horse            sheep            cow            
elephant         bear             zebra            giraffe          backpack       
umbrella         handbag          tie              suitcase         frisbee        
skis             snowboard        sports ball      kite             baseball bat   
baseball glove   skateboard       surfboard        tennis racket    bottle         
wine glass       cup              fork             knife            spoon          
bowl             banana           apple            sandwich         orange         
broccoli         carrot           hot dog          pizza            donut          
cake             chair            couch    

In [None]:
import cv2

OPENCV_TRACKERS = {
    "MOSSE": cv2.TrackerMOSSE_create,
    "KCF": cv2.TrackerKCF_create,
    "CSRT": cv2.TrackerCSRT_create
}

In [None]:
class CreativeTracker:
    def __init__(self, frame, bbox):
        self.bbox = bbox
        self.learning_rate = 0.075
        self.scale_factor = 1.05
        self.occlusion_threshold = 0.05
        self.is_occluded = False

        self.kalman = cv2.KalmanFilter(4, 2)
        self.kalman.measurementMatrix = np.array([[1, 0, 0, 0], [0, 1, 0, 0]], np.float32)
        self.kalman.transitionMatrix = np.array([[1, 0, 1, 0], [0, 1, 0, 1], [0, 0, 1, 0], [0, 0, 0, 1]], np.float32)
        self.kalman.processNoiseCov = np.eye(4, dtype=np.float32) * 0.03
        self.kalman.measurementNoiseCov = np.eye(2, dtype=np.float32) * 0.5
        x, y, w, h = bbox
        self.kalman.statePost = np.array([x + w/2, y + h/2, 0, 0], dtype=np.float32)

        self.init_correlation_filter(frame, bbox)

    def init_correlation_filter(self, frame, bbox):
        x, y, w, h = [int(v) for v in bbox]
        self.size = (w, h)
        pos = (x + w/2, y + h/2)
        patch = self._get_patch(frame, pos, self.size)
        features = np.concatenate((self._get_hog_features(patch), self._get_color_features(patch)), axis=2)
        g = self._create_gaussian_response(features.shape[:2])
        G = np.fft.fft2(g)
        F = np.fft.fft2(features, axes=(0, 1))
        F_conj = np.conj(F)
        self.H_num = G[..., np.newaxis] * F_conj
        self.H_den = np.sum(F * F_conj, axis=2) + 1e-4

    def update(self, frame):
        predicted_state = self.kalman.predict()
        predicted_pos = (predicted_state[0, 0], predicted_state[1, 0])
        response, _ = self.localize(frame, predicted_pos)
        psr = self._calculate_psr(response)

        if psr < self.occlusion_threshold:
            self.is_occluded = True
        else:
            self.is_occluded = False
            max_loc = np.unravel_index(np.argmax(response), response.shape)
            dy = max_loc[0] - response.shape[0] / 2
            dx = max_loc[1] - response.shape[1] / 2
            current_pos = (predicted_pos[0] + dx, predicted_pos[1] + dy)
            measurement = np.array([current_pos[0], current_pos[1]], dtype=np.float32)
            self.kalman.correct(measurement)
            final_pos = (self.kalman.statePost[0], self.kalman.statePost[1])
            self.update_scale(frame, final_pos)
            self.update_model(frame, final_pos)
            self.bbox = (final_pos[0] - self.size[0]/2, final_pos[1] - self.size[1]/2, self.size[0], self.size[1])

        if self.is_occluded:
            pos = (predicted_state[0, 0], predicted_state[1, 0])
            self.bbox = (pos[0] - self.size[0]/2, pos[1] - self.size[1]/2, self.size[0], self.size[1])

        return [int(v) for v in self.bbox], self.is_occluded

    def localize(self, frame, pos):
        patch = self._get_patch(frame, pos, self.size)
        features = np.concatenate((self._get_hog_features(patch), self._get_color_features(patch)), axis=2)
        F = np.fft.fft2(features, axes=(0,1))
        H = self.H_num / self.H_den[..., np.newaxis]
        R = np.fft.ifft2(np.sum(np.conj(H) * F, axis=2))
        return R.real, np.max(R.real)

    def update_model(self, frame, pos):
        patch = self._get_patch(frame, pos, self.size)
        features = np.concatenate((self._get_hog_features(patch), self._get_color_features(patch)), axis=2)
        F = np.fft.fft2(features, axes=(0,1)); F_conj = np.conj(F)
        g = self._create_gaussian_response(features.shape[:2]); G = np.fft.fft2(g)
        num = G[..., np.newaxis] * F_conj
        den = np.sum(F * F_conj, axis=2) + 1e-4
        if den.shape != self.H_den.shape:
            self.H_num = num
            self.H_den = den
        else:
            self.H_num = (1 - self.learning_rate) * self.H_num + self.learning_rate * num
            self.H_den = (1 - self.learning_rate) * self.H_den + self.learning_rate * den

    def update_scale(self, frame, pos):
        scales = [1.0/self.scale_factor, 1.0, self.scale_factor]
        best_peak = -float('inf'); best_scale = 1.0
        H_current = self.H_num / self.H_den[..., np.newaxis]
        for scale in scales:
            scaled_patch_size = (self.size[0] * scale, self.size[1] * scale)
            patch = self._get_patch(frame, pos, scaled_patch_size)
            if patch.size == 0: continue
            patch_resized = cv2.resize(patch, self.size)
            features = np.concatenate((self._get_hog_features(patch_resized), self._get_color_features(patch_resized)), axis=2)
            F = np.fft.fft2(features, axes=(0,1))
            R = np.fft.ifft2(np.sum(np.conj(H_current) * F, axis=2))
            peak_value = np.max(R.real)
            if peak_value > best_peak:
                best_peak = peak_value
                best_scale = scale
        new_w = (1 - self.learning_rate) * self.size[0] + self.learning_rate * (self.size[0] * best_scale)
        new_h = (1 - self.learning_rate) * self.size[1] + self.learning_rate * (self.size[1] * best_scale)
        self.size = (int(new_w), int(new_h))

    def _get_patch(self, frame, pos, size):
        w, h = int(size[0]), int(size[1])
        if w <= 0 or h <= 0: w, h = 1, 1
        y, x = int(pos[1]), int(pos[0])
        y_start, y_end = max(0, y - h // 2), min(frame.shape[0], y + h // 2)
        x_start, x_end = max(0, x - w // 2), min(frame.shape[1], x + w // 2)
        patch = frame[y_start:y_end, x_start:x_end]

        if patch.size == 0:
            return np.zeros((h, w, frame.shape[2]), dtype=frame.dtype)

        return cv2.resize(patch, (w, h))

    def _get_hog_features(self, patch):
        gray = cv2.cvtColor(patch, cv2.COLOR_BGR2GRAY)
        gx = cv2.Sobel(gray, cv2.CV_32F, 1, 0); gy = cv2.Sobel(gray, cv2.CV_32F, 0, 1)
        mag, ang = cv2.cartToPolar(gx, gy)
        bins = 9; hog = np.zeros((patch.shape[0], patch.shape[1], bins), dtype=np.float32)
        bin_indices = np.int32(bins * ang / (2 * np.pi))
        for i in range(bins): hog[..., i] = mag * (bin_indices == i)
        return hog

    def _get_color_features(self, patch):
        return (cv2.cvtColor(patch, cv2.COLOR_BGR2Lab)).astype(np.float32) / 255.0

    def _create_gaussian_response(self, size):
        h, w = int(size[0]), int(size[1]); sigma = np.sqrt(w * h) / 9.0
        rs, cs = np.mgrid[0:h, 0:w]; rs -= h // 2; cs -= w // 2
        g = np.exp(-0.5 * (rs**2 + cs**2) / sigma**2)
        return np.roll(g, (-h // 2, -w // 2), axis=(0, 1))

    def _calculate_psr(self, response_map):
        if np.all(response_map == 0): return 0.0
        max_val = np.max(response_map); y, x = np.unravel_index(np.argmax(response_map), response_map.shape)
        sidelobe = response_map.copy(); cv2.rectangle(sidelobe, (x-5, y-5), (x+5, y+5), 0, -1)
        sidelobe_pixels = sidelobe[sidelobe > 0]
        if sidelobe_pixels.size == 0: return 100.0
        sidelobe_mean = np.mean(sidelobe_pixels); sidelobe_std = np.std(sidelobe_pixels)
        if sidelobe_std < 1e-5: return 0.0
        return (max_val - sidelobe_mean) / sidelobe_std

print("Creative tracker class defined (final stable version).")

Creative tracker class defined (final stable version).


In [None]:
import torch
import cv2
from ultralytics import YOLO
import time
import numpy as np

model = YOLO('yolov8n.pt')

cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
    print("Error: Could not open video.")
else:
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    output_video_path = 'output_video.mp4'
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

    tracker = None
    frame_count = 0
    total_processing_time = 0
    tracking_started = False

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        start_time = time.time()
        frame_count += 1
        display_text = ""

        if tracker is None:
            if frame_count <= x_frames:
                display_text = f"Frame {frame_count}/{x_frames}: Detecting '{target_object_name}'..."
                results = model(frame, stream=True, verbose=False)
                for r in results:
                    boxes = r.boxes
                    for box in boxes:
                        cls = int(box.cls[0])
                        label = model.names[cls]
                        if label == target_object_name:
                            x1, y1, x2, y2 = box.xyxy[0]
                            bbox = (int(x1), int(y1), int(x2 - x1), int(y2 - y1))
                            tracker = CreativeTracker(frame, bbox)
                            print(f"\nTarget '{target_object_name}' found at {bbox}. Starting tracker.")
                            tracking_started = True
                            total_processing_time = 0
                            break
                    if tracker:
                        break
            else:
                print(f"\nCould not find '{target_object_name}' in the first {x_frames} frames. Stopping.")
                out.write(frame)
                break
        else:
            bbox, is_occluded = tracker.update(frame)
            x, y, w, h = bbox
            if is_occluded:
                cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 0, 255), 2)
                cv2.putText(frame, "Occluded", (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
            else:
                cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)

        end_time = time.time()
        processing_time = end_time - start_time

        if tracker is not None:
             total_processing_time += processing_time
             tracking_frame_count = frame_count - x_frames
             if total_processing_time > 0 and tracking_frame_count > 0:
                 avg_fps_tracking = tracking_frame_count / total_processing_time
                 cv2.putText(frame, f"Tracking FPS: {avg_fps_tracking:.2f}", (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)

        current_fps = 1.0 / processing_time if processing_time > 0 else 0
        cv2.putText(frame, f"Current FPS: {current_fps:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)


        if display_text:
             cv2.putText(frame, display_text, (10, height - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2)


        out.write(frame)

    cap.release()
    out.release()
    print(f"\nProcessing complete. Output video saved to '{output_video_path}'")


Target 'person' found at (868, 245, 122, 164). Starting tracker.

Processing complete. Output video saved to 'output_video.mp4'


In [None]:
files.download(output_video_path)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>