Preprocess Camera

In [None]:
import os
import json
import cv2
import numpy as np
import torch
from torchvision import transforms
from PIL import Image

# Define paths
DATA_DIR = r'D:/ADAS_assignment'
CAMERA_DIR = r"D:\ADAS_assignment\samples"
SAMPLE_DATA_FILE = r"D:\ADAS_assignment\v1.0-mini\sample_data.json"
CALIBRATION_FILE = r"D:\ADAS_assignment\v1.0-mini\calibrated_sensor.json"
SENSOR_FILE = r"D:\ADAS_assignment\v1.0-mini\sensor.json"
EGO_POSE_FILE = r"D:\ADAS_assignment\v1.0-mini\ego_pose.json"
OBJECT_VECTOR_FILE = 'camera_object_vectors.json'

# Load JSON data
def load_json(file):
    with open(file, 'r') as f:
        return json.load(f)

# Load Data
sample_data = load_json(SAMPLE_DATA_FILE)
calibration = load_json(CALIBRATION_FILE)
sensor_map = load_json(SENSOR_FILE)
ego_pose = load_json(EGO_POSE_FILE)

# Load YOLO model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
from ultralytics import YOLO
model = YOLO('yolov8n.pt').to(device)
model.conf = 0.25  # Lower confidence threshold
model.iou = 0.45

# Image transformation
transform = transforms.Compose([
    transforms.Resize((640, 640)),
    transforms.ToTensor()
])

# Process camera data
object_vectors = []

camera_subdirs = ['CAM_BACK', 'CAM_BACK_LEFT', 'CAM_BACK_RIGHT', 'CAM_FRONT', 'CAM_FRONT_LEFT', 'CAM_FRONT_RIGHT']
for subdir in camera_subdirs:
    for entry in sample_data:
        if entry['fileformat'] == 'jpg' and subdir in entry['filename']:
            file_path = os.path.join(DATA_DIR, 'samples', subdir, os.path.basename(entry['filename']))
            if os.path.exists(file_path):
                print(f'Processing: {file_path}')
                img = Image.open(file_path).convert('RGB')
                input_tensor = transform(img).unsqueeze(0).to(device)

                # Run YOLO detection
                with torch.no_grad():
                    results = model(input_tensor)
                    detections = results[0].boxes.data.cpu().numpy()

                # Process detection results
                for det in detections:
                    x1, y1, x2, y2, conf, cls = det

                    object_vectors.append({
                        'sample_token': entry['sample_token'],
                        'bbox': [float(x1), float(y1), float(x2), float(y2)],
                        'confidence': conf.item(),
                        'class': int(cls.item())
                    })

# Save object vectors
with open(OBJECT_VECTOR_FILE, 'w') as f:
    json.dump(object_vectors, f, indent=4)

print(f"✅ Object vectors saved to '{OBJECT_VECTOR_FILE}'")

Processing: D:/ADAS_assignment\samples\CAM_BACK\n015-2018-07-24-11-22-45+0800__CAM_BACK__1532402931687525.jpg

0: 640x640 1 truck, 84.9ms
Speed: 1.0ms preprocess, 84.9ms inference, 292.2ms postprocess per image at shape (1, 3, 640, 640)
Processing: D:/ADAS_assignment\samples\CAM_BACK\n015-2018-07-24-11-22-45+0800__CAM_BACK__1532402928687525.jpg

0: 640x640 (no detections), 81.0ms
Speed: 0.0ms preprocess, 81.0ms inference, 8.5ms postprocess per image at shape (1, 3, 640, 640)
Processing: D:/ADAS_assignment\samples\CAM_BACK\n015-2018-07-24-11-22-45+0800__CAM_BACK__1532402930137525.jpg

0: 640x640 1 person, 1 truck, 79.7ms
Speed: 0.0ms preprocess, 79.7ms inference, 11.2ms postprocess per image at shape (1, 3, 640, 640)
Processing: D:/ADAS_assignment\samples\CAM_BACK\n015-2018-07-24-11-22-45+0800__CAM_BACK__1532402927637525.jpg

0: 640x640 3 persons, 1 car, 58.8ms
Speed: 0.0ms preprocess, 58.8ms inference, 7.8ms postprocess per image at shape (1, 3, 640, 640)
Processing: D:/ADAS_assignment

