In [None]:
# @title Part1
!pip install ultralytics

!git clone https://github.com/abewley/sort

%cd /content/sort


# Open the file in edit mode
with open('/content/sort/sort.py', 'r+') as f:
    content = f.read()
    # Make the changes (replace 'TkAgg' with 'Agg')
    content = content.replace("matplotlib.use('TkAgg')", "matplotlib.use('Agg')")
    # Overwrite the file with the modified content
    f.seek(0)
    f.write(content)
    f.truncate()

!pip install -r requirements.txt

!pip install filterpy

# @title Arrow
import cv2
import numpy as np
import os
import pickle
from ultralytics import YOLO  # Assuming YOLOv8 is installed and available


def load_histogram(file_path):
    """Load a histogram from a pickle file."""
    with open(file_path, 'rb') as file:
        histogram = pickle.load(file)
    return histogram


def calculate_histogram(image, bbox):
    """Calculate and normalize the histogram for a region in the image."""
    height, width, _ = image.shape
    x_center, y_center, box_width, box_height = bbox

    # Convert normalized coordinates to pixel coordinates
    x_min = int((x_center - box_width / 2) * width)
    x_max = int((x_center + box_width / 2) * width)
    y_min = int((y_center - box_height / 2) * height)
    y_max = int((y_center + box_height / 2) * height)

    # Crop the region of interest
    roi = image[y_min:y_max, x_min:x_max]

    # Calculate the histogram
    histogram = cv2.calcHist([roi], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256])
    histogram = cv2.normalize(histogram, histogram).flatten()
    return histogram


def compare_histograms(frame_histogram, histograms):
    """Compare the frame histogram with a list of histograms using correlation."""
    best_match = None
    best_score = -1

    for label, histogram in histograms.items():
        score = cv2.compareHist(frame_histogram, histogram, cv2.HISTCMP_CORREL)
        if score > best_score:
            best_score = score
            best_match = label

    return best_match, best_score


def draw_arrow(frame, x1, y1, x2, y2):
    """Draw a large, filled downward arrow above the detected object."""
    center_x = (x1 + x2) // 2
    top_y = y1 - 150  # Position above the bounding box

    # Define arrow dimensions
    arrow_width = 50
    arrow_height = 100
    shaft_width = 20

    # Coordinates for the arrowhead
    arrow_tip = (center_x, y1)
    left_corner = (center_x - arrow_width, top_y + arrow_height)
    right_corner = (center_x + arrow_width, top_y + arrow_height)

    # Coordinates for the shaft
    shaft_top_left = (center_x - shaft_width, top_y)
    shaft_top_right = (center_x + shaft_width, top_y)
    shaft_bottom_left = (center_x - shaft_width, top_y + arrow_height)
    shaft_bottom_right = (center_x + shaft_width, top_y + arrow_height)

    # Combine polygons to form the arrow
    arrow_head = np.array([arrow_tip, left_corner, right_corner], np.int32)
    arrow_shaft = np.array([shaft_top_left, shaft_bottom_left, shaft_bottom_right, shaft_top_right], np.int32)

    # Draw the filled polygons
    color = (0, 0, 255)  # Red color for the arrow
    cv2.fillPoly(frame, [arrow_head], color)
    cv2.fillPoly(frame, [arrow_shaft], color)


def detect_cars(frame, model, specified_path):
    """Detect cars only within the specified path region."""
    results = model(frame)  # Perform inference
    detections = []

    for box in results[0].boxes.data:  # Access bounding box data
        x_min, y_min, x_max, y_max = map(int, box[:4])  # Extract coordinates
        cls = int(box[5])  # Class ID
        if cls == 2:  # Class ID for 'car'
            # Check if the bounding box lies within the specified path
            car_center_x = (x_min + x_max) // 2
            car_center_y = (y_min + y_max) // 2
            if is_within_path(car_center_x, car_center_y, specified_path, frame.shape):
                detections.append((x_min, y_min, x_max, y_max))
                draw_arrow(frame, x_min, y_min, x_max, y_max)  # Draw arrow on the car
    return detections


