# Get curb heights from point clouds

In [None]:
# Standard library and path imports
import set_path
import os
import warnings

# Third-party library imports
import upcp.utils.las_utils as las_utils
import upcp.utils.bgt_utils as bgt_utils
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from shapely.ops import unary_union
from shapely.wkt import dumps
import geopandas as gpd
import folium
import branca.colormap as cm
from tqdm.notebook import tqdm_notebook
tqdm_notebook.pandas()

# Local or project-specific imports
import curb_utils
import poly_utils
import plot_utils
import las_utils as las_utils_extra
import settings as st

if st.my_run == "azure":
    import config_azure as cf
elif st.my_run == "local":
    import config as cf

from shapely.errors import ShapelyDeprecationWarning
warnings.filterwarnings("ignore", category=ShapelyDeprecationWarning)

# 0. Prepare

In [None]:
if st.my_run == "azure":
    os.system('sudo blobfuse /home/azureuser/cloudfiles/code/blobfuse/sidewalk --tmp-path=/mnt/resource/blobfusetmp --config-file=/home/azureuser/cloudfiles/code/blobfuse/fuse_connection_sidewalk.cfg -o attr_timeout=3600 -o entry_timeout=3600 -o negative_timeout=3600 -o allow_other -o nonempty')

In [None]:
# Import areas
df_areas = gpd.read_file(cf.output_pilot_area)
polygon = df_areas.to_crs(crs=st.CRS).unary_union

## Select possible tiles

In [None]:
# Mount ovl container to access point clouds and set folders
if st.my_run == "azure":
    os.system('sudo blobfuse /home/azureuser/cloudfiles/code/blobfuse/ovl --tmp-path=/mnt/resource/blobfusetmp --config-file=/home/azureuser/cloudfiles/code/blobfuse/fuse_connection_ovl.cfg -o attr_timeout=3600 -o entry_timeout=3600 -o negative_timeout=3600 -o allow_other -o nonempty')

In [None]:
# Collect all tiles in polygon area
all_pc_filenames = []
base_folder_bgt = cf.bgt_folder
in_folder_point_clouds = cf.out_folder_point_clouds

for path, subdirs, files in os.walk(in_folder_point_clouds):
    for name in files:
            if '.laz' in name:
                all_pc_filenames.append(os.path.join(path, name))
all_pc_filenames = np.array(all_pc_filenames)
all_pc_tilecodes = np.array([las_utils.get_tilecode_from_filename(filename) for filename in all_pc_filenames])
all_pc_bboxes = np.array([las_utils.get_bbox_from_tile_code(tilecode) for tilecode in all_pc_tilecodes])

all_pc_polygons = [las_utils.get_polygon_from_tile_code(tilecode) for tilecode in all_pc_tilecodes]
gdf_pc_polygons = gpd.GeoDataFrame(geometry=all_pc_polygons)

gdf = gdf_pc_polygons.intersection(polygon)
pc_idxs_in_area_polygon = gdf[~gdf.is_empty].index.to_list()

# filenames, tilecodes and bounding boxes of point clouds in selected area
pc_filenames = all_pc_filenames[pc_idxs_in_area_polygon]
pc_tilecodes = all_pc_tilecodes[pc_idxs_in_area_polygon]
pc_bboxes = all_pc_bboxes[pc_idxs_in_area_polygon]

sns.set()
ax = gdf_pc_polygons.iloc[pc_idxs_in_area_polygon].boundary.plot()
ax = df_areas.boundary.plot(ax=ax)
plt.title('Point clouds in selected area')
plt.show()

## Generate dictionary with point cloud tile information
This contains:
tilecode, filename, point cloud tile polygon, sidewalk polygons, parkeervlak polygons

In [None]:
tiles_bgt_dict = {}
for tilecode, filename, bbox in zip(pc_tilecodes, pc_filenames, pc_bboxes):
    bgt_data_file = base_folder_bgt + '{}.csv'.format(tilecode)

    ((x_min, y_max), (x_max, y_min)) = bbox

    # Create reader for BGT sidewalk part polygons.
    bgt_sidewalk_reader = bgt_utils.BGTPolyReader(bgt_file=bgt_data_file)

    sidewalk_polygons = bgt_sidewalk_reader.filter_tile(
                                tilecode,
                                padding=0, offset=0,
                                merge=False)

    parkeervlak_polygons = bgt_sidewalk_reader.filter_tile(
                                tilecode,
                                padding=0, offset=0,
                                bgt_types=['parkeervlak',],
                                merge=False)

    tiles_bgt_dict[tilecode] = {'filename': filename, 
                                'bbox': bbox, 
                                'tile_polygon': poly_utils.bbox_to_polygon(bbox),
                                'sw_polygons': sidewalk_polygons, 
                                'pkv_polygons': parkeervlak_polygons,
                                'lines_in_tile': None,
                                'potential_crossing_lines': None}

## Add all lines and potential crossing lines to point cloud information dictionary
We split line between sidewalk and road in segments. Next, we create overlapping polygons of these line segments with given buffer along the line.

In [None]:
# Loop over point cloud and generate curb height images
for tilecode, values in tiles_bgt_dict.items():
    sidewalk_polygons = tiles_bgt_dict[tilecode]['sw_polygons']
    parkeervlak_polygons = tiles_bgt_dict[tilecode]['pkv_polygons']

    # Check if point cloud tile holds sidewalk polygons
    if sidewalk_polygons:
        tile_polygon = tiles_bgt_dict[tilecode]['tile_polygon']
        sidewalks_merged = unary_union(sidewalk_polygons)
        road_in_tile = sidewalks_merged.intersection(tile_polygon)
        
        boundary = road_in_tile.boundary
        lines_in_tile = boundary.intersection(tile_polygon.buffer(-0.5))

        # Remove curbs next to parking, these are not accessible
        potential_crossing_lines = lines_in_tile
        for parkeer in parkeervlak_polygons:
            potential_crossing_lines = potential_crossing_lines.difference(parkeer)

        # Add lines to dictionary
        tiles_bgt_dict[tilecode]['lines_in_tile'] = lines_in_tile
        tiles_bgt_dict[tilecode]['potential_crossing_lines'] = potential_crossing_lines

### Set parameters for curb detection

In [None]:
## Tweak parameters

# Cut offs for height categories
min_h = 0.04 # 0.07

# Width of the buffer (to both sides)
buffer_w = 0.15 # 0.5

# Minimum amount of points within polygon to continue
min_nr_points = 200

# length of polygons
distance_delta = 1 # 2

### Calculate curb heights

In [None]:
# Load curb heights csv file
curb_height_df = pd.DataFrame(columns=['line_segm', 'line_segm_polygon', 'overarching_line_segm','curb_height'])
i = 0
# Calculate curb heights and save in csv
for tilecode, values in tqdm_notebook(tiles_bgt_dict.items()):
    i += 1
    potential_crossing_lines = tiles_bgt_dict[tilecode]['potential_crossing_lines']

    if potential_crossing_lines:
        lines_in_tile = tiles_bgt_dict[tilecode]['lines_in_tile']
        potential_crossing_lines = tiles_bgt_dict[tilecode]['potential_crossing_lines']
        filename = tiles_bgt_dict[tilecode]['filename']
        try:
            points, labels = las_utils_extra.read_las(filename)

            if potential_crossing_lines.type == 'LineString':
                potential_crossing_lines = [potential_crossing_lines]

            for line in potential_crossing_lines:

                points_on_line = curb_utils.get_points_on_line(line, distance_delta)
                result = curb_utils.split_line_by_point(line, points_on_line)

                for line_segm in result:
                    poly = line_segm.buffer(0.5, single_sided=True)
                    poly2 = line_segm.buffer(-0.5, single_sided=True)
                    poly_union = unary_union([poly, poly2])

                    if line_segm.length < (distance_delta/2):
                        continue

                    # calc height difference planes of sidewalk and road using point cloud
                    if poly_union.type == 'MultiPolygon':
                        curb_height, available_points = 0, False
                    else:
                        curb_height, available_points = curb_utils.calculate_curb_height(points, labels, poly_union, min_nr_points)
                        height_color = curb_utils.get_height_color(curb_height, available_points, min_h)
                    
                    # add curb information to dataframe
                    new_row = [line_segm, poly_union.wkt, line.wkt, curb_height]
                    curb_height_df.loc[len(curb_height_df)] = new_row
        except Exception:
            continue

### export curb heights as csv and create html map

In [None]:
curb_height_df['geometry'] = curb_height_df['line_segm']
curb_height_gdf = gpd.GeoDataFrame(curb_height_df, crs=st.CRS)
curb_height_gdf['line_segm'] = curb_height_gdf['line_segm'].astype(str)

In [None]:
curb_height_gdf

In [None]:
# set True for satellite background, False for regular background
satellite = False

# Create Folium map
map = folium.Map(
    location=[52.350547922223434, 4.7940192423718443], tiles=plot_utils.generate_map_params(satellite=satellite),
    min_zoom=10, max_zoom=25, zoom_start=17,
    zoom_control=True, control_scale=True, control=False
    )

# Add colormap
cmp = cm.linear.RdYlGn_11.colors
cmp = list(reversed(cmp))
colormap = cm.LinearColormap(colors=cmp, vmin=0, vmax=0.2, caption='Curb height (m)')
colormap.add_to(map)

# Add curb height features
plot_gdf = curb_height_gdf.drop(columns=['line_segm', 'line_segm_polygon', 'overarching_line_segm'])
plot_gdf['tilecode'] = np.floor(gdf.centroid.x/50).astype(int).astype(str) + '_' + np.floor(gdf.centroid.y/50).astype(int).astype(str)
plot_gdf.fillna(100, inplace=True)
json = plot_gdf.to_crs(crs=st.CRS_map).to_json()
feature_names = plot_gdf.columns.tolist()
feature_names.remove('geometry')
tooltip = plot_utils.gen_tooltip(feature_names, feature_names)

folium.GeoJson(json, style_function=lambda feature: {"color": colormap(feature["properties"]['curb_height']) 
               if feature['properties']['curb_height'] != 100 else 'black', 
               "weight": 7 if feature["properties"]['curb_height'] != 100 else 2}, tooltip=tooltip).add_to(map)

map

In [None]:
# Change directory
if st.my_run == "azure":
    os.system('sudo blobfuse /home/azureuser/cloudfiles/code/blobfuse/sidewalk --tmp-path=/mnt/resource/blobfusetmp --config-file=/home/azureuser/cloudfiles/code/blobfuse/fuse_connection_sidewalk.cfg -o attr_timeout=3600 -o entry_timeout=3600 -o negative_timeout=3600 -o allow_other -o nonempty')

# Save curb height data as geodataframe
curb_height_gdf.to_file(cf.output_curb_heigts, driver='GPKG')