Preprocess Radar Object

In [None]:
import os
import json
import numpy as np
import open3d as o3d

# Define paths
DATA_DIR = r"D:\ADAS_assignment"
RADAR_DIR = os.path.join(DATA_DIR, 'samples')
SAMPLE_DATA_FILE = os.path.join(DATA_DIR, 'v1.0-mini', 'sample_data.json')
CALIBRATION_FILE = os.path.join(DATA_DIR, 'v1.0-mini', 'calibrated_sensor.json')
EGO_POSE_FILE = os.path.join(DATA_DIR, 'v1.0-mini', 'ego_pose.json')
RADAR_OUTPUT_FILE = 'radar_object_vectors.json'

# Load JSON data
def load_json(file):
    with open(file, 'r') as f:
        return json.load(f)

# Load Data
sample_data = load_json(SAMPLE_DATA_FILE)
calibration = load_json(CALIBRATION_FILE)
ego_pose = load_json(EGO_POSE_FILE)

# Radar subdirectories
radar_subdirs = ['RADAR_BACK_RIGHT', 'RADAR_BACK_LEFT', 'RADAR_FRONT', 'RADAR_FRONT_LEFT', 'RADAR_FRONT_RIGHT']

# Process radar data
radar_objects = []

for subdir in radar_subdirs:
    for entry in sample_data:
        if entry['fileformat'] == 'pcd' and subdir in entry['filename']:
            file_path = os.path.join(RADAR_DIR, subdir, os.path.basename(entry['filename']))
            if os.path.exists(file_path):
                print(f'Processing: {file_path}')

                # Load radar data using Open3D
                pcd = o3d.io.read_point_cloud(file_path)
                points = np.asarray(pcd.points)

                # Compute relative speed and distance
                for point in points:
                    x, y, z = point[:3]
                    distance = np.sqrt(x**2 + y**2 + z**2)
                    relative_speed = np.random.uniform(-5, 5)  # Placeholder for actual speed computation

                    radar_objects.append({
                        'sample_token': entry['sample_token'],
                        'position': [float(x), float(y), float(z)],
                        'distance': float(distance),
                        'relative_speed': float(relative_speed)
                    })

# Save radar objects
with open(RADAR_OUTPUT_FILE, 'w') as f:
    json.dump(radar_objects, f, indent=4)

print(f"✅ Radar object vectors saved to '{RADAR_OUTPUT_FILE}'")


Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.
Processing: D:\ADAS_assignment\samples\RADAR_BACK_RIGHT\n015-2018-07-24-11-22-45+0800__RADAR_BACK_RIGHT__1532402928146224.pcd
Processing: D:\ADAS_assignment\samples\RADAR_BACK_RIGHT\n015-2018-07-24-11-22-45+0800__RADAR_BACK_RIGHT__1532402928730857.pcd
Processing: D:\ADAS_assignment\samples\RADAR_BACK_RIGHT\n015-2018-07-24-11-22-45+0800__RADAR_BACK_RIGHT__1532402932171718.pcd
Processing: D:\ADAS_assignment\samples\RADAR_BACK_RIGHT\n015-2018-07-24-11-22-45+0800__RADAR_BACK_RIGHT__1532402929171003.pcd
Processing: D:\ADAS_assignment\samples\RADAR_BACK_RIGHT\n015-2018-07-24-11-22-45+0800__RADAR_BACK_RIGHT__1532402929685672.pcd
Processing: D:\ADAS_assignment\samples\RADAR_BACK_RIGHT\n015-2018-07-24-11-22-45+0800__RADAR_BACK_RIGHT__1532402927635538.pcd
Processing: D:\ADAS_assignment\samples\RADAR_BACK_RIGHT\n015-2018-07-24-11-

Prepare LiDAR Objects

In [None]:
import os
import json
import numpy as np

# Define paths
DATA_DIR = r"D:\ADAS_assignment"
LIDAR_DIR = os.path.join(DATA_DIR, "samples", "LIDAR_TOP")
SAMPLE_DATA_FILE = os.path.join(DATA_DIR, "v1.0-mini", "sample_data.json")
LIDAR_OBJECT_VECTOR_FILE = os.path.join(DATA_DIR, "lidar_object_vectors.json")

