In [1]:
### Setup ###
import os
import json
import math
import random
import zipfile
import shutil
import time
import copy
import itertools
import requests
import yaml
import subprocess

import numpy as np
import pandas as pd
import geopandas as gpd
import folium
import psutil
import mapclassify
import scipy
import seaborn as sns

import matplotlib.pyplot as plt
import matplotlib.path as mpltPath
import matplotlib.colors as mcolors
import matplotlib.cm as cm

from PIL import Image
from tqdm import tqdm
from IPython.display import display
from ultralytics import YOLO, settings

import matplotlib.colors as mcolors
from branca.element import Template, MacroElement


### Directories ###
WORK_DIR = "."
#API_KEY="" # for GSV API
CHECKPOINT_PATH = os.path.join("./model_sign_detection.pt") # load the trained sign detection model


### Import ###
input_dir = "./../../outputs/step1_preprocessing"
lines = gpd.read_file(os.path.join(input_dir, "LINE_EPSG4326.geojson"), driver="GeoJSON")
print(len(lines))
#lines.iloc[0:5]

points = gpd.read_file(os.path.join(input_dir, "POINT_EPSG4326.geojson"), driver="GeoJSON")
print(len(points))
points.iloc[0:5]

In [7]:
### Adjust heading based on road bearing vs pano_heading ###
def get_side_heading(row, YOUR_ANGLE):
    bearing = row['bearing']
    pano_heading = row['pano_heading']

    diff = abs(pano_heading - bearing)
    diff = diff if diff <= 180 else 360 - diff  # shortest angular distance

    if diff <= 45 or diff >= 315:
        pano_heading_adjusted = pano_heading
    else:
        pano_heading_adjusted = pano_heading + 180
    pano_heading_adjusted = pano_heading_adjusted % 360   # keep in [0, 360)
    
    if str(row["median"]) == "yes":
        if str(row["side"]) == "side1":
            pano_heading_side = pano_heading_adjusted + YOUR_ANGLE   
        else:
            pano_heading_side = pano_heading_adjusted + YOUR_ANGLE    
    else:  # no median, one single centerline for both sides
        if str(row["side"]) == "side1":
            pano_heading_side = pano_heading_adjusted + YOUR_ANGLE   
        else:
            pano_heading_side = pano_heading_adjusted + YOUR_ANGLE # I already switched the bearing to the other side before 
    return pd.Series({"pano_heading_side": pano_heading_side})

side_headings = points.apply(lambda row: get_side_heading(row, 40), axis=1)
points = pd.concat([points, side_headings], axis=1)

points = points.sort_values(by=["link_id", "side", "point_id"])
print(len(points))
print(sum(points["pano_heading"].astype(float).isna()))
print(sum(points["pano_heading_side"].astype(float).isna()))
points.iloc[0:5]  

In [1]:
### GSV download ###
MAX_LENGTH = len(points)
def generate_ranges(start, stop, interval, num_ranges, max_length=MAX_LENGTH):
    ranges = []
    for i in range(num_ranges):
        range_start = start + i * interval
        range_end = start + (i + 1) * interval
        range_end = min(range_end, max_length)    
        ranges.append(range(range_start, range_end))
    return ranges
ranges = generate_ranges(0, 1000, 1000, 18, MAX_LENGTH)
print(ranges[0]) 
print(ranges[1]) 
print("...")
print(ranges[-2]) 
print(ranges[-1]) 
print()


output_dir = os.path.join(".", "outputs") 
os.makedirs(output_dir, exist_ok=True)
print(output_dir)


def get_image(row):
    linkid = row['link_id']
    pointid = row['point_id']
    location = f"{round(row['pano_lat'], 9)},{round(row['pano_lon'], 9)}"
    side = row['side']
    heading = round(row['pano_heading_side'], 3)
    date = row['pano_date']
    panoid = row['pano_id']
    width = 640
    height = 640
    fov = 100
    endpoint = "https://streetviewpixels-pa.googleapis.com/v1/thumbnail?cb_client=maps_sv.tactile&"
    furl = f"{endpoint}w={width}&h={height}&pitch={0}&panoid={panoid}&yaw={heading}&thumbfov={fov}"
    fname = f"gsv__{linkid}__{side}__{pointid}__{date}__{heading}__{location}.jpg"  # Don't change this naming
    output_path = os.path.join(output_dir, fname)
    if not os.path.exists(output_path):
        response = requests.get(furl, stream=True)
        if response.status_code == 200:
            with open(output_path, 'wb') as file:
                file.write(response.content)
        else:
            print(f"Error: Failed to fetch {output_path}, Status Code: {response.status_code}")
for r in ranges:
    print(r)
    points_download = points.iloc[r]
    print(points_download.shape)
    for i in range(len(points_download)):
        row = points_download.iloc[i]
        get_image(row)

NameError: name 'point_whole_side_assigned_azi' is not defined

In [None]:
### Settings for Sign Detection (GPU needed) ###
import torch  
print(torch.cuda.is_available())  
if torch.cuda.is_available():
    for i in range(torch.cuda.device_count()):
        print(f"GPU {i}: {torch.cuda.get_device_name(i)}")
        print(f"  Memory Allocated: {torch.cuda.memory_allocated(i)/1024**2:.2f} MB")
        print(f"  Memory Cached:    {torch.cuda.memory_reserved(i)/1024**2:.2f} MB")
else:
    print("No CUDA-compatible GPU detected.")

from ultralytics import YOLO settings
#ultralytics.checks()

def install_opencv_headless():
    try:
        subprocess.run(["pip", "install", "opencv-python-headless"], check=True)
        print("opencv-python-headless installed successfully.")
    except subprocess.CalledProcessError as e:
        print(f"An error occurred: {e}")
#install_opencv_headless()

In [6]:
### Prepare for Sign Detection ###
# load images
folder_dir = "./outputs"
filenames = sorted([x for x in os.listdir(folder_dir) if x.endswith(".jpg")])
filenames = [os.path.join(folder_dir, f) for f in filenames]
print("Total files:", len(filenames))


# set your batch size (e.g., 1000 per range)
BATCH_SIZE = 1000
MAX_LENGTH = len(filenames)
def generate_ranges(batch_size, max_length=MAX_LENGTH):
    num_batches = ceil(max_length / batch_size)
    ranges = []
    for i in range(num_batches):
        start = i * batch_size
        end = min((i + 1) * batch_size, max_length)
        ranges.append(range(start, end))
    return ranges
ranges = generate_ranges(BATCH_SIZE)
print("Number of ranges:", len(ranges))


# create a storage directory of GSV images
output_dir = os.path.join(".", "sign_detected_imgs") 
os.makedirs(output_dir, exist_ok=True)
print(output_dir)


# define classes
classes =  {
    "class_id": [0, 1, 2, 3, 4],
    "class_nm": ["prohibAnytime", 
                 "prohibSometimes", 
                 "prohibForBusStop", 
                 "prohibForCleaning", 
                 "permit"]}
classes = pd.DataFrame(classes)

FileNotFoundError: [Errno 2] No such file or directory: './outputs'

In [None]:
### Sign Detection ###
# ---------- SETTINGS ----------
THRES_CONFID = 0.3
THRES_AREA = 200
BATCH_SIZE = 8
command = ["nvidia-smi", "--query-gpu=memory.free", "--format=csv,nounits"]

# ---------- LOAD MODEL ON GPU 3 ----------
torch.cuda.set_device(3)  
model = YOLO(input_path).to("cuda:3")

# os.environ["CUDA_VISIBLE_DEVICES"] = "3"       # ðŸ‘ˆ make GPU 3 the only visible one
# torch.cuda.set_device(0)    
# model = YOLO(input_path).to("cuda:0")


log_detected = []

# ---------- PROCESS RANGES SEQUENTIALLY ----------
for i, batch_range in enumerate(ranges):
    print(f"{i}th range starts: {batch_range}")

    files = filenames[batch_range[0]:batch_range[-1]]
    try:
        results = model.predict(
            files,
            conf=THRES_CONFID,
            #device="cuda:0",    # refers to GPU 3
            save=False,
            verbose=False,
            batch=BATCH_SIZE,
            half=True
        )

        batch_log = []
        for k, result in enumerate(results):
            boxes = result.boxes.data
            fname = os.path.basename(files[k])

            for j, box in enumerate(boxes):
                xmin, ymin, xmax, ymax, confidence, class_id = box.tolist()
                xmin, ymin, xmax, ymax, confidence = int(xmin), int(ymin), int(xmax), int(ymax), round(confidence, 2)
                area = (xmax - xmin) * (ymax - ymin)
                class_id = int(class_id)
                class_nm = classes.iloc[class_id]["class_nm"]

                if confidence > THRES_CONFID and area > THRES_AREA:
                    result.save(os.path.join(output_dir, fname))
                    batch_log.append({
                        "file_id": fname,
                        "detection_index": j,
                        "class_id": class_id,
                        "class_nm": class_nm,
                        "xmin": xmin,
                        "ymin": ymin,
                        "xmax": xmax,
                        "ymax": ymax,
                        "confidence": confidence,
                        "area": area
                    })

        log_detected.extend(batch_log)
        print(f"Batch {i}: {len(batch_log)} detections")

    except Exception as e:
        print(f"Error processing batch {i}: {e}")

    # free GPU memory between batches
    del results, files, batch_log
    gc.collect()
    torch.cuda.empty_cache()

    shell_op = subprocess.run(command, stdout=subprocess.PIPE, text=True)
    #print(f"GPU memory after batch {i}:\n{shell_op.stdout}")

print("Done")

In [None]:
log_detected_df = pd.DataFrame(log_detected)
log_detected_df = log_detected_df[['file_id', 'class_nm', 'confidence', 'area',
                                       'detection_index', 'class_id', 
                                       'xmin', 'ymin', 'xmax', 'ymax']]
log_detected_df["link_id"] = log_detected_df["file_id"].apply(lambda x: x.split("__")[1])
log_detected_df["side"] = log_detected_df["file_id"].apply(lambda x: x.split("__")[2])
log_detected_df["point_id"] = log_detected_df["file_id"].apply(lambda x: int(x.split("__")[3]))
log_detected_df = log_detected_df[['link_id', 'side', 'point_id', 
                                   'class_nm', 'confidence', 'area',
                                   'detection_index', 'class_id',
                                   'xmin', 'ymin', 'xmax', 'ymax', 'file_id']]
log_detected_df = log_detected_df.sort_values(by=['link_id', 'side', 'point_id'])
print(len(log_detected_df))
print(log_detected_df.groupby("class_nm")["class_nm"].count())
log_detected_df.iloc[0:3]

output_path = os.path.join(WORK_DIR, "sign_inferred.csv")
log_detected_df.to_csv(output_path, index=False)

In [None]:
points_side1 = points[points["side"] == "side1"][["link_id", "side", "point_id", "pano_id", "pano_date", "geometry"]]
print(len(points_side1))

class_order = [    
    "prohibAnytime",
    "prohibSometimes",
    "prohibForBusStop",
    #"prohibForCleaning",
    "permit",
]

points_side1_detected = points_side1.merge(log_detected_df[log_detected_df["side"]=="side1"],
                                 on=["link_id", "side", "point_id"],
                                 how="left")

points_side1_detected = points_side1_detected[~points_side1_detected["confidence"].isna()]

points_side1_detected["class_nm"] = pd.Categorical(
    points_side1_detected["class_nm"],
    categories=class_order,
    ordered=True
)
points_side1_detected = points_side1_detected.sort_values(by="class_nm")
points_side1_detected["class_nm"] = points_side1_detected["class_nm"].astype(str)

print(len(points_side1_detected))
print(points_side1_detected.iloc[0:2])
print()

sign_types = points_side1_detected["class_nm"].unique()
print(sign_types)
colors = plt.get_cmap("RdYlBu")(np.linspace(0, 1, len(sign_types)))
sign_types_color_map = dict(zip(sign_types, [mcolors.to_hex(c) for c in colors]))
points_side1_detected["color"] = points_side1_detected["class_nm"].map(sign_types_color_map)

m = atl.explore(
    color="white",
    alpha=1,
    name="atl_metro",
    tooltip=False,
    control_scale=True
)

m = points_side1.explore(
    m=m,
    color="grey",
    name="Pano IDs",
    marker_kwds={"radius": 1.0, "fillOpacity": 0.4})

m = points_side1_detected.explore(
    m=m,
    color=points_side1_detected["color"],
    name="Detected signs by category",
    tooltip="class_nm",
    marker_kwds={"radius": 2.0, "fillOpacity": 0.8}
)

folium.LayerControl(collapsed=False).add_to(m)

legend_entries = "\n".join([
    f'<span style="color:{color};">&#9632;</span> {rtype}<br>'
    for rtype, color in sign_types_color_map.items()
])

legend_html = f"""
{{% macro html(this, kwargs) %}}
<div style="
    position: fixed; 
    top: 100px; left: 5px; width: 190px; height: 160px;
    background-color: white;
    border:2px solid grey;
    z-index:9999;
    font-size:14px;
    padding: 10px;
    box-shadow: 2px 2px 6px rgba(0,0,0,0.3);
    line-height: 1.5em;
    ">
    <b>Sign Category</b><br>
    {legend_entries}
</div>
{{% endmacro %}}
"""

legend = MacroElement()
legend._template = Template(legend_html)
m.get_root().add_child(legend)

#m.save("./map.html")
m