In [4]:
import cv2
import torch
import pytesseract
from ultralytics import YOLO
import numpy as np
from collections import deque
import os
import json
import matplotlib.pyplot as plt

In [5]:
## Estimate Direction of the vehicle
def estimate_direction(trail):
    if len(trail) < 2:
        return None

    # Calculate the direction vector
    dx = trail[-1][0] - trail[0][0]
    dy = trail[-1][1] - trail[0][1]

    # Normalize the direction vector
    length = (dx**2 + dy**2)**0.5
    if length == 0:
        return None

    dx /= length
    dy /= length

    # Determine the direction based on the angle
    angle = np.arctan2(dy, dx) * 180 / np.pi

    if -45 <= angle < 45:
        return "Right"
    elif 45 <= angle < 135:
        return "Up"
    elif 135 <= angle < 225:
        return "Left"
    elif 225 <= angle < 315:
        return "Down"
    else:
        return None

In [6]:
# Applying OCR
def apply_ocr(image,box):
    x1,y1,x2,y2 = map(int,box)
    roi = image[y1:y2, x1:x2]
    text = pytesseract.image_to_string(roi, config='--psm 6').strip().lower()
    text = text.replace("\m", " ").replace("\x0c", "")
    return text

In [7]:
def extract_speed_from_json(json_path):
    with open(json_path,'r') as f:
        data = json.load(f)
        samples = data['samples']
        speed_array = []
        for i in samples:
            speed_array.append(samples[i]["eld"]["vehicle"]["road_speed_smooth_kph"])
    return speed_array
        

In [8]:
def plot_speed_graph(speed,output_path):
    plt.figure(figsize=(10,4))
    plt.bar(range(len(speed)),speed,color="skyblue")
    plt.xlabel("Frame Index")
    plt.ylabel("Speed KPH")
    plt.title("Speed Variation Over Time")
    plt.tight_layout()
    plt.savefig(output_path)
    plt.close()

In [None]:
# def process_video(video_path,model_path='/home/shoaibkhan/Desktop/YOLO_SSV_Model/Complete_Project/weights/best.pt'):
#     # Load YOLOv8 model
#     model = YOLO(model_path)
    
    
#     # Initialize video capture
#     cap = cv2.VideoCapture(video_path)
#     fps = cap.get(cv2.CAP_PROP_FPS)
#     widht, height = int(cap.get(3)), int(cap.get(4))

#     filename = os.path.basename(video_path).replace('.mp4', '_processed.mp4')
#     output_path = os.path.join("results", filename)
#     fourcc = cv2.VideoWriter_fourcc(*'mp4v')
#     out = cv2.VideoWriter(output_path, fourcc, fps, (widht, height))

#     vehicle_trail = deque(maxlen=int(fps * 3))
#     ocr_type = None
#     violation = False
#     stop_frame_index = -1
#     frame_index = 0
#     while cap.isOpened():
#         ret, frame = cap.read()
#         if not ret:
#             break

#         results = model(frame)[0]
#         detections = results.boxes.data.cpu().numpy() if results.boxes is not None else []

#         speed  = speed_array[frame_index] if frame_index < len(speed_array) else 0
#         cv2.putText(frame, f"Speed: {speed:.1f} kph", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

#         for det in detections:
#             x1, y1, x2, y2, conf, cls = det[:6]
#             cls_id = int(cls_id)
#             label = model.names[cls_id]
#             color = (0,255,0) if 'stop' in label else (255,0,0)
#             cv2.rectangle(frame,(int(x1), int(y1)), (int(x2), int(y2)), color, 2)
#             cv2.putText(frame, label, (int(x1), int(y1) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
#             if label == 'stop_sign':
#                 stop_frame_index = frame_index

#             if label in ["right_turn_only","except_right_turn","arrow_right_turn_only"]:
#                 text = apply_ocr(frame,(x1,y1,x2,y2))
#                 ocr_type = label
#                 cv2.putText(frame,f"OCR: {text}", (int(x1),int(y2)+20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,0,0), 2)
#                 if label == "vehicle":
#                     cx , cy = int((x1 + x2) // 2), int((y1 + y2) // 2)
#                     vehicle_trail.append((cx, cy))
#                     cv2.circle(frame, (cx, cy), 5, (0, 255, 0), -1)
#                 direction = estimate_direction(vehicle_trail)
#                 if direction:
#                     cv2.putText(frame,f"Dir:{direction}",(10,70),cv2.FONT_HERSHEY_SIMPLEX, 1,(0,255,0),2)
#                 if stop_frame_index != -1 and frame_index == stop_frame_index + int(fps*3):
#                     speeds_before = speed_array[max(0,stop_frame_index - int (fps)*3):stop_frame_index]
#                     min_speed = min(speeds_before) if speeds_before else 999
#                     violation = False
#                     if ocr_type is None:
#                         if min_speed > 8:
#                             violation = True
#                     elif ocr_type in ["right_turn_only","arrow_right_turn_only"]:
#                         if min_speed > 8 and direction == "Right":
#                             violation = True
#                     elif ocr_type == "except_right_turn":
#                         if min_speed > 8 and direction != "Right":
#                             violation = True
                
