In [None]:
%config Completer.use_jedi = False

In [None]:
# Add project src to path.
import set_path

# Import modules.
import numpy as np
from numba import jit
import time
from sklearn.cluster import DBSCAN
from scipy.stats import binned_statistic_2d

%matplotlib widget
import matplotlib.pyplot as plt
import matplotlib.patches as patches

import src.fusion as fusion
import src.utils.clip_utils as clip_utils
import src.utils.las_utils as las_utils
import src.utils.ahn_utils as ahn_utils
from src.utils.labels import Labels
from src.region_growing import LabelConnectedComp
from src.utils.interpolation import FastGridInterpolator

## Set-up

In [None]:
# Data folder for the BGT fuser.
bgt_data_folder = '../datasets/bgt/'
#bgt_data_file = '../datasets/bgt/bgt_points.csv'
bgt_data_file = '../datasets/bgt/custom_points_demo.csv'
tile_code = '2386_9702'
#tile_code = '2397_9705'

# Building fuser using BGT building footprint data.
bgt_point_fuser = fusion.BGTPointFuser(Labels.TREE, bgt_file=bgt_data_file)

In [None]:
# Extract pole objects from tile.
bgt_points = bgt_point_fuser._filter_tile(tile_code)
trees = [(x, y) for (t, x, y) in bgt_points if t == 'boom']
lights = [(x, y) for (t, x, y) in bgt_points if t == 'lichtmast']
signs = [(x, y) for (t, x, y) in bgt_points if t == 'verkeersbord']

In [None]:
# Load elevation data.
ahn_data_file = '../datasets/ahn/ahn_' + tile_code + '.npz'
ahn_tile = ahn_utils.load_ahn_tile(ahn_data_file)

grid_z = ahn_tile['ground_surface']
avg_ground_height = np.nanmean(grid_z)
#grid_z[grid_z == np.nan] = avg_ground_height

# Create interpolator.
fast_z = FastGridInterpolator(ahn_tile['x'], ahn_tile['y'], grid_z)

In [None]:
# Load labeled and grown LAS file.
las_file = '../datasets/pointcloud/grown_' + tile_code + '.laz'
las = las_utils.read_las(las_file)

In [None]:
# Remove ground and building points.
PRE_MASK = (las.label != Labels.GROUND) & (las.label != Labels.BUILDING)
PRE_MASK_IDS = np.where(PRE_MASK)[0]
points = np.vstack((las.x[PRE_MASK], las.y[PRE_MASK], las.z[PRE_MASK])).T

## Voxel approach

In [None]:
def find_point_cluster(points, point, plane_height, plane_buffer=0.1, search_radius=1, max_dist=0.1, min_points=1):
    search_ids = np.where(clip_utils.cylinder_clip(points, point, search_radius, bottom=plane_height-plane_buffer, top=plane_height+plane_buffer))[0]
    if len(search_ids) < min_points:
        return np.empty((0, 3))
    # Cluster the potential seed points.
    clustering = DBSCAN(eps=0.05, min_samples=5, p=2).fit(points[search_ids])
    # Remove noise points.
    noise_mask = clustering.labels_ != -1
    # Get cluster labels and sizes.
    cc_labels, counts = np.unique(clustering.labels_, return_counts=True)
    if min_points > 1:
        # Only keep clusters with size at least min_points.
        cc_labels = cc_labels[counts >= min_points]
        noise_mask = noise_mask & [l in set(cc_labels) for l in clustering.labels_]
    # Create a list of cluster centers (x,y) and radius r.
    c_xyr_list = []
    for cl in set(cc_labels).difference((-1,)):
        c_mask = clustering.labels_ == cl
        (cx, cy) = np.mean(points[search_ids[c_mask], 0:2], axis=0)
        cr = np.max(np.max(points[search_ids[c_mask], 0:2], axis=0) - np.min(points[search_ids[c_mask], 0:2], axis=0)) / 2
        if (point[0] - cx)**2 + (point[1] - cy)**2 < (cr + max_dist)**2:
            c_xyr_list.append([cx, cy, cr])
    return np.array(c_xyr_list)

In [None]:
def find_seeds_for_point_objects(points, point_objects, fast_z, search_pad=1.5, voxel_res=0.1, seed_height=1.5, min_height=2, min_points=500, z_min=0, z_max=2.5):
    seeds = []
    for ind, obj in enumerate(point_objects):
        # Assume obj = [x, y].
        ground_z = fast_z(np.array([obj]))
        search_box = (obj[0]-search_pad, obj[1]-search_pad, obj[0]+search_pad, obj[1]+search_pad)
        box_ids = np.where(clip_utils.box_clip(points, search_box, bottom=ground_z+z_min, top=ground_z+z_max))[0]
        if len(box_ids) == 0:
            print(f'Empty search box for object {ind}.')
            continue
        x_edge = np.arange(search_box[0], search_box[2] + 0.01, voxel_res)
        y_edge = np.arange(search_box[1], search_box[3] + 0.01, voxel_res)
        min_z_bin = binned_statistic_2d(points[box_ids, 0], points[box_ids, 1], points[box_ids, 2], bins=[x_edge, y_edge], statistic='min')
        max_z_bin = binned_statistic_2d(points[box_ids, 0], points[box_ids, 1], points[box_ids, 2], bins=[x_edge, y_edge], statistic='max')
        #med_z_bin = binned_statistic_2d(points[box_ids, 0], points[box_ids, 1], points[box_ids, 2], bins=[x_edge, y_edge], statistic='median')
        count_z_bin = binned_statistic_2d(points[box_ids, 0], points[box_ids, 1], points[box_ids, 2], bins=[x_edge, y_edge], statistic='count')

        midpoint = ground_z + (z_min + z_max) / 2
        #med_mid = np.abs(med_z_bin.statistic - midpoint) < 0.25
        diff = max_z_bin.statistic - min_z_bin.statistic
        #x_loc, y_loc = np.where((diff > min_height) & (count_z_bin.statistic > min_points) & med_mid)
        x_loc, y_loc = np.where((diff > min_height) & (count_z_bin.statistic > min_points))
        if len(x_loc) == 0:
            print(f'No candidates found for object {ind}.')
            continue
        candidates = np.stack((x_edge[x_loc] + voxel_res/2, y_edge[y_loc] + voxel_res/2)).T
        dist = [np.linalg.norm(np.array(obj) - np.array([c])) for c in candidates]
        c_prime = candidates[np.argmin(dist), :]
        clusters = find_point_cluster(points, c_prime, seed_height)
        if len(clusters) > 0:
            seeds.append(clusters[0])
        else:
            print(f'No cluster found for object {ind}, using default.')
            seeds.append([c_prime[0], c_prime[1], voxel_res])
    return seeds

## Match point objects with clusters

In [None]:
# Optional: re-load labeled and grown LAS file.
# E.g. to try out different settings.
las_file = '../datasets/pointcloud/grown_' + tile_code + '.laz'
las = las_utils.read_las(las_file)

In [None]:
# Cluster radius tends to be too small (although sometimes it isn't) so we multiply.
r_mult = 1.5

label_mask = np.ones((len(points),), dtype=bool)

tree_mask = np.zeros((len(points),), dtype=bool)
light_mask = np.zeros((len(points),), dtype=bool)
sign_mask = np.zeros((len(points),), dtype=bool)

# Match and label trees.
tree_points = find_seeds_for_point_objects(points[label_mask], trees, fast_z, voxel_res=0.2, seed_height=1.75, min_points=500)
for tree in tree_points:
    clip_mask = clip_utils.cylinder_clip(points, tree[0:2], r_mult*tree[2], top=fast_z(np.array([tree[0:2]]))+4)
    tree_mask = tree_mask | clip_mask

label_mask[tree_mask] = False

In [None]:
mask_ids = np.where(label_mask)[0]
light_points = find_seeds_for_point_objects(points[label_mask], lights, fast_z, voxel_res=0.2, seed_height=2.25, min_points=400)
for light in light_points:
    clip_mask = clip_utils.cylinder_clip(points, light[0:2], r_mult*light[2], top=fast_z(np.array([light[0:2]]))+4)
    light_mask = light_mask | clip_mask

label_mask[light_mask] = False

In [None]:
mask_ids = np.where(label_mask)[0]
sign_points = find_seeds_for_point_objects(points[label_mask], signs, fast_z, voxel_res=0.2, seed_height=1.5, min_points=400)
for sign in sign_points:
    clip_mask = clip_utils.cylinder_clip(points, sign[0:2], r_mult*sign[2], top=fast_z(np.array([sign[0:2]]))+3)
    sign_mask = sign_mask | clip_mask

In [None]:
# Create labels.
labels = las.label
labels[PRE_MASK_IDS[tree_mask]] = Labels.TREE
labels[PRE_MASK_IDS[light_mask]] = Labels.STREET_LIGHT
labels[PRE_MASK_IDS[sign_mask]] = Labels.TRAFFIC_SIGN

In [None]:
# Save the result.
out_file = '../datasets/pointcloud/poles_custom_' + tile_code + '.laz'
las_utils.label_and_save_las(las, labels, out_file)

## Visualize results

In [None]:
# Visualize the resulting match.
((x_min, y_max), (x_max, y_min)) = las_utils.get_bbox_from_tile_code(tile_code)
viz_mask = clip_utils.box_clip(points, np.array([x_min, y_min, x_max, y_max]), bottom=avg_ground_height+1.7, top=avg_ground_height+1.8)

fig, ax = plt.subplots(1)
ax.scatter(points[viz_mask, 0], points[viz_mask, 1], c='lightgrey', marker='.')

for tree in tree_points:
    circle = patches.Circle((tree[0], tree[1]), tree[2], linewidth=2, linestyle='-', edgecolor='green', fill=False)
    ax.add_patch(circle)

for light in light_points:
    circle = patches.Circle((light[0], light[1]), light[2], linewidth=2, linestyle='-', edgecolor='purple', fill=False)
    ax.add_patch(circle)

for sign in sign_points:
    circle = patches.Circle((sign[0], sign[1]), sign[2], linewidth=2, linestyle='-', edgecolor='red', fill=False)
    ax.add_patch(circle)

box = patches.Rectangle((x_min, y_min), x_max-x_min, y_max-y_min, linewidth=1, linestyle='--', edgecolor='grey', fill=False)
ax.add_patch(box)

ax.set_xlabel('X')
ax.set_ylabel('Y')
plt.axis('equal')
plt.show()

## TODO Cluster Growing

In [None]:
plane_height = 1.25

height_mask_ids = np.where(points[:,2] > avg_ground_height + plane_height)[0]

In [None]:
# Cluster based region growing.
# Not for trees yet.
lcc_light = LabelConnectedComp(Labels.STREET_LIGHT, exclude_labels=(Labels.TREE, Labels.TRAFFIC_SIGN),
                               octree_level=9, min_component_size=100, threshold=0.1)
lcc_sign = LabelConnectedComp(Labels.TRAFFIC_SIGN, exclude_labels=(Labels.TREE, Labels.STREET_LIGHT),
                              octree_level=9, min_component_size=100, threshold=0.05)

lcc_light_mask = lcc_light.get_label_mask(points=points[height_mask_ids], las_labels=las.label[mask][height_mask_ids])
lcc_sign_mask = lcc_sign.get_label_mask(points=points[height_mask_ids], las_labels=las.label[mask][height_mask_ids])

# Update labels.
labels[mask_ids[height_mask_ids[lcc_light_mask]]] = lcc_light.get_label()
labels[mask_ids[height_mask_ids[lcc_sign_mask]]] = lcc_sign.get_label()