In [2]:
from pointnet2_utils import PointNetSetAbstraction, PointNetSetAbstractionMsg, PointNetFeaturePropagation, Attention, GLUBlock, LightHead
import torch.nn as nn
import torch.nn.functional as F
import torch
from torch.utils.data import Dataset, DataLoader
import os
import numpy as np
import torch
import collections
from scipy.linalg import expm,norm








In [3]:
print(torch.__version__)

2.7.1+cpu


In [4]:
# helper functions:
def farthest_point_sample(xyz, npoint):
    batch_size, n, _ = xyz.shape
    device = xyz.device
    centroids = torch.zeros(batch_size, npoint, dtype=torch.long).to(device)
    distance = torch.ones(batch_size, n).to(device) * 1e10
    farthest = torch.randint(0, n, (batch_size,), dtype=torch.long).to(device)
    batch_indices = torch.arange(batch_size, dtype=torch.long).to(device)

    for i in range(npoint):
        centroids[:, i] = farthest
        centroid = xyz[batch_indices, farthest, :].unsqueeze(1)  # [B, 1, 3]
        dist = torch.sum((xyz - centroid) ** 2, -1)  # Squared distance [B, N]
        mask = dist < distance
        distance[mask] = dist[mask]
        farthest = torch.max(distance, -1)[1]  # Index of the farthest point

    return centroids


def gather_points(xyz, idx):
    batch_size, n, _ = xyz.shape

    if idx.dim() == 2:  # Case 1: [B, npoint]
        _, npoint = idx.shape
        idx = idx.view(-1)  # Flatten indices for batch processing
        gathered_xyz = xyz.reshape(batch_size * n, -1)[idx, :]  # Gather points
        gathered_xyz = gathered_xyz.reshape(batch_size, npoint, -1)  # Reshape back
    elif idx.dim() == 3:  # Case 2: [B, npoint, nsample]
        _, npoint, nsample = idx.shape
        idx_base = torch.arange(0, batch_size, device=xyz.device).view(-1, 1, 1) * n
        idx = idx + idx_base  # Flatten indices for batch processing
        idx = idx.reshape(-1)  # Flatten completely
        gathered_xyz = xyz.reshape(batch_size * n, -1)[idx, :]  # Gather points
        gathered_xyz = gathered_xyz.reshape(batch_size, npoint, nsample, -1)  # Reshape back

    return gathered_xyz


def query_and_group(xyz, new_xyz, points, radius, nsample):
    B, N, _ = xyz.shape
    _, npoint, _ = new_xyz.shape

    # Compute squared distances between sampled points and all points
    sqrdists = square_distance(new_xyz, xyz)  # [B, npoint, N]

    # Find indices of the nearest neighbors
    group_idx = sqrdists.argsort(dim=-1)[:, :, :nsample]  # [B, npoint, nsample]

    # Gather the grouped xyz coordinates
    grouped_xyz = gather_points(xyz, group_idx)  # [B, npoint, nsample, 3]
    grouped_xyz = grouped_xyz - new_xyz.unsqueeze(2)  # Local coordinates [B, npoint, nsample, 3]

    if points is not None:
        grouped_points = gather_points(points.transpose(1, 2), group_idx).permute(0, 3, 2, 1)  # [B, C, nsample, npoint]
        new_points = torch.cat([grouped_xyz.permute(0, 3, 2, 1), grouped_points], dim=1)  # [B, C+3, nsample, npoint]
    else:
        new_points = grouped_xyz.permute(0, 3, 2, 1)  # [B, 3, nsample, npoint]

    return new_points


def square_distance(src, dst):
    B, N, _ = src.shape
    _, M, _ = dst.shape
    dist = -2 * torch.matmul(src, dst.permute(0, 2, 1))  # [B, N, M]
    dist += torch.sum(src ** 2, -1).view(B, N, 1)
    dist += torch.sum(dst ** 2, -1).view(B, 1, M)
    return dist


class SetAbstraction(nn.Module):
    def __init__(self, npoint, radius, nsample, in_channel, mlp):
        super(SetAbstraction, self).__init__()
        self.npoint = npoint
        self.radius = radius
        self.nsample = nsample

        # Include x, y, z coordinates in the input channel count
        last_channel = in_channel + 3  # Add (x, y, z)
        self.mlp_convs = nn.ModuleList()
        self.mlp_bns = nn.ModuleList()
        for out_channel in mlp:
            self.mlp_convs.append(nn.Conv2d(last_channel, out_channel, 1))  # Conv2D expects [B, C_in, nsample, npoint]
            self.mlp_bns.append(nn.BatchNorm2d(out_channel))
            last_channel = out_channel

    def forward(self, xyz, points):
        if self.npoint is not None:
            idx = farthest_point_sample(xyz, self.npoint)  # [B, npoint]
            new_xyz = gather_points(xyz, idx)  # [B, npoint, 3]
        else:
            new_xyz = xyz  # Use all points if npoint is None

        grouped_points = query_and_group(xyz, new_xyz, points, self.radius, self.nsample)  # [B, C+3, nsample, npoint]
        for i, conv in enumerate(self.mlp_convs):
            grouped_points = F.relu(self.mlp_bns[i](conv(grouped_points)))  # [B, out_channel, nsample, npoint]

        # Max pooling over nsample dimension
        new_points = torch.max(grouped_points, 2)[0]  # [B, mlp[-1], npoint]
        return new_xyz, new_points
    
    
