In [1]:
pip install opencv-python numpy tqdm ultralytics scikit-learn


Note: you may need to restart the kernel to use updated packages.


In [1]:
import cv2
import numpy as np
import os
from glob import glob
from tqdm import tqdm
import time
from sklearn.cluster import KMeans
from ultralytics import YOLO
import warnings
import sys

In [2]:
# --- Hide YOLO and sklearn verbose output ---
class HiddenPrints:
    def __enter__(self):
        self._original_stdout = sys.stdout
        self._original_stderr = sys.stderr
        sys.stdout = open(os.devnull, 'w')
        sys.stderr = open(os.devnull, 'w')
    def __exit__(self, exc_type, exc_val, exc_tb):
        sys.stdout.close()
        sys.stderr.close()
        sys.stdout = self._original_stdout
        sys.stderr = self._original_stderr

warnings.filterwarnings("ignore")

In [3]:
def create_water_mask(frame):
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
    lower_water = np.array([75, 40, 60])    # Adjust if needed for your canal
    upper_water = np.array([100, 200, 255])
    mask = cv2.inRange(hsv, lower_water, upper_water)
    kernel = np.ones((7,7), np.uint8)
    mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
    return mask


In [4]:
def create_vehicle_mask(frame, yolo_model):
    with HiddenPrints():
        results = yolo_model(frame, verbose=False)
    mask = np.zeros(frame.shape[:2], dtype=np.uint8)
    for r in results:
        for box, cls in zip(r.boxes.xyxy, r.boxes.cls):
            if int(cls) in [2, 3, 5, 7, 1]:  # car, motorcycle, bus, truck, bicycle
                x1, y1, x2, y2 = map(int, box)
                cv2.rectangle(mask, (x1, y1), (x2, y2), 255, -1)
    return mask

