In [None]:
import os
import cv2
import torch
import numpy as np
import json
import time


device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Using device: {device}")


model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True)
model.to(device)
model.eval()  # set the model to evaluation mode


input_dir = "/kaggle/input/kitti-dataset/data_object_image_2/training/image_2"      # Directory containing your images
output_dir = "/kaggle/working/object_vector"     # Directory to save processed images
if not os.path.exists(output_dir):
    os.makedirs(output_dir)


image_files = sorted([f for f in os.listdir(input_dir) if f.lower().endswith(('.jpg', '.png'))])[:1000]


object_vectors = {}


start_time = time.time()

for idx, img_file in enumerate(image_files):
    img_path = os.path.join(input_dir, img_file)
    img = cv2.imread(img_path)
    if img is None:
        print(f"Error loading image {img_path}")
        continue
    
    
    img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    
    
    results = model(img_rgb)
    
    
    detections = results.xyxy[0].detach().cpu().numpy()
    
    
    object_vectors[img_file] = detections.tolist()
    
   
    for det in detections:
        x1, y1, x2, y2, conf, cls = det
        label = results.names[int(cls)]
        cv2.rectangle(img, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)
        cv2.putText(img, f"{label} {conf:.2f}", (int(x1), int(y1)-10), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
    
   
    output_img_path = os.path.join(output_dir, img_file)
    cv2.imwrite(output_img_path, img)
    
    
    if (idx + 1) % 50 == 0:
        elapsed = time.time() - start_time
        print(f"Processed {idx+1} images in {elapsed:.2f} seconds")


with open(os.path.join(output_dir, "object_vectors.json"), "w") as f:
    json.dump(object_vectors, f)

print("Processing complete.")


In [None]:
import numpy as np
import cv2
import os
import matplotlib.pyplot as plt
import pandas as pd

class CameraOnlyDetection:
    def __init__(self, calib_file_path):
      
        self.calib_data = self.load_calibration(calib_file_path)
        self.image_objects = {}  # Dictionary to store detected objects from camera
        
    def load_calibration(self, calib_file_path):
        
        calib_data = {}
        try:
            with open(calib_file_path, 'r') as f:
                for line in f.readlines():
                    if ':' in line:
                        key, value = line.split(':', 1)
                        calib_data[key] = np.array([float(x) for x in value.split()])
                    else:
                        # Handle alternative format if needed
                        parts = line.split()
                        if len(parts) > 1:
                            key = parts[0]
                            values = [float(x) for x in parts[1:]]
                            calib_data[key] = np.array(values)
                    
           
            if 'P2' in calib_data:  
                calib_data['P2'] = calib_data['P2'].reshape(3, 4)
            elif 'P_rect_02' in calib_data:  # Alternative naming in some KITTI formats
                calib_data['P2'] = calib_data['P_rect_02'].reshape(3, 4)
                
            
            if 'P2' not in calib_data:
                print("Warning: P2 matrix not found in calibration file. Using default.")
                calib_data['P2'] = np.array([[718.856, 0, 607.1928, 0], 
                                            [0, 718.856, 185.2157, 0],
                                            [0, 0, 1, 0]])
                
           
            if 'P2' in calib_data:
                calib_data['fx'] = calib_data['P2'][0, 0]  # Focal length x
                calib_data['fy'] = calib_data['P2'][1, 1]  # Focal length y
                calib_data['cx'] = calib_data['P2'][0, 2]  # Principal point x
                calib_data['cy'] = calib_data['P2'][1, 2]  # Principal point y
                
        except FileNotFoundError:
            print(f"Warning: Calibration file not found: {calib_file_path}. Using default values.")
            # Default calibration values
            calib_data['P2'] = np.array([[718.856, 0, 607.1928, 0], 
                                         [0, 718.856, 185.2157, 0],
                                         [0, 0, 1, 0]])
            calib_data['fx'] = 718.856
            calib_data['fy'] = 718.856
            calib_data['cx'] = 607.1928
            calib_data['cy'] = 185.2157
            
        return calib_data
    
    def load_image_objects(self, objects_file_path, image_id):
        
        objects = []
        try:
            with open(objects_file_path, 'r') as f:
                for line in f.readlines():
                    values = line.strip().split(' ')
                    # Skip empty lines
                    if len(values) < 2:
                        continue
                        
                    # Try to parse standard KITTI format
                    try:
                        obj = {
                            'type': values[0],
                            'truncated': float(values[1]),
                            'occluded': int(values[2]),
                            'alpha': float(values[3]),
                            'bbox': [float(values[4]), float(values[5]), float(values[6]), float(values[7])],
                            'dimensions': [float(values[8]), float(values[9]), float(values[10])],
                            'location': [float(values[11]), float(values[12]), float(values[13])],
                            'rotation_y': float(values[14])
                        }
                        if len(values) > 15:  # If confidence score is available
                            obj['score'] = float(values[15])
                        objects.append(obj)
                    except (ValueError, IndexError) as e:
                        print(f"Warning: Could not parse line: {line}. Error: {e}")
                        # Try alternative format if available
        except FileNotFoundError:
            print(f"Warning: Object file not found: {objects_file_path}")
        
        
        print(f"Loaded {len(objects)} objects for image {image_id}")
        
        self.image_objects[image_id] = objects
        return objects
    
    def estimate_distance(self, obj):
        
       
        if 'location' in obj and obj['location'][2] > 0:
            # The Z coordinate is depth in camera coordinates
            return obj['location'][2]
        
        
        if 'dimensions' in obj and 'bbox' in obj:
            
            obj_height_3d = obj['dimensions'][1]  # Height is usually the second dimension in KITTI
            
            
            bbox = obj['bbox']
            obj_height_px = bbox[3] - bbox[1]
            
           
            if obj_height_px > 0:
                distance = obj_height_3d * self.calib_data['fy'] / obj_height_px
                return distance
        
        
        return 10.0
    
    def calculate_collision_risk(self, image_id, velocity=None, time_threshold=3.0):
       
        if image_id not in self.image_objects:
            print(f"No objects found for image {image_id} when calculating risks")
            return {}
            
        
        if velocity is None:
            velocity = 13.8  # m/s
            
        collision_risks = {}
        
        for obj_idx, obj in enumerate(self.image_objects[image_id]):
            
            if obj['type'].lower() in ['misc', 'dontcare']:
                continue
                
            
            distance = self.estimate_distance(obj)
            
            
            if 'location' in obj:
                
                lateral_position = obj['location'][0]
            else:
                
                bbox = obj['bbox']
                bbox_center_x = (bbox[0] + bbox[2]) / 2
                
                lateral_position = (bbox_center_x - self.calib_data['cx']) * distance / self.calib_data['fx']
            
            
            in_path = abs(lateral_position) < 2.0
            
            
            forward_distance = distance
            
            
            relative_velocity = velocity
                
            
            if forward_distance > 0 and relative_velocity > 0:  
                ttc = forward_distance / relative_velocity
            else:
                ttc = float('inf')  
                
            
            if ttc < time_threshold and ttc > 0:
                base_risk = 1.0 - (ttc / time_threshold)
                # Increase risk for objects in our path, decrease for those outside
                risk_score = base_risk * (1.0 if in_path else 0.3)
            else:
                risk_score = 0.0
                
            collision_risks[obj_idx] = {
                'object': obj,
                'distance': distance,
                'lateral_position': lateral_position,
                'in_path': in_path,
                'forward_distance': forward_distance,
                'ttc': ttc,
                'risk_score': risk_score
            }
            
        
        print(f"Calculated {len(collision_risks)} collision risks for image {image_id}")
        
        if collision_risks:
            max_risk = max([r['risk_score'] for r in collision_risks.values()])
            print(f"Highest risk score for image {image_id}: {max_risk}")
            
        return collision_risks
    
    def generate_warnings(self, collision_risks, risk_threshold=0.3):
        
        warnings = []
        
        for obj_id, risk_data in collision_risks.items():
            if risk_data['risk_score'] > risk_threshold:
                obj_type = risk_data['object']['type']
                distance = risk_data['distance']
                ttc = risk_data['ttc']
                in_path = risk_data['in_path']
                
                warning_level = 'HIGH' if risk_data['risk_score'] > 0.5 else 'MEDIUM'
                
                # Add more context about position
                position_context = "in path" if in_path else "outside path"
                
                warning = {
                    'level': warning_level,
                    'message': f"Collision warning: {obj_type} at {distance:.2f}m ({position_context}), TTC: {ttc:.2f}s",
                    'object_id': obj_id,
                    'action': 'BRAKE' if risk_data['risk_score'] > 0.5 and in_path else 'ALERT'
                }
                
                warnings.append(warning)
                
        
        print(f"Generated {len(warnings)} warnings from {len(collision_risks)} collision risks")
                
        return warnings
    
    def visualize_detections(self, image_path, image_id, save_path=None):
        
        if image_id not in self.image_objects:
            print(f"No objects found for image {image_id}")
            return None
            
        
        img = cv2.imread(image_path)
        if img is None:
            # Try alternative extensions
            for ext in ['.jpg', '.png', '.jpeg']:
                alt_path = os.path.splitext(image_path)[0] + ext
                if os.path.exists(alt_path):
                    img = cv2.imread(alt_path)
                    if img is not None:
                        print(f"Found alternative image: {alt_path}")
                        break
            
            if img is None:
                print(f"Error: Could not load image {image_path}")
                return None
        
        # Make a copy of the image to ensure we don't modify the original
        vis_img = img.copy()
        
       
        collision_risks = self.calculate_collision_risk(image_id)
        
        
        for obj_idx, obj in enumerate(self.image_objects[image_id]):
            bbox = [int(x) for x in obj['bbox']]
            
            
            risk_data = collision_risks.get(obj_idx, None)
            
            if risk_data:
                
                risk_score = risk_data['risk_score']
                in_path = risk_data['in_path']
                
                
                if risk_score > 0.5:
                    color = (0, 0, 255)  # Red for high risk
                elif risk_score > 0.3:
                    color = (0, 165, 255)  # Orange for medium risk
                else:
                    color = (0, 255, 0)  # Green for low risk
                
                
                thickness = 3 if in_path else 2
                
                
                distance = risk_data['distance']
                ttc = risk_data['ttc']
                
                
                x1, y1, x2, y2 = max(0, bbox[0]), max(0, bbox[1]), min(vis_img.shape[1], bbox[2]), min(vis_img.shape[0], bbox[3])
                
                
                cv2.rectangle(vis_img, (x1, y1), (x2, y2), color, thickness)
                
                
                info_text = f"{obj['type']}: {distance:.1f}m, TTC: {ttc:.1f}s"
                cv2.putText(vis_img, info_text, (x1, max(15, y1 - 5)), 
                           cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
                
                
                risk_text = f"Risk: {risk_score:.2f}"
                cv2.putText(vis_img, risk_text, (x1, max(35, y1 - 25)), 
                           cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
                
                
                img_h, img_w = vis_img.shape[:2]
                bev_h, bev_w = 150, 100  # Bird's eye view size
                bev_orig_x, bev_orig_y = img_w // 2, img_h - 30  # Origin position
                
               
                cv2.rectangle(vis_img, (img_w - bev_w - 10, img_h - bev_h - 10), 
                             (img_w - 10, img_h - 10), (255, 255, 255), -1)
                
                
                cv2.rectangle(vis_img, (img_w - bev_w//2 - 15, img_h - 25), 
                             (img_w - bev_w//2 + 15, img_h - 15), (0, 0, 0), -1)
                
                
                scaled_dist = min(distance * 5, bev_h - 20)
                scaled_lat = risk_data['lateral_position'] * 10
                
                obj_bev_x = int(img_w - bev_w//2 - scaled_lat)
                obj_bev_y = int(img_h - 20 - scaled_dist)
                
                
                obj_bev_x = max(img_w - bev_w - 5, min(obj_bev_x, img_w - 15))
                obj_bev_y = max(img_h - bev_h - 5, min(obj_bev_y, img_h - 15))
                
                cv2.circle(vis_img, (obj_bev_x, obj_bev_y), 5, color, -1)
            else:
                
                x1, y1, x2, y2 = max(0, bbox[0]), max(0, bbox[1]), min(vis_img.shape[1], bbox[2]), min(vis_img.shape[0], bbox[3])
                cv2.rectangle(vis_img, (x1, y1), (x2, y2), (255, 0, 0), 2)
                cv2.putText(vis_img, obj['type'], (x1, max(15, y1 - 5)), 
                           cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 0), 2)
        
        
        cv2.putText(vis_img, f"Frame: {image_id}", (10, 30), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 0), 2)
                   
          
        warnings = self.generate_warnings(collision_risks)
        if warnings:
            warning_text = f"WARNING: {len(warnings)} potential collisions detected!"
            cv2.putText(vis_img, warning_text, (10, 60), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)
        
        if save_path:
            
            os.makedirs(os.path.dirname(save_path), exist_ok=True)
            
            cv2.imwrite(save_path, vis_img)
            print(f"Saved visualization to {save_path}")
            
        return vis_img
    
    def process_frame(self, image_path, objects_path, image_id, velocity=13.8):
        
        
        self.load_image_objects(objects_path, image_id)
        
        
        collision_risks = self.calculate_collision_risk(image_id, velocity=velocity)
        
        
        warnings = self.generate_warnings(collision_risks)
        
        return collision_risks, warnings
    
    def process_dataset(self, dataset_config, output_path, visualize=True, velocity=13.8):
       
        
        if not os.path.exists(output_path):
            os.makedirs(output_path)
            
        
        results = {}
        for idx, frame_id in enumerate(dataset_config['frame_ids']):
            print(f"\nProcessing frame {frame_id} ({idx+1}/{len(dataset_config['frame_ids'])})")
            
            
            image_path = dataset_config['image_paths'][idx]
            label_path = dataset_config['label_paths'][idx]
            
           
            collision_risks, warnings = self.process_frame(image_path, label_path, frame_id, velocity)
            
            
            if visualize:
                result_img = self.visualize_detections(
                    image_path, 
                    frame_id,
                    save_path=os.path.join(output_path, f'{frame_id}_detection.jpg')
                )
            
            
            with open(os.path.join(output_path, f'{frame_id}_warnings.txt'), 'w') as f:
                if warnings:
                    for warning in warnings:
                        f.write(f"{warning['level']}: {warning['message']} - Action: {warning['action']}\n")
                else:
                    f.write("No collision warnings for this frame.\n")
                    
            
            results[frame_id] = {
                'objects': self.image_objects.get(frame_id, []),
                'collision_risks': collision_risks,
                'warnings': warnings
            }
                    
        print("Processing complete!")
        return results


def main():
    """
    Main function to run the camera-only detection system
    """
    # Define your paths here
    image_dir = "/kaggle/working/object_vector"  # Adjust to your actual image path
    label_dir = "/kaggle/input/kitti-dataset/data_object_label_2/training/label_2"
    calib_dir = "/kaggle/input/kitti-dataset/data_object_calib/training/calib"    
    output_dir = "/kaggle/working/output_collision_detection_1"
    
    print("Starting camera-only collision detection...")
    
    
    for dir_path, dir_name in [
        (image_dir, "Image directory"), 
        (label_dir, "Label directory"), 
        (calib_dir, "Calibration directory")
    ]:
        if not os.path.exists(dir_path):
            print(f"Warning: {dir_name} not found: {dir_path}")
    
    
    image_ext = '.png'  # Default
    test_file = os.listdir(image_dir)[0] if os.path.exists(image_dir) and os.listdir(image_dir) else None
    if test_file:
        if test_file.endswith('.jpg') or test_file.endswith('.jpeg'):
            image_ext = '.jpg'
        print(f"Using image extension: {image_ext}")
    
    # Get list of frame IDs (look for both png and jpg)
    if os.path.exists(image_dir):
        frame_ids = []
        for f in os.listdir(image_dir):
            if f.endswith(image_ext):
                frame_id = os.path.splitext(f)[0]
                # Check if label file exists
                if os.path.exists(os.path.join(label_dir, f"{frame_id}.txt")):
                    frame_ids.append(frame_id)
        
        frame_ids = sorted(frame_ids)
        
        if not frame_ids:
            print(f"Warning: No valid image files found in {image_dir} with corresponding label files")
            
            image_files = [f for f in os.listdir(image_dir) if f.endswith(image_ext)]
            print(f"Found {len(image_files)} images, but no matching label files")
    else:
        frame_ids = []
        print(f"Warning: Image directory not found: {image_dir}")
    
    print(f"Found {len(frame_ids)} valid frames with labels")
    
    if not frame_ids:
        print("Error: No valid frames found. Cannot proceed.")
        return
    
    
    dataset_config = {
        'frame_ids': frame_ids,
        'image_paths': [os.path.join(image_dir, f"{frame_id}{image_ext}") for frame_id in frame_ids],
        'label_paths': [os.path.join(label_dir, f"{frame_id}.txt") for frame_id in frame_ids],
        'calib_paths': [os.path.join(calib_dir, f"{frame_id}.txt") for frame_id in frame_ids],
    }
    
   
    detector = CameraOnlyDetection(dataset_config['calib_paths'][0])
    
    
    results = detector.process_dataset(dataset_config, output_dir, velocity=13.8) 

if __name__ == "__main__":
    main()