class Attention(nn.Module):
    """
    Lớp self-attention cho đặc trưng điểm trong point cloud.
    Dùng sau mỗi tầng Set Abstraction để tăng hiệu suất mô hình học hình học.
    Input: [B, C, N] (batch, channel, num_points)
    Output: [B, C, N]
    """
    def __init__(self, in_channels, heads=4):
        super(Attention, self).__init__()
        self.in_channels = in_channels
        self.heads = heads
        self.dk = in_channels // heads
        assert in_channels % heads == 0, "in_channels phải chia hết cho số heads"
        self.query = nn.Conv1d(in_channels, in_channels, 1)
        self.key = nn.Conv1d(in_channels, in_channels, 1)
        self.value = nn.Conv1d(in_channels, in_channels, 1)
        self.proj = nn.Conv1d(in_channels, in_channels, 1)

    def forward(self, x):
        # x: [B, C, N]
        B, C, N = x.shape
        Q = self.query(x).view(B, self.heads, self.dk, N)  # [B, heads, dk, N]
        K = self.key(x).view(B, self.heads, self.dk, N)
        V = self.value(x).view(B, self.heads, self.dk, N)
        attn = torch.einsum('bhdk,bhdk->bhdn', Q, K) / (self.dk ** 0.5)  # [B, heads, N, N]
        attn = torch.softmax(attn, dim=-1)
        out = torch.einsum('bhdn,bhdn->bhdk', attn, V)  # [B, heads, dk, N]
        out = out.contiguous().view(B, C, N)
        out = self.proj(out)
        return out + x  # residual
    

class GLUBlock(nn.Module):
    def __init__(self, in_dim, out_dim):
        super().__init__()
        self.linear_main = nn.Linear(in_dim, out_dim)
        self.linear_gate = nn.Linear(in_dim, out_dim)
    
    def forward(self, x):
        return self.linear_main(x) * torch.sigmoid(self.linear_gate(x))

# Define PointNet++ model
class PointNetPlusPlus(nn.Module):
    def __init__(self, num_classes):
        super(PointNetPlusPlus, self).__init__()

        # Set Abstraction layers
        self.sa1 = SetAbstraction(npoint=512, radius=0.2, nsample=32, in_channel=0, mlp=[64, 64, 128])
        self.sa2 = SetAbstraction(npoint=128, radius=0.4, nsample=64, in_channel=128, mlp=[128, 128, 256])
        self.sa3 = SetAbstraction(npoint=None, radius=None, nsample=None, in_channel=256, mlp=[256, 512, 1024])

        # Fully connected layers for classification
        self.light_head = LightHead(in_dim=1024, num_classes=num_classes)

    def forward(self, xyz):
        batch_size, _, _ = xyz.shape

        # Hierarchical feature extraction
        l1_xyz, l1_points = self.sa1(xyz, None)       # Layer 1: [B, 512, 128]
        l2_xyz, l2_points = self.sa2(l1_xyz, l1_points)  # Layer 2: [B, 128, 256]
        _, l3_points = self.sa3(l2_xyz, l2_points)    # Layer 3: [B, 1024, npoint]
        # Fully connected layers
        x = self.light_head(l3_points)
        return F.log_softmax(x, dim=1)






In [4]:
def extract_unique_labels(label_dir):
    unique_labels = set()
    for label_file in os.listdir(label_dir):
        if label_file.endswith('.txt'):
            with open(os.path.join(label_dir, label_file), 'r') as file:
                for line in file:
                    parts = line.strip().split()
                    unique_labels.add(parts[0])  # Add the label (Class)
    return sorted(unique_labels)

In [6]:
class KittiPointCloudDataset(Dataset):
    def __init__(self, velodyne_dir, label_dir, label_to_id, num_points=1024):
        self.velodyne_dir = velodyne_dir
        self.label_dir = label_dir
        self.label_to_id = label_to_id
        self.num_points = num_points


        # List all velodyne files and label files
        self.velodyne_files = sorted([f for f in os.listdir(velodyne_dir) if f.endswith('.bin')])
        self.label_files = sorted([f for f in os.listdir(label_dir) if f.endswith('.txt')])

        # Ensure point clouds and labels align
        assert len(self.velodyne_files) == len(self.label_files), "Mismatch between point clouds and labels"

    def __len__(self):
        return len(self.velodyne_files)

    def __getitem__(self, idx):
        # Load point cloud
        pc_file = os.path.join(self.velodyne_dir, self.velodyne_files[idx])
        point_cloud = self.load_point_cloud(pc_file)

        # Downsample or pad point cloud
        if len(point_cloud) > self.num_points:
            idxs = np.random.choice(len(point_cloud), self.num_points, replace=False)
        else:
            idxs = np.random.choice(len(point_cloud), self.num_points, replace=True)
        point_cloud = point_cloud[idxs]
        # Load and parse labels
        label_file = os.path.join(self.label_dir, self.label_files[idx])
        labels = self.parse_labels(label_file)

        return torch.tensor(point_cloud, dtype=torch.float32), torch.tensor(labels, dtype=torch.long)

    def load_point_cloud(self, file_path):
        """Load point cloud from .bin file"""
        point_cloud = np.fromfile(file_path, dtype=np.float32).reshape(-1, 4)  # x, y, z, intensity
        return point_cloud[:, :3]  # Use only x, y, z

    def parse_labels(self, label_file):
        """Parse label file and map labels to IDs using label_to_id"""
        with open(label_file, 'r') as file:
            for line in file:
                parts = line.strip().split()
                obj_class = parts[0]
                if obj_class in self.label_to_id:
                    return self.label_to_id[obj_class]
        return self.label_to_id.get('DontCare', 0)  # Default to DontCare (0) if label is missing

