In [None]:
import cv2
import numpy as np
import math
import torch
import pandas as pd
import subprocess
from moviepy.editor import VideoFileClip
from sklearn.metrics import precision_score, recall_score, f1_score

# Load YOLOv5 model
model = torch.hub.load('ultralytics/yolov5', 'yolov5s')

def reencode_video(input_file, output_file):
    """Re-encode the video file to ensure compatibility."""
    cmd = f"ffmpeg -i {input_file} -c:v copy -c:a copy -movflags faststart {output_file}"
    subprocess.call(cmd, shell=True)

# Re-encode the video file
input_video_path = "/content/WhatsApp Video 2024-07-05 at 17.25.04_ac24697e.mp4"
fixed_video_path = "/content/sample_data/video.mp4"
reencode_video(input_video_path, fixed_video_path)

def region_of_interest(img, vertices):
    """Apply a mask to the image to focus on a specific region."""
    mask = np.zeros_like(img)
    match_mask_color = 255
    cv2.fillPoly(mask, vertices, match_mask_color)
    masked_image = cv2.bitwise_and(img, mask)
    return masked_image

def draw_lines(img, lines, color=[255, 0, 0], thickness=3):
    """Draw lines on the image."""
    line_img = np.zeros(
        (
            img.shape[0],
            img.shape[1],
            3
        ),
        dtype=np.uint8
    )
    img = np.copy(img)
    if lines is None:
        return img
    for line in lines:
        for x1, y1, x2, y2 in line:
            cv2.line(line_img, (x1, y1), (x2, y2), color, thickness)
    img = cv2.addWeighted(img, 0.8, line_img, 1.0, 0.0)
    return img

