In [1]:
import cv2
import os
import numpy as np
import geocoder
from ultralytics import YOLO
from twilio.rest import Client
from keras.models import load_model
from keras.preprocessing.image import img_to_array
from sklearn.metrics import accuracy_score, precision_score, f1_score, confusion_matrix
from tkinter import Tk, filedialog, Button, Label
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import datetime

# Load CNN model
cnn_model = load_model("accident_model.h5", compile=False)
CLASS_LABELS = ["accident", "no_accident"]

# Load YOLO model
yolo_model = YOLO("yolov8n.pt")
VEHICLE_CLASSES = ["car", "motorbike", "bus", "truck"]

# Twilio Credentials
account_sid = 'AC00803d869a7761245fe3144469f65360'
auth_token = 'af09fe4a418dd8a76c7c996803e878d3'
twilio_client = Client(account_sid, auth_token)

# Logging CSV
LOG_FILE = "accident_log.csv"
if not os.path.exists(LOG_FILE):
    pd.DataFrame(columns=["Time", "Vehicle Count", "Motion Intensity", "CNN Result", "Latitude", "Longitude"]).to_csv(LOG_FILE, index=False)


def detect_vehicles(frame):
    results = yolo_model(frame)
    detected_objects = results[0].boxes.data.cpu().numpy() if results[0].boxes.data is not None else []
    vehicles = [obj for obj in detected_objects if yolo_model.names[int(obj[5])] in VEHICLE_CLASSES and obj[4] > 0.5]
    return len(vehicles)


def calculate_optical_flow(prev_frame, next_frame):
    prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
    next_gray = cv2.cvtColor(next_frame, cv2.COLOR_BGR2GRAY)
    flow = cv2.calcOpticalFlowFarneback(prev_gray, next_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
    magnitude, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1])
    motion_score = np.sum(magnitude)
    return motion_score


def classify_frame_with_cnn(frame):
    frame_resized = cv2.resize(frame, (64, 64))
    frame_array = img_to_array(frame_resized) / 255.0
    frame_array = np.expand_dims(frame_array, axis=0)
    prediction = cnn_model.predict(frame_array, verbose=0)
    label = CLASS_LABELS[np.argmax(prediction)]
    return label


def send_alert_message(lat, lng):
    try:
        message_body = f"Met an Accident! Help me.\nLocation: https://www.google.com/maps?q={lat},{lng}"
        message = twilio_client.messages.create(
            from_='+18788677836',
            body=message_body,
            to='+919381623527'
        )
        print("Alert sent. Message SID:", message.sid)
    except Exception as e:
        print("Error sending alert:", e)


def log_event(vehicle_count, motion_score, cnn_result, lat, lng):
    timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    new_row = pd.DataFrame([[timestamp, vehicle_count, motion_score, cnn_result, lat, lng]],
                           columns=["Time", "Vehicle Count", "Motion Intensity", "CNN Result", "Latitude", "Longitude"])
    new_row.to_csv(LOG_FILE, mode='a', header=False, index=False)


def extract_frames_to_dataset(video_path, output_dir='test_dataset', label='accident'):
    os.makedirs(os.path.join(output_dir, label), exist_ok=True)
    cap = cv2.VideoCapture(video_path)
    frame_count = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frame_path = os.path.join(output_dir, label, f"frame_{frame_count}.jpg")
        cv2.imwrite(frame_path, frame)
        frame_count += 1
    cap.release()
    print(f"Extracted {frame_count} frames to {os.path.join(output_dir, label)}")


def evaluate_model(test_dir='test_dataset'):
    true_labels = []
    pred_labels = []

    for label in CLASS_LABELS:
        label_dir = os.path.join(test_dir, label)
        if not os.path.isdir(label_dir):
            continue

        for fname in os.listdir(label_dir):
            if fname.endswith(('.jpg', '.png', '.jpeg')):
                img_path = os.path.join(label_dir, fname)
                img = cv2.imread(img_path)
                if img is None:
                    continue
                img_resized = cv2.resize(img, (64, 64))
                img_array = img_to_array(img_resized) / 255.0
                img_array = np.expand_dims(img_array, axis=0)
                prediction = cnn_model.predict(img_array, verbose=0)
                pred_label = CLASS_LABELS[np.argmax(prediction)]
                true_labels.append(label)
                pred_labels.append(pred_label)

    if true_labels and pred_labels:
        acc = accuracy_score(true_labels, pred_labels)
        prec = precision_score(true_labels, pred_labels, pos_label='accident', zero_division=0)
        f1 = f1_score(true_labels, pred_labels, pos_label='accident', zero_division=0)
        cm = confusion_matrix(true_labels, pred_labels, labels=CLASS_LABELS)

        print("\nModel Evaluation Results:")
        print(f"Accuracy: {acc:.4f}")
        print(f"Precision: {prec:.4f}")
        print(f"F1-score: {f1:.4f}")

        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=CLASS_LABELS, yticklabels=CLASS_LABELS)
        plt.xlabel("Predicted")
        plt.ylabel("True")
        plt.title("Confusion Matrix")
        plt.show()

        return acc
    else:
        print("No valid predictions could be made. Please check your test dataset.")
        return None