In [5]:
def dual_plane_stabilization(frames_path, output_path, yolo_weights, plane1_ratio=0.7, feature_num=1000):
    print("[Init] Starting dual-plane stabilization with masking...")

    frame_files = sorted(glob(os.path.join(frames_path, "*.jpg")))
    if not frame_files:
        print("[Error] No JPG frames found!")
        return

    print(f"[Init] Found {len(frame_files)} frames")

    first_frame = cv2.imread(frame_files[0])
    h, w = first_frame.shape[:2]
    crop_margin = int(w * 0.05)
    output_res = (w-2*crop_margin, h-2*crop_margin)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, 30.0, output_res)

    orb = cv2.ORB_create(feature_num)
    bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)

    print("[Init] Loading YOLO model...")
    yolo_model = YOLO(yolo_weights)

    prev_gray = cv2.cvtColor(first_frame, cv2.COLOR_BGR2GRAY)
    water_mask = create_water_mask(first_frame)
    vehicle_mask = create_vehicle_mask(first_frame, yolo_model)
    static_mask = cv2.bitwise_not(cv2.bitwise_or(water_mask, vehicle_mask))
    prev_kp, prev_des = orb.detectAndCompute(prev_gray, static_mask)

    start_time = time.time()
    print("[Status] Processing frames...")

    for i in tqdm(range(1, len(frame_files)), desc="Stabilizing", unit="frames"):
        curr_frame = cv2.imread(frame_files[i])
        curr_gray = cv2.cvtColor(curr_frame, cv2.COLOR_BGR2GRAY)

        water_mask = create_water_mask(curr_frame)
        vehicle_mask = create_vehicle_mask(curr_frame, yolo_model)
        static_mask = cv2.bitwise_not(cv2.bitwise_or(water_mask, vehicle_mask))

        curr_kp, curr_des = orb.detectAndCompute(curr_gray, static_mask)
        if curr_kp is None or prev_kp is None or len(curr_kp) < 5 or len(prev_kp) < 5:
            prev_gray = curr_gray
            prev_kp, prev_des = curr_kp, curr_des
            continue

        matches = bf.match(prev_des, curr_des)
        matches = sorted(matches, key=lambda x: x.distance)[:int(len(matches)*plane1_ratio)]

        motion_vectors = [np.array(curr_kp[m.trainIdx].pt) - np.array(prev_kp[m.queryIdx].pt) for m in matches]
        if len(motion_vectors) < 2:
            prev_gray = curr_gray
            prev_kp, prev_des = curr_kp, curr_des
            continue

        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            kmeans = KMeans(n_clusters=2, random_state=0, n_init=10)
            labels = kmeans.fit_predict(np.array(motion_vectors).astype(np.float32))

        # Dual-plane homography
        H1, H2 = None, None
        plane1_pts = [(prev_kp[m.queryIdx].pt, curr_kp[m.trainIdx].pt) for m, l in zip(matches, labels) if l == 0]
        if len(plane1_pts) > 4:
            pts1, pts2 = zip(*plane1_pts)
            H1, _ = cv2.findHomography(np.array(pts1), np.array(pts2), cv2.RANSAC, 5.0)
        plane2_pts = [(prev_kp[m.queryIdx].pt, curr_kp[m.trainIdx].pt) for m, l in zip(matches, labels) if l == 1]
        if len(plane2_pts) > 4:
            pts1, pts2 = zip(*plane2_pts)
            H2, _ = cv2.findHomography(np.array(pts1), np.array(pts2), cv2.RANSAC, 5.0)

        stabilized = curr_frame.copy()
        if H1 is not None and len(plane1_pts) > 4:
            warped_plane1 = cv2.warpPerspective(curr_frame, H1, (w, h))
            mask = np.zeros_like(curr_gray)
            contour = cv2.convexHull(np.array([pt[1] for pt in plane1_pts], dtype=np.int32))
            cv2.drawContours(mask, [contour], -1, 255, -1)
            stabilized = cv2.bitwise_and(warped_plane1, warped_plane1, mask=mask) + \
                         cv2.bitwise_and(stabilized, stabilized, mask=~mask)
        if H2 is not None and len(plane2_pts) > 4:
            warped_plane2 = cv2.warpPerspective(curr_frame, H2, (w, h))
            mask = np.zeros_like(curr_gray)
            contour = cv2.convexHull(np.array([pt[1] for pt in plane2_pts], dtype=np.int32))
            cv2.drawContours(mask, [contour], -1, 255, -1)
            stabilized = cv2.bitwise_and(warped_plane2, warped_plane2, mask=mask) + \
                         cv2.bitwise_and(stabilized, stabilized, mask=~mask)

        stabilized = stabilized[crop_margin:h-crop_margin, crop_margin:w-crop_margin]
        out.write(stabilized)

        prev_gray = curr_gray
        prev_kp, prev_des = curr_kp, curr_des

    out.release()
    total_time = time.time() - start_time
    print(f"\n[Complete] Processed {len(frame_files)} frames in {total_time:.1f}s")
    print(f"[Complete] Average speed: {len(frame_files)/total_time:.1f} FPS")
    print(f"[Complete] Output saved: {output_path}")

# --- USAGE EXAMPLE ---

frames_path = r"C:\Users\simar\OneDrive\Desktop\Python_stabilization\unzipped_video"
output_path = r"C:\Users\simar\OneDrive\Desktop\Python_stabilization\2_yolo+dual+mask.mp4"
yolo_weights = "yolov8n.pt"
dual_plane_stabilization(frames_path, output_path, yolo_weights)

[Init] Starting dual-plane stabilization with masking...
[Init] Found 7169 frames
[Init] Loading YOLO model...
[Status] Processing frames...


Stabilizing: 100%|██████████| 7168/7168 [1:34:46<00:00,  1.26frames/s]


[Complete] Processed 7169 frames in 5686.5s
[Complete] Average speed: 1.3 FPS
[Complete] Output saved: C:\Users\simar\OneDrive\Desktop\Python_stabilization\2_yolo+dual+mask.mp4