#                 if frame_index == stop_frame_index + int(fps*3)+1:
#                     if violation:
#                         cv2.putText(frame, "Violation Detected!", (10, 110), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
#                     else:
#                         cv2.putText(frame, "No Violation", (10, 110), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
#                 out.write(frame)
#                 frame_index += 1
#     cap.release()
#     out.release()
#     return "Violation Detected!" if violation else "No Violation",output_path

In [12]:
import streamlit as st

In [9]:
import os
import cv2
import json
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
from ultralytics import YOLO

def apply_ocr(image, bbox):
    # Dummy OCR function — replace with actual OCR logic
    return "Sample OCR Text"

def estimate_direction(trail):
    if len(trail) < 2:
        return None
    dx = trail[-1][0] - trail[0][0]
    if abs(dx) < 5:
        return "Straight"
    return "Right" if dx > 0 else "Left"

def extract_speeds_from_json(json_path):
    with open(json_path, 'r') as f:
        data = json.load(f)
    return data.get("speeds", [])

def plot_speed_graph(speeds, output_path):
    plt.figure(figsize=(10, 4))
    plt.bar(range(len(speeds)), speeds, color='skyblue')
    plt.xlabel('Frame Index')
    plt.ylabel('Speed (kph)')
    plt.title('Speed Variation Over Time')
    plt.tight_layout()
    plt.savefig(output_path)
    plt.close()

def process_video(video_path, json_path, model_path='/home/shoaibkhan/Desktop/YOLO_SSV_Model/Complete_Project/weights/best.pt'):
    model = YOLO(model_path)
    speeds = extract_speeds_from_json(json_path)

    # Plot the speed bar graph
    speed_graph_path = os.path.join("results", "speed_graph.png")
    plot_speed_graph(speeds, speed_graph_path)

    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    width, height = int(cap.get(3)), int(cap.get(4))

    filename = os.path.basename(video_path).replace('.mp4', '_processed.mp4')
    output_path = os.path.join("results", filename)
    os.makedirs("results", exist_ok=True)

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    vehicle_trail = deque(maxlen=int(fps * 3))
    ocr_type = None
    violation = False
    stop_frame_index = -1
    direction = None
    frame_index = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        results = model(frame)[0]
        detections = results.boxes.data.cpu().numpy() if results.boxes is not None else []

        speed = speeds[frame_index] if frame_index < len(speeds) else 0
        cv2.putText(frame, f"Speed: {speed:.1f} kph", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

        for det in detections:
            x1, y1, x2, y2, conf, cls_id = det[:6]
            cls_id = int(cls_id)
            label = model.names[cls_id]
            color = (0, 255, 0) if 'stop' in label else (255, 0, 0)

            cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), color, 2)
            cv2.putText(frame, label, (int(x1), int(y1) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

            if label == 'stop_sign':
                stop_frame_index = frame_index

            if label in ["right_turn_only", "except_right_turn", "arrow_right_turn_only"]:
                text = apply_ocr(frame, (x1, y1, x2, y2))
                ocr_type = label
                cv2.putText(frame, f"OCR: {text}", (int(x1), int(y2) + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)

            if label == "vehicle":
                cx, cy = int((x1 + x2) // 2), int((y1 + y2) // 2)
                vehicle_trail.append((cx, cy))
                cv2.circle(frame, (cx, cy), 5, (0, 255, 0), -1)

        # Estimate direction
        direction = estimate_direction(vehicle_trail)
        if direction:
            cv2.putText(frame, f"Dir: {direction}", (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 2)

        # Check for violation
        if stop_frame_index != -1 and frame_index == stop_frame_index + int(fps * 3):
            speeds_before = speeds[max(0, stop_frame_index - int(fps) * 3):stop_frame_index]
            min_speed = min(speeds_before) if speeds_before else 999

            if ocr_type is None:
                violation = min_speed > 8
            elif ocr_type in ["right_turn_only", "arrow_right_turn_only"]:
                violation = min_speed > 8 and direction == "Right"
            elif ocr_type == "except_right_turn":
                violation = min_speed > 8 and direction != "Right"

        if frame_index == stop_frame_index + int(fps * 3) + 1:
            status_text = "Violation Detected!" if violation else "No Violation"
            color = (0, 0, 255) if violation else (0, 255, 0)
            cv2.putText(frame, status_text, (10, 110), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)

        out.write(frame)
        frame_index += 1

    cap.release()
    out.release()

    return ("Violation Detected!" if violation else "No Violation"), output_path, speed_graph_path