def process_video(video_path):
    extract_frames_to_dataset(video_path, label='accident')
    cap = cv2.VideoCapture(video_path)
    prev_frame = None
    alert_sent = False

    location = geocoder.ip('me')
    lat, lng = location.latlng if location.ok else ("Unknown", "Unknown")

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        if prev_frame is not None:
            motion_score = calculate_optical_flow(prev_frame, frame)
            num_vehicles = detect_vehicles(frame)
            cnn_result = classify_frame_with_cnn(frame)

            accident_detected = motion_score > 5000 and cnn_result == "accident"

            if not accident_detected:
                cnn_result = "no_accident"

            display_text = f"Vehicles: {num_vehicles}, Motion: {motion_score:.2f}, CNN: {cnn_result}"
            print(display_text)
            print("Accident Detected." if accident_detected else "No Accident Detected. CNN:", cnn_result)

            if accident_detected and not alert_sent:
                send_alert_message(lat, lng)
                log_event(num_vehicles, motion_score, cnn_result, lat, lng)
                alert_sent = True

            cv2.putText(frame, display_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8,
                        (0, 0, 255) if accident_detected else (0, 255, 0), 2)
            cv2.imshow("Accident Detection", frame)

            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

        prev_frame = frame

    cap.release()
    cv2.destroyAllWindows()
    evaluate_model()


def open_file():
    video_path = filedialog.askopenfilename(title="Select video", filetypes=[("MP4 files", "*.mp4")])
    if video_path:
        process_video(video_path)

from tkinter import messagebox  # Ensure this is imported at the top

def evaluate_model(test_dir='test_dataset'):
    true_labels = []
    pred_labels = []

    for label in CLASS_LABELS:
        label_dir = os.path.join(test_dir, label)
        if not os.path.isdir(label_dir):
            continue

        for fname in os.listdir(label_dir):
            if fname.lower().endswith(('.jpg', '.jpeg', '.png')):
                img_path = os.path.join(label_dir, fname)
                img = cv2.imread(img_path)
                if img is None:
                    continue
                img_resized = cv2.resize(img, (64, 64))
                img_array = img_to_array(img_resized) / 255.0
                img_array = np.expand_dims(img_array, axis=0)
                prediction = cnn_model.predict(img_array, verbose=0)
                pred_label = CLASS_LABELS[np.argmax(prediction)]
                true_labels.append(label)
                pred_labels.append(pred_label)

    if true_labels and pred_labels:
        acc = accuracy_score(true_labels, pred_labels)
        prec = precision_score(true_labels, pred_labels, pos_label='accident', zero_division=0)
        f1 = f1_score(true_labels, pred_labels, pos_label='accident', zero_division=0)

        print("\nModel Evaluation Results:")
        print(f"Accuracy : {acc:.4f}")
        print(f"Precision: {prec:.4f}")
        print(f"F1-score : {f1:.4f}")

        # Display in GUI popup
        messagebox.showinfo("Model Accuracy", f"Accuracy: {acc:.4f}\nPrecision: {prec:.4f}\nF1-score: {f1:.4f}")
    else:
        print("No valid predictions found.")
        messagebox.showwarning("Evaluation", "No valid predictions could be made.\nCheck your test dataset.")




def launch_gui():
    window = Tk()
    window.title("Accident Detection System")
    window.geometry("400x200")

    label = Label(window, text="Select a video to process", font=("Arial", 14))
    label.pack(pady=20)

    select_button = Button(window, text="Select Video", font=("Arial", 12), command=open_file)
    select_button.pack()

    window.mainloop()


if __name__ == "__main__":
    launch_gui()


Extracted 229 frames to test_dataset\accident

0: 384x640 (no detections), 118.7ms
Speed: 6.7ms preprocess, 118.7ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)
Vehicles: 0, Motion: 3678.63, CNN: no_accident
No Accident Detected. CNN: no_accident

0: 384x640 (no detections), 99.9ms
Speed: 0.9ms preprocess, 99.9ms inference, 4.1ms postprocess per image at shape (1, 3, 384, 640)
Vehicles: 0, Motion: 22924.62, CNN: accident
Accident Detected. accident
Alert sent. Message SID: SM0a4048b6c6304a7a97e8be883b59cc2b

0: 384x640 (no detections), 74.6ms
Speed: 3.0ms preprocess, 74.6ms inference, 3.1ms postprocess per image at shape (1, 3, 384, 640)
Vehicles: 0, Motion: 89673.80, CNN: accident
Accident Detected. accident

0: 384x640 (no detections), 70.8ms
Speed: 0.0ms preprocess, 70.8ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)
Vehicles: 0, Motion: 83867.59, CNN: accident
Accident Detected. accident

0: 384x640 (no detections), 65.5ms
Speed: 0.0ms prepro