def is_within_path(car_center_x, car_center_y, path_bbox, frame_shape):
    """Check if the car's center lies within the specified path."""
    height, width, _ = frame_shape
    x_center, y_center, box_width, box_height = path_bbox

    # Convert normalized path coordinates to pixel coordinates
    x_min = int((x_center - box_width / 2) * width)
    x_max = int((x_center + box_width / 2) * width)
    y_min = int((y_center - box_height / 2) * height)
    y_max = int((y_center + box_height / 2) * height)

    return x_min <= car_center_x <= x_max and y_min <= car_center_y <= y_max


def process_video(input_video_path, histogram_files, bbox, path_bbox, output_video_path, save_frames=False, temp_frame_folder=None):
    """
    Process video, match histograms, detect cars, and output processed video.
    """
    # Load the precomputed histograms
    histograms = {label: load_histogram(file) for label, file in histogram_files.items()}

    # Initialize YOLO model
    model = YOLO('yolov8n.pt')  # Use YOLOv8 pretrained model

    # Load the input video
    video_cap = cv2.VideoCapture(input_video_path)
    fps = int(video_cap.get(cv2.CAP_PROP_FPS))
    frame_width = int(video_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(video_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    video_writer = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height))

    frame_count = 0

    while video_cap.isOpened():
        ret, frame = video_cap.read()
        if not ret:
            break

        # Calculate the histogram for the specified region
        frame_histogram = calculate_histogram(frame, bbox)

        # Compare histograms
        matched_label, score = compare_histograms(frame_histogram, histograms)

        # Draw the bounding box on the frame for the given coordinates
        height, width, _ = frame.shape
        x_center, y_center, box_width, box_height = bbox
        x_min = int((x_center - box_width / 2) * width)
        x_max = int((x_center + box_width / 2) * width)
        y_min = int((y_center - box_height / 2) * height)
        y_max = int((y_center + box_height / 2) * height)

        # Draw the original rectangle
        color_map = {
            "Green": (0, 255, 0),
            "None": (255, 255, 255),
            "Orange": (0, 165, 255),
            "Red": (0, 0, 255)
        }
        color = color_map.get(matched_label, (255, 255, 255))  # Default to white if label not found
        cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), color, 2)
        label = f"{matched_label} ({score:.2f})"
        cv2.putText(frame, label, (x_min, y_max + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

        # Detect cars in the frame if the label is "Red"
        if matched_label == "Red":
            car_detections = detect_cars(frame, model, path_bbox)
            for (x1, y1, x2, y2) in car_detections:
                # Car detections already include the arrow drawing
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2)  # Red for car detections

        # Save frames if needed
        if save_frames and temp_frame_folder:
            os.makedirs(temp_frame_folder, exist_ok=True)
            temp_frame_path = os.path.join(temp_frame_folder, f"frame_{frame_count:04d}.jpg")
            cv2.imwrite(temp_frame_path, frame)

        # Write the processed frame to the output video
        video_writer.write(frame)
        frame_count += 1

    video_cap.release()
    video_writer.release()
    print(f"Processed video saved at {output_video_path}")


# Inputs
input_video_path = '/content/drive/MyDrive/Input_Data/vid11_27_7_FaisalTown.mp4'  # Input video path
histogram_files = {
    "Green": "/content/drive/MyDrive/Histogram/averaged_hist_Green.pkl",
    "None": "/content/drive/MyDrive/Histogram/averaged_hist_None.pkl",
    "Orange": "/content/drive/MyDrive/Histogram/averaged_hist_Orange.pkl",
    "Red": "/content/drive/MyDrive/Histogram/averaged_hist_Red.pkl"
}
bbox = (0.485879, 0.577487, 0.016630, 0.051067)  # Normalized coordinates
path_bbox = (0.901061, 0.730515, 0.197877, 0.199434)  # New specified path coordinates
output_video_path = '/content/drive/MyDrive/Resulted_violation/output_video.mp4'  # Output video path

# Process video
process_video(input_video_path, histogram_files, bbox, path_bbox, output_video_path)


In [None]:
# @title part2
import csv

import logging

# Setup logging
logging.basicConfig(level=logging.INFO)

# Constants for signal status
SIGNAL_RED = "Red"
SIGNAL_GREEN = "Green"

# List to store violation records
violations = []

upper_line_start = (1389, 699)
upper_line_end = (1859, 666)
lower_line_start = (1912, 842)
lower_line_end = (1332, 1061)


def load_histograms(file_path):
    """Load a histogram from a pickle file."""
    if not os.path.exists(file_path):
        logging.error(f"Histogram file {file_path} not found!")
        return None
    try:
        with open(file_path, 'rb') as file:
            histogram = pickle.load(file)
        return histogram
    except Exception as e:
        logging.error(f"Error loading histogram from {file_path}: {e}")
        return None

def calculate_histogram(image, bbox):
    """Calculate and normalize the histogram for a region in the image."""
    height, width, _ = image.shape
    x_center, y_center, box_width, box_height = bbox

    # Convert normalized coordinates to pixel coordinates
    x_min = int((x_center - box_width / 2) * width)
    x_max = int((x_center + box_width / 2) * width)
    y_min = int((y_center - box_height / 2) * height)
    y_max = int((y_center + box_height / 2) * height)

    # Crop the region of interest
    roi = image[y_min:y_max, x_min:x_max]

    # Calculate the histogram
    histogram = cv2.calcHist([roi], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256])
    histogram = cv2.normalize(histogram, histogram).flatten()
    return histogram