# Load sample data to map file to sample_token
with open(SAMPLE_DATA_FILE, 'r') as f:
    sample_data = json.load(f)

file_to_sample_token = {
    entry['filename']: entry['sample_token'] for entry in sample_data
}

# Load LIDAR files
lidar_files = [f for f in os.listdir(LIDAR_DIR) if f.endswith('.pcd.bin')]

lidar_objects = []

for file in lidar_files:
    # Map the filename to sample_token
    sample_token = file_to_sample_token.get(f"samples/LIDAR_TOP/{file}")

    if sample_token:
        lidar_path = os.path.join(LIDAR_DIR, file)
        lidar_data = np.fromfile(lidar_path, dtype=np.float32).reshape(-1, 4)  # (x, y, z, intensity)

        for point in lidar_data:
            x, y, z, intensity = point

            # Save LiDAR object with sample_token
            lidar_objects.append({
                'sample_token': sample_token,
                'x': float(x),
                'y': float(y),
                'z': float(z),
                'intensity': float(intensity)
            })

# ✅ Save LiDAR object vectors
with open(LIDAR_OBJECT_VECTOR_FILE, 'w') as f:
    json.dump(lidar_objects, f, indent=4)

print(f"✅ LiDAR object vectors saved to '{LIDAR_OBJECT_VECTOR_FILE}'")







✅ LiDAR object vectors saved to 'D:\ADAS_assignment\lidar_object_vectors.json'


Correlating LiDAR and Camera Objects

In [None]:
import json
import os

# Define paths
DATA_DIR = r"D:\ADAS_assignment"
LIDAR_OBJECT_VECTOR_FILE = os.path.join(DATA_DIR, "lidar_object_vectors.json")
CAMERA_OBJECT_VECTOR_FILE = os.path.join(DATA_DIR, "camera_object_vectors.json")
MATCHING_SAMPLE_TOKENS_FILE = os.path.join(DATA_DIR, "matching_sample_tokens.json")

# Function to load JSON data
def load_json(file_path):
    """Load JSON file into a dictionary or list."""
    with open(file_path, 'r') as f:
        return json.load(f)

# Function to load and filter data
def load_and_filter_data():
    """Load camera, LiDAR objects and filter based on matching sample tokens."""
    print("📥 Loading JSON files...")
    lidar_objects = load_json(LIDAR_OBJECT_VECTOR_FILE)
    camera_objects = load_json(CAMERA_OBJECT_VECTOR_FILE)
    matching_sample_tokens = set(load_json(MATCHING_SAMPLE_TOKENS_FILE))  # Convert list to set for fast lookup

    # Convert to NumPy arrays for GPU acceleration
    lidar_data = [(obj['sample_token'], obj['x'], obj['y'], obj['z'], obj['intensity']) for obj in lidar_objects if obj['sample_token'] in matching_sample_tokens]
    camera_data = [(obj['sample_token'], obj['class'], obj['confidence'], obj['bbox']) for obj in camera_objects if obj['sample_token'] in matching_sample_tokens]

    print(f"✅ Loaded {len(camera_data)} camera objects and {len(lidar_data)} LiDAR objects.")
    return lidar_data, camera_data


In [None]:
import torch
import numpy as np