class KittiTestDataset(Dataset):
    def __init__(self, velodyne_dir, num_points=1024):
        self.velodyne_dir = velodyne_dir
        self.num_points = num_points

        # List all velodyne files
        self.velodyne_files = sorted([f for f in os.listdir(velodyne_dir) if f.endswith('.bin')])

    def __len__(self):
        return len(self.velodyne_files)

    def __getitem__(self, idx):
        # Load point cloud
        pc_file = os.path.join(self.velodyne_dir, self.velodyne_files[idx])
        point_cloud = self.load_point_cloud(pc_file)

        # Downsample or pad point cloud
        if len(point_cloud) > self.num_points:
            idxs = np.random.choice(len(point_cloud), self.num_points, replace=False)
        else:
            idxs = np.random.choice(len(point_cloud), self.num_points, replace=True)
        point_cloud = point_cloud[idxs]

        return torch.tensor(point_cloud, dtype=torch.float32)

    def load_point_cloud(self, file_path):
        """Load point cloud from .bin file"""
        point_cloud = np.fromfile(file_path, dtype=np.float32).reshape(-1, 4)  # x, y, z, intensity
        return point_cloud[:, :3]  # Use only x, y, z

In [8]:
# Paths to KITTI directories
velodyne_dir = r"E:\Storange\Python\Point_cloud\data\archive\training\velodyne_subset"
label_dir = r"E:\Storange\Python\Point_cloud\data\archive\training\label_2_subset"

unique_labels = extract_unique_labels(label_dir)
label_to_id = {label: idx for idx, label in enumerate(unique_labels)}
label_to_id = {label: idx for idx, label in enumerate(unique_labels)}
print("Extracted label mapping:", label_to_id)

Extracted label mapping: {'Car': 0, 'Cyclist': 1, 'DontCare': 2, 'Misc': 3, 'Pedestrian': 4, 'Person_sitting': 5, 'Tram': 6, 'Truck': 7, 'Van': 8}


In [7]:
def collate_fn(batch):
    points, labels = zip(*batch)
    return torch.stack(points), torch.tensor(labels)

In [8]:
# Create train dataset and DataLoader
train_dataset = KittiPointCloudDataset(velodyne_dir, label_dir, label_to_id, num_points=1024)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, collate_fn=lambda x: collate_fn(x))

In [None]:
print(torch.__version__)

In [9]:
for batch in train_loader:
    points, labels = batch
    print(points.shape)
    print(labels.shape)
    break

torch.Size([16, 1024, 3])
torch.Size([16])


In [10]:
# Training Parameters
from torch import optim

num_classes = len(label_to_id)
num_epochs = 10


# Initialize model, optimizer, and loss function
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = PointNetPlusPlus(num_classes=num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-4)

# Training Loop
for epoch in range(2):
    model.train()
    train_loss = 0.0
    correct = 0
    total = 0

    for batch in train_loader:
      points, labels = points.to(device), labels.to(device)

      optimizer.zero_grad()
      outputs = model(points)   # No need for reshaping

      loss = criterion(outputs, labels)
      loss.backward()
      optimizer.step()

      train_loss += loss.item()
      _, predicted = torch.max(outputs, 1)
      correct += (predicted == labels).sum().item()
      total += labels.size(0)

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {train_loss/len(train_loader):.4f}, Accuracy: {correct/total:.4f}")

RuntimeError: CUDA error: device kernel image is invalid
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1.
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.


<h2> Testing </h2>

In [None]:
test_path = r"E:\Storange\Python\Point_cloud\data\archive\testing\velodyne"
test_dataset = KittiPointCloudDataset(test_path, num_points=1024)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

In [None]:
# Ensure model is in evaluation mode
model.eval()

# Initialize a list to store predictions
predictions = []

with torch.no_grad():
    for points in test_loader:
        points = points.to(device)  # Send points to GPU if available
        outputs = model(points)  # Get predictions
        _, predicted_classes = torch.max(outputs, 1)  # Predicted class indices
        predictions.append(predicted_classes.cpu().numpy())  # Store predictions

# Flatten predictions into a single array
predictions = np.concatenate(predictions, axis=0)