# Manhole Detection - Inference

[Tutorial](https://www.youtube.com/watch?v=QtsI0TnwDZs&ab_channel=Ultralytics)    &    [Source](https://docs.ultralytics.com/modes/predict/#working-with-results)

In [1]:
# Import Libraries
import os
import numpy as np
import cv2

import pandas as pd
from tqdm.auto import tqdm
from osgeo import gdal,osr,ogr
import matplotlib.pyplot as plt
import matplotlib.patches as patches

import PIL.Image as PILImage
import subprocess
import shutil

import pandas as pd
from scipy.spatial.distance import cdist
from sklearn.cluster import DBSCAN
import numpy as np

import geopandas as gpd
from shapely.geometry import Point

import ultralytics
from ultralytics import YOLO
ultralytics.checks()

Ultralytics YOLOv8.1.28 🚀 Python-3.12.2 torch-2.2.1+cu121 CUDA:0 (NVIDIA RTX A4500, 20178MiB)
Setup complete ✅ (32 CPUs, 93.5 GB RAM, 21.7/182.3 GB disk)


In [2]:
def ResampleGeotiff(infname, outfname, minx, miny, maxx, maxy, dx, dy):
    """This function resamples the GeoTIFF image in UInt16 format."""
    
    box_str = "%f %f %f %f" % (minx, miny, maxx, maxy)
    pixsp_str = "%f %f" % (dx, dy)
    command = 'gdalwarp -overwrite -te %s -tr %s -r max -ot byte -of Gtiff %s %s' % (box_str, pixsp_str, infname, outfname)
    
    with open(os.devnull, 'w') as devnull:      # open a handle to os.devnull to suppress the output of the subprocess
        subprocess.run(command, shell=True, stdout=devnull, stderr=subprocess.STDOUT)


def convert_tiff_to_jpg(tiff_path, jpg_path):
    """This function converts images format from .tif to .jpg."""
    with PILImage.open(tiff_path) as img:
        img.convert('RGB').save(jpg_path, 'JPEG')


def distance_between_points(x1, y1, x2, y2):
    """Calculate the distance between two points."""
    return np.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2)


def get_tiff_corners(gt, xsize, ysize):
    """Calculate corner coordinates based on geotransform and image size."""
    top_left = (gt[0], gt[3])
    top_right = (gt[0] + gt[1] * xsize, gt[3] + gt[2] * xsize)
    bottom_left = (gt[0] + gt[4] * ysize, gt[3] + gt[5] * ysize)
    bottom_right = (gt[0] + gt[1] * xsize + gt[4] * ysize, gt[3] + gt[2] * xsize + gt[5] * ysize)
    return top_left, top_right, bottom_left, bottom_right


def resample_and_convert_to_jpg(infname, outfname_tiff, outfname_jpg, minx, miny, maxx, maxy, dx, dy):
    """Resample the GeoTIFF image and convert it to JPEG, returning the image object."""

    box_str = "%f %f %f %f" % (minx, miny, maxx, maxy)
    pixsp_str = "%f %f" % (dx, dy)
    command = 'gdalwarp -overwrite -te %s -tr %s -r max -ot byte -of Gtiff %s %s' % (box_str, pixsp_str, infname, outfname_tiff)
    
    with open(os.devnull, 'w') as devnull:      # open a handle to os.devnull to suppress the output of the subprocess
        subprocess.run(command, shell=True, stdout=devnull, stderr=subprocess.STDOUT)

    # Open the resampled TIFF, convert to RGB, and save as JPEG
    img = PILImage.open(outfname_tiff)
    img_rgb = img.convert('RGB')
    img_rgb.save(outfname_jpg, 'JPEG')

    # After saving the JPEG, delete the TIFF file to free up space
    os.remove(outfname_tiff)

    return img_rgb


def is_central_pixel_bright_enough(image, threshold=30):
    """Check if the brightness of the central pixel exceeds a threshold."""
    central_pixel = image.getpixel((image.width // 2, image.height // 2))
    brightness = sum(central_pixel) / 3
    return brightness > threshold


def draw_boxes_on_image(image_path, detections):
    """
    Draw bounding boxes on the image based on detections.
    detections: List of detections with each detection as [x_center, y_center, width, height] in normalized coordinates.
    """
    image = cv2.imread(image_path)
    img_height, img_width = image.shape[:2]

    for det in detections:
        x_center, y_center, width, height = det
        # Convert normalized coordinates back to pixel coordinates
        x_center, y_center, width, height = (
            x_center * img_width, 
            y_center * img_height, 
            width * img_width, 
            height * img_height
        )
        # Calculate the bounding box's top left and bottom right corners
        xmin = int(x_center - width / 2)
        ymin = int(y_center - height / 2)
        xmax = int(x_center + width / 2)
        ymax = int(y_center + height / 2)
        
        # Draw the bounding box
        cv2.rectangle(image, (xmin, ymin), (xmax, ymax), (255, 0, 0), 2)
    
    # Save the image with bounding boxes drawn
    cv2.imwrite(image_path, image)


def inference(name_image, outpath, imsize_m, stride, model_path='training_results/mh5m-default/weights/best.pt', debug=False):

    # Paths for positives and negatives
    positives_path = os.path.join(outpath, 'positives')
    negatives_path = os.path.join(outpath, 'negatives')
    invalid_path = os.path.join(outpath, 'invalid')
    os.makedirs(positives_path, exist_ok=True)
    os.makedirs(negatives_path, exist_ok=True)
    os.makedirs(invalid_path, exist_ok=True)

    # Open the large image
    Image = gdal.Open(name_image)
    if Image is None:
        raise Exception("Unable to open ", name_image)

    gt = Image.GetGeoTransform()
    img_xsize = Image.RasterXSize
    img_ysize = Image.RasterYSize

    # Calculate the number of steps to take across and down the image
    num_steps_x = int(np.floor((img_xsize * abs(gt[1])) / stride))
    num_steps_y = int(np.floor((img_ysize * abs(gt[5])) / stride))
    total_steps = num_steps_x * num_steps_y

    x0, y0 = get_tiff_corners(gt, img_xsize, img_ysize)[0]

    # Initialize DataFrame for storing manhole locations
    manhole_locations_df = pd.DataFrame(columns=['filename', 'xGeo', 'yGeo', 'conf', 'class_id'])

    # Load the Pretrained YOLO Model
    model = YOLO(model_path)

    # Initialize the progress bar
    pbar = tqdm(total=total_steps, desc='Processing', unit='step')

    for y_step in range(num_steps_y):
        for x_step in range(num_steps_x):

            # Calculate the upper left corner of the current window
            xW = x0 + x_step * stride
            yW = y0 - y_step * stride

            # Update progress bar description with current step
            pbar.set_description(f'Processing x_step={x_step}, y_step={y_step}')

            # Define the boundaries of the window
            x_lb = xW
            y_lb = yW - imsize_m
            x_ub = xW + imsize_m
            y_ub = yW

            # Resample the .tif Image and convert to .jpg
            outfname_tiff = os.path.join(outpath, f"Window_{x_step}_{y_step}.tif")
            outfname_jpg = os.path.join(outpath, f"Window_{x_step}_{y_step}.jpg")
            img = resample_and_convert_to_jpg(name_image, outfname_tiff, outfname_jpg, x_lb, y_lb, x_ub, y_ub, abs(gt[1]), abs(gt[5]))

            if is_central_pixel_bright_enough(img):

                # Run YOLO inference on the image
                results = model(outfname_jpg, verbose=False)

                if results[0].boxes.data.shape[0] > 0:

                    detections = []

                    for result in results:

                        coords_norm = result.boxes.xywhn[0].tolist()
                        conf = result.boxes.conf[0].tolist()
                        class_id = int(result.boxes.cls[0].tolist())

                        xM = x_lb + coords_norm[0] * imsize_m
                        yM = y_lb - coords_norm[1] * imsize_m

                        # Append to the DataFrame
                        manhole_locations_df = manhole_locations_df._append({
                            'filename': f"Window_{x_step}_{y_step}.jpg", 
                            'xGeo': xM, 
                            'yGeo': yM, 
                            'conf': conf, 
                            'class_id': class_id
                        }, ignore_index=True)

                        # print(coords_norm, conf, class_id)

                        detections.append(coords_norm[:4])  # [x_center, y_center, width, height]

                    # Draw the bounding boxes on the image
                    draw_boxes_on_image(outfname_jpg, detections)

                    shutil.move(outfname_jpg, os.path.join(positives_path, os.path.basename(outfname_jpg)))

                    if debug:

                        plt.imshow(img)
                        plt.axis('off')  # Hide axes ticks
                        plt.show()
                        # print(results[0].boxes)

                else:

                    shutil.move(outfname_jpg, os.path.join(negatives_path, os.path.basename(outfname_jpg)))

            else:

                    shutil.move(outfname_jpg, os.path.join(invalid_path, os.path.basename(outfname_jpg)))

            # Update the progress bar after each step
            pbar.update()

    pbar.close()  # Close the progress bar after completion

    # Save dataframe to CSV
    print(manhole_locations_df)
    manhole_locations_df.to_csv(os.path.join(outpath, 'detected_manholes.csv'))

In [3]:
# --- Detection Parameters --- #
name_image = "ImagesManholeDetection/Gigean/Gigean_RGB/filtered_gigean.tif"
# name_image = "ImagesManholeDetection/Gigean/Gigean_RGB/140610_gigean_4cm_l93_vis.tif"
imsize_m = 5            # meters
step_size = 2.5         # meters
model_path = 'training_results/mh5m-n-improved/weights/best.pt'
EPSG = 0

outpath = os.path.join('inference', 'gigean-filtered-improved')
os.makedirs(outpath, exist_ok=True)


inference(name_image, outpath, imsize_m, step_size, model_path)


In [4]:
# Load the detections CSV
df = pd.read_csv(os.path.join(outpath, 'detected_manholes.csv'))

# Convert coordinates to a NumPy array for distance calculation
coordinates = df[['xGeo', 'yGeo']].values

# Use DBSCAN for clustering based on a distance threshold (40cm, or 0.4 meters in this case)
# eps is the maximum distance between two samples for them to be considered in the same neighborhood
# min_samples is the minimum number of samples in a neighborhood for a data point to qualify as a core point
eps = 0.4
dbscan = DBSCAN(eps=eps, min_samples=1, metric='euclidean')
clusters = dbscan.fit_predict(coordinates)

# Add cluster labels to the DataFrame
df['cluster'] = clusters

# Function to calculate weighted average
def weighted_average(group):
    x = np.average(group['xGeo'], weights=group['conf'])
    y = np.average(group['yGeo'], weights=group['conf'])
    max_conf = group['conf'].max()
    return pd.Series({'xGeo': x, 'yGeo': y, 'conf': max_conf})

# Group by cluster and calculate the weighted average for each group
unique_manholes = df.groupby('cluster').apply(weighted_average).reset_index()

# Save the unique manholes to a new CSV file
unique_manholes.to_csv(os.path.join(outpath, 'unique_manholes.csv'), index=False)

print("Unique manholes have been identified and saved.")


Unique manholes have been identified and saved.


  unique_manholes = df.groupby('cluster').apply(weighted_average).reset_index()


In [5]:
# Load the CSV file
csv_file = os.path.join(outpath, 'unique_manholes.csv')
data = pd.read_csv(csv_file)

# Convert the DataFrame to a GeoDataFrame
gdf = gpd.GeoDataFrame(
    data, geometry=data.apply(lambda row: Point(row['xGeo'], row['yGeo']), axis=1)
)

# Set the CRS (Coordinate Reference System) from the original Shapefile
original_shp_path = 'ImagesManholeDetection/Gigean/ManholesOrthoAJ_Carole.shp'
original_gdf = gpd.read_file(original_shp_path)
gdf.crs = original_gdf.crs

# Save to Shapefile
manhole_file = os.path.join(outpath, 'unique_manholes.shp')
gdf.to_file(manhole_file)

print(f"Shapefile saved to {manhole_file}")


Shapefile saved to inference/gigean-filtered-improved/unique_manholes.shp