# Function to correlate objects using PyTorch
def correlate_objects(lidar_data, camera_data):
    """Correlate camera and LiDAR objects using GPU."""
    print("🔄 Correlating objects using GPU...")

    # Move data to GPU
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # **Separate sample tokens & numerical data**
    lidar_tokens = [obj[0] for obj in lidar_data]  # Extract sample tokens
    lidar_numerical = np.array([[obj[1], obj[2], obj[3], obj[4]] for obj in lidar_data], dtype=np.float32)

    # Convert to PyTorch tensors
    lidar_tensor = torch.tensor(lidar_numerical, dtype=torch.float32, device=device)  # (N, 4) -> x, y, z, intensity
    camera_tensor = torch.tensor([obj[3] for obj in camera_data], dtype=torch.float32, device=device)  # Extract bbox (N, 4)

    correlated_objects = []

    for cam_idx, (sample_token, cam_class, confidence, bbox) in enumerate(camera_data):
        # Find matching LiDAR points by comparing sample tokens
        matching_indices = [i for i, token in enumerate(lidar_tokens) if token == sample_token]
        lidar_points = lidar_tensor[matching_indices] if matching_indices else None

        if lidar_points is not None and lidar_points.size(0) > 0:
            distances = torch.sqrt(torch.sum(lidar_points[:, :3] ** 2, dim=1))  # Compute Euclidean distance (x, y, z)

            correlated_objects.append({
                'sample_token': sample_token,
                'camera_class': cam_class,
                'camera_confidence': round(float(confidence), 3),
                'lidar_x': round(float(lidar_points[0, 0]), 3),
                'lidar_y': round(float(lidar_points[0, 1]), 3),
                'lidar_z': round(float(lidar_points[0, 2]), 3),
                'lidar_intensity': round(float(lidar_points[0, 3]), 3),
                'distance': round(float(distances[0]), 3),
                'camera_bbox': [round(coord, 3) for coord in bbox]
            })

    print(f"✅ Correlated {len(correlated_objects)} objects.")
    return correlated_objects



In [None]:
import json
import os

# Define output path
DATA_DIR = r"D:\ADAS_assignment"
CORRELATED_OUTPUT_FILE = os.path.join(DATA_DIR, "correlated_objects.json")

# Function to save results
def save_results(correlated_objects):
    """Save correlated objects into a JSON file."""
    with open(CORRELATED_OUTPUT_FILE, 'w') as f:
        json.dump(correlated_objects, f, indent=4)
    print(f"✅ Correlated objects saved to '{CORRELATED_OUTPUT_FILE}'")



In [None]:


def main():
    """Execute the modular correlation process."""
    lidar_data, camera_data = load_and_filter_data()
    correlated_objects = correlate_objects(lidar_data, camera_data)
    save_results(correlated_objects)

if __name__ == "__main__":
    main()


📥 Loading JSON files...
✅ Loaded 8952 camera objects and 17532760 LiDAR objects.
🔄 Correlating objects using GPU...
✅ Correlated 8952 objects.
✅ Correlated objects saved to 'D:\ADAS_assignment\correlated_objects.json'


In [None]:
import os
import re
import json
import cv2
import numpy as np

# Paths
DATA_DIR = r"D:\ADAS_assignment"
CORRELATED_FILE = os.path.join(DATA_DIR, "correlated_objects.json")
SAMPLE_DATA_FILE = os.path.join(DATA_DIR, "v1.0-mini", "sample_data.json")
CAMERA_DIR = os.path.join(DATA_DIR, "samples")  # Main folder with images

# Load correlated objects
with open(CORRELATED_FILE, "r") as f:
    correlated_objects = json.load(f)

# Load sample data to map sample_token → filename
with open(SAMPLE_DATA_FILE, "r") as f:
    sample_data = json.load(f)

# Create a mapping from sample_token → image filename
sample_token_to_filename = {}
for entry in sample_data:
    if entry["fileformat"] == "jpg":
        filename = entry["filename"]

        # Use regex to check if the filename starts with "samples/"
        if re.match(r"^samples/", filename):
            sample_token_to_filename[entry["sample_token"]] = filename

# Process each object
for obj in correlated_objects:
    sample_token = obj["sample_token"]

    # Fetch the correct filename using sample_token
    filename = sample_token_to_filename.get(sample_token, None)
    if not filename:
        continue  # Skip if no matching filename found

    # Construct the full image path
    image_path = os.path.join(DATA_DIR, filename)

    if not os.path.exists(image_path):
        continue

    # Read image
    img = cv2.imread(image_path)
    img_h, img_w, _ = img.shape

    cls_name = obj["camera_class"]
    bbox = obj["camera_bbox"]
    lidar_x, lidar_y, lidar_z = obj["lidar_x"], obj["lidar_y"], obj["lidar_z"]

    # Ensure valid bounding box
    if len(bbox) != 4 or min(bbox) < 0 or bbox[2] > img_w or bbox[3] > img_h:
        continue  # Skip invalid bbox

    # Convert 3D LiDAR point to 2D overlay (simple projection)
    x2D = int((lidar_x * 20) + (img_w / 2))  # Adjust scaling factor (20)
    y2D = int((lidar_y * 20) + (img_h / 2))  # Adjust scaling factor (20)

    # Ensure projected point is within image
    if 0 <= x2D < img_w and 0 <= y2D < img_h:
        cv2.circle(img, (x2D, y2D), 5, (0, 255, 0), -1)  # Green circle

    # Draw bounding box
    cv2.rectangle(img, (int(bbox[0]), int(bbox[1])), (int(bbox[2]), int(bbox[3])), (255, 0, 0), 2)
    cv2.putText(img, f"{cls_name} ({lidar_z:.1f}m)", (int(bbox[0]), int(bbox[1]) - 10),
                cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 0), 2)

    cv2.imshow("Sensor Fusion", img)
    cv2.waitKey(0)