def detect_vehicles(image):
    """Detect vehicles using YOLOv5 model and draw bounding boxes."""
    results = model(image)
    detections = results.pandas().xyxy[0]  # Extracting detections as a pandas DataFrame

    vehicles = []
    for _, row in detections.iterrows():
        if row['name'] in ["car", "bus", "truck", "motorcycle"]:
            x, y, w, h = int(row['xmin']), int(row['ymin']), int(row['xmax']) - int(row['xmin']), int(row['ymax']) - int(row['ymin'])
            vehicles.append((x, y, w, h))
            label = row['name']
            color = (0, 0, 255)
            cv2.rectangle(image, (x, y), (x + w, y + h), color, 2)
            cv2.putText(image, label, (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

    return vehicles

def highlight_vehicle_area(image, left_line, right_line, vehicles):
    """Highlight the area between the lane lines and show a warning for cut-in vehicles."""
    overlay = image.copy()
    warning = False
    if left_line is not None and right_line is not None:
        height = image.shape[0]
        min_y = int(height * (3 / 5))
        max_y = int(height)

        left_x_start, left_y_start, left_x_end, left_y_end = left_line
        right_x_start, right_y_start, right_x_end, right_y_end = right_line

        pts = np.array([
            [left_x_start, left_y_start],
            [left_x_end, left_y_end],
            [right_x_end, right_y_end],
            [right_x_start, right_y_start]
        ], np.int32)

        cv2.fillPoly(overlay, [pts], (0, 255, 0))
        cv2.addWeighted(overlay, 0.3, image, 0.7, 0, image)

        for (x, y, w, h) in vehicles:
            vehicle_center = (float(x + w // 2), float(y + h // 2))
            if cv2.pointPolygonTest(pts, vehicle_center, False) >= 0:
                warning = True

    if warning:
        cv2.putText(image, "Caution!! Vehicle Cutting In!", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)

    return image

def pipeline(image):
    """Process the image through region of interest, lane detection, and vehicle detection."""
    height = image.shape[0]
    width = image.shape[1]
    region_of_interest_vertices = [
        (0, height),
        (width / 2, height / 2),
        (width, height),
    ]
    gray_image = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
    cannyed_image = cv2.Canny(gray_image, 100, 200)

    cropped_image = region_of_interest(
        cannyed_image,
        np.array(
            [region_of_interest_vertices],
            np.int32
        ),
    )

    lines = cv2.HoughLinesP(
        cropped_image,
        rho=6,
        theta=np.pi / 60,
        threshold=160,
        lines=np.array([]),
        minLineLength=20,
        maxLineGap=300
    )

    left_line_x = []
    left_line_y = []
    right_line_x = []
    right_line_y = []

    if lines is not None:
        for line in lines:
            for x1, y1, x2, y2 in line:
                if x2 - x1 == 0:  # Prevent division by zero
                    continue
                slope = (y2 - y1) / (x2 - x1)
                if math.fabs(slope) < 0.5:
                    continue
                if slope <= 0:
                    left_line_x.extend([x1, x2])
                    left_line_y.extend([y1, y2])
                else:
                    right_line_x.extend([x1, x2])
                    right_line_y.extend([y1, y2])

    left_line = None
    right_line = None
    if left_line_x and left_line_y and right_line_x and right_line_y:
        min_y = int(image.shape[0] * (3 / 5))
        max_y = int(image.shape[0])

        if left_line_x and left_line_y:
            poly_left = np.poly1d(np.polyfit(
                left_line_y,
                left_line_x,
                deg=1
            ))
            left_line = (int(poly_left(max_y)), max_y, int(poly_left(min_y)), min_y)

        if right_line_x and right_line_y:
            poly_right = np.poly1d(np.polyfit(
                right_line_y,
                right_line_x,
                deg=1
            ))
            right_line = (int(poly_right(max_y)), max_y, int(poly_right(min_y)), min_y)

        # Draw the left and right lines on the image
        line_image = draw_lines(
            image,
            [[
                left_line,
                right_line,
            ]],
            thickness=5,
        )
    else:
        line_image = image  # If no lines are detected, return the original image

    # Detect vehicles and draw red rectangles around them
    vehicles = detect_vehicles(line_image)

    # Highlight the area between the lanes and show warning if necessary
    final_image = highlight_vehicle_area(line_image, left_line, right_line, vehicles)

    return final_image

def calculate_iou(boxA, boxB):
    """Calculate Intersection over Union (IoU) for two bounding boxes."""
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[2], boxB[2])
    yB = min(boxA[3], boxB[3])

    interArea = max(0, xB - xA + 1) * max(0, yB - yA + 1)
    boxAArea = (boxA[2] - boxA[0] + 1) * (boxA[3] - boxA[1] + 1)
    boxBArea = (boxB[2] - boxB[0] + 1) * (boxB[3] - boxB[1] + 1)

    iou = interArea / float(boxAArea + boxBArea - interArea)
    return iou

def evaluate_detection(detected_vehicles, ground_truth_vehicles, iou_threshold=0.5):
    """Evaluate vehicle detection against ground truth."""
    true_positives = 0
    false_positives = 0
    false_negatives = 0

    # Create a list to keep track of which ground truth boxes have been matched
    matched_ground_truths = []

    for gt_vehicle in ground_truth_vehicles:
        gt_box = (gt_vehicle[0], gt_vehicle[1], gt_vehicle[0] + gt_vehicle[2], gt_vehicle[1] + gt_vehicle[3])
        matched = False
        for det_vehicle in detected_vehicles:
            det_box = (det_vehicle[0], det_vehicle[1], det_vehicle[0] + det_vehicle[2], det_vehicle[1] + det_vehicle[3])
            if calculate_iou(gt_box, det_box) >= iou_threshold:
                matched = True
                break

        if matched:
            true_positives += 1
        else:
            false_negatives += 1

    false_positives = len(detected_vehicles) - true_positives

    return true_positives, false_positives, false_negatives

def read_ground_truth_from_csv(csv_file):
    """Read ground truth bounding boxes from a CSV file."""
    df = pd.read_csv(csv_file)

    # Print column names for debugging
    print("CSV Columns:", df.columns)

    # Adjust the column names based on your actual CSV file
    if 'labels' not in df.columns:
        raise ValueError("CSV file must contain a 'labels' column.")
    if 'x_min' not in df.columns or 'y_min' not in df.columns or 'x_max' not in df.columns or 'y_max' not in df.columns:
        raise ValueError("CSV file must contain 'x_min', 'y_min', 'x_max', and 'y_max' columns.")

    # Extract unique labels from the CSV
    labels = df['labels'].unique()

    # Initialize dictionary to hold ground truth data
    ground_truth_data = {label: [] for label in labels}

    for _, row in df.iterrows():
        label = row['labels']
        x_min = int(row['x_min'])
        y_min = int(row['y_min'])
        x_max = int(row['x_max'])
        y_max = int(row['y_max'])
        box = (x_min, y_min, x_max - x_min, y_max - y_min)
        ground_truth_data[label].append(box)

    return ground_truth_data

def run_evaluation(video_file, ground_truth_csv):
    """Run the evaluation process for vehicle detection and cut-in detection."""
    # Read ground truth data from CSV
    ground_truth_data = read_ground_truth_from_csv(ground_truth_csv)

    video_clip = VideoFileClip(video_file)
    frame_list = [frame for frame in video_clip.iter_frames()]

    all_detected_vehicles = []
    all_ground_truth_vehicles = []

    for frame_number, frame in enumerate(frame_list):
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
        final_frame = pipeline(frame_rgb)

        # Extract ground truth data for the current frame
        ground_truth_vehicles = []
        for label in ground_truth_data:
            ground_truth_vehicles.extend(ground_truth_data[label])

        # Detect vehicles
        detected_vehicles = detect_vehicles(final_frame)
        all_detected_vehicles.extend(detected_vehicles)
        all_ground_truth_vehicles.extend(ground_truth_vehicles)

    # Evaluate
    tp, fp, fn = evaluate_detection(all_detected_vehicles, all_ground_truth_vehicles)
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

    print(f"Precision: {precision:.2f}")
    print(f"Recall: {recall:.2f}")
    print(f"F1 Score: {f1:.2f}")

    # Optionally save or display results
    output_file = 'output_video_with_detections.mp4'
    final_clip = VideoFileClip(video_file).fl_image(lambda img: pipeline(cv2.cvtColor(img, cv2.COLOR_RGB2BGR)))
    final_clip.write_videofile(output_file, codec='libx264')

# Example usage:
if __name__ == "__main__":
    run_evaluation(fixed_video_path, '/content/daily_weather.csv')
