In [12]:
import os
import laspy
import numpy as np

def load_and_combine_laz_files(file_paths, subsample_ratio=0.1):
    combined_data = {
        "points": [],
        "intensity": [],
        "classification": [],
        "return_number": [],
        "number_of_returns": [],
    }

    for file_path in file_paths:
        las = laspy.read(file_path)
        num_points = len(las.x)
        sample_size = int(num_points * subsample_ratio)
        indices = np.random.choice(num_points, sample_size, replace=False)

        combined_data["points"].append(
            np.vstack((las.x[indices], las.y[indices], las.z[indices])).T
        )
        combined_data["intensity"].append(las.intensity[indices])
        combined_data["classification"].append(las.classification[indices])
        combined_data["return_number"].append(las.return_number[indices])
        combined_data["number_of_returns"].append(las.number_of_returns[indices])

    for key in combined_data:
        combined_data[key] = np.concatenate(combined_data[key])

    return combined_data


In [13]:
from sklearn.decomposition import PCA
from sklearn.preprocessing import MinMaxScaler

def normalize_and_pca(data, n_components=3):
    # Ensure points are n x 3
    if data["points"].shape[0] == 3:
        data["points"] = data["points"].T

    # Normalize intensity and height
    scaler = MinMaxScaler()
    normalized_intensity = scaler.fit_transform(data["intensity"].reshape(-1, 1))
    normalized_height = scaler.fit_transform(data["points"][:, 2].reshape(-1, 1))

    # Combine features: x, y, z, intensity, height
    features = np.hstack((data["points"], normalized_intensity, normalized_height))
    
    # Apply PCA
    pca = PCA(n_components=n_components)
    reduced_features = pca.fit_transform(features)

    return reduced_features, pca


In [14]:
from scipy.interpolate import griddata

def rasterize_points(data, grid_size=(512, 512), method='nearest'):
    x, y, z = data["points"].T
    intensity = data["intensity"]
    classification = data["classification"]

    grid_x, grid_y = np.linspace(x.min(), x.max(), grid_size[0]), np.linspace(y.min(), y.max(), grid_size[1])
    grid_x, grid_y = np.meshgrid(grid_x, grid_y)

    intensity_map = griddata((x, y), intensity, (grid_x, grid_y), method=method)
    classification_map = griddata((x, y), classification, (grid_x, grid_y), method=method)

    return intensity_map, classification_map


In [15]:
def spatial_train_test_split(data, test_ratio=0.3):
    # Ensure data['points'] is a NumPy array
    if not isinstance(data["points"], np.ndarray):
        data["points"] = np.array(data["points"])

    # Unpack x, y, z coordinates
    x, y, z = data["points"].T

    # Split into spatial regions
    mask = np.random.rand(x.shape[0]) < test_ratio

    # Base train and test data
    train_data = {
        "points": data["points"][~mask],
        "intensity": data["intensity"][~mask],
        "classification": data["classification"][~mask],
    }
    test_data = {
        "points": data["points"][mask],
        "intensity": data["intensity"][mask],
        "classification": data["classification"][mask],
    }

    # Include PCA features if present
    if "pca_features" in data:
        train_data["pca_features"] = data["pca_features"][~mask]
        test_data["pca_features"] = data["pca_features"][mask]

    return train_data, test_data


In [16]:
def extract_patches(data, patch_size=(128, 128)):
    """
    Extract patches from 2D rasterized data.
    
    Parameters:
        data (ndarray): 2D rasterized data.
        patch_size (tuple): Height and width of each patch.
    
    Returns:
        ndarray: Array of patches.
    """
    patches = []
    h, w = data.shape
    patch_h, patch_w = patch_size
    for i in range(0, h - patch_h + 1, patch_h):
        for j in range(0, w - patch_w + 1, patch_w):
            patches.append(data[i:i+patch_h, j:j+patch_w])
    return np.array(patches)


In [33]:
import os
import numpy as np
from keras_unet_collection.models import unet_2d
from sklearn.cluster import KMeans
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import MeanSquaredError

# Step 1: Generate pseudo-labels using clustering
def create_pseudo_labels(data, n_clusters=3):
    """
    Generate pseudo-labels using KMeans clustering on PCA features.
    """
    kmeans = KMeans(n_clusters=n_clusters, random_state=42)
    labels = kmeans.fit_predict(data["pca_features"])
    return labels

# Step 2: Prepare the U-Net model
def build_unet(input_shape, n_clusters):
    """
    Build a U-Net model for unsupervised learning.
    """
    model = unet_2d(
        input_shape,
        filter_num=[64, 128, 256, 512],
        n_labels=n_clusters,
        stack_num_down=2,
        stack_num_up=2,
        activation="ReLU",
        output_activation="Softmax",
        batch_norm=True,
    )
    model.compile(optimizer=Adam(learning_rate=0.001), loss=MeanSquaredError())
    return model

# Step 3: Prepare data for U-Net
def prepare_data_for_unet(data, grid_size=(128, 128), patch_size=(128, 128)):
    """
    Rasterize the point cloud and extract patches.
    """
    intensity_map, _ = rasterize_points(data, grid_size=grid_size)
    patches = extract_patches(intensity_map, patch_size=patch_size)
    patches = patches[..., np.newaxis]  # Add channel dimension
    return patches

base_dir = r"data/"

laz_data_path_list = [
    os.path.join(root, file)
    for root, _, files in os.walk(base_dir)
    for file in files
    if file.endswith(".laz")
]
print("Number of LAZ files:", len(laz_data_path_list))


# Combine and preprocess data
combined_data = load_and_combine_laz_files(laz_data_path_list, subsample_ratio=0.5)

# Normalize and apply PCA
reduced_features, pca_model = normalize_and_pca(combined_data)

# Add PCA features to the combined data
combined_data["pca_features"] = reduced_features

# Create pseudo-labels
pseudo_labels = create_pseudo_labels(combined_data)

# Add pseudo-labels to combined data
combined_data["pseudo_labels"] = pseudo_labels

# Rasterize and prepare patches
unet_input_patches = prepare_data_for_unet(combined_data)

# Build U-Net
input_shape = (128, 128, 1)  # Adjust channels if needed
n_clusters = len(set(pseudo_labels))
unet_model = build_unet(input_shape, n_clusters)

# Step 5: Train the U-Net
# Split patches into train/test if needed
train_ratio = 0.8
num_train = int(len(unet_input_patches) * train_ratio)
X_train = unet_input_patches[:num_train]
X_val = unet_input_patches[num_train:]

# Use patches themselves as both input and target for unsupervised learning
unet_model.fit(
    X_train,
    X_train,  # Target is the same as input for reconstruction
    validation_data=(X_val, X_val),
    epochs=50,
    batch_size=32,
)

# Step 6: Evaluate and save
unet_model.save("unet_unsupervised_model.h5")


Number of LAZ files: 3
Epoch 1/50


ValueError: Unexpected result of `train_function` (Empty logs). Please use `Model.compile(..., run_eagerly=True)`, or `tf.config.run_functions_eagerly(True)` for more information of where went wrong, or file a issue/bug to `tf.keras`.

In [34]:
unet_input_patches = prepare_data_for_unet(combined_data)
print("Number of patches extracted:", len(unet_input_patches))


Number of patches extracted: 1


In [35]:
print("Total patches:", len(unet_input_patches))
num_train = int(len(unet_input_patches) * train_ratio)
print("Number of training patches:", num_train)
print("Number of validation patches:", len(unet_input_patches) - num_train)


Total patches: 1
Number of training patches: 0
Number of validation patches: 1


In [36]:
print("Points shape:", combined_data["points"].shape)
print("Intensity shape:", combined_data["intensity"].shape)


Points shape: (30470117, 3)
Intensity shape: (30470117,)


In [None]:
import matplotlib.pyplot as plt

def visualize_unet_prediction(unet_model, patches, num_examples=5):
    """
    Visualize input patches and their corresponding U-Net predictions.
    
    Parameters:
        unet_model: Trained U-Net model.
        patches: Input patches for prediction.
        num_examples: Number of examples to visualize.
    """
    # Select a random subset of patches to visualize
    indices = np.random.choice(len(patches), num_examples, replace=False)
    selected_patches = patches[indices]
    
    # Predict using the U-Net model
    predictions = unet_model.predict(selected_patches)
    
    # Plot the results
    fig, axes = plt.subplots(num_examples, 2, figsize=(8, 4 * num_examples))
    for i, (input_patch, predicted_patch) in enumerate(zip(selected_patches, predictions)):
        if num_examples > 1:
            ax_input, ax_pred = axes[i]
        else:
            ax_input, ax_pred = axes
        
        # Input patch
        ax_input.imshow(input_patch.squeeze(), cmap="gray")
        ax_input.set_title("Input Patch")
        ax_input.axis("off")
        
        # Predicted patch
        ax_pred.imshow(predicted_patch.squeeze(), cmap="gray")
        ax_pred.set_title("Predicted Patch")
        ax_pred.axis("off")
    
    plt.tight_layout()
    plt.show()

# Example usage
visualize_unet_prediction(unet_model, unet_input_patches)
