In [1]:
import google.generativeai as genai
import cv2
import os
from PIL import Image
import io
import cv2
import numpy as np

In [3]:
def preprocess_video(input_path, output_path, target_size=(224, 224)):
    # Open the input video
    cap = cv2.VideoCapture(input_path)

    # Get video properties
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    # Define the codec and create VideoWriter object
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, target_size)

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Resize the frame
        resized_frame = cv2.resize(frame, target_size, interpolation=cv2.INTER_AREA)

        # Normalize the frame (scale pixel values to [0, 1])
        normalized_frame = resized_frame / 255.0

        # Convert back to uint8 format for video writing (0-255 range)
        output_frame = (normalized_frame * 255).astype(np.uint8)

        # Write the processed frame
        out.write(output_frame)

    # Release everything
    cap.release()
    out.release()
    cv2.destroyAllWindows()

# Example usage
input_video = r"C:\Users\AHMED DAWOD\Downloads\graduation project\RoadAccidents002_x264.mp4"
output_video = "output_video_preprocessing.mp4"
preprocess_video(input_video, output_video, target_size=(224, 224))

In [4]:
import cv2
import numpy as np
from ultralytics import YOLO

# Set paths
video_path = "output_video_preprocessing.mp4"
output_video_raw = "keyframes_only_output.mp4"
output_video_annotated = "keyframes_annotated_output.mp4"
output_video_significant = "significant_keyframes_output.mp4"  # New output video

# Load YOLOv12 model
model = YOLO("yolo12n.pt")

# Open video
cap = cv2.VideoCapture(video_path)
ret, prev_frame = cap.read()
if not ret:
    print("Error reading video file.")
    cap.release()
    exit()

# Get video properties
fps = cap.get(cv2.CAP_PROP_FPS)
print(f"Input Video FPS: {fps}")
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')

# Initialize video writers
out_raw = cv2.VideoWriter(output_video_raw, fourcc, fps, (frame_width, frame_height))
out_annotated = cv2.VideoWriter(output_video_annotated, fourcc, fps, (frame_width, frame_height))
out_significant = cv2.VideoWriter(output_video_significant, fourcc, fps, (frame_width, frame_height))  # New writer

# Initialize variables
prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
frame_count = 0
event_active = False
event_start_frame = None
no_motion_threshold = 30
motion_history = []
saved_frames = 0
saved_significant_frames = 0  # Counter for significant frames
events = []
event_motion_peak = 0
peak_frame = None
frames_per_keyframe = 10  # For original outputs
significant_frames_per_keyframe = 1  # For new significant output

def frame_to_time(frame_num, fps):
    return frame_num / fps

def check_tracking_event(results):
    if not results or not hasattr(results[0], 'boxes'):
        return False
    boxes = results[0].boxes.xyxy.cpu().numpy()
    if len(boxes) == 0:
        return False
    for i in range(len(boxes)):
        for j in range(i + 1, len(boxes)):
            x1, y1, x2, y2 = boxes[i]
            x3, y3, x4, y4 = boxes[j]
            if (x1 < x4 and x2 > x3 and y1 < y4 and y2 > y3):
                return True
    return len(boxes) > 0