def compare_histograms(frame_histogram, histograms):
    """Compare the frame histogram with a list of histograms using correlation."""
    best_match = None
    best_score = -1

    for label, histogram in histograms.items():
        score = cv2.compareHist(frame_histogram, histogram, cv2.HISTCMP_CORREL)
        if score > best_score:
            best_score = score
            best_match = label

    return best_match, best_score

def detect_cars(frame, model, specified_path):
    """Detect cars only within the specified path region."""
    results = model(frame)  # Perform inference
    detections = []

    for box in results[0].boxes.data:  # Access bounding box data
        x_min, y_min, x_max, y_max = map(int, box[:4])  # Extract coordinates
        cls = int(box[5])  # Class ID
        if cls == 2:  # Class ID for 'car'
            # Check if the bounding box lies within the specified path
            car_center_x = (x_min + x_max) // 2
            car_center_y = (y_min + y_max) // 2
            if is_within_path(car_center_x, car_center_y, specified_path, frame.shape):
                # Ensure detections are in the correct format for the tracker: [x1, y1, x2, y2, score]
                detections.append([x_min, y_min, x_max, y_max, box[4].item()])  # Add confidence score

    # Convert detections to a NumPy array if it's not empty
    if detections:
        detections = np.array(detections)
    else:
        # Return an empty NumPy array with the correct shape if no detections are found
        detections = np.empty((0, 5))
    return detections

def is_within_path(car_center_x, car_center_y, path_bbox, frame_shape):
    """Check if the car's center lies within the specified path."""
    height, width, _ = frame_shape
    x_center, y_center, box_width, box_height = path_bbox

    # Convert normalized path coordinates to pixel coordinates
    x_min = int((x_center - box_width / 2) * width)
    x_max = int((x_center + box_width / 2) * width)
    y_min = int((y_center - box_height / 2) * height)
    y_max = int((y_center + box_height / 2) * height)

    return x_min <= car_center_x <= x_max and y_min <= car_center_y <= y_max

# List to store cars that have crossed the lower line
crossed_lower_line = {}

# List to store violation records
violations = []

def detect_and_track_violation(frame, model, path_bbox, tracker, frame_number):
    """Detect cars and track violations based on line crossings."""
    car_detections = detect_cars(frame, model, path_bbox)

    if car_detections.size > 0:
        # Track cars
        tracked_objects = tracker.update(np.array(car_detections))

        # Process each tracked object
        for obj in tracked_objects:
            x1, y1, x2, y2, track_id = map(int, obj)

            # Get the car's center position
            car_center_x = (x1 + x2) // 2
            car_center_y = (y1 + y2) // 2

            print("crossed_lower_line: ", crossed_lower_line)
            # Check if car has crossed the lower line
            if track_id not in crossed_lower_line:
                if is_crossed_lower_line(car_center_y):
                    crossed_lower_line[track_id] = {'crossed': True, 'violation': False}

            # Check if car has already crossed the lower line and now crosses the upper line
            if track_id in crossed_lower_line and crossed_lower_line[track_id]['crossed']:
                if is_crossed_upper_line(car_center_y):
                    crossed_lower_line[track_id]['violation'] = True
                    violations.append({
                        'frame_no': frame_number,
                        'tracking_id': track_id,
                        'bbox': (x1, y1, x2, y2)
                    })

    return violations

