# Semantic Segment Road and Sidewalk
Tony Wang July 04 2023

After semantic segmentation of road and sidewalk, we obtain the pixel level binary mask of them. Which can be used to detect human-road relationship using rule-based comparision. Since the SAM didn't provide necessary api, I write some utility func to realize it

> This notebook is used for tutuorial demo, because I believe, compared to the unstable .py file, jupyter notebook would provide a vivid description and data pipeline demonstration.



## Library & Model Loading

In [1]:
import os
import cv2
# filter some annoying debug info
# import warnings
# warnings.filterwarnings('ignore')

import torch
import torchvision
import supervision as sv

import numpy as np
from PIL import Image
from pathlib import Path
from collections import Counter

import matplotlib.pyplot as plt

from groundingdino.util.inference import Model
from segment_anything import sam_model_registry, SamPredictor
#TODO name!
from groundingdino.util.inference import load_model, load_image, predict, annotate

# import SAM_utility # 

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


# Paths to GroundingDINO and SAM checkpoints
GROUNDING_DINO_CONFIG_PATH = "../GroundingDINO/groundingdino/config/GroundingDINO_SwinT_OGC.py"
GROUNDING_DINO_CHECKPOINT_PATH = "./weights/groundingdino_swint_ogc.pth"
MODEL_TYPE = "vit_b"
SAM_CHECKPOINT_PATH = "./weights/sam_vit_b_01ec64.pth"

# Predict classes and hyper-param for GroundingDINO
BOX_TRESHOLD = 0.25
TEXT_TRESHOLD = 0.25
PED_TRESHOLD = 0.5

NMS_THRESHOLD = 0.85
IOU_THRESHOLD = 0.5

In [2]:
DEBUG = False
# DEBUG = True


In [None]:

from depth_util import predict_depth,get_distance_category

from mask_util import (
    show_mask, show_points, show_box, display_mask, nms_processing, 
    is_overlap, compute_overlap, get_location, get_surface_info
)
from file_io    import is_image_file
from angle_util import describe_angle,estimate_angle

In [3]:
from DPT_module.dpt.models import DPTDepthModel
from DPT_module.dpt.midas_net import MidasNet_large
from DPT_module.dpt.transforms import Resize, NormalizeImage, PrepareForNet
import DPT_module.util.io as DPT_io
from torchvision.transforms import Compose

The model loading is quite long
with some unremovable warning in gDINO, just ignore it

In [6]:
# Initialize GroundingDINO model
grounding_dino_model = Model(
    model_config_path=GROUNDING_DINO_CONFIG_PATH, 
    model_checkpoint_path=GROUNDING_DINO_CHECKPOINT_PATH, 
    device=DEVICE
)

# Initialize SAM model and predictor
sam = sam_model_registry[MODEL_TYPE](checkpoint=SAM_CHECKPOINT_PATH)
sam.to(device=DEVICE)
sam_predictor = SamPredictor(sam)

  return _VF.meshgrid(tensors, **kwargs)  # type: ignore[attr-defined]


final text_encoder_type: bert-base-uncased


# Data structure
LocationInfo: pack form to help data-alignment

In [7]:
class LocationInfo:
    def __init__(self, object_type, id, box, mask,confidence):
        self.object_type = object_type  # ('sidewalk', 'road', or 'person')
        self.id = id  # Unique ID within the type
        self.box = box  # Bounding box in xyxy format
        self.mask = mask  # Binary mask indicating the precise location of the object
        self.confidence = confidence #confidence of bbox
        self.distance = None # str,{very close,close, median, far, very far}
        self.angle = None    # horizontal angle relative to camera
    def get_area(self):
        """
        int: The area of the object in pixels.
        """
        return np.sum(self.mask)


## Utility Function


## Architecture:
1. gDINO : grounding_dino_model.predict_with_classes

   CLASSES_prompt= ['road', 'sidewalk']

   Based on testing, this pair is most reliable (otherwise the sidewalk may messed up with road) 

   In this part, I use the box as Region of Interest(ROI) to further prompt SAM

2. Non-maximum suppression (NMS) :

   get rid of redundant and overlapping bounding boxes.

   the metric is Intersection over Union(IoU)

3. Prompting SAM with ROI, select mask with largest area, in this step, the road and sidewalk can be segmented with naming in pixel level accuracy.

4. save the result 

5. label the result with label and confidence

6. TODO: do image sequence experiment, analyze the behavior of person

7. TODO: split cases based on JAAD info

   - car is moving 
   - car is stopping
   - time
   - weather
   - more...

In GTX3090 environment, the algorithm runs relatively fast with GPU boosting.

(Not as bad as I guessed before, much faster than all of the online demo)


## Location utility function
- is_overlap: a mask-level comparitor func

Surface Utility Function

In [15]:
def obj_print(obj_dict,image):
    for name, person in obj_dict.items():
        print(name)
        print(person.box)
        print(person.distance)
        person.angle = estimate_angle(image,person)

        print(f"angle is {person.angle},it is {describe_angle(person.angle)}")
        print("\n\n")



In [16]:
def write_to_txt(image_path, output_path, p_surface_overlaps, counts, labels, p_labels,obj_dict):
    '''
    str image_path: the relative path to input image
    str output_path: "DINOmasked/video_0018/man.png"
    
    
    '''
    output_dir = Path(output_path).parent
    img_name = image_path[-8:-4]
    txt_name = "Info_Video_"+ str(output_dir)[-4:] +".txt"
    txt_path = os.path.join(output_dir, txt_name) 

    if DEBUG:
        print("output_dir: ", output_dir)
        print("image_name: ", img_name)
        print("txt_path: ", txt_path)
    # Check if file already exists
    if DEBUG:
        if os.path.exists(txt_path):
            # Read in existing data
            with open(txt_path, 'r') as f:
                existing_data = f.read()

            # If the info of the current image has already been recorded, return without appending
            if f"INFO of {img_name}:\n" in existing_data:
                print(f"ERROR: the info of{img_name} has been generated")
                return
    with open(txt_path, 'a') as f: # 'a' option is for appending to the file if it exists
        f.write(f"INFO of {img_name}:\n")

        get_surface_info(obj_dict,f)
        
        for person, surfaces in p_surface_overlaps:
            if surfaces:
                surface_str = ', '.join([f"{surface.object_type} {surface.id}" for surface in surfaces])
                f.write(f"Person {person.id} is on the {surface_str}\n")
            else:
                f.write(f"Person {person.id} is not on any detected surface\n")
                
        f.write(f"number of Surface mask, Road&sidewalk, People 's mask, actural people: {counts}\n")
        f.write(f"Labels: [{', '.join(labels)}]\n")
        f.write(f"Person Labels: [{', '.join(p_labels)}]\n\n")





## Key Function

In [17]:
# Prompting SAM with Region of Interest
def segment_ROI(sam_predictor: SamPredictor, image: np.ndarray, boxes: np.ndarray):

    sam_predictor.set_image(image)
    result_masks = []
    for box in boxes:
        masks_np, iou_predictions, _ = sam_predictor.predict(
        point_coords=None,
        point_labels=None,
        box=box,
        multimask_output=True,
        )
        #TODO Remove the following line to get all the person masks
        # index = np.argmax(scores_np) 
        # Add all masks to the result, not just the one with the highest score
        # Filter out masks with IoU scores below the threshold
        for mask, score in zip(masks_np, iou_predictions):
            if score >= IOU_THRESHOLD:
                result_masks.append(mask)

    return np.array(result_masks)



In [18]:

def detect_road(image_path,output_path):
    try:
        image = cv2.imread(image_path)
        if image is None:
            print(f"Image at path {image_path} could not be loaded. Skipping.")
            return None
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    except Exception as e:
        print(f"Failed to process image at {image_path}. Error: {e}")
        return None
    
    ROAD_SIDEWALK = ['road', 'sidewalk'] 
    P_CLASS     = ['person'] #,'bike']
    # the person label lower gDINO's performance
    # so I split them

    # detect road and sidewalk
    detections = grounding_dino_model.predict_with_classes(
        image=image,
        classes = ROAD_SIDEWALK,
        box_threshold= BOX_TRESHOLD,
        text_threshold=TEXT_TRESHOLD
    )
    detections = nms_processing(detections)
    # detect person 
    p_detections = grounding_dino_model.predict_with_classes(
        image = image,
        classes = P_CLASS , 
        box_threshold= BOX_TRESHOLD,
        text_threshold=PED_TRESHOLD - 0.3
    )
    p_detections = nms_processing(p_detections)

    box_annotator = sv.BoxAnnotator()
    person_annotator = sv.BoxAnnotator()

    labels = [
        f"{ROAD_SIDEWALK[class_id]} {i} {confidence:0.2f}" 
        for i, (_, _, confidence, class_id, _) in enumerate(detections)]

    P_labels = [
        f"{P_CLASS[class_id]} {i} {confidence:0.2f}" 
        for i, (_, _, confidence, class_id, _) in enumerate(p_detections)]

    DINO_boxes = np.array(detections.xyxy)
    P_boxes    = np.array(p_detections.xyxy)
    
    annotated_frame = box_annotator.annotate(scene=image.copy(), detections=detections ,labels=labels)
    if DEBUG:
        sv.plot_image(annotated_frame, (16, 16))
    person_annotation = person_annotator.annotate(scene=annotated_frame,detections= p_detections,labels= P_labels)
    if DEBUG:
        sv.plot_image(person_annotation, (16, 16))
    # cv2.imwrite("annotated_image.jpg", annotated_frame)
    
    SAM_masks = segment_ROI(sam_predictor,image,DINO_boxes)
    P_masks = segment_ROI(sam_predictor,image,DINO_boxes)

    # Create a list of LocationInfo objects for each detected object
    obj_dict = Counter()
    
    for i, (box, label, mask) in enumerate(zip(DINO_boxes, labels, SAM_masks)):
        object_type, id, confidence   = label.split(' ')
        index = object_type +id
        obj_dict[index] =  (LocationInfo(object_type, int(id), box, mask,confidence)) 

    for i, (box, label, mask) in enumerate(zip(P_boxes, P_labels, P_masks)):
        object_type, id, confidence = label.split(' ')
        index = object_type+id
        obj_dict[index] = (LocationInfo(object_type, int(id), box, mask,confidence)) 

    depth_map = predict_depth(image_path,output_path)
    
    # Analyze where each person is standing
    p_surface_overlaps = []
    
    for name, person in obj_dict.items():
        if person.object_type != "person":
            continue # We only want to analyze persons
        person.distance = get_distance_category(depth_map,person.mask)
        person.angle   = estimate_angle(image,person)
        
        overlaps = []
        for name, surface in obj_dict.items():
            # We only want to analyze surfaces (road or sidewalk)
            if surface.object_type not in ROAD_SIDEWALK: 
                continue

            # Check if the person and the surface overlap
            overlap, _ = is_overlap(person.mask, surface.mask)
            if overlap:
                overlaps.append(surface)

        p_surface_overlaps.append((person, overlaps))


    if DEBUG:
        # Print out the analysis results
        for person, surfaces in p_surface_overlaps:
            if surfaces:
                surface_str = ', '.join([f"{surface.object_type} {surface.id}" for surface in surfaces])
                print(f"Person {person.id} is on the {surface_str}")
            else:
                print(f"Person {person.id} is not on any detected surface")

    (i, j, k, d) = display_mask(SAM_masks,P_masks,P_boxes,DINO_boxes,person_annotation,output_path)
    

    write_to_txt(image_path, output_path, p_surface_overlaps, (i, j, k, d), labels, P_labels,obj_dict)

    plt.close()
    
    # return DINO_boxes,labels,P_labels,SAM_masks,P_masks
    return obj_dict,image

obj_dict,image = detect_road("input/scene_2.png",output_path="DINO_masked/scene_2.png")
# DINO_boxes,labels,P_labels,SAM_masks,P_masks = detect_road("input/video_0031/image_0005.png",output_path="DINOmasked/man.png")
# DINO_boxes,labels,P_labels,SAM_masks,P_masks = detect_road("JAAD_seg_by_sec/video_0268/image_0001.png",output_path="DINOmasked/video_0268/image_0001.png")
# DINO_boxes,labels,P_labels,SAM_masks,P_masks = detect_road("JAAD_seg_by_sec/video_0268/image_0003.png",output_path="DINOmasked/video_0268/image_0003.png")
# obj_dict,labels,p_labels =  detect_road("JAAD_seg_by_sec/video_0060/image_0005.png",output_path="SSS.png" )# "DINOmasked/video_0060/image_0005.png")
# obj_dict =  detect_road("input/S0710/image_0005.png",output_path="SSS.png" )# "DINOmasked/video_0060/image_0005.png")

obj_print(obj_dict,image)




road0
[  1.7296143 187.68036   893.4764    430.3484   ]
None
angle is None,it is unknown



sidewalk1
[351.81097 195.26332 893.76764 429.85553]
None
angle is None,it is unknown



person0
[467.99738 217.8452  518.62616 317.82587]
very far
angle is 89.45291580105491,it is to the right



person1
[592.1711  215.20636 615.8034  294.1244 ]
far
angle is 89.87186884497642,it is to the right



person2
[558.53107 212.25148 585.63153 301.5714 ]
very far
angle is 89.79881769489278,it is to the right



person3
[565.5572 209.2723 587.733  298.9891]
very far
angle is 89.80592710219433,it is to the right



person4
[468.3664  219.26927 497.02032 317.64752]
very far
angle is 89.28786664070377,it is to the right





## File IO logic
the following code demonstrate how is the IO logic organized

For the sake of fast file inquiry, I used library: Path() and os

Feel free to modify this part if you need, just in case the content is too big, which may crash the kernel

In [19]:
image_path = "input/video_0268/image_0001.png"
output_path = "DINOmasked/video_0018/man.png"

output_dir = Path(output_path).parent

print(output_dir)
img_name = image_path[-8:-4]
txt_name = "Info_Video_"+ str(output_dir)[-4:] +".txt"
txt_path = os.path.join(output_dir, txt_name) 
print(txt_path)



DINOmasked/video_0018
DINOmasked/video_0018/Info_Video_0018.txt


In [20]:
# STOP RUNNING THE MAIN PROGRAM

## Main Function

In [21]:
input_dir = Path("input") # contain many folders  JAAD_seg_by_sec
output_dir = Path('DINOmasked')
output_dir.mkdir(parents=True, exist_ok=True)

print("===== Start =====")
i = 1
# Use rglob to recursively find all image files
for image_path in input_dir.rglob('*'):
    if is_image_file(str(image_path)):
        relative_path = image_path.relative_to(input_dir)

        output_path = output_dir / relative_path
        output_path.parent.mkdir(parents=True,exist_ok=True)

        if output_path.exists():
            print(f"Already scanned {output_path}, next one")
            continue
        else:
            print("Processing: ", i)
            i += 1
            print(f"Image path: {os.path.basename(str(image_path))}")

            result = detect_road(str(image_path),str(output_path))

            if result is not None:
                print(f"Detected: {image_path}") 
            else: 
                print( "failed to detect result")

print("===== END =====")
                


===== Start =====
Already scanned DINOmasked/man.png, next one
Already scanned DINOmasked/man_black.png, next one
Already scanned DINOmasked/scene_2.png, next one
Already scanned DINOmasked/scene_2_black.png, next one
Already scanned DINOmasked/S0710/image_0003.png, next one
Already scanned DINOmasked/S0710/image_0001.png, next one
Already scanned DINOmasked/S0710/image_0002.png, next one
Already scanned DINOmasked/S0710/image_0006.png, next one
Already scanned DINOmasked/S0710/image_0004.png, next one
Already scanned DINOmasked/S0710/image_0005.png, next one
Already scanned DINOmasked/S0710/image_0008.png, next one
Already scanned DINOmasked/S0710/image_0007.png, next one
Already scanned DINOmasked/S0710/image_0009.png, next one
Already scanned DINOmasked/S0710/image_0011.png, next one
Already scanned DINOmasked/S0710/image_0012.png, next one
Already scanned DINOmasked/S0710/image_0010.png, next one
Already scanned DINOmasked/S0710/image_0014.png, next one
Already scanned DINOmasked/S

Detected: input/video_0045/image_0001.png
Processing:  2
Image path: image_0002.png
Detected: input/video_0045/image_0002.png
Processing:  3
Image path: image_0003.png
Invalid distance description: medium
Invalid distance description: medium
Detected: input/video_0045/image_0003.png
Processing:  4
Image path: image_0004.png
Invalid distance description: medium
Invalid distance description: medium
Invalid distance description: medium
Detected: input/video_0045/image_0004.png
Processing:  5
Image path: image_0005.png
Detected: input/video_0045/image_0005.png
===== END =====


In [22]:
input_dir  = Path("input")
i = 0
for image_path in input_dir.rglob('*'):
    if i > 30: break
    
    if is_image_file(str(image_path)):
        i += 1

        print("image path is" ,image_path)
        relative_path = image_path.relative_to(input_dir)

        output_filename = 'D_' + relative_path.name
        
        output_path = Path(os.path.join( output_dir , output_filename))
        print("output_path path is" ,output_path)
        output_path.parent.mkdir(parents=True,exist_ok=True)


image path is input/man.png
output_path path is DINOmasked/D_man.png
image path is input/man_black.png
output_path path is DINOmasked/D_man_black.png
image path is input/scene_2.png
output_path path is DINOmasked/D_scene_2.png
image path is input/scene_2_black.png
output_path path is DINOmasked/D_scene_2_black.png
image path is input/S0710/image_0003.png
output_path path is DINOmasked/D_image_0003.png
image path is input/S0710/image_0001.png
output_path path is DINOmasked/D_image_0001.png
image path is input/S0710/image_0002.png
output_path path is DINOmasked/D_image_0002.png
image path is input/S0710/image_0006.png
output_path path is DINOmasked/D_image_0006.png
image path is input/S0710/image_0004.png
output_path path is DINOmasked/D_image_0004.png
image path is input/S0710/image_0005.png
output_path path is DINOmasked/D_image_0005.png
image path is input/S0710/image_0008.png
output_path path is DINOmasked/D_image_0008.png
image path is input/S0710/image_0007.png
output_path path is 