cv2.destroyAllWindows()


KeyboardInterrupt: 

Contour Analysis of the Images

In [None]:
import cv2
import numpy as np
import os
import json
import re

# Define paths
DATA_DIR = r"D:\ADAS_assignment"
CAMERA_DIR = os.path.join(DATA_DIR, "samples")
SAMPLE_DATA_FILE = os.path.join(DATA_DIR, "v1.0-mini", "sample_data.json")
CORRELATED_OBJECTS_FILE = os.path.join(DATA_DIR, "correlated_objects.json")
OUTPUT_DIR = os.path.join(DATA_DIR, "contour_analysis")

# Ensure output directory exists
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Load correlated object data
with open(CORRELATED_OBJECTS_FILE, "r") as f:
    correlated_objects = json.load(f)

# Load sample data (to map sample_token to filenames)
with open(SAMPLE_DATA_FILE, "r") as f:
    sample_data = json.load(f)

# Create a mapping of sample_token → image filename
sample_token_to_filename = {}
for entry in sample_data:
    if entry["fileformat"] == "jpg":
        filename = entry["filename"]

        # Use regex to check if the filename starts with "samples/"
        if re.match(r"^samples/", filename):
            sample_token_to_filename[entry["sample_token"]] = filename

def analyze_contours(image_path, objects):
    """Perform contour analysis on an image."""
    image = cv2.imread(image_path)

    if image is None:
        print(f"⚠️ Warning: Could not load {image_path}")
        return

    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    blurred = cv2.GaussianBlur(gray, (5, 5), 0)
    edges = cv2.Canny(blurred, 50, 150)

    # Find contours
    contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    cv2.drawContours(image, contours, -1, (0, 255, 0), 2)  # Green contours

    # Draw bounding boxes for detected objects
    for obj in objects:
        x1, y1, x2, y2 = map(int, obj["camera_bbox"])
        cv2.rectangle(image, (x1, y1), (x2, y2), (255, 0, 0), 2)  # Blue bbox

    output_path = os.path.join(OUTPUT_DIR, os.path.basename(image_path))
    cv2.imwrite(output_path, image)
    print(f"✅ Processed: {output_path}")

# Process each correlated object
for obj in correlated_objects:
    sample_token = obj["sample_token"]
    image_filename = sample_token_to_filename.get(sample_token)

    if image_filename:
        # Construct the full image path
        image_path = os.path.join(DATA_DIR, image_filename)

        if os.path.exists(image_path):
            analyze_contours(image_path, [obj])
        else:
            print(f"⚠️ Image file not found: {image_path}")
    else:
        print(f"⚠️ No matching image for sample_token: {sample_token}")

print("✅ Contour analysis completed! Results saved in:", OUTPUT_DIR)




✅ Processed: D:\ADAS_assignment\contour_analysis\n015-2018-07-24-11-22-45+0800__CAM_FRONT_LEFT__1532402931654844.jpg
✅ Processed: D:\ADAS_assignment\contour_analysis\n015-2018-07-24-11-22-45+0800__CAM_FRONT_LEFT__1532402930104844.jpg
✅ Processed: D:\ADAS_assignment\contour_analysis\n015-2018-07-24-11-22-45+0800__CAM_FRONT_LEFT__1532402930104844.jpg
✅ Processed: D:\ADAS_assignment\contour_analysis\n015-2018-07-24-11-22-45+0800__CAM_FRONT_LEFT__1532402927604844.jpg
✅ Processed: D:\ADAS_assignment\contour_analysis\n015-2018-07-24-11-22-45+0800__CAM_FRONT_LEFT__1532402927604844.jpg
✅ Processed: D:\ADAS_assignment\contour_analysis\n015-2018-07-24-11-22-45+0800__CAM_FRONT_LEFT__1532402927604844.jpg
✅ Processed: D:\ADAS_assignment\contour_analysis\n015-2018-07-24-11-22-45+0800__CAM_FRONT_LEFT__1532402927604844.jpg
✅ Processed: D:\ADAS_assignment\contour_analysis\n015-2018-07-24-11-22-45+0800__CAM_FRONT_LEFT__1532402932154844.jpg
✅ Processed: D:\ADAS_assignment\contour_analysis\n015-2018-07-24

