In [1]:
!pip install opencv-python opencv-python-headless torch torchvision

Collecting torchvision
  Using cached torchvision-0.19.1-cp312-cp312-win_amd64.whl.metadata (6.1 kB)
Using cached torchvision-0.19.1-cp312-cp312-win_amd64.whl (1.3 MB)
Installing collected packages: torchvision
Successfully installed torchvision-0.19.1



[notice] A new release of pip is available: 24.0 -> 24.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [31]:
import torch
import torchvision
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.transforms import functional as F
import cv2
import numpy as np
import os
from tqdm import tqdm
import random

In [32]:
model = fasterrcnn_resnet50_fpn(pretrained=True)
model.eval()

FasterRCNN(
  (transform): GeneralizedRCNNTransform(
      Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
      Resize(min_size=(800,), max_size=1333, mode='bilinear')
  )
  (backbone): BackboneWithFPN(
    (body): IntermediateLayerGetter(
      (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (bn1): FrozenBatchNorm2d(64, eps=0.0)
      (relu): ReLU(inplace=True)
      (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (layer1): Sequential(
        (0): Bottleneck(
          (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): FrozenBatchNorm2d(64, eps=0.0)
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): FrozenBatchNorm2d(64, eps=0.0)
          (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn3): FrozenBatchNorm2d(256, eps=0.0)
          (relu): ReLU(

In [33]:
def transform_image(image):
    image = F.to_tensor(image)
    return image.unsqueeze(0)

In [34]:
def detect_obstacles(image, model, threshold=0.5):
    transformed_image = transform_image(image)
    with torch.no_grad():
        predictions = model(transformed_image)
    boxes = predictions[0]['boxes'].cpu().numpy()
    scores = predictions[0]['scores'].cpu().numpy()
    high_conf_indices = np.where(scores > threshold)[0]
    boxes = boxes[high_conf_indices]
    return boxes

In [35]:
def decide_action(distance, speed):
    safe_stopping_distance = speed * 0.44704 * 1.5
    
    if distance < safe_stopping_distance:
        return "Decelerate"
    elif distance > safe_stopping_distance + 5 and speed < 60:
        return "Accelerate"
    else:
        return "Maintain"

In [53]:
def predict_future_states(speed, action, start_position, robot_size_half, boxes):
    future_speeds = []
    future_positions = []
    position = start_position.astype(np.float64) 
    
    for i in range(100):  
        if action == "Accelerate":
            speed = min(speed + 1, 10)  
        elif action == "Decelerate":
            speed = max(speed - 1, 0)
        
        displacement = np.array([speed * 0.1, speed * 0.1], dtype=np.float64) 
        new_position = position + displacement
        
        collision_detected = False
        for box in boxes:
            startX, startY, endX, endY = box.astype(int)
            if (new_position[0] + robot_size_half[0] > startX and new_position[0] - robot_size_half[0] < endX and 
                new_position[1] + robot_size_half[1] > startY and new_position[1] - robot_size_half[1] < endY):
                collision_detected = True
                break
        
        if collision_detected:
            break
        
        position = new_position
        
        future_speeds.append(speed)
        future_positions.append(position.copy())
    
    return future_speeds, future_positions

In [57]:
rgb_folder_path = './data/RGB'
depth_folder_path = './data/Depth/Depth'
image_paths = [os.path.join(rgb_folder_path, img) for img in os.listdir(rgb_folder_path) if img.endswith('.png')]

sampled_image_paths = random.sample(image_paths, min(5, len(image_paths)))

current_speed = 5  

for image_path in tqdm(sampled_image_paths, desc="Processing Images", unit="image"):
    image = cv2.imread(image_path)
    
    depth_image_path = os.path.join(depth_folder_path, os.path.basename(image_path))
    depth_image = cv2.imread(depth_image_path, cv2.IMREAD_UNCHANGED)
    
    if image is None or depth_image is None:
        continue
    
    height, width, _ = image.shape
    
    boxes = detect_obstacles(image, model)

    nearest_obstacle_distance = float('inf')
    
    for box in boxes:
        startX, startY, endX, endY = box.astype(int)
        cv2.rectangle(image, (startX, startY), (endX, endY), (255, 0, 0), 2)

        obstacle_depth_region = depth_image[startY:endY, startX:endX]
        avg_depth = np.mean(obstacle_depth_region)

        if avg_depth < nearest_obstacle_distance:
            nearest_obstacle_distance = avg_depth

    safe_start_found = False
    for _ in range(100): 
        start_position_candidate = np.array([random.uniform(0 + width / 10, width - width / 10),
                                             random.uniform(0 + height / 10, height - height / 10)], dtype=np.float64)
        
        robot_size_half_candidate = np.array([15.0 / width * image.shape[1], 
                                              15.0 / height * image.shape[0]])
        
        _, future_positions_candidate = predict_future_states(current_speed,
                                                              "Maintain",
                                                              start_position_candidate,
                                                              robot_size_half_candidate,
                                                              boxes)
        
        if len(future_positions_candidate) == 100:  
            safe_start_found = True
            start_position = start_position_candidate
            break
    
    if not safe_start_found:
        print("Could not find a safe starting position.")
        continue

    action = decide_action(nearest_obstacle_distance, current_speed)
    future_speeds, future_positions = predict_future_states(current_speed,
                                                            action,
                                                            start_position,
                                                            robot_size_half_candidate,
                                                            boxes)

    label_text_action = f"Action: {action}"
    
    cv2.putText(image,
                label_text_action,
                (10, height - 30),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.5,
                (255, 255, 255),
                thickness=1)


    for i in range(len(future_positions) - 1):
        start_pos = tuple(future_positions[i].astype(int))
        end_pos = tuple(future_positions[i + 1].astype(int))
        
        cv2.line(image,
                 start_pos,
                 end_pos,
                 (0, 255, 0),
                 thickness=2)

        if i % 10 == 0:
            top_left_robot_box = (int(start_pos[0] - robot_size_half_candidate[0]), int(start_pos[1] - robot_size_half_candidate[1]))
            bottom_right_robot_box = (int(start_pos[0] + robot_size_half_candidate[0]), int(start_pos[1] + robot_size_half_candidate[1]))
            
            cv2.rectangle(image,
                          top_left_robot_box,
                          bottom_right_robot_box,
                          (255, 255, 0),
                          thickness=2)

            cv2.putText(image,
                        f"{future_speeds[i]} mph",
                        (bottom_right_robot_box[0] + int(robot_size_half_candidate[0]), bottom_right_robot_box[1]),
                        cv2.FONT_HERSHEY_SIMPLEX,
                        0.4,
                        (0, 0, 255),   
                        thickness=1)

    cv2.imshow('Obstacle Detection with Distance Estimation', image)

    if cv2.waitKey(0) & 0xFF == ord('q'):
        break

cv2.destroyAllWindows()

Processing Images: 100%|██████████| 5/5 [01:35<00:00, 19.12s/image]