while cap.isOpened():
    ret, current_frame = cap.read()
    if not ret:
        if event_active:
            end_time = frame_to_time(frame_count, fps)
            print(f"Event ended at frame {frame_count} ({end_time:.2f}s) - Video ended")
            events[-1]["end_frame"] = frame_count
            events[-1]["end_time"] = end_time
            if peak_frame is not None:
                # Write to original outputs
                for _ in range(frames_per_keyframe):
                    out_raw.write(peak_frame[0])
                    out_annotated.write(peak_frame[1])
                saved_frames += 1
                # Write to significant output
                for _ in range(significant_frames_per_keyframe):
                    out_significant.write(peak_frame[0])
                saved_significant_frames += 1
            event_active = False
        break

    frame_count += 1
    current_gray = cv2.cvtColor(current_frame, cv2.COLOR_BGR2GRAY)

    # Motion Detection
    diff_gray = cv2.absdiff(prev_gray, current_gray)
    _, thresh_gray = cv2.threshold(diff_gray, 30, 255, cv2.THRESH_BINARY)
    non_zero_count = cv2.countNonZero(thresh_gray)
    motion_history.append(non_zero_count)
    if len(motion_history) > 50:
        motion_history.pop(0)
    adaptive_threshold = max(100, np.mean(motion_history) * 2)

    diff_b = cv2.absdiff(prev_frame[:, :, 0], current_frame[:, :, 0])
    diff_g = cv2.absdiff(prev_frame[:, :, 1], current_frame[:, :, 1])
    diff_r = cv2.absdiff(prev_frame[:, :, 2], current_frame[:, :, 2])
    color_diff = cv2.max(cv2.max(diff_b, diff_g), diff_r)
    _, thresh_color = cv2.threshold(color_diff, 30, 255, cv2.THRESH_BINARY)
    color_change = cv2.countNonZero(thresh_color)

    flow = cv2.calcOpticalFlowFarneback(prev_gray, current_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
    magnitude = np.sqrt(flow[..., 0]**2 + flow[..., 1]**2)
    flow_motion = np.mean(magnitude) > 2

    motion_detected = (non_zero_count > adaptive_threshold) or \
                      (color_change > adaptive_threshold) or \
                      flow_motion

    # YOLOv12 + BoT-SORT Tracking
    tracking_event = False
    annotated_frame = current_frame.copy()
    if frame_count % 5 == 0:
        results = model.track(
            source=current_frame,
            persist=True,
            tracker="botsort.yaml",
            conf=0.3,
            iou=0.5,
            verbose=False
        )
        tracking_event = check_tracking_event(results)
        if results and results[0].boxes:
            annotated_frame = results[0].plot()

    # Combine Motion and Tracking
    significant_event = motion_detected or tracking_event

    # Handle Events
    if significant_event:
        if not event_active:
            event_active = True
            event_start_frame = frame_count
            start_time = frame_to_time(frame_count, fps)
            print(f"Event started at frame {frame_count} ({start_time:.2f}s)")
            events.append({"start_frame": frame_count, "start_time": start_time})
            event_motion_peak = 0
            peak_frame = None

        motion_score = non_zero_count + color_change + np.mean(magnitude)
        if motion_score > event_motion_peak:
            event_motion_peak = motion_score
            peak_frame = (current_frame, annotated_frame)

    elif event_active and (frame_count - event_start_frame) > no_motion_threshold:
        event_active = False
        end_time = frame_to_time(frame_count, fps)
        print(f"Event ended at frame {frame_count} ({end_time:.2f}s)")
        events[-1]["end_frame"] = frame_count
        events[-1]["end_time"] = end_time
        if peak_frame is not None:
            # Write to original outputs
            for _ in range(frames_per_keyframe):
                out_raw.write(peak_frame[0])
                out_annotated.write(peak_frame[1])
            saved_frames += 1
            # Write to significant output
            for _ in range(significant_frames_per_keyframe):
                out_significant.write(peak_frame[0])
            saved_significant_frames += 1

    prev_gray = current_gray
    prev_frame = current_frame.copy()

# Release resources
cap.release()
out_raw.release()
out_annotated.release()
out_significant.release()

# Print summary
print(f"\nRaw keyframes video saved as: {output_video_raw}")
print(f"Annotated keyframes video saved as: {output_video_annotated}")
print(f"Significant keyframes video saved as: {output_video_significant}")
print(f"Total frames written to raw/annotated: {saved_frames} (displayed frames: {saved_frames * frames_per_keyframe})")
print(f"Total frames written to significant: {saved_significant_frames} (displayed frames: {saved_significant_frames * significant_frames_per_keyframe})")
print(f"Output video duration (raw/annotated): {saved_frames * frames_per_keyframe / fps:.2f} seconds at {fps} FPS")
print(f"Output video duration (significant): {saved_significant_frames * significant_frames_per_keyframe / fps:.2f} seconds at {fps} FPS")
print("\nEvent Summary:")
for i, event in enumerate(events, 1):
    print(f"Event {i}: Start {event['start_time']:.2f}s (Frame {event['start_frame']}), "
          f"End {event['end_time']:.2f}s (Frame {event['end_frame']})")

Creating new Ultralytics Settings v0.0.6 file  
View Ultralytics Settings with 'yolo settings' or at 'C:\Users\AHMED DAWOD\AppData\Roaming\Ultralytics\settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.
Downloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolo12n.pt to 'yolo12n.pt'...


100%|██████████| 5.34M/5.34M [00:01<00:00, 3.86MB/s]


Input Video FPS: 30.0
[31m[1mrequirements:[0m Ultralytics requirement ['lap>=0.5.12'] not found, attempting AutoUpdate...
Collecting lap>=0.5.12
  Downloading lap-0.5.12-cp39-cp39-win_amd64.whl.metadata (6.3 kB)
Downloading lap-0.5.12-cp39-cp39-win_amd64.whl (1.5 MB)
   ---------------------------------------- 1.5/1.5 MB 5.2 MB/s eta 0:00:00
Installing collected packages: lap
Successfully installed lap-0.5.12

[31m[1mrequirements:[0m AutoUpdate success  5.8s, installed 1 package: ['lap>=0.5.12']
[31m[1mrequirements:[0m  [1mRestart runtime or rerun command for updates to take effect[0m

Event started at frame 5 (0.17s)
Event ended at frame 36 (1.20s)
Event started at frame 40 (1.33s)
Event ended at frame 71 (2.37s)
Event started at frame 75 (2.50s)
Event ended at frame 106 (3.53s)
Event started at frame 110 (3.67s)
Event ended at frame 141 (4.70s)
Event started at frame 145 (4.83s)
Event ended at frame 176 (5.87s)
Event started at frame 180 (6.00s)
Event ended at frame 216 (7

In [9]:
import torch
import cv2
import numpy as np
import torch.nn as nn
from huggingface_hub import hf_hub_download

def load_i3d_ucf_finetuned(repo_id="Ahmeddawood0001/i3d_ucf_finetuned", filename="i3d_ucf_finetuned.pth"):
    class I3DClassifier(nn.Module):
        def __init__(self, num_classes):
            super(I3DClassifier, self).__init__()
            self.i3d = torch.hub.load('facebookresearch/pytorchvideo', 'i3d_r50', pretrained=True)
            self.dropout = nn.Dropout(0.3)
            self.i3d.blocks[6].proj = nn.Linear(2048, num_classes)
        def forward(self, x):
            x = self.i3d(x)
            x = self.dropout(x)
            return x
    device = torch.device("cpu")  # Explicitly set to CPU
    model = I3DClassifier(num_classes=8).to(device)
    weights_path = hf_hub_download(repo_id=repo_id, filename=filename)
    # Load with map_location to CPU
    model.load_state_dict(torch.load(weights_path, map_location=torch.device('cpu')))
    model.eval()
    return model

# Define frame extraction function
def extract_frames(video_path, max_frames=32, frame_size=(224, 224)):
    cap = cv2.VideoCapture(video_path)
    frames = []
    while len(frames) < max_frames:
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frame = cv2.resize(frame, frame_size)
        frames.append(frame)
    while len(frames) < max_frames:
        frames.append(frames[-1])
    frames = frames[:max_frames]
    frames = np.stack(frames)
    frames = torch.from_numpy(frames).permute(0, 3, 1, 2).float() / 255.0
    frames = frames.permute(1, 0, 2, 3)
    cap.release()
    return frames

# Define classification function
def classify_video(video_path, model, labels):
    frames = extract_frames(video_path)
    frames = frames.unsqueeze(0).to(device)
    with torch.no_grad():
        outputs = model(frames)
        probabilities = torch.softmax(outputs, dim=1)
        predicted_idx = torch.argmax(probabilities, dim=1).item()
        predicted_label = labels[predicted_idx]
        confidence = probabilities[0, predicted_idx].item()
    return predicted_label, confidence

# Example usage
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
labels = ["arrest", "Explosion", "Fight", "normal", "roadaccidents", "shooting", "Stealing", "vandalism"]
model = load_i3d_ucf_finetuned()
video_path = "significant_keyframes_output.mp4"  # Replace with your video path
predicted_label, confidence = classify_video(video_path, model, labels)
print(f"Video: {video_path}")
print(f"Predicted Label: {predicted_label}")

print(f"Confidence: {confidence:.4f}")


Using cache found in C:\Users\AHMED DAWOD/.cache\torch\hub\facebookresearch_pytorchvideo_main


Video: significant_keyframes_output.mp4
Predicted Label: roadaccidents
Confidence: 0.6165


In [8]:
# Set up the Gemini API key
genai.configure(api_key="AIzaSyCZFf2r-fmE9uRQjKebHfF_MZhDKwiZP7A")  # Replace with your actual API key

# Load the model
model = genai.GenerativeModel("gemini-1.5-flash")
print("Model Loaded Successfully")


# Define video path & output directory
video_path = "significant_keyframes_output.mp4"
output_dir = "frames"
os.makedirs(output_dir, exist_ok=True)

# Capture video
cap = cv2.VideoCapture(video_path)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

# Extract 15 evenly distributed frames
frame_rate = 15
step = max(1, total_frames // frame_rate)

frames = []
frame_idx = 0

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    if frame_idx % step == 0:
        frame_path = os.path.join(output_dir, f"frame_{frame_idx}.jpg")
        cv2.imwrite(frame_path, frame)
        frames.append(frame_path)

    frame_idx += 1

cap.release()
print(f"Extracted {len(frames)} frames")



# Define video prediction label
video_prediction = predicted_label

print(f"Video Prediction: {video_prediction}")

# Dictionary to store descriptions
descriptions = {}

for frame_path in frames:
    prompt = (
        f"This frame is from a video classified as '{video_prediction}'. "
        "Describe the event happening in the image in one sentence."
    )

    with open(frame_path, "rb") as img_file:
        image_data = Image.open(io.BytesIO(img_file.read()))

    # Ensure model is loaded
    if model:
        response = model.generate_content([prompt, image_data])
        descriptions[frame_path] = response.text
    else:
        print("Error: Model is not defined.")

print("Descriptions Generated Successfully")


for frame, desc in descriptions.items():
    print(f"{frame}: {desc}")


# Create summary prompt
summary_prompt = (
    "Here are multiple descriptions of frames from a surveillance video:\n"
    + "\n".join(descriptions.values()) +  # Combine all frame descriptions
    "\nBased on these descriptions, provide a concise summary of the overall event."
)

# Generate summary response
summary_response = model.generate_content(summary_prompt)

# Print the final summary
print("\n**Final Summary:**\n")
print(summary_response.text)


Model Loaded Successfully
Extracted 10 frames
Video Prediction: roadaccidents
Descriptions Generated Successfully
frames\frame_0.jpg: Here's a one-sentence description of the image:

A police car is stopped on a city street near pedestrians, possibly responding to or investigating a road accident.

frames\frame_1.jpg: Here's a one-sentence description of the image:

A car has mounted the curb and appears to have collided with a pedestrian, causing people nearby to react.

frames\frame_2.jpg: That's a still image from a video showing a pedestrian walking in a marked crosswalk as cars pass by, with no apparent immediate accident.

frames\frame_3.jpg: Here's a one-sentence description of the image:

A pedestrian is in danger of being hit by a vehicle on a busy city street.

frames\frame_4.jpg: That's a still image showing a pedestrian seemingly running into the street after almost being struck by a vehicle, possibly causing a near-miss accident.

frames\frame_5.jpg: A dark-colored car is 