# Setup

Ensure all dependencies are installed.


In [1]:
%pip install -U -r requirements.txt

Looking in links: https://download.pytorch.org/whl/torch/, https://download.pytorch.org/whl/torchvision/
Collecting numpy (from -r requirements.txt (line 10))
  Using cached numpy-2.2.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (62 kB)
Note: you may need to restart the kernel to use updated packages.


Check if we have GPU support, and if not, warn the user.


In [2]:
import torch
import warnings

if torch.cuda.is_available():
    print("CUDA is available!")
else:
    warnings.warn("CUDA is not available. Running on CPU.")

CUDA is available!


# Settings

All our settings are here for convenience.


In [3]:
from pole_gen.models import UtilityPoleLabel
import secrets
import numpy as np
import random
import torch
import warnings
from utils.logging import warning_format
import os

# ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤
# ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ No need to modify anything above here! ◢◤ ◢◤ ◢◤ ◢◤ ◢◤
# ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤

debug: bool = True
n_points: int = 1000
classes: list = [l.name for l in UtilityPoleLabel]
n_classes: int = len(classes)
seed: int = secrets.randbits(32)
torch.set_float32_matmul_precision("medium")

# ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤
# ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ No need to modify anything below here! ◢◤ ◢◤ ◢◤ ◢◤ ◢◤
# ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤ ◢◤

if debug:
    os.environ["CUDA_LAUNCH_BLOCKING"] = "1"
    torch.autograd.set_detect_anomaly(True)

random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(seed)
print(f"Seed: {seed}")

warnings.formatwarning = warning_format

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.
Seed: 4209307681


# Preparing our Data


## Training & Evaluation Data

This data is procedurally generated.


In [4]:
import os
from pole_gen.data import generate_data
from models.dataset import PointCloudDataset

out_path: str = os.path.join("data", "train")
train_size = 10000
eval_size = 1000

if not os.path.exists(out_path) or len(os.listdir(out_path)) == 0:
    print("Directory is empty or does not exist. New testing data will be generated.")
    generate_data(
        n_samples=train_size + eval_size,
        n_points=n_points,
        out_dir=out_path,
        jitter=0.02,
    )
else:
    print("Data directory found. Using existing training data.")

file_paths = [os.path.join(out_path, f) for f in os.listdir(out_path)]
generated_dataset = PointCloudDataset(
    file_paths=file_paths,
    n_points=n_points,
    n_classes=n_classes,
)

generated_dataset.validate()

# Split the dataset into training and validation sets
print("Splitting dataset into training and validation sets...")
train_dataset, eval_dataset = torch.utils.data.random_split(
    generated_dataset, [train_size, eval_size]
)

del generated_dataset

print(f"Training dataset size: {len(train_dataset)}")
print(f"Validation dataset size: {len(eval_dataset)}")

Directory is empty or does not exist. New testing data will be generated.


Generating training data:   0%|          | 0/11000 [00:00<?, ?it/s]

Checking dataset...:   0%|          | 0/11000 [00:00<?, ?it/s]

Splitting dataset into training and validation sets...
Training dataset size: 10000
Validation dataset size: 1000


## Testing Data

This data is manually labeled, real-world laser scanned data. We will fetch this remotely if needed.


In [5]:
import os
from electrical_poles.data import download_data
from models.dataset import PointCloudDataset

test_data_path = os.path.join("data", "test")

if not os.path.exists(test_data_path) or len(os.listdir(test_data_path)) == 0:
    print(
        "Testing data directory is empty or does not exist. New testing data will be downloaded."
    )
    download_data(out_dir=test_data_path)
else:
    print("Testing data directory found. Using existing testing data.")

file_paths = [os.path.join(test_data_path, f) for f in os.listdir(test_data_path)]
test_dataset = PointCloudDataset(
    file_paths=file_paths,
    n_points=n_points,
    n_classes=n_classes,
)

test_dataset.validate()

print(f"Testing dataset size: {len(test_dataset)}")

Testing data directory found. Using existing testing data.


Checking dataset...:   0%|          | 0/91 [00:00<?, ?it/s]

Testing dataset size: 91


# Segmentation

Now we can train our segmenter with our data, or load a pre-existing one if needed.


## Training


In [33]:
from ai.pointnet_seg.train import train
from ai.pointnet_seg.model import PointNetSeg
from torch.utils.data import DataLoader
import torch

segmenter_path = "data/pointnet/checkpoints/pointnetmodel.pth"

segmenter = PointNetSeg(classes=n_classes)

if os.path.exists(segmenter_path):
    print("Loading existing model...")
    segmenter.load_state_dict(torch.load(segmenter_path))
    segmenter.eval()
    print("Model loaded.")
else:
    print("Training new model...")
    train(
        pointnet=segmenter,
        optimizer=torch.optim.Adam(segmenter.parameters(), lr=0.001),
        train_data=DataLoader(
            train_dataset, batch_size=32, shuffle=True, num_workers=4
        ),
        eval_data=DataLoader(eval_dataset, batch_size=32, shuffle=False, num_workers=4),
        out_dir="data/pointnet/checkpoints",
    )

Loading existing model...
Model loaded.


## Testing

Test our data on our real-world dataset.


In [34]:
from ai.pointnet_seg.test import test
from torch.utils.data import DataLoader

test(
    model=segmenter,
    test_data=DataLoader(test_dataset, batch_size=1, shuffle=False, num_workers=4),
)

Testing:   0%|          | 0/91 [00:00<?, ?it/s]

Test accuracy: [38;2;255;240;0m78.84%[0m


# Demo

A little section here to show its labeling in action!


In [54]:
import numpy as np
import open3d as o3d
from utils.plot import plot_cloud
from utils.string import format_accuracy

segmenter.eval()
with torch.no_grad():
    inputs, labels = test_dataset[np.random.randint(0, len(test_dataset))]
    inputs = inputs.float()
    outputs, __, __ = segmenter(inputs.unsqueeze(0).transpose(1, 2))
    _, predicted = torch.max(outputs.data, 1)

pc = o3d.t.geometry.PointCloud()
pc.point.positions = o3d.core.Tensor(
    inputs.squeeze(0).numpy(), dtype=o3d.core.Dtype.Float32
)
pc.point.labels = predicted.squeeze(0).numpy()

accuracy = (predicted == labels).sum().item() / n_points
print(f"Accuracy: {format_accuracy(accuracy)}")
plot_cloud(pc, xaxis=[-0.5, 0.5], yaxis=[-0.5, 0.5], zaxis=[-0.5, 0.5])

Accuracy: [38;2;71;255;0m94.40%[0m
