## Junction Detection Techniques

In MLS datasets, road structures are typically less complex, so explicit junction detection is often unnecessary.

However, ALS datasets tend to contain more complex road topology. To simplify downstream mesh generation, especially when color or trajectory information is unavailable, a reliable topology analysis becomes important. Junction detection plays a key role in this process by enabling roads to be segmented into simpler point sets, which improves reconstruction stability and reduces geometric complexity.

### Color based filtering - Radial search + Thresholding + Highlighting

In [None]:
#---------------------------------------------------------------------------------------------<DensityBasedAnalysis>---------------------------||
import numpy as np
import open3d as o3d


"""
Density based analysis. The major intuition is junctions will be having higher point density when compared to roads.
"""
def highlightDenseCenters(pointCloud,searchRadius,minNeighborCount=2000):
    if not pointCloud.has_colors():
        numPoints = np.asarray(pointCloud.points).shape[0]
        pointCloud.colors = o3d.utility.Vector3dVector(
            np.full((numPoints, 3), 0.7)
        )

    colors = np.asarray(pointCloud.colors)
    kdTree = o3d.geometry.KDTreeFlann(pointCloud)
    for pointIndex, point in enumerate(pointCloud.points):
        [_, neighborIndices, _] = kdTree.search_radius_vector_3d(
            point,
            searchRadius
        )

        if len(neighborIndices) > minNeighborCount:
            colors[pointIndex] = [1.0, 0.0, 0.0]  #Red

    pointCloud.colors = o3d.utility.Vector3dVector(colors)

    return pointCloud

In [None]:
inputPath  = "28AN1_16_Road-SmallSnippet.ply"
outputPath = "28AN1_16_Road-SmallSnippet_Centers.ply"

searchRadius      = 10.0 # 12.0
minNeighborCount  = 1400 # 1200 # 2000

pointCloud = o3d.io.read_point_cloud(inputPath)

pointCloudHighlighted = highlightDenseCenters(pointCloud=pointCloud,searchRadius=searchRadius,minNeighborCount=minNeighborCount)

o3d.io.write_point_cloud(outputPath,pointCloudHighlighted,write_ascii=False,compressed=True)

print(f"Saved highlighted point cloud to: {outputPath}")

Saved highlighted point cloud to: 28AN1_16_Road-SmallSnippet_Centers.ply


### Scalar field based filtering - Radial search + Thresholding + Scalar Field (Same script as above, but added it as a scalar field)

In [None]:
#---------------------------------------------------------------------------------------------<DensityBasedAnalysis>---------------------------||

import numpy as np
import open3d as o3d
from datetime import datetime

def computeDensityScalar(pointCloud,searchRadius,densityThreshold=2000):
    points = np.asarray(pointCloud.points)
    kdTree = o3d.geometry.KDTreeFlann(pointCloud)

    densityScalar = np.zeros(points.shape[0], dtype=np.float32)

    for pointIndex, point in enumerate(points):

        [_, neighborIndices, _] = kdTree.search_radius_vector_3d(
            point,
            searchRadius
        )

        neighborCount = len(neighborIndices)
        densityScalar[pointIndex] = min(
            neighborCount / densityThreshold,
            1.0
        )

    return densityScalar

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


In [None]:
def writeAsciiPLYWithScalar(outputPath,points,scalar,scalarName="scalar_Density"):
    numPoints = points.shape[0]
    timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")

    with open(outputPath, "w") as plyFile:

        plyFile.write("ply\n")
        plyFile.write("format ascii 1.0\n")
        plyFile.write("comment Created by Custom Python Script\n")
        plyFile.write(f"comment Created {timestamp}\n")
        plyFile.write("obj_info Generated for CloudCompare compatibility\n")
        plyFile.write(f"element vertex {numPoints}\n")
        plyFile.write("property double x\n")
        plyFile.write("property double y\n")
        plyFile.write("property double z\n")
        plyFile.write(f"property float {scalarName}\n")
        plyFile.write("end_header\n")

        for i in range(numPoints):
            plyFile.write(
                f"{points[i,0]:.6f} "
                f"{points[i,1]:.6f} "
                f"{points[i,2]:.6f} "
                f"{scalar[i]:.6f}\n"
            )

In [None]:
inputPath  = "28AN1_16_Road-SmallSnippet.ply"
outputPath = "28AN1_16_Road-SmallSnippet_Centers.ply"

searchRadius      = 10.0
densityThreshold  = 1400 #1400

pointCloud = o3d.io.read_point_cloud(inputPath)
points = np.asarray(pointCloud.points)

densityScalar = computeDensityScalar(pointCloud=pointCloud,searchRadius=searchRadius,densityThreshold=densityThreshold)

writeAsciiPLYWithScalar(outputPath=outputPath,points=points,scalar=densityScalar,scalarName="scalar_Density")

print(f"Saved ASCII PLY with scalar field to: {outputPath}")

Saved ASCII PLY with scalar field to: 28AN1_16_Road-SmallSnippet_Centers.ply


### Local PCA based thresholding -> Best method

In [None]:
#---------------------------------------------------------------------------------------------<Imports>----------------------------------------||
import numpy as np
import open3d as o3d
from datetime import datetime

#---------------------------------------------------------------------------------------------<PCA based Junction Scalar>----------------------||
def computeJunctionScalarPCA(pointCloud,searchRadius,minNeighborCount=2000):
    # scalar_JunctionScore = lambda_2 / lambda_1 -----> Junction will be having higher value

    points = np.asarray(pointCloud.points)
    kdTree = o3d.geometry.KDTreeFlann(pointCloud)

    junctionScalar = np.zeros(points.shape[0], dtype=np.float32)

    for pointIndex, point in enumerate(points):

        [_, neighborIndices, _] = kdTree.search_radius_vector_3d(
            point,
            searchRadius
        )

        if len(neighborIndices) < minNeighborCount:
            continue

        neighbors = points[neighborIndices]
        centroid = neighbors.mean(axis=0)

        centered = neighbors - centroid
        covariance = np.cov(centered.T)

        eigenValues, _ = np.linalg.eigh(covariance)
        eigenValues = np.sort(eigenValues)[::-1]

        if eigenValues[0] > 0:
            junctionScalar[pointIndex] = eigenValues[1] / eigenValues[0]
        else:
            junctionScalar[pointIndex] = 0.0

    return junctionScalar

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


In [None]:
#---------------------------------------------------------------------------------------------<Write PLY>--------------------------------------||
def writeAsciiPLYWithScalar(outputPath,points,scalar,scalarName):
    numPoints = points.shape[0]
    timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")

    with open(outputPath, "w") as plyFile:

        plyFile.write("ply\n")
        plyFile.write("format ascii 1.0\n")
        plyFile.write("comment Created by Custom PCA Junction Detector\n")
        plyFile.write(f"comment Created {timestamp}\n")
        plyFile.write("obj_info Generated for CloudCompare compatibility\n")
        plyFile.write(f"element vertex {numPoints}\n")
        plyFile.write("property double x\n")
        plyFile.write("property double y\n")
        plyFile.write("property double z\n")
        plyFile.write(f"property float {scalarName}\n")
        plyFile.write("end_header\n")

        for i in range(numPoints):
            plyFile.write(
                f"{points[i,0]:.6f} "
                f"{points[i,1]:.6f} "
                f"{points[i,2]:.6f} "
                f"{scalar[i]:.6f}\n"
            )

In [None]:
%%time

#inputPath  = "28AN1_16_Road-SmallSnippet.ply"
#outputPath = "28AN1_16_Road-SmallSnippet_PCACenters.ply"
inputPath  = "28AN1_16_Road.ply"
outputPath = "28AN1_16_Road-PCACenters.ply"

searchRadius      = 15.0
densityThreshold  = 1400 #1400 for full road #1800 For road snippts

pointCloud = o3d.io.read_point_cloud(inputPath)
points = np.asarray(pointCloud.points)

junctionScalar = computeJunctionScalarPCA(pointCloud=pointCloud,searchRadius=searchRadius,minNeighborCount=densityThreshold)

writeAsciiPLYWithScalar(outputPath=outputPath,points=points,scalar=junctionScalar,scalarName="scalar_JunctionScore")

print(f"Saved PCA-based junction scalar to: {outputPath}")

Saved PCA-based junction scalar to: 28AN1_16_Road-SmallSnippet_PCACenters.ply
CPU times: user 17.2 s, sys: 55.2 ms, total: 17.3 s
Wall time: 17.5 s