def is_crossed_lower_line(car_center_y):
    """Check if car has crossed the lower line."""
    # Define the y-coordinate of the lower line
    lower_line_y = (lower_line_start[1] + lower_line_end[1]) // 2
    return car_center_y > lower_line_y

def is_crossed_upper_line(car_center_y):
    """Check if car has crossed the upper line."""
    # Define the y-coordinate of the upper line
    upper_line_y = (upper_line_start[1] + upper_line_end[1]) // 2
    return car_center_y < upper_line_y

def save_violations_to_csv(violations, csv_file_path):
    """Save violation records to a CSV file."""
    with open(csv_file_path, mode='w', newline='') as file:
        writer = csv.DictWriter(file, fieldnames=['frame_no', 'tracking_id', 'bbox'])
        writer.writeheader()
        for violation in violations:
            writer.writerow(violation)

def process_and_generate_video_with_tracking(video_path, histogram_files, bbox, path_bbox, output_video_path, fps, csv_output_path):
    # Initialize variables
    violations = []
    crossed_lower_line.clear()  # Reset crossed lower line state at the start

    histograms = {}
    for label, file_path in histogram_files.items():
        histogram = load_histograms(file_path)  # Load individual histogram
        if histogram is not None:  # Check if histogram was loaded successfully
            histograms[label] = histogram

    if not histograms:
        logging.error("No histograms loaded, aborting video processing.")
        return

    model = YOLO('yolov8n.pt')  # Use YOLOv8 pretrained model
    if model is None:
        return

    tracker = Sort(max_age=15, min_hits=3)
    video_capture = cv2.VideoCapture(video_path)
    if not video_capture.isOpened():
        logging.error(f"Error: Unable to open video file {video_path}")
        return

    width, height = int(video_capture.get(cv2.CAP_PROP_FRAME_WIDTH)), int(video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    video_writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

    frame_number = 0
    while True:
        ret, frame = video_capture.read()
        if not ret:
            break

        # Process frame and track violations
        violations = detect_and_track_violation(frame, model, path_bbox, tracker, frame_number)
        print("violation= ", violations)

        # Draw lines, calculate histograms, and process detections (as per original code)
        # Your existing drawing and bounding box logic goes here...

        video_writer.write(frame)
        frame_number += 1
        if frame_number % 100 == 0:
            logging.info(f"Processed {frame_number} frames...")

    video_capture.release()
    video_writer.release()

    # Save violation records to CSV

    save_violations_to_csv(violations, csv_output_path)
    logging.info(f"Processed video saved to {output_video_path} and violations saved to {csv_output_path}")

# Example usage
video_path = '/content/drive/MyDrive/Input_Data/vid11_27_7_FaisalTown.mp4'
histogram_files = {
    "Green": "/content/drive/MyDrive/Histogram/averaged_hist_Green.pkl",
    "None": "/content/drive/MyDrive/Histogram/averaged_hist_None.pkl",
    "Orange": "/content/drive/MyDrive/Histogram/averaged_hist_Orange.pkl",
    "Red": "/content/drive/MyDrive/Histogram/averaged_hist_Red.pkl"
}
bbox = (0.485879, 0.577487, 0.016630, 0.051067)
path_bbox = (0.901061, 0.730515, 0.197877, 0.199434)
output_video_path = '/content/drive/MyDrive/Resulted_violation/finalvideo.mp4'
csv_output_path = '/content/drive/MyDrive/Resulted_violation/violations.csv'
fps = 26
signal_color = 'Red'

process_and_generate_video_with_tracking(video_path, histogram_files, bbox, path_bbox, output_video_path, fps, csv_output_path)


In [None]:
# @title part3
import json
import cv2
import time
import requests
import pandas as pd
from tqdm import tqdm
from collections import defaultdict

# Load tracking data
tracking_data = pd.read_csv('/content/drive/MyDrive/Resulted_violation/violations.csv')

# Function to call the license plate recognition API with retry mechanism
def call_plate_api(image, api_key, retries=4, delay=6):
    for attempt in range(retries):
        try:
            with open(image, 'rb') as fp:
                response = requests.post(
                    'https://api.platerecognizer.com/v1/plate-reader/',
                    files={'upload': fp},
                    headers={'Authorization': f'Token {api_key}'}
                )
            response.raise_for_status()  # Raise an error for a bad response (4xx, 5xx)
            return response.json()
        except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
            print(f"Error: {e}. Retrying {attempt + 1}/{retries}...")
            time.sleep(delay)
        except requests.exceptions.RequestException as e:
            print(f"Request failed: {e}. Skipping this object.")
            return None
    print("Max retries reached. Skipping this object.")
    return None

# Function to process the video
def process_video(input_video, output_video, api_key, retry_interval=26, json_output='output.json'):
    cap = cv2.VideoCapture(input_video)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video, fourcc, fps, (width, height))

    # Cache for API results and retry tracking
    plate_cache = {}
    retry_tracker = defaultdict(lambda: -retry_interval)  # Last frame when API was called for each ID

    # Data to save for JSON output
    tracking_summary = {}

    # Add progress bar
    for frame_idx in tqdm(range(frame_count), desc="Processing frames"):
        ret, frame = cap.read()
        if not ret:
            break

        # Get data for the current frame from tracking data
        frame_data = tracking_data[tracking_data['Frame'] == frame_idx]
        for _, row in frame_data.iterrows():
            obj_id = int(row['ID'])  # Convert to native Python int
            x1, y1, x2, y2 = map(int, [row['x1'], row['y1'], row['x2'], row['y2']])  # Convert to Python int

            # Initialize tracking summary for the object if not already present
            if obj_id not in tracking_summary:
                tracking_summary[obj_id] = {"ID": obj_id, "start_frame": frame_idx, "end_frame": frame_idx, "plate": "N/A"}
            else:
                tracking_summary[obj_id]["end_frame"] = frame_idx

            # Check if plate number is already cached or if it's time to retry
            if obj_id not in plate_cache or (plate_cache[obj_id] == 'N/A' and frame_idx - retry_tracker[obj_id] >= retry_interval):
                # Crop the object region
                cropped = frame[y1:y2, x1:x2]
                cv2.imwrite('temp.jpg', cropped)

                # Call the API with retries
                api_response = call_plate_api('temp.jpg', api_key)
                if api_response and 'results' in api_response and api_response['results']:
                    plate_number = api_response['results'][0]['plate']
                else:
                    plate_number = 'N/A'

                # Update cache and retry tracker
                plate_cache[obj_id] = plate_number
                retry_tracker[obj_id] = frame_idx

                # Save plate number in tracking summary
                tracking_summary[obj_id]["plate"] = plate_number

            # Visualize the bounding box and plate number
            plate_number = plate_cache[obj_id]
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 2)
            cv2.putText(frame, plate_number, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)

        out.write(frame)

    cap.release()
    out.release()

    # Convert tracking summary values to serializable types
    tracking_summary_serializable = {
        k: {key: (int(value) if isinstance(value, (int, np.integer)) else value) for key, value in v.items()}
        for k, v in tracking_summary.items()
    }

    # Save the tracking summary to a JSON file
    with open(json_output, 'w') as json_file:
        json.dump(list(tracking_summary_serializable.values()), json_file, indent=4)

    print(f"Video processing completed! JSON saved to {json_output}")

# Input parameters
input_video = '/content/drive/MyDrive/Resulted_violation/finalvideo.mp4'
output_video = '/content/drive/MyDrive/Resulted_violation/violation.mp4'
api_key = 'df9ea40d34e80d6149f57d04d8b1328d61920916'
json_output = '/content/drive/MyDrive/Resulted_violation/tracking_summary.json'

# Process the video
process_video(input_video, output_video, api_key, json_output=json_output)

