In [92]:
%pip install tqdm pandas
import os
import cv2
from pathlib import Path
from tqdm import tqdm
import pandas as pd

Note: you may need to restart the kernel to use updated packages.


In [93]:
# ---------- CONFIG ----------
DATA_ROOT = "TU-DAT"
INPUT_ROOT = os.path.join(DATA_ROOT, "Final_videos")
OUTPUT_ROOT = os.path.join(DATA_ROOT, "Final Videos_processed")
EXCEL_PATH = "accident_times_positive_videos.xlsx"  # your Excel file

TARGET_FPS = 12
TARGET_HEIGHT = 481
PRE_SEC = 3.0   # seconds before accident (for long videos)
POST_SEC = 3.0  # seconds after accident (for long videos)
# -----------------------------

In [94]:

def ensure_dir(path: str):
    Path(path).mkdir(parents=True, exist_ok=True)


In [95]:
def load_excel_annotations(excel_path):
    """Load Excel and return dict: video_name -> {'is_longer than 6 sec': bool, 'accident_time': int or None}"""
    df = pd.read_excel(excel_path)
    mapping = {}
    for _, row in df.iterrows():
        video_name = row['video_name'].strip()
        is_longer = row['is_longer than 6 sec'] == 1  # True/False
        accident_time = float(row['accident_time']) if pd.notna(row['accident_time']) else None
        mapping[video_name] = {'is_longer than 6 sec': is_longer, 'accident_time': accident_time}
    return mapping

In [96]:
def compute_time_window(acc_info, video_duration_sec):
    """Compute (start_sec, end_sec) based on Excel info"""
    if not acc_info['is_longer than 6 sec']:
        # Short video: use entire duration
        return 0.0, video_duration_sec
    
    if acc_info['accident_time'] is None:
        # Long video but no accident time: center 6s window
        mid = video_duration_sec / 2.0
        half = 3.0  # 6s total
        return max(0.0, mid - half), min(video_duration_sec, mid + half)
    
    # Long video with accident time: [T-PRE, T+POST]
    start = max(0.0, acc_info['accident_time'] - PRE_SEC)
    end = min(video_duration_sec, acc_info['accident_time'] + POST_SEC)
    return start, end

In [97]:
def preprocess_video(in_path: str, out_path: str, acc_info: dict):
    cap = cv2.VideoCapture(in_path)
    if not cap.isOpened():
        print(f"[WARN] Cannot open {in_path}")
        return

    orig_fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    if orig_fps <= 0 or total_frames <= 0:
        print(f"[WARN] Invalid metadata: {in_path} (fps={orig_fps}, frames={total_frames})")
        cap.release()
        return

    video_duration = total_frames / orig_fps
    start_time, end_time = compute_time_window(acc_info, video_duration)
    clip_duration = end_time - start_time
    target_frames = int(TARGET_FPS * clip_duration)

    if target_frames < 10:  # minimum 10 frames
        print(f"[WARN] Clip too short ({target_frames} frames): {in_path}")
        cap.release()
        return 
    # Convert to frame indices
    start_frame = int(start_time * orig_fps)
    end_frame = int(end_time * orig_fps)
    cap.set(cv2.CAP_PROP_POS_FRAMES, max(0, start_frame))

    # Get frame size from first frame
    ret, frame = cap.read()
    if not ret:
        print(f"[WARN] Cannot read first frame: {in_path}")
        cap.release()
        return

    orig_h, orig_w = frame.shape[:2]
    new_h = TARGET_HEIGHT
    new_w = int(orig_w * (new_h / orig_h))
    size = (new_w, new_h)
    # Setup output video
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    ensure_dir(os.path.dirname(out_path))
    out = cv2.VideoWriter(out_path, fourcc, TARGET_FPS, size)

    # Write first frame
    frame_resized = cv2.resize(frame, size, interpolation=cv2.INTER_AREA)
    out.write(frame_resized)
    frames_written = 1

    # Sample frames with stride
    frame_stride = max(1, round(orig_fps / TARGET_FPS))
    current_frame_idx = start_frame + frame_stride

    while current_frame_idx < end_frame and frames_written < target_frames:
        cap.set(cv2.CAP_PROP_POS_FRAMES, current_frame_idx)
        ret, frame = cap.read()
        if not ret:
            break
        frame_resized = cv2.resize(frame, size, interpolation=cv2.INTER_AREA)
        out.write(frame_resized)
        frames_written += 1
        current_frame_idx += frame_stride

    cap.release()
    out.release()
    strategy = "full" if not acc_info['is_longer than 6 sec'] else "around_accident" if acc_info['accident_time'] else "center"
    print(f"[OK] {Path(in_path).name:<25} -> {Path(out_path).name} | {frames_written} frames | strategy: {strategy}")

In [98]:
def process_folder(input_folder: str, output_folder: str, acc_map: dict):
    """Process one folder (positive videos or negative videos)"""
    in_dir = os.path.join(INPUT_ROOT, input_folder)
    out_dir = os.path.join(OUTPUT_ROOT, output_folder)
    ensure_dir(out_dir)

    if not os.path.isdir(in_dir):
        print(f"[WARN] Folder not found: {in_dir}")
        return
    videos = [f for f in os.listdir(in_dir) if f.lower().endswith((".mp4", ".avi", ".mov", ".mkv"))]
    
    for video_name in tqdm(videos, desc=f"Processing {input_folder}"):
        in_path = os.path.join(in_dir, video_name)
        out_name = Path(video_name).stem + "_proc.mp4"
        out_path = os.path.join(out_dir, out_name)
        
        # Get annotation info (default to no accident time if missing)
        acc_info = acc_map.get(video_name, {'is_longer than 6 sec': True, 'accident_time': None})
        preprocess_video(in_path, out_path, acc_info)

In [None]:
def main():
    print("Loading Excel annotations...")
    acc_map = load_excel_annotations(EXCEL_PATH)
    print(f"Loaded {len(acc_map)} videos from Excel")
    
    # Process both folders
    process_folder("Positive_Vidoes", "positive", acc_map)
    process_folder("Negative_Videos", "negative", acc_map)
    
    print("\nâœ… Preprocessing complete!")
    print(f"Output: {OUTPUT_ROOT}/")

if __name__ == "__main__":
    main()

Loading Excel annotations...


ImportError: Missing optional dependency 'openpyxl'.  Use pip or conda to install openpyxl.