Used the Car Accident Detection and Prediction (CADP) dataset from Carnegie Mellon University and the University of Tokyo.
From the manually annotated part of the dataset, we extracted the original CCTV videos.
Using the provided annotation file, we identified the time intervals where accidents occur.
For each video, we selected one frame per second within those accident intervals and saved them as accident images.
All extracted frames were saved into a structured folder for further model training and analysis.


In [8]:
import json
import os
import cv2
import numpy as np
from sklearn.model_selection import train_test_split
from pathlib import Path
import shutil

In [6]:
videos_dir = "raw_data/manual/videos"
annotations_path = "raw_data/manual/annotations_1531762138.1303267.json"

video_files = {os.path.basename(f) for f in os.listdir(videos_dir) if f.endswith(".mp4")}

with open(annotations_path, "r") as f:
    annotations = json.load(f)
json_keys = set(annotations.keys())

intersection = video_files.intersection(json_keys)
missing_in_videos = json_keys - video_files
missing_in_json = video_files - json_keys

print(f"Total video files: {len(video_files)}")
print(f"Total entries in JSON: {len(json_keys)}")
print(f"Matched files: {len(intersection)}\n")


Total video files: 230
Total entries in JSON: 226
Matched files: 226



In [None]:
import os
import json
import cv2
from pathlib import Path
import math

videos_dir = Path("raw_data/manual/videos")
annotations_path = Path("raw_data/manual/annotations_1531762138.1303267.json")
output_root = Path("dataset_accident_frames")

(output_root / "Accident").mkdir(parents=True, exist_ok=True)

with open(annotations_path, "r") as f:
    annotations = json.load(f)

for video_name in os.listdir(videos_dir):
    if not video_name.endswith(".mp4"):
        continue

    video_path = videos_dir / video_name
    ann = annotations.get(video_name, [])
    if not ann or not ann[0].get("keyframes"):
        continue  

    keyframes = ann[0]["keyframes"]
    starts = [k["frame"] for k in keyframes if k.get("state") == "Start"]
    ends = [k["frame"] for k in keyframes if k.get("state") == "End"]

    if not starts or not ends:
        continue  

    start_sec = float(starts[0])
    end_sec = float(ends[0])

    cap = cv2.VideoCapture(str(video_path))
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    
    for sec in range(math.floor(start_sec), math.ceil(end_sec)):
        frame_idx = int(sec * fps)
        if frame_idx >= total_frames:
            break

        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
        success, frame = cap.read()
        if success:
            out_name = f"{Path(video_name).stem}_{sec:04d}s.jpg"
            out_path = output_root / "Accident" / out_name
            cv2.imwrite(str(out_path), frame)

    cap.release()