Decision Making

In [None]:
import os
import json
import torch
import numpy as np
import cv2
from ultralytics import YOLO
from sklearn.cluster import DBSCAN
from scipy.spatial.transform import Rotation

# Define Paths
DATA_DIR = r"D:\ADAS_assignment"
SAMPLE_DATA_FILE = os.path.join(DATA_DIR, "v1.0-mini", "sample_data.json")
CORRELATED_FILE = os.path.join(DATA_DIR, "correlated_objects.json")
CAMERA_DIR = os.path.join(DATA_DIR, "samples")
LIDAR_OBJECT_VECTOR_FILE = os.path.join(DATA_DIR, "lidar_object_vectors.json")
CAMERA_OBJECT_VECTOR_FILE = os.path.join(DATA_DIR, "camera_object_vectors.json")
MODEL_PATH = "yolov8n.pt"

# Load YOLOv8 model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = YOLO(MODEL_PATH).to(device)

# Load JSON data
def load_json(file):
    with open(file, 'r') as f:
        return json.load(f)

# Load data
sample_data = load_json(SAMPLE_DATA_FILE)
correlated_objects = load_json(CORRELATED_FILE)
camera_objects = load_json(CAMERA_OBJECT_VECTOR_FILE)
lidar_objects = load_json(LIDAR_OBJECT_VECTOR_FILE)

# Create mapping for sample tokens
sample_token_to_filename = {entry['sample_token']: entry['filename'] for entry in sample_data if entry['fileformat'] == 'jpg'}

# Define a distance threshold for obstructions
OBSTRUCTION_DISTANCE = 15  # Any object within 15m is considered an obstruction

# Process each object
for obj in correlated_objects:
    sample_token = obj["sample_token"]
    if sample_token not in sample_token_to_filename:
        continue

    image_path = os.path.join(DATA_DIR, sample_token_to_filename[sample_token])
    if not os.path.exists(image_path):
        continue

    # Read image
    img = cv2.imread(image_path)

    # Run YOLOv8 on the image
    results = model(image_path)[0]

    for det in results.boxes.data:
        x1, y1, x2, y2, conf, cls = det.cpu().numpy()
        cls_name = model.names[int(cls)]
        distance = obj["distance"]

        # Define color: RED if obstruction, GREEN otherwise
        color = (0, 0, 255) if distance < OBSTRUCTION_DISTANCE else (0, 255, 0)

        # Draw bounding box
        cv2.rectangle(img, (int(x1), int(y1)), (int(x2), int(y2)), color, 2)
        cv2.putText(img, f"{cls_name} ({distance:.1f}m)", (int(x1), int(y1) - 10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

    # Show result
    cv2.imshow("Obstruction Detection", img)
    cv2.waitKey(0)

cv2.destroyAllWindows()



image 1/1 D:\ADAS_assignment\sweeps\CAM_FRONT_LEFT\n015-2018-07-24-11-22-45+0800__CAM_FRONT_LEFT__1532402931254844.jpg: 384x640 (no detections), 47.9ms
Speed: 17.7ms preprocess, 47.9ms inference, 15.5ms postprocess per image at shape (1, 3, 384, 640)

image 1/1 D:\ADAS_assignment\sweeps\CAM_FRONT_LEFT\n015-2018-07-24-11-22-45+0800__CAM_FRONT_LEFT__1532402929754844.jpg: 384x640 1 truck, 44.4ms
Speed: 4.8ms preprocess, 44.4ms inference, 22.2ms postprocess per image at shape (1, 3, 384, 640)

image 1/1 D:\ADAS_assignment\sweeps\CAM_FRONT_LEFT\n015-2018-07-24-11-22-45+0800__CAM_FRONT_LEFT__1532402929754844.jpg: 384x640 1 truck, 44.4ms
Speed: 4.2ms preprocess, 44.4ms inference, 2.7ms postprocess per image at shape (1, 3, 384, 640)

image 1/1 D:\ADAS_assignment\samples\CAM_FRONT_LEFT\n015-2018-07-24-11-22-45+0800__CAM_FRONT_LEFT__1532402927604844.jpg: 384x640 1 person, 44.2ms
Speed: 4.3ms preprocess, 44.2ms inference, 2.5ms postprocess per image at shape (1, 3, 384, 640)

image 1/1 D:\ADAS_

KeyboardInterrupt: 

In [None]:
import os
import json
import numpy as np
import cv2
import pygame  # For audio alerts

# Paths
DATA_DIR = r"D:\ADAS_assignment"
CORRELATED_FILE = os.path.join(DATA_DIR, "correlated_objects.json")

# Load correlated objects
with open(CORRELATED_FILE, "r") as f:
    correlated_objects = json.load(f)

# 🚨 Initialize Pygame for audio alerts
pygame.mixer.init()
ALERT_SOUND = "alert_sound.mp3"

# ⚙ Thresholds for decision-making
SAFE_DISTANCE = 15  # Above this distance, no action needed
WARNING_DISTANCE = 10  # Below this, alert is triggered
CRITICAL_DISTANCE = 5  # Immediate braking/lane change action

# 🚘 Decision-making function
def decision_making(obj):
    """Decide actions based on the detected object's distance."""
    distance = obj["distance"]

    if distance > SAFE_DISTANCE:
        return "✅ Safe: No action needed."

    elif WARNING_DISTANCE <= distance <= SAFE_DISTANCE:
        # Trigger warning (visual & sound)
        print(f"⚠ WARNING: {obj['camera_class']} detected at {distance:.1f}m.")
        pygame.mixer.music.load(ALERT_SOUND)
        pygame.mixer.music.play()
        return "⚠ Warning: Alert triggered!"

    else:
        # 🚨 Collision imminent → Apply corrective action
        print(f"🚨 CRITICAL: {obj['camera_class']} detected at {distance:.1f}m. Immediate action required!")

        # Choose corrective mechanism
        action = "🛑 Emergency Braking" if distance < CRITICAL_DISTANCE else "➡ Lane Change"
        return f"🚨 Critical: {action} activated!"

# 🎯 Process each object for decision-making
for obj in correlated_objects:
    action_taken = decision_making(obj)
    print(f"🔍 Object: {obj['camera_class']}, Distance: {obj['distance']:.1f}m → {action_taken}")

print("✅ Decision-making process completed.")


🚨 CRITICAL: 7 detected at 3.7m. Immediate action required!
🔍 Object: 7, Distance: 3.7m → 🚨 Critical: 🛑 Emergency Braking activated!
🚨 CRITICAL: 0 detected at 3.6m. Immediate action required!
🔍 Object: 0, Distance: 3.6m → 🚨 Critical: 🛑 Emergency Braking activated!
🚨 CRITICAL: 7 detected at 3.6m. Immediate action required!
🔍 Object: 7, Distance: 3.6m → 🚨 Critical: 🛑 Emergency Braking activated!
🚨 CRITICAL: 0 detected at 3.7m. Immediate action required!
🔍 Object: 0, Distance: 3.7m → 🚨 Critical: 🛑 Emergency Braking activated!
🚨 CRITICAL: 0 detected at 3.7m. Immediate action required!
🔍 Object: 0, Distance: 3.7m → 🚨 Critical: 🛑 Emergency Braking activated!
🚨 CRITICAL: 2 detected at 3.7m. Immediate action required!
🔍 Object: 2, Distance: 3.7m → 🚨 Critical: 🛑 Emergency Braking activated!
🚨 CRITICAL: 0 detected at 3.7m. Immediate action required!
🔍 Object: 0, Distance: 3.7m → 🚨 Critical: 🛑 Emergency Braking activated!
🚨 CRITICAL: 7 detected at 3.7m. Immediate action required!
🔍 Object: 7, Dist

Collision Avoidance System

In [None]:
import os
import json
import cv2
import torch
import numpy as np
import pygame
from ultralytics import YOLO

# 🔧 Paths
DATA_DIR = r"D:\ADAS_assignment"
CORRELATED_FILE = os.path.join(DATA_DIR, "correlated_objects.json")
MODEL_PATH = "yolov8n.pt"

# 🚀 Initialize Camera
cap = cv2.VideoCapture(0)
if not cap.isOpened():
    print("❌ Error: Unable to access the camera!")
    exit()

# 🎯 Load YOLOv8 Model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = YOLO(MODEL_PATH).to(device)

# 🚨 Initialize Pygame for Audio Alerts
pygame.mixer.init()
ALERT_SOUND = "alert_sound.mp3"

# ⚠ Thresholds for Risk Assessment
SAFE_DISTANCE = 15  # No action needed
WARNING_DISTANCE = 10  # Trigger alert
CRITICAL_DISTANCE = 5  # Immediate braking

# 📥 Load Correlated Objects
with open(CORRELATED_FILE, "r") as f:
    correlated_objects = json.load(f)

# 🚘 Collision Avoidance Decision-Making
def decision_making(obj):
    distance = obj["distance"]

    if distance > SAFE_DISTANCE:
        return "✅ Safe: No action needed."

    elif WARNING_DISTANCE <= distance <= SAFE_DISTANCE:
        print(f"⚠ WARNING: {obj['camera_class']} at {distance:.1f}m.")
        pygame.mixer.music.load(ALERT_SOUND)
        pygame.mixer.music.play()
        return "⚠ Warning: Alert triggered!"

    else:
        print(f"🚨 CRITICAL: {obj['camera_class']} at {distance:.1f}m. Immediate action required!")
        action = "🛑 Emergency Braking" if distance < CRITICAL_DISTANCE else "➡ Lane Change"
        return f"🚨 Critical: {action} activated!"

# 🎥 **Real-Time Camera & Collision Detection Loop**
while True:
    ret, frame = cap.read()
    if not ret:
        print("❌ Error: Failed to capture frame!")
        break

    # 🔎 Run YOLO on the frame
    results = model(frame)[0]

    for det in results.boxes.data:
        x1, y1, x2, y2, conf, cls = det.cpu().numpy()
        cls_name = model.names[int(cls)]

        # Match with LiDAR data
        for obj in correlated_objects:
            if obj["camera_class"] == cls_name:
                distance = obj["distance"]
                decision = decision_making(obj)

                # Set color: Red (critical), Yellow (warning), Green (safe)
                color = (0, 255, 0)  # Green (safe)
                if distance < WARNING_DISTANCE:
                    color = (0, 255, 255)  # Yellow (warning)
                if distance < CRITICAL_DISTANCE:
                    color = (0, 0, 255)  # Red (critical)

                # 🖼️ Draw Bounding Box
                cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), color, 2)
                cv2.putText(frame, f"{cls_name} ({distance:.1f}m)", (int(x1), int(y1) - 10),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
                cv2.putText(frame, decision, (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 3)

    # 🖥️ **Show Real-Time Feed**
    cv2.imshow("ADAS Collision Avoidance", frame)

    # **Exit on 'Q' key press**
    if cv2.waitKey(1) & 0xFF == ord("q"):
        break

# Release resources
cap.release()
cv2.destroyAllWindows()



0: 480x640 (no detections), 17.5ms
Speed: 3.4ms preprocess, 17.5ms inference, 2.1ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 (no detections), 16.7ms
Speed: 2.1ms preprocess, 16.7ms inference, 0.8ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 (no detections), 14.9ms
Speed: 1.9ms preprocess, 14.9ms inference, 0.9ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 (no detections), 12.1ms
Speed: 1.6ms preprocess, 12.1ms inference, 0.8ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 (no detections), 13.8ms
Speed: 1.7ms preprocess, 13.8ms inference, 0.8ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 (no detections), 11.1ms
Speed: 1.5ms preprocess, 11.1ms inference, 1.0ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 (no detections), 10.1ms
Speed: 1.6ms preprocess, 10.1ms inference, 0.7ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 (no detections), 11.2ms
Speed: 1.7ms preprocess, 11.2ms i