# Import Necessary Libraries and configuration parameters

This section imports all the necessary libraries required for the project. The libraries are organized by functionality, including asynchronous operations, deep learning, computer vision, API handling, and utility functions such as JSON and file management.

- `asyncio`: For managing asynchronous operations in Python.

- `os`: To interact with the operating system (e.g., file and directory management).

- `fastapi.FastAPI`: For building APIs using FastAPI.
- `fastapi.Path`: For handling API path parameters.
- `fastapi.WebSocket`: For managing real-time WebSocket connections.
- `fastapi.responses.HTMLResponse`: For sending HTML responses.

- `json`: For working with JSON data (parsing and serializing).
- `pickle`: For serializing and deserializing Python objects.
- `pandas`: For data manipulation and analysis, particularly with tabular data.

- `PIL.Image`: For handling image operations such as loading, saving, and manipulating images.
- `cv2`: For computer vision tasks using OpenCV.
- `torchvision.transforms`: For image transformations like resizing, cropping, and normalization.
- `matplotlib.pyplot`: For visualizing data, including images and charts.

- `torch`: For creating and working with deep learning models.
- `torch.nn`: For defining neural networks and model components.
- `transformers.DetrForObjectDetection`: For object detection tasks using Hugging Face's Detr model.
- `transformers.GLPNForDepthEstimation`: For depth estimation using the GLPN model.
- `transformers.GLPNFeatureExtractor`: For preprocessing images for the GLPN model.

- `numpy`: For numerical operations, especially array and matrix manipulations.
- `scipy.stats`: For statistical analysis and hypothesis testing.



In [5]:
!pip install -r requirements.txt

Defaulting to user installation because normal site-packages is not writeable
Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com


In [2]:
# Asynchronous operations
'''import asyncio'''
import os

# Deep learning and computer vision
import torch
import torch.nn as nn
from torchvision import transforms
from transformers import DetrForObjectDetection, GLPNForDepthEstimation, GLPNFeatureExtractor
from PIL import Image
import cv2
import numpy as np
from matplotlib import pyplot as plt

# API handling
'''
from fastapi import FastAPI, Path, WebSocket
from fastapi.responses import HTMLResponse
'''
# Data handling and statistical analysis
import pandas as pd
from scipy import stats
import json
import pickle

RuntimeError: Failed to import transformers.models.detr.modeling_detr because of the following error (look up to see its traceback):
module 'torch' has no attribute 'compiler'

The configuration parameters for model paths, device setup, and related resources. The configuration is designed to automatically handle device selection and streamline model loading paths.

In [None]:
CONFIG = {
    'device': torch.device('cuda' if torch.cuda.is_available() else 'cpu'),
    'detr_model_path': 'facebook/detr-resnet-101',
    'glpn_model_path': 'vinvino02/glpn-kitti',
    'lstm_model_path': 'pretrained_lstm.pth',
    'lstm_scaler_path': 'lstm_scaler.pkl',
}

# Set Device for Model

In this section, the device (CPU or GPU) is set for running the deep learning models, depending on the configuration provided in the `CONFIG` file. The goal is to ensure the model runs efficiently on available hardware.

In [None]:
device = CONFIG['device']

# Define the LSTM-based Z-location Estimator Model

This section defines the architecture of the LSTM-based model used for predicting the Z-location (distance) of objects detected in the input images.
The Zloc_Estimator is a neural network model designed for estimating Z-location (depth) based on a sequence of input features. It consists of an LSTM layer for handling sequential input, followed by fully connected layers for regression.

### Parameters:
- **input_dim** : *int*  
    The dimensionality of the input features. Each time step of the input sequence has `input_dim` features.
  
- **hidden_dim** : *int*  
    The number of hidden units in the LSTM layer. This determines the size of the LSTM's hidden state.
  
- **layer_dim** : *int*  
    The number of LSTM layers stacked in the model. The model can have multiple layers of LSTM for increased capacity.

### Model Architecture:
1. **LSTM Layer**:  
    - The model starts with an LSTM (Long Short-Term Memory) layer to process sequential input data.
    - **Parameters**:
        - `input_dim`: The input dimension of the LSTM layer.
        - `hidden_dim`: The number of units in the LSTM's hidden state.
        - `layer_dim`: The number of stacked LSTM layers.
        - `batch_first=True`: Ensures that the input and output tensors are shaped as `(batch_size, sequence_length, feature_dim)`.
        - `bidirectional=False`: The LSTM processes the sequence in only one direction.

2. **Fully Connected Layers**:  
    - After the LSTM, the model uses a sequence of fully connected (dense) layers to refine the output and produce the final prediction.
    - The hidden dimension of the LSTM layer is progressively reduced in the fully connected layers.
    - **Layer sizes**: [306, 154, 76].
    - Each fully connected layer is followed by a **ReLU activation function** to introduce non-linearity.
    - The final output layer has a **single neuron** (without activation) for regression.

### Methods:
- **`forward(x)`**  
    Defines the forward pass of the model.
    
    - **Parameters**:
        - **x** : *torch.Tensor*  
            Input tensor of shape `(batch_size, sequence_length, input_dim)` representing the sequence of features for each batch.
    
    - **Returns**:
        - **torch.Tensor**  
            The final output of the model, representing the estimated Z-location for each input sequence.  
            The output is a tensor of shape `(batch_size, 1)`, where each element is the predicted depth.

### Forward Pass Process:
1. **LSTM Output**:  
    - The input `x` (a batch of sequences) is passed through the LSTM layer.
    - The output of the LSTM is a tensor of shape `(batch_size, sequence_length, hidden_dim)`.

2. **Fully Connected Layers**:  
    - The model takes the output from the **last time step** of the LSTM (`out[:, -1]`), which captures the summary of the entire sequence.
    - This output is passed through the fully connected layers to generate the final prediction.

### Summary:
- `__init__`: Initializes the model with the necessary input dimensions, hidden state dimensions, and the number of LSTM layers.
- `forward`: Defines the forward pass logic, where the input features are processed through the LSTM layers and the output is the estimated Z-location.

In [None]:
class Zloc_Estimator(nn.Module):
    def __init__(self, input_dim, hidden_dim, layer_dim):
        super(Zloc_Estimator, self).__init__()

        # LSTM layer
        self.rnn = nn.LSTM(input_dim, hidden_dim, layer_dim, batch_first=True, bidirectional=False)

        # Fully connected layers
        layersize = [306, 154, 76]
        layerlist = []
        n_in = hidden_dim
        for i in layersize:
            layerlist.append(nn.Linear(n_in, i))
            layerlist.append(nn.ReLU())
            n_in = i
        layerlist.append(nn.Linear(layersize[-1], 1))  # Final output layer

        self.fc = nn.Sequential(*layerlist)

    def forward(self, x):
        out, hn = self.rnn(x)
        output = self.fc(out[:, -1])  # Get the last output for prediction
        return output

# Deployment-ready Class for Handling the Model

This section outlines the deployment-ready class for the LSTM-based Z-location estimator model. This class represents an LSTM (Long Short-Term Memory) model wrapper for Z-location estimation. It initializes the model with pre-defined architecture parameters and loads pre-trained weights.

### Attributes:
- **input_dim** : *int*  
    The number of input features expected by the model. In this case, it's set to 15.
- **hidden_dim** : *int*  
    The number of units in the hidden layers of the LSTM model. Here, it's set to 612.
- **layer_dim** : *int*  
    The number of layers in the LSTM network. The model is designed with 3 layers.
- **model** : *Zloc_Estimator*  
    The LSTM model for estimating the Z-location (depth) based on the input features.  
    It is initialized with the defined input, hidden dimensions, and number of layers.

### Methods:
- **`__init__(self)`**  
    Constructor for the `LSTM_Model` class. It initializes the LSTM model with the given architecture,  
    loads the model's pre-trained weights from a state dictionary, and moves the model to the correct  
    device (CPU or GPU). Additionally, it sets the model to evaluation mode, making it ready for inference.
  
- **`predict(self, data)`**  
    This method performs inference on the input data and returns the predicted Z-location (depth).
    
    - **Parameters:**
        - **data** : *torch.Tensor*  
            Input tensor of shape `(batch_size, input_dim)` representing the feature set for each batch.
  
    - **Returns:**
        - **torch.Tensor**  
            The predicted Z-location for each input sequence, returned as a tensor.

### Summary:
- **`__init__`**: Loads the pre-trained model and prepares it for deployment by setting the model to evaluation mode.
- **`predict`**: Takes pre-processed input data and returns the predicted Z-location. The input is normalized before being passed through the model.


In [None]:
class LSTM_Model:
    def __init__(self):

        self.input_dim = 15
        self.hidden_dim = 612
        self.layer_dim = 3

        # Initialize the Z-location estimator model
        self.model = Zloc_Estimator(self.input_dim, self.hidden_dim, self.layer_dim)

        # Load the state dictionary from the file, using map_location in torch.load()
        state_dict = torch.load(CONFIG['lstm_model_path'], map_location=device)

        # Load the model with the state dictionary
        self.model.load_state_dict(state_dict, strict=False)
        self.model.to(device)  # This line ensures the model is moved to the right device
        self.model.eval()  # Set the model to evaluation mode


    def predict(self, data):
        """
        Predicts the z-location based on input data.

        :param data: Input tensor of shape (batch_size, input_dim)
        :return: Predicted z-location as a tensor
        """
        with torch.no_grad():  # Disable gradient computation for deployment
            data = data.to(device)  # Move data to the appropriate device
            data = data.reshape(-1, 1, self.input_dim)  # Reshape data to (batch_size, sequence_length, input_dim)
            zloc = self.model(data)
        return zloc.cpu()  # Return the output in CPU memory for further processing

# DETR (Detection Transformer) Class for Object Detection

The `DETR` (Detection Transformer) class is designed for object detection tasks. It loads a pre-trained DETR model for detecting objects in an image, applies necessary transformations to the input, rescales bounding boxes, and provides a visualization method for plotting detected objects on the image.

### Attributes:
- **CLASSES** : *list of str*  
    A list of object classes the model can detect. Each index corresponds to a specific class.
  
- **COLORS** : *list of list of float*  
    A list of RGB color triplets used for visualizing detected bounding boxes. The colors are cycled for multiple detections.

- **transform** : *torchvision.transforms.Compose*  
    A composition of image transformations applied to the input image before feeding it into the model.  
    This includes converting the image to a tensor and normalizing it.

- **model** : *DetrForObjectDetection*  
    The pre-trained DETR model loaded using Hugging Face's `from_pretrained` method, used for detecting objects in the input image.

### Methods:
- **`__init__()`**  
    Initializes the DETR class by setting up object classes, colors for visualization, image transformations, and loading the pre-trained DETR model.  
    The model is moved to the appropriate device (CPU/GPU) and set to evaluation mode.
  
- **`box_cxcywh_to_xyxy(x)`**  
    Converts bounding boxes from the center (cx, cy) and width/height (cxcywh) format to top-left (x1, y1) and bottom-right (x2, y2) corner format (xyxy).
    
    - **Parameters**:
        - **x** : *torch.Tensor*  
            Bounding boxes in (center_x, center_y, width, height) format.
    
    - **Returns**:
        - **torch.Tensor**  
            Bounding boxes in (x_min, y_min, x_max, y_max) format.

- **`rescale_bboxes(out_bbox, size)`**  
    Rescales predicted bounding boxes to match the original size of the input image.
    
    - **Parameters**:
        - **out_bbox** : *torch.Tensor*  
            Bounding boxes predicted by the model, in relative format (0 to 1).
        - **size** : *tuple*  
            Original width and height of the image.
    
    - **Returns**:
        - **torch.Tensor**  
            Rescaled bounding boxes in absolute pixel coordinates.

- **`detect(im)`**  
    Performs object detection on the input image, returning class probabilities and bounding boxes for detected objects.
    
    - **Parameters**:
        - **im** : *PIL.Image*  
            Input image for object detection.
    
    - **Returns**:
        - **Tuple[torch.Tensor, torch.Tensor]**  
            - `probas`: Class probabilities for detected objects.
            - `bboxes_scaled`: Rescaled bounding boxes for detected objects.

- **`visualize(im, probas, bboxes)`**  
    Visualizes detected bounding boxes and class probabilities on the input image, using `matplotlib` to draw boxes and labels.
    
    - **Parameters**:
        - **im** : *PIL.Image*  
            Original input image.
        - **probas** : *torch.Tensor*  
            Predicted class probabilities for detected objects.
        - **bboxes** : *torch.Tensor*  
            Bounding boxes for detected objects, scaled to the image size.

### Summary:
- `__init__`: Initializes the DETR model by loading the pre-trained weights.
- `detect`: Takes an input image and performs object detection, returning the bounding boxes and class predictions for detected objects.

In [None]:
class DETR:
    def __init__(self):

        self.CLASSES = [
            'N/A', 'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus',
            'train', 'truck', 'boat', 'traffic light', 'fire hydrant', 'N/A',
            'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse',
            'sheep', 'cow', 'elephant', 'bear', 'zebra', 'giraffe', 'N/A', 'backpack',
            'umbrella', 'N/A', 'N/A', 'handbag', 'tie', 'suitcase', 'frisbee', 'skis',
            'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove',
            'skateboard', 'surfboard', 'tennis racket', 'bottle', 'N/A', 'wine glass',
            'cup', 'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple', 'sandwich',
            'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake',
            'chair', 'couch', 'potted plant', 'bed', 'N/A', 'dining table', 'N/A',
            'N/A', 'toilet', 'N/A', 'tv', 'laptop', 'mouse', 'remote', 'keyboard',
            'cell phone', 'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'N/A',
            'book', 'clock', 'vase', 'scissors', 'teddy bear', 'hair drier',
            'toothbrush'
        ]

        self.COLORS = [[0.000, 0.447, 0.741], [0.850, 0.325, 0.098],
                       [0.929, 0.694, 0.125], [0, 0, 1], [0.466, 0.674, 0.188],
                       [0.301, 0.745, 0.933]]

        self.transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])

        self.model = DetrForObjectDetection.from_pretrained(CONFIG['detr_model_path'], revision="no_timm")
        self.model.to(CONFIG['device'])
        self.model.eval()

    def box_cxcywh_to_xyxy(self, x):
        x_c, y_c, w, h = x.unbind(1)
        b = [(x_c - 0.5 * w), (y_c - 0.5 * h),
             (x_c + 0.5 * w), (y_c + 0.5 * h)]
        return torch.stack(b, dim=1).to(CONFIG['device'])

    def rescale_bboxes(self, out_bbox, size):
        img_w, img_h = size
        b = self.box_cxcywh_to_xyxy(out_bbox)
        b = b * torch.tensor([img_w, img_h, img_w, img_h], dtype=torch.float32).to(CONFIG['device'])
        return b


    def detect(self, im):
        img = self.transform(im).unsqueeze(0).to(CONFIG['device'])
        assert img.shape[-2] <= 1600 and img.shape[-1] <= 1600, 'Image too large'
        outputs = self.model(img)
        probas = outputs['logits'].softmax(-1)[0, :, :-1]
        keep = probas.max(-1).values > 0.7
        bboxes_scaled = self.rescale_bboxes(outputs['pred_boxes'][0, keep], im.size)
        return probas[keep], bboxes_scaled

    def visualize(self, im, probas, bboxes):
        """
        Visualizes the detected bounding boxes and class probabilities on the image.

        Parameters:
            im (PIL.Image): The original input image.
            probas (Tensor): Class probabilities for detected objects.
            bboxes (Tensor): Bounding boxes for detected objects.
        """
        # Convert image to RGB format for matplotlib
        plt.figure(figsize=(10, 6))
        plt.imshow(im)
        ax = plt.gca()

        # Iterate over detections and draw bounding boxes and labels
        for p, (xmin, ymin, xmax, ymax), color in zip(probas, bboxes, self.COLORS * 100):
            # Detach tensors and convert to float
            xmin, ymin, xmax, ymax = map(lambda x: x.detach().cpu().numpy().item(), (xmin, ymin, xmax, ymax))

            ax.add_patch(plt.Rectangle((xmin, ymin), xmax - xmin, ymax - ymin,
                                        fill=False, color=color, linewidth=3))
            cl = p.argmax()
            text = f'{self.CLASSES[cl]}: {p[cl].detach().cpu().numpy():0.2f}'  # Detach probability as well
            ax.text(xmin, ymin, text, fontsize=15, bbox=dict(facecolor='yellow', alpha=0.5))

        plt.axis('off')
        plt.show()

# GLPDepth Class for Monocular Depth Estimation

The `GLPDepth` class is designed to perform monocular depth estimation from a single input image. It leverages a pre-trained GLPDepth model to extract features and predict the depth map of the image. The model runs inference on the appropriate hardware (CPU or GPU) and operates in evaluation mode.

### Attributes:
- **`feature_extractor`** : *GLPNFeatureExtractor*  
    A pre-trained feature extractor that processes the input image, converting it into a tensor format suitable for the depth estimation model.
  
- **`model`** : *GLPNForDepthEstimation*  
    A pre-trained GLPDepth model used to predict the depth map from the input image. The model is moved to the appropriate device (CPU or GPU) and set to evaluation mode for inference.

### Methods:
- **`__init__()`**  
    Initializes the `GLPDepth` class by loading the pre-trained feature extractor and depth estimation model.  
    The model is automatically moved to the correct device and configured for evaluation.

- **`predict(img, img_shape)`**  
    Predicts the depth map for the input image using the GLPDepth model.
    
    - **Parameters**:
        - **img** : *PIL.Image*  
            The input image for which the depth map will be estimated.
        
        - **img_shape** : *tuple*  
            The original dimensions of the input image, represented as (height, width).
    
    - **Returns**:
        - **np.ndarray**  
            The predicted depth map as a NumPy array, resized to match the original image dimensions.

### Summary:
- `__init__`: Initializes the GLPN depth estimation model by loading the pre-trained weights.
- `predict`: Takes an input image and returns the predicted depth map, which can be used to estimate the distance of objects in the scene.

In [None]:
"""
Created on Sat Apr  9 04:08:02 2022
@author: Admin_with ODD Team

Edited by our team : Sat Oct 5 10:00 2024

references: https://github.com/vinvino02/GLPDepth
"""
import torch
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
from transformers import GLPNFeatureExtractor, GLPNForDepthEstimation

# GLPDepth Model Class
class GLPDepth:
    def __init__(self):
        # Load feature extractor and model from pretrained path
        self.feature_extractor = GLPNFeatureExtractor.from_pretrained(CONFIG['glpn_model_path'])
        self.model = GLPNForDepthEstimation.from_pretrained(CONFIG['glpn_model_path'])

        # Move model to the right device (GPU or CPU)
        self.model.to(CONFIG['device'])
        self.model.eval()

    def predict(self, img: Image.Image, img_shape: tuple):
        """Predict the depth map of the input image.

        Args:
            img (PIL.Image): Input image for depth estimation.
            img_shape (tuple): Original image size (height, width).

        Returns:
            np.ndarray: The predicted depth map in numpy array format.
        """
        with torch.no_grad():
            # Preprocess image and move to the appropriate device
            pixel_values = self.feature_extractor(img, return_tensors="pt").pixel_values.to(CONFIG['device'])

            # Get model output
            outputs = self.model(pixel_values)
            predicted_depth = outputs.predicted_depth

            # Resize depth prediction to original image size
            prediction = torch.nn.functional.interpolate(
                predicted_depth.unsqueeze(1),
                size=img_shape[:2],  # Interpolate to original image size (H, W)
                mode="bicubic",
                align_corners=False,
            )
            prediction = prediction.squeeze().cpu().numpy()  # Convert to numpy array (shape: (H, W))

        return prediction

    def plot_depth_map(self, depth_map: np.ndarray, cmap='plasma'):
        """Plot the predicted depth map using matplotlib.

        Args:
            depth_map (np.ndarray): The predicted depth map (H, W).
            cmap (str): The colormap for visualization.
        """
        plt.figure(figsize=(8, 8))
        plt.imshow(depth_map, cmap=cmap)
        cbar = plt.colorbar(label='Depth Value', orientation='horizontal')
        cbar.ax.tick_params(labelsize=12)  # Optional: set the size of the ticks
        plt.axis('off')  # Turn off the axis

        # Adjust layout to avoid overlap
        plt.subplots_adjust(bottom=0.15)  # Adjust the bottom to fit the colorbar

        plt.show()


# Utility Function for Predicting Z-location for a Single Row

This section defines a utility function that processes a single row of data, containing bounding box coordinates, depth information, and object class type. The function predicts the Z-location of the object using the LSTM model.

### Parameters:
- `row`: A single row of data that contains information about the detected object, including bounding box and depth information.
- `ZlocE`: The pre-loaded LSTM model used to predict the Z-location.
- `scaler`: The scaler used for normalizing the input data before making predictions.

### Returns:
- The predicted Z-location of the object.

In [None]:

def predict_z_location_single_row(row, ZlocE, scaler):

    # One-hot encoding of class type
    class_type = row['class']

    if class_type == 'bicycle':
        class_tensor = torch.tensor([[0, 1, 0, 0, 0, 0]], dtype=torch.float32)
    elif class_type == 'car':
        class_tensor = torch.tensor([[0, 0, 1, 0, 0, 0]], dtype=torch.float32)
    elif class_type == 'person':
        class_tensor = torch.tensor([[0, 0, 0, 1, 0, 0]], dtype=torch.float32)
    elif class_type == 'train':
        class_tensor = torch.tensor([[0, 0, 0, 0, 1, 0]], dtype=torch.float32)
    elif class_type == 'truck':
        class_tensor = torch.tensor([[0, 0, 0, 0, 0, 1]], dtype=torch.float32)
    else :
        class_tensor = torch.tensor([[1, 0, 0, 0, 0, 0]], dtype=torch.float32)

    # Prepare input data (bounding box + depth info)
    input_data = np.array([row[['xmin', 'ymin', 'xmax', 'ymax', 'width', 'height', 'depth_mean', 'depth_median', 'depth_mean_trim']].values], dtype=np.float32)
    input_data = torch.from_numpy(input_data)

    # Concatenate class information
    input_data = torch.cat([input_data, class_tensor], dim=-1)

    # Scale the input data
    scaled_input = torch.tensor(scaler.transform(input_data), dtype=torch.float32).unsqueeze(0).unsqueeze(0)

    # Use the LSTM model to predict the Z-location
    z_loc_prediction = ZlocE.predict(scaled_input).detach().numpy()[0]

    return z_loc_prediction


# Class for Processing Object Detections and Handling Overlaps

This section defines a class for processing object detection results, managing overlapping bounding boxes, and computing depth statistics for detected objects. This is useful for refining the results and eliminating unnecessary or redundant bounding boxes.

### Methods:

- **`process_detections(scores, boxes, depth_map, detr)`**  
    Processes the detected bounding boxes and computes depth statistics for each detection. It also assigns object classes and handles class filtering, RGB color assignment, and depth calculations.
    
    - **Args**:
        - `scores` (*list*): Class prediction scores from the object detection model.
        - `boxes` (*numpy.ndarray*): Bounding boxes in the format \[`xmin`, `ymin`, `xmax`, `ymax`\].
        - `depth_map` (*numpy.ndarray*): Depth map corresponding to the image.
        - `detr` (*object*): Pretrained object detection model (e.g., DETR), used for extracting object class information.
    
    - **Returns**:
        - `pandas.DataFrame`: A DataFrame containing processed detection information for each bounding box, including:
            - Bounding box coordinates (`xmin`, `ymin`, `xmax`, `ymax`)
            - Depth statistics (`mean`, `trimmed mean`, `median`)
            - Object class
            - RGB color values associated with the class.

- **`handle_overlaps(depth_map)`**  
    Detects and processes overlapping bounding boxes. If two objects overlap by more than 70%, the farther object is removed, and depth statistics are recalculated for the overlapping area. Depth values within the overlapping region are also adjusted accordingly.
    
    - **Args**:
        - `depth_map` (*numpy.ndarray*): Depth map corresponding to the image, used for recalculating depth in overlapping regions.

### Summary:
- `__init__`: Initializes the class with any required attributes, such as thresholds for overlap.
- `process_detections`: Processes the detected objects by computing depth-related statistics and handling overlaps between bounding boxes. The final output consists of refined object detections.

In [None]:

class PROCESSING :
    def __init__(self):
        pass

    def process_detections(self, scores, boxes, depth_map, detr):
        """
        Processes object detections, computes depth statistics, and handles overlapping bounding boxes.

        Args:
            scores (list): List of class prediction scores from the object detection model.
            boxes (numpy.ndarray): Bounding boxes in the format [xmin, ymin, xmax, ymax].
            depth_map (numpy.ndarray): Depth map corresponding to the image.
            detr (object): Pretrained object detection model (e.g., detr) containing class information.

        Returns:
            pandas.DataFrame: Processed dataset containing bounding box coordinates,
                            depth statistics, and object class.
        """
        # Initialize a DataFrame for storing results
        self.data = pd.DataFrame(columns=['xmin','ymin','xmax','ymax','width', 'height','depth_mean_trim','depth_mean','depth_median', 'class', 'rgb'])

        # Iterate over detected bounding boxes and their corresponding scores
        for p, (xmin, ymin, xmax, ymax) in zip(scores, boxes.tolist()):
            # Identify the class with the highest score
            detected_class = p.argmax()
            class_label = detr.CLASSES[detected_class]

            # Filter for relevant object classes
            if class_label == 'motorcycle':
                class_label = 'bicycle'
            elif class_label == 'bus':
                class_label = 'train'
            elif class_label not in ['person', 'truck', 'car', 'bicycle', 'train']:
                class_label = 'Misc'

            if class_label in ['Misc', 'person', 'truck', 'car', 'bicycle', 'train']:
                # Assign RGB color for the detected class
                class_index = ['Misc', 'person', 'truck', 'car', 'bicycle', 'train'].index(class_label)
                r, g, b = detr.COLORS[class_index]
                rgb = (r * 255, g * 255, b * 255)

                # Calculate bounding box dimensions
                width, height = xmax - xmin, ymax - ymin
                xmin, ymin = max(0, int(xmin)), max(0, int(ymin))

                # Compute depth statistics within the bounding box
                bbox_depth = depth_map[int(ymin):int(ymax), int(xmin):int(xmax)]
                depth_mean = bbox_depth.mean()
                depth_median = np.median(bbox_depth)
                depth_trimmed_mean = stats.trim_mean(bbox_depth.flatten(), 0.2)
                #depth_max = bbox_depth.max()

                # Store the calculated data in the DataFrame
                new_row = pd.DataFrame([[xmin, ymin, xmax, ymax, width, height, depth_trimmed_mean ,depth_mean, depth_median, class_label, rgb]],
                                    columns=self.data.columns)
                self.data = pd.concat([self.data, new_row], ignore_index=True)

        # Handle overlapping bounding boxes
        self.handle_overlaps(depth_map)

        return self.data


    def handle_overlaps(self, depth_map):
        """
        Handles overlapping bounding boxes by removing the farther object
        or recalculating depth statistics for the overlapping region.

        Args:
            depth_map (numpy.ndarray): Depth map corresponding to the image.
        """
        # Reset the index for easy iteration
        self.data.reset_index(drop=True, inplace=True)

        # Lists to track the bounding box coordinates
        xmin_list, ymin_list, xmax_list, ymax_list = [], [], [], []

        # Loop through each bounding box in the dataset
        for index, (xmin, ymin, xmax, ymax) in self.data[['xmin', 'ymin', 'xmax', 'ymax']].iterrows():
            xmin_list.insert(0, xmin)
            ymin_list.insert(0, ymin)
            xmax_list.insert(0, xmax)
            ymax_list.insert(0, ymax)

            # Compare the current bounding box with all previous ones
            for i in range(len(xmin_list) - 1):
                # Check Y-axis overlap
                y_range1 = np.arange(int(ymin_list[0]), int(ymax_list[0]) + 1)
                y_range2 = np.arange(int(ymin_list[i + 1]), int(ymax_list[i + 1]) + 1)
                y_intersection = np.intersect1d(y_range1, y_range2)

                if len(y_intersection) >= 1:
                    # Check X-axis overlap
                    x_range1 = np.arange(int(xmin_list[0]), int(xmax_list[0]) + 1)
                    x_range2 = np.arange(int(xmin_list[i + 1]), int(xmax_list[i + 1]) + 1)
                    x_intersection = np.intersect1d(x_range1, x_range2)

                    if len(x_intersection) >= 1:
                        # Calculate the areas of the bounding boxes and their intersection
                        area1 = (y_range1.max() - y_range1.min()) * (x_range1.max() - x_range1.min())
                        area2 = (y_range2.max() - y_range2.min()) * (x_range2.max() - x_range2.min())
                        area_intersection = (y_intersection.max() - y_intersection.min()) * (x_intersection.max() - x_intersection.min())

                        # If more than 70% overlap, remove the farther object
                        if area_intersection / area1 >= 0.70 or area_intersection / area2 >= 0.70:
                            if area1 < area2:
                                self.data.drop(index=index, inplace=True)
                            else:
                                self.data.drop(index=index - (i + 1), inplace=True)

                        # If partial overlap, recalculate depth for the overlapping region
                        elif area_intersection / area1 > 0 or area_intersection / area2 > 0:
                            # Convert to integers for indexing
                            y_min_idx = int(y_intersection.min())
                            y_max_idx = int(y_intersection.max())
                            x_min_idx = int(x_intersection.min())
                            x_max_idx = int(x_intersection.max())

                            if area1 < area2:
                                # Check bounds before slicing
                                if (0 <= y_min_idx < depth_map.shape[0]) and (0 <= y_max_idx < depth_map.shape[0]) and \
                                (0 <= x_min_idx < depth_map.shape[1]) and (0 <= x_max_idx < depth_map.shape[1]):
                                    depth_map[y_min_idx:y_max_idx, x_min_idx:x_max_idx] = np.nan
                                    bbox_depth = depth_map[int(ymin_list[0]):int(ymax_list[0]), int(xmin_list[0]):int(xmax_list[0])]
                                    self.data.at[index, 'depth_mean'] = np.nanmean(bbox_depth)
                                else:
                                    print("Index out of bounds for depth map:", y_min_idx, y_max_idx, x_min_idx, x_max_idx)
                            else:
                                # Similar bounds checking for the other box
                                if (0 <= y_min_idx < depth_map.shape[0]) and (0 <= y_max_idx < depth_map.shape[0]) and \
                                (0 <= x_min_idx < depth_map.shape[1]) and (0 <= x_max_idx < depth_map.shape[1]):
                                    depth_map[y_min_idx:y_max_idx, x_min_idx:x_max_idx] = np.nan
                                    bbox_depth = depth_map[int(ymin_list[i + 1]):int(ymax_list[i + 1]), int(xmin_list[i + 1]):int(xmax_list[i + 1])]
                                    self.data.at[index - (i + 1), 'depth_mean'] = np.nanmean(bbox_depth)
                                else:
                                    print("Index out of bounds for depth map:", y_min_idx, y_max_idx, x_min_idx, x_max_idx)

        # Reset index after removing rows
        self.data.reset_index(drop=True, inplace=True)


# Function for Generating JSON Output

This function takes the processed data, predicts the Z-location for each detected object, and returns the results in JSON format. The JSON structure will include the bounding box, object class, and the predicted Z-location for each object.

### Parameters:
- `data`: The data containing the bounding box coordinates, depth information, and object class type.
- `ZlocE`: The pre-loaded LSTM model for Z-location prediction.
- `scaler`: The scaler used for normalizing the input data.

### Returns:
- A JSON structure with the predicted Z-locations and additional details for each detected object.

In [None]:
def generate_output_json(data, ZlocE, scaler):

    output_json = []

    # Iterate over each row in the data
    for i, row in data.iterrows():
        # Predict distance for each object using the single-row prediction function
        distance = predict_z_location_single_row(row, ZlocE, scaler)

        # Create object info dictionary
        object_info = {
            "class": row["class"],  # Object class (e.g., 'car', 'truck')
            "distance_estimated": float(distance),  # Convert distance to float (if necessary)
            "features": {
                "xmin": float(row["xmin"]),  # Bounding box xmin
                "ymin": float(row["ymin"]),  # Bounding box ymin
                "xmax": float(row["xmax"]),  # Bounding box xmax
                "ymax": float(row["ymax"]),  # Bounding box ymax
                "mean_depth": float(row["depth_mean"]),  # Depth mean
                "depth_mean_trim": float(row["depth_mean_trim"]),  # Depth mean trim
                "depth_median": float(row["depth_median"]),  # Depth median
                "width": float(row["width"]),  # Object width
                "height": float(row["height"])  # Object height
            }
        }

        # Append each object info to the output JSON list
        output_json.append(object_info)

    # Return the final JSON output structure
    return {"objects": output_json}

# Initialize the FastAPI Application

This section initializes the FastAPI application, which will serve as the main entry point for interacting with the models. The API will handle WebSocket requests for uploading images and receiving object detection and depth estimation results.

In [None]:
'''app = FastAPI(title="WebSocket Image Upload API", description="API for uploading images via WebSocket and receiving object detection and depth estimation results.")'''

# Load Models and Scaler Outside the WebSocket Route for Efficiency

To improve the performance of the WebSocket route, the object detection (DETR), depth estimation (GLPN), and Z-location prediction (LSTM) models, as well as the scaler, are loaded outside the WebSocket route. This ensures that the models do not need to be reloaded with each incoming request, reducing latency.

In [None]:
detr = DETR()
glpn = GLPDepth()
zlocE = LSTM_Model()
scaler = pickle.load(open(CONFIG['lstm_scaler_path'], 'rb'))
processing = PROCESSING()

# Define Route for Serving HTML Documentation

This route serves HTML documentation for the API. It responds to GET requests at the root (`"/"`) and returns an HTML response that describes how to interact with the FastAPI service. This can be useful for providing a user-friendly interface or API documentation.

In [None]:
'''
# Serve the HTML documentation
@app.get("/", response_class=HTMLResponse)
async def get_docs():

    # Get the directory of the current script
    html_path = os.path.join(os.path.dirname(__file__), "docs.html")

    if not os.path.exists(html_path):
        return HTMLResponse(content="docs.html file not found", status_code=404)

    with open(html_path, "r") as f:
        return HTMLResponse(f.read())
'''

# Define Asynchronous Functions for Running DETR Detection and GLPN Depth estimation

Two asynchronous functions for running object detection using the DETR model and performing depth estimation using the GLPN model. The functions will be executed in a separate threads to avoid blocking the main application, ensuring better performance.

In [None]:
'''
# Run DETR detection in a separate thread
async def run_detr(pil_image):
    return await asyncio.to_thread(detr.detect, pil_image)

# Run GLPN depth estimation in a separate thread
async def run_glpn(pil_image, img_shape):
    return await asyncio.to_thread(glpn.predict, pil_image, img_shape)
'''

# Define WebSocket Endpoint for Handling Image Uploads and Processing Requests

This WebSocket endpoint handles image uploads from clients. When an image is uploaded, it runs the necessary object detection and depth estimation models, and then returns the results to the client.

### Parameters:
- `websocket`: The WebSocket connection object, which is used to receive images from the client and send the results back.

In [None]:
'''
@app.websocket("/ws/predict")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    while True:
        try:
            # Receive raw bytes (image data)
            image_bytes = await websocket.receive_bytes()
            # Convert bytes to a NumPy array
            nparr = np.frombuffer(image_bytes, np.uint8)
            # Decode the image
            frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)

            # Process the frame
            # frame = cv2.resize(frame, (1280, 640))
            color_converted = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            pil_image = Image.fromarray(color_converted)
            img_shape = color_converted.shape[0:2]  # (height, width)

            # DETR Object Detection
            # scores, boxes = detr.detect(pil_image)

            # GLPN Depth Estimation
            # depth_map = glpn.predict(pil_image, img_shape)

            # Run DETR and GLPN in parallel
            detr_result, depth_map = await asyncio.gather(
                run_detr(pil_image),
                run_glpn(pil_image, img_shape)
            )

            # Unpack the DETR detection results
            scores, boxes = detr_result

            # Process bounding boxes and overlap them with the depth map
            pdata = processing.process_detections(scores, boxes, depth_map, detr)

            # Generate the output JSON
            output_json = generate_output_json(pdata, zlocE, scaler)

            # Send the output back to the client
            await websocket.send_text(json.dumps(output_json))

        except Exception as e:
            print(f"Error: {e}")
            await websocket.send_text(f"Error processing image: {str(e)}")
    '''

# Testing

In [0]:
# Load a sample image
sample_image = cv2.imread("sample_image.png")

sample_image = cv2.resize(sample_image, (1280, 640))
color_converted = cv2.cvtColor(sample_image, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(color_converted)
img_shape = color_converted.shape[0:2]  # (height, width)


In [None]:
# DETR Object Detection
scores, boxes = detr.detect(pil_image)

In [None]:
# Show Object Detection
detr.visualize(pil_image, scores, boxes)

In [None]:
# GLPN Depth Estimation
depth_map = glpn.predict(pil_image, img_shape)

In [None]:
# Show Depth Map
glpn.plot_depth_map(depth_map)

In [None]:
# Process bounding boxes and overlap them with the depth map
pdata = processing.process_detections(scores, boxes, depth_map, detr)
pdata

In [None]:
# Generate the output JSON
output_json = generate_output_json(pdata, zlocE, scaler)
output_json