# Hillfort detection with LiDAR data
## Data management

## Table of contents

[Code](#code)

1. [**Initializing and training the model**](#initializing-and-training-the-model)
2. [**Evaluating the model**](#evaluating-the-model)
3. [**Hyperparameter tuning**](#hyperparameter-tuning)
4. [**Results**](#results)

[End](#end)

## Code

### Defined functions

In [1]:
!pip install pandas scikit-learn numpy matplotlib laspy tqdm geopandas

Collecting scikit-learn
  Downloading scikit_learn-1.3.2-cp38-cp38-macosx_10_9_x86_64.whl.metadata (11 kB)
Collecting laspy
  Downloading laspy-2.5.4-py3-none-any.whl.metadata (3.5 kB)
Collecting tqdm
  Downloading tqdm-4.67.1-py3-none-any.whl.metadata (57 kB)
Collecting geopandas
  Downloading geopandas-0.13.2-py3-none-any.whl.metadata (1.5 kB)
Collecting joblib>=1.1.1 (from scikit-learn)
  Downloading joblib-1.4.2-py3-none-any.whl.metadata (5.4 kB)
Collecting threadpoolctl>=2.0.0 (from scikit-learn)
  Downloading threadpoolctl-3.5.0-py3-none-any.whl.metadata (13 kB)
Collecting fiona>=1.8.19 (from geopandas)
  Downloading fiona-1.10.1-cp38-cp38-macosx_10_15_x86_64.whl.metadata (56 kB)
Collecting pyproj>=3.0.1 (from geopandas)
  Downloading pyproj-3.5.0-cp38-cp38-macosx_10_9_x86_64.whl.metadata (28 kB)
Collecting shapely>=1.7.1 (from geopandas)
  Downloading shapely-2.0.6-cp38-cp38-macosx_10_9_x86_64.whl.metadata (7.0 kB)
Collecting click~=8.0 (from fiona>=1.8.19->geopandas)
  Download

In [None]:
# If you have GPU
!pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu124
# Otherwise
!pip3 install torch torchvision torchaudio

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader


ModuleNotFoundError: No module named 'torch'

## Utility Functions

In [None]:
def knn_search(query_pts, support_pts, k):
    """
    Naive CPU-based k-NN search returning indices of nearest neighbors.
    For large-scale data, replace with GPU ops or specialized libs (Faiss, Open3D, etc.).
    
    Args:
        query_pts: (B, N, 3)
        support_pts: (B, M, 3)
        k: int, number of neighbors
    Returns:
        neighbor_idx: (B, N, k) - indices in support_pts of the k nearest neighbors for each point
    """
    # This naive approach does pairwise distance on CPU. Not efficient for large N, M.
    # For demonstration only.
    B, N, _ = query_pts.shape
    _, M, _ = support_pts.shape
    
    # Expand dims to compute pairwise distances
    query_expand = query_pts.unsqueeze(2)  # (B, N, 1, 3)
    support_expand = support_pts.unsqueeze(1)  # (B, 1, M, 3)
    dist = torch.sum((query_expand - support_expand) ** 2, dim=-1)  # (B, N, M)
    
    # topk with largest negative distances = smallest distances
    _, neighbor_idx = torch.topk(dist, k, dim=-1, largest=False)  # (B, N, k)
    return neighbor_idx

def gather_neighborhood_features(features, neighbor_idx):
    """
    Gather neighbor features given neighbor indices.
    Args:
        features: (B, M, C) - feature vectors for M points
        neighbor_idx: (B, N, K) - neighbor indices in [0..M-1]
    Returns:
        neighbor_features: (B, N, K, C)
    """
    B, M, C = features.shape
    B, N, K = neighbor_idx.shape
    
    # Use torch.gather to index the features. We first reshape for convenience.
    # neighbor_idx shape -> (B, N, K)
    # We'll gather along dimension=1 (the second dimension of features).
    # But we need neighbor_idx to broadcast across the channel dimension.
    neighbor_idx_expand = neighbor_idx.unsqueeze(-1).expand(-1, -1, -1, C)  # (B, N, K, C)
    
    # Expand features to (B, M, 1, C) so we can gather
    features_expand = features.unsqueeze(1).expand(-1, N, -1, -1)  # (B, N, M, C)
    
    neighbor_features = torch.gather(features_expand, 2, neighbor_idx_expand)  # (B, N, K, C)
    return neighbor_features

## RandLA-Net Building Blocks

In [None]:
class MLP(nn.Module):
    """A simple multi-layer perceptron block with [Conv1d -> BN -> ReLU], repeated."""
    def __init__(self, in_channels, out_channels_list):
        super().__init__()
        layers = []
        c_prev = in_channels
        for c_out in out_channels_list:
            layers.append(nn.Conv1d(c_prev, c_out, kernel_size=1, bias=False))
            layers.append(nn.BatchNorm1d(c_out))
            layers.append(nn.ReLU(inplace=True))
            c_prev = c_out
        self.mlp = nn.Sequential(*layers)
    
    def forward(self, x):
        # x shape: (B, in_channels, N)
        return self.mlp(x)

class LocalFeatureAggregation(nn.Module):
    """
    The key RandLA-Net module. 
    1. Neighborhood search
    2. MLP to encode local geometry
    3. Attentive Pooling
    4. Another MLP
    """
    def __init__(self, in_channels, out_channels):
        """
        Args:
            in_channels: number of input channels
            out_channels: number of output channels (e.g. 64, 128, etc.)
        """
        super().__init__()
        self.mlp1 = MLP(in_channels + 3, [out_channels//2, out_channels])  # incorporate relative positions
        self.attention_mlp = MLP(out_channels, [out_channels//4, out_channels//4, out_channels//4, 1])  # to get attention score
        self.mlp2 = MLP(out_channels, [out_channels, out_channels])
        
    def forward(self, pts, features, neighbor_idx):
        """
        Args:
            pts: (B, N, 3) coordinates of the "query" set
            features: (B, N, C) features for each point
            neighbor_idx: (B, N, K) the neighbor indices
        Returns:
            updated_features: (B, N, out_channels)
        """
        B, N, C_in = features.shape
        K = neighbor_idx.shape[-1]
        
        # Gather neighbor points and features
        neighbor_xyz = gather_neighborhood_features(pts, neighbor_idx)  # (B, N, K, 3)
        neighbor_feats = gather_neighborhood_features(features, neighbor_idx)  # (B, N, K, C_in)
        
        # Compute relative positions: [neighbor_xyz - center_xyz]
        center_xyz_expand = pts.unsqueeze(2).expand(-1, -1, K, -1)  # (B, N, K, 3)
        relative_xyz = neighbor_xyz - center_xyz_expand  # (B, N, K, 3)
        
        # Concatenate neighbor_feats with relative_xyz
        concat_feats = torch.cat([neighbor_feats, relative_xyz], dim=-1)  # (B, N, K, C_in + 3)

        # Reshape for MLP1: (B, (C_in+3), N*K)
        concat_feats = concat_feats.permute(0,1,3,2).contiguous()  # -> (B, N, C_in+3, K)
        concat_feats = concat_feats.view(B, N, -1)                  # Flatten last two dims
        concat_feats = concat_feats.permute(0,2,1)                  # -> (B, C_in+3, N*K)? Actually let's do this differently:

        # Actually, let's do an MLP over the neighbor dimension, so let's keep shape as:
        #    (B, C_in+3, N*K). But that means we need to reshape carefully.
        concat_feats = concat_feats.view(B, -1, N*K)  # (B, C_in+3, N*K)

        # MLP1
        local_feats = self.mlp1(concat_feats)  # (B, out_channels, N*K)
        
        # Reshape back to group by neighbors
        local_feats = local_feats.view(B, -1, N, K)  # (B, out_channels, N, K)

        # Attentive pooling
        # attention scores = MLP(local_feats) -> shape (B, 1, N, K)
        att_scores = self.attention_mlp(local_feats)  # (B, 1, N, K)
        att_scores = F.softmax(att_scores, dim=-1)    # softmax across K neighbors

        # Weighted sum
        aggregated_feats = torch.sum(local_feats * att_scores, dim=-1)  # (B, out_channels, N)

        # MLP2
        updated_feats = self.mlp2(aggregated_feats)  # (B, out_channels, N)
        updated_feats = updated_feats.permute(0,2,1) # (B, N, out_channels)

        return updated_feats


class LocalSamplingLayer(nn.Module):
    """
    Random sampling layer. 
    For RandLA-Net, you typically downsample the points by a factor (e.g., 4).
    """
    def __init__(self, ratio=0.25):
        super().__init__()
        self.ratio = ratio

    def forward(self, pts, features):
        """
        Randomly sample a subset of points (and corresponding features).
        Args:
            pts: (B, N, 3)
            features: (B, N, C)
        Returns:
            sub_pts: (B, N_sub, 3)
            sub_features: (B, N_sub, C)
        """
        B, N, _ = pts.shape
        N_sub = int(N * self.ratio)

        # Random sample indices
        idx = torch.randperm(N)[:N_sub].to(pts.device)  # shape: (N_sub,)
        # But to keep it consistent across the batch, we do this differently for each batch sample 
        # (though for demonstration, this might suffice for B=1).
        
        # More correct approach is to do random sampling per batch:
        sub_pts_list = []
        sub_features_list = []
        for b in range(B):
            perm = torch.randperm(N, device=pts.device)
            chosen_idx = perm[:N_sub]
            sub_pts_list.append(pts[b, chosen_idx, :].unsqueeze(0))
            sub_features_list.append(features[b, chosen_idx, :].unsqueeze(0))
        sub_pts = torch.cat(sub_pts_list, dim=0)
        sub_features = torch.cat(sub_features_list, dim=0)
        
        return sub_pts, sub_features


## RandLANet

In [None]:

class RandLANet(nn.Module):
    """
    A simplified RandLA-Net architecture:
      - 4 hierarchical stages of downsampling + LocalFeatureAggregation
      - Then upsampling with skip connections
      - Output segmentation head
    """
    def __init__(self, num_classes=10, num_neighbors=16):
        super().__init__()
        self.num_neighbors = num_neighbors

        # Downsample ratios
        self.samplings = nn.ModuleList([
            LocalSamplingLayer(ratio=0.25),  # DS1: 1/4
            LocalSamplingLayer(ratio=0.25),  # DS2: 1/4
            LocalSamplingLayer(ratio=0.25),  # DS3: 1/4
            LocalSamplingLayer(ratio=0.25),  # DS4: 1/4
        ])
        
        # Each stage: LocalFeatureAggregation
        # The paper uses channels progression: 16->64->128->256->512, etc. 
        self.encoders = nn.ModuleList([
            LocalFeatureAggregation(in_channels=3,   out_channels=32),   # Stage1
            LocalFeatureAggregation(in_channels=32,  out_channels=64),   # Stage2
            LocalFeatureAggregation(in_channels=64,  out_channels=128),  # Stage3
            LocalFeatureAggregation(in_channels=128, out_channels=256),  # Stage4
        ])

        # Decoding / Upsampling path
        # A typical approach is nearest-neighbor interpolation to upsample the sub_points
        # and a LocalFeatureAggregation or MLP to fuse skip features.
        self.decoders = nn.ModuleList([
            MLP(256+128, [256, 128]),
            MLP(128+64, [128, 64]),
            MLP(64+32, [64, 32]),
            MLP(32+32, [32, 32]),
        ])

        # Final segmentation head
        self.classifier = nn.Sequential(
            nn.Conv1d(32, 32, 1),
            nn.BatchNorm1d(32),
            nn.ReLU(),
            nn.Conv1d(32, num_classes, 1)
        )

    def _nearest_interpolate(self, sub_pts, pts, sub_feats):
        """
        Nearest neighbor interpolation from sub_pts to pts.
        sub_pts: (B, N_sub, 3)
        pts: (B, N, 3)
        sub_feats: (B, N_sub, C)
        returns: up_feats (B, N, C)
        """
        # For each point in pts, find its nearest neighbor in sub_pts
        # Then gather the features from sub_feats
        neighbor_idx = knn_search(pts, sub_pts, k=1)  # (B, N, 1)
        # gather sub_feats
        B, N, _ = neighbor_idx.shape
        C = sub_feats.shape[-1]
        neighbor_idx_expand = neighbor_idx.expand(-1, -1, C)  # (B, N, C)
        up_feats = torch.gather(sub_feats, 1, neighbor_idx_expand)  # (B, N, C)
        return up_feats

    def forward(self, pts):
        """
        Args:
            pts: shape (B, N, 3) - input point coordinates. 
                 For RandLA-Net, we might also have features like color or intensity, but let's keep it simple.
        Returns:
            logits: (B, num_classes, N)
        """
        B, N, _ = pts.shape
        
        # Initially, features = coordinates
        feats = pts.clone()   # (B, N, 3)

        # Lists to store intermediate results for skip connections
        pts_list    = [pts]
        feats_list  = [feats]

        # ---------------- Encoder ----------------
        # Stage 1
        neighbor_idx = knn_search(pts_list[-1], pts_list[-1], self.num_neighbors)
        feats_down = self.encoders[0](pts_list[-1], feats_list[-1], neighbor_idx)
        sub_pts, sub_feats = self.samplings[0](pts_list[-1], feats_down)
        pts_list.append(sub_pts)
        feats_list.append(sub_feats)

        # Stage 2
        neighbor_idx = knn_search(pts_list[-1], pts_list[-1], self.num_neighbors)
        feats_down = self.encoders[1](pts_list[-1], feats_list[-1], neighbor_idx)
        sub_pts, sub_feats = self.samplings[1](pts_list[-1], feats_down)
        pts_list.append(sub_pts)
        feats_list.append(sub_feats)

        # Stage 3
        neighbor_idx = knn_search(pts_list[-1], pts_list[-1], self.num_neighbors)
        feats_down = self.encoders[2](pts_list[-1], feats_list[-1], neighbor_idx)
        sub_pts, sub_feats = self.samplings[2](pts_list[-1], feats_down)
        pts_list.append(sub_pts)
        feats_list.append(sub_feats)

        # Stage 4
        neighbor_idx = knn_search(pts_list[-1], pts_list[-1], self.num_neighbors)
        feats_down = self.encoders[3](pts_list[-1], feats_list[-1], neighbor_idx)
        sub_pts, sub_feats = self.samplings[3](pts_list[-1], feats_down)
        pts_list.append(sub_pts)
        feats_list.append(sub_feats)

        # ---------------- Decoder ----------------
        # We upsample from stage 4 -> 3, fuse features
        for stage_idx in range(4):
            i = 4 - stage_idx - 1  # if stage_idx=0 -> i=3, if stage_idx=1-> i=2, ...
            sub_pts = pts_list[i+1]
            sub_feats = feats_list[i+1]
            up_pts = pts_list[i]
            up_feats_pre = feats_list[i]  # skip connection

            # nearest interpolation from sub_pts to up_pts
            up_feats_post = self._nearest_interpolate(sub_pts, up_pts, sub_feats)
            # fuse
            fused_feats = torch.cat([up_feats_pre, up_feats_post], dim=-1).transpose(1,2)  # (B, C', N)
            fused_feats = self.decoders[stage_idx](fused_feats)  # (B, X, N)
            feats_list[i] = fused_feats.transpose(1,2)  # (B, N, X)

        # After final decoder, feats_list[0] has shape (B, N, 32)
        final_feats = feats_list[0].transpose(1,2)  # (B, 32, N)

        # Classification head
        logits = self.classifier(final_feats)  # (B, num_classes, N)

        return logits
    

In [None]:
laz_file_dir = '../data/downsampled_class_lazFiles/'

# Step 1: Get the list of LiDAR files
all_files = [os.path.join(laz_file_dir, f) for f in os.listdir(laz_file_dir) if f.endswith('.laz')]

# Step 2: Split the files into train, validation, and test sets
train_files, test_files = sk.model_selection.train_test_split(all_files, test_size=0.2, random_state=42)  # 20% for testing
train_files, val_files = sk.model_selection.train_test_split(train_files, test_size=0.1, random_state=42)  # 10% of train for validation

print(f"Number of files - Train: {len(train_files)}, Validation: {len(val_files)}, Test: {len(test_files)}")

# Step 3: Load and group data based on the splits
def load_grouped_data(file_list):
    X, y = [], []
    for file in file_list:
        las = laspy.read(file)
        xyz = las.xyz
        labels = (las.points.array['classification'] == 12).astype(int)  # Hillfort class
        X.append(xyz)
        y.append(labels)
    return X, y

# Load data for each split
X_train, y_train = load_grouped_data(train_files)
X_val, y_val = load_grouped_data(val_files)
X_test, y_test = load_grouped_data(test_files)

# Optionally, combine all points into single arrays for each split
X_train_combined = np.vstack(X_train)
y_train_combined = np.concatenate(y_train)

X_val_combined = np.vstack(X_val)
y_val_combined = np.concatenate(y_val)

X_test_combined = np.vstack(X_test)
y_test_combined = np.concatenate(y_test)

# Print final shapes
print("Train data shape:", X_train_combined.shape, y_train_combined.shape)
print("Validation data shape:", X_val_combined.shape, y_val_combined.shape)
print("Test data shape:", X_test_combined.shape, y_test_combined.shape)


Number of files - Train: 93, Validation: 11, Test: 27
Train data shape: (7141378, 3) (7141378,)
Validation data shape: (731422, 3) (731422,)
Test data shape: (1946003, 3) (1946003,)


In [None]:
print(np.unique(y_train_combined))
print(np.unique(y_val_combined))
print(np.unique(y_test_combined))

[0 1]
[0 1]
[0 1]


In [28]:
class_weights = sk.utils.class_weight.compute_class_weight(class_weight='balanced', classes=np.unique(y_train_combined), y=y_train_combined)
class_weights_tensor = torch.tensor(class_weights, dtype=torch.float32).cuda() if device.type == "cuda" else torch.tensor(class_weights, dtype=torch.float32)
criterion = torch.nn.NLLLoss(weight=class_weights_tensor)
print(class_weights)

[ 0.5103787  24.58779661]


In [29]:
train_X = torch.from_numpy(X_train_combined)
train_y = torch.from_numpy(y_train_combined).long()
val_X = torch.from_numpy(X_val_combined)
val_y = torch.from_numpy(y_val_combined).long()
test_X = torch.from_numpy(X_test_combined)
test_y = torch.from_numpy(y_test_combined).long()

In [None]:
class PointCloudSegDataset(Dataset):
    def __init__(self, xyz, labels, num_points=4096):
        """
        xyz: (total_points, 3)
        labels: (total_points,)
        num_points: number of points in each chunk or random sample
        """
        self.xyz = xyz
        self.labels = labels
        self.num_points = num_points

    def __len__(self):
        # Instead of each point being an item, let's define how many items we want:
        # For demonstration, treat each call as a random sample from the entire set.
        # We'll just artificially define a length, e.g. 1000 "epochs" worth of random samples.
        return 1000  

    def __getitem__(self, idx):
        # Randomly sample 'num_points' indices
        total_points = self.xyz.shape[0]
        rand_indices = torch.randperm(total_points)[:self.num_points]

        pts = self.xyz[rand_indices, :]         # shape (num_points, 3)
        seg_labels = self.labels[rand_indices]  # shape (num_points,)

        # Return as (B=1, N, 3) inside the batch, but DataLoader will batch multiple samples
        # Typically we just return the raw arrays here; DataLoader collates them into a batch
        return {
            'points': pts,          # (num_points, 3)
            'labels': seg_labels    # (num_points,)
        }

In [None]:
num_points = 4096  # RandLA-Net uses bigger chunks typically (e.g. 4k..8k)
train_dataset = PointCloudSegDataset(train_X, train_y, num_points=num_points)
val_dataset   = PointCloudSegDataset(val_X,   val_y,   num_points=num_points)
test_dataset  = PointCloudSegDataset(test_X,  test_y,  num_points=num_points)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, drop_last=True)
val_loader   = DataLoader(val_dataset,   batch_size=8, shuffle=False)
test_loader  = DataLoader(test_dataset,  batch_size=8, shuffle=False)


### Initializing and training the model

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

model = RandLANet(num_classes=2, num_neighbors=16).to(device)

### Loss and optimizer

In [None]:
import torch.optim as optim

# For segmentation, each point is classified among 'num_classes'.
# If you're weighting classes, you can pass a weight tensor here too:
criterion = nn.CrossEntropyLoss(weight=class_weights_tensor).to(device)

optimizer = optim.Adam(model.parameters(), lr=1e-3, betas=(0.9, 0.99))


### Training loop

In [None]:
num_epochs = 5

for epoch in range(num_epochs):
    model.train()
    total_loss = 0.0
    for batch_idx, batch_data in enumerate(train_loader):
        pts  = batch_data['points'].float().to(device)   # (B, N, 3)
        seg  = batch_data['labels'].long().to(device)    # (B, N)

        optimizer.zero_grad()
        logits = model(pts)  # shape (B, num_classes, N)

        # Reshape to compute CE loss: 
        # CrossEntropy expects (B*N, num_classes) and (B*N) for labels
        logits = logits.permute(0, 2, 1).contiguous()   # (B, N, num_classes)
        loss = criterion(logits.view(-1, 2), seg.view(-1))

        loss.backward()
        optimizer.step()

        total_loss += loss.item()

        if (batch_idx+1) % 10 == 0:
            print(f"Epoch [{epoch+1}/{num_epochs}], Step [{batch_idx+1}/{len(train_loader)}], Loss: {total_loss/10:.4f}")
            total_loss = 0.0

    # Validation step (optional)
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for batch_data in val_loader:
            pts  = batch_data['points'].float().to(device)
            seg  = batch_data['labels'].long().to(device)
            logits = model(pts)
            logits = logits.permute(0, 2, 1)
            loss = criterion(logits.view(-1, 2), seg.view(-1))
            val_loss += loss.item()
    val_loss /= len(val_loader) if len(val_loader) > 0 else 1
    print(f"Validation Loss after epoch {epoch+1}: {val_loss:.4f}")


### Evaluating the model

In [None]:
model.eval()
test_loss = 0.0
correct = 0
total_points = 0
with torch.no_grad():
    for batch_data in test_loader:
        pts = batch_data['points'].float().to(device)
        seg = batch_data['labels'].long().to(device)
        logits = model(pts)
        logits = logits.permute(0,2,1)  # (B,N,2)
        loss = criterion(logits.view(-1, 2), seg.view(-1))
        test_loss += loss.item()

        # Predictions
        preds = logits.argmax(dim=-1)  # (B, N)
        correct += (preds == seg).sum().item()
        total_points += seg.numel()

test_loss /= len(test_loader)
accuracy = 100.0 * correct / total_points
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {accuracy:.2f}%")


### Hyperparameter tuning

### Results

Notes & Caveats
Naive k-NN:
The included knn_search is CPU-based and scales poorly for large 
𝑁
N. For production or large LiDAR scans, you’ll want a GPU-accelerated neighbor search library (e.g., Faiss, Open3D, or a custom CUDA kernel).
Random sampling:
Our LocalSamplingLayer and the dataset approach both do random sampling. This might be redundant or suboptimal. RandLA-Net typically does a structured approach (like farthest point sampling) for each hierarchical stage. The code here is just a demonstration.
Memory:
If you have millions of points, you may need advanced sampling strategies or chunking the point cloud into smaller tiles.
Hyperparameters:
Adjust num_points, batch size, and learning rate for your dataset. RandLA-Net typically uses big mini-batches of 4k–8k points each.
With the dataset + training loop above, you should now be able to initialize and train your simplified RandLA-Net model on LiDAR data to perform binary segmentation (hillfort vs. non-hillfort) at the point level.

## End