In [None]:
import numpy as np

X_train = np.load("train_points.npy")  # shape: [B, N, 3]
y_train = np.load("train_labels.npy")
X_test = np.load("test_points.npy")
y_test = np.load("test_labels.npy")

print("Train:", X_train.shape, y_train.shape)
print("Test:", X_test.shape, y_test.shape)


In [None]:
def augment_toilet(points):
    # 随机绕 z 轴旋转 
    theta = np.random.uniform(0, 2 * np.pi)
    rot_matrix = np.array([
        [np.cos(theta), -np.sin(theta), 0],
        [np.sin(theta),  np.cos(theta), 0],
        [0,              0,             1]
    ])
    # 随机高斯噪声
    points = points @ rot_matrix.T
    points += np.random.normal(0, 0.01, size=points.shape)
    return points


In [None]:
from torch_geometric.data import Data
import os
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"

import torch
import torch_cluster

def to_graph_data(X, y, k=20):
    from torch_geometric.nn import knn_graph
    data_list = []
    for i in range(len(X)):
        pc = X[i]
        label = int(y[i])
        pos = torch.tensor(pc, dtype=torch.float)
        edge_index = knn_graph(pos, k=k, loop=False)
        data = Data(pos=pos, edge_index=edge_index, y=torch.tensor(label, dtype=torch.long))
        data_list.append(data)
    return data_list



train_data_list = to_graph_data(X_train, y_train, k=20)
test_data_list = to_graph_data(X_test, y_test, k=20)


In [None]:
from torch_geometric.loader import DataLoader

train_loader = DataLoader(train_data_list, batch_size=32, shuffle=True)
test_loader = DataLoader(test_data_list, batch_size=32, shuffle=False)


In [None]:
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import EdgeConv, global_max_pool

class DGCNN(nn.Module):
    def __init__(self, k=20, emb_dims=1024, num_classes=10):
        super(DGCNN, self).__init__()
        self.k = k
        self.conv1 = EdgeConv(nn.Sequential(nn.Linear(6, 64), nn.ReLU(), nn.Linear(64, 64)))
        self.conv2 = EdgeConv(nn.Sequential(nn.Linear(128, 64), nn.ReLU(), nn.Linear(64, 64)))
        self.conv3 = EdgeConv(nn.Sequential(nn.Linear(128, 64), nn.ReLU(), nn.Linear(64, 64)))
        self.conv4 = EdgeConv(nn.Sequential(nn.Linear(128, 128), nn.ReLU(), nn.Linear(128, 128)))

        self.lin1 = nn.Linear(320, emb_dims)
        self.bn1 = nn.BatchNorm1d(emb_dims)
        self.dp1 = nn.Dropout(p=0.5)
        self.fc1 = nn.Linear(emb_dims, 256)
        self.fc2 = nn.Linear(256, num_classes)

    def forward(self, data):
        x, edge_index, batch = data.pos, data.edge_index, data.batch
        x1 = self.conv1(x, edge_index)
        x2 = self.conv2(x1, edge_index)
        x3 = self.conv3(x2, edge_index)
        x4 = self.conv4(x3, edge_index)
        x_cat = torch.cat((x1, x2, x3, x4), dim=1)
        x = F.relu(self.bn1(self.lin1(x_cat)))
        x = global_max_pool(x, batch)
        x = self.dp1(F.relu(self.fc1(x)))
        x = self.fc2(x)
        return x

    def extract_features(self, data):
        x, edge_index, batch = data.pos, data.edge_index, data.batch
        x1 = self.conv1(x, edge_index)
        x2 = self.conv2(x1, edge_index)
        x3 = self.conv3(x2, edge_index)
        x4 = self.conv4(x3, edge_index)
        x_cat = torch.cat((x1, x2, x3, x4), dim=1)
        x = self.bn1(self.lin1(x_cat))
        x = global_max_pool(x, batch)
        return x


In [None]:
# 定义一个辅助函数：计算每类准确率（在评估阶段使用）
def compute_class_stats(model, loader, device, num_classes):
    model.eval()
    correct = [0] * num_classes
    total = [0] * num_classes

    with torch.no_grad():
        for data in loader:
            data = data.to(device)
            out = model(data)
            pred = out.argmax(dim=1)
            label = data.y
            for i in range(len(label)):
                total[label[i]] += 1
                if pred[i] == label[i]:
                    correct[label[i]] += 1

    return correct, total

# 定义动态类别权重计算函数
def compute_dynamic_weights(correct, total, epsilon=1e-6, alpha=0.3):
    acc = [c / (t + epsilon) for c, t in zip(correct, total)]
    inv_acc = [1.0 / max(a + 1e-3, 0.01) for a in acc]  # 限制最大值
    norm = sum(inv_acc)
    dynamic_weights = [w / norm for w in inv_acc]

    # 与均匀权重做插值（平滑）
    base = 1.0 / len(acc)
    weights = [(1 - alpha) * base + alpha * w for w in dynamic_weights]
    return torch.tensor(weights, dtype=torch.float32)


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class FocalLoss(nn.Module):
    def __init__(self, gamma=2.0, weight=None, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.gamma = gamma
        self.weight = weight  # Tensor of shape [num_classes]
        self.reduction = reduction

    def forward(self, input, target):
        """
        input: [B, C] — logits
        target: [B] — int64 labels
        """
        logp = F.log_softmax(input, dim=1)        # [B, C]
        p = torch.exp(logp)                       # [B, C]
        focal = (1 - p) ** self.gamma              # [B, C]
        loss = -focal * logp                       # [B, C]

        # ⚠ 安全方式获取每个样本的类别权重
        if self.weight is not None:
            w = self.weight.gather(0, target)      # [B]
            loss = loss * w.unsqueeze(1)           # broadcast to [B, C]

        loss = loss[range(len(target)), target]    # pick per-sample loss

        if self.reduction == 'mean':
            return loss.mean()
        elif self.reduction == 'sum':
            return loss.sum()
        else:
            return loss  # [B]



In [None]:
# 初始化 loss 函数
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
weights = torch.ones(10)
loss_fn = FocalLoss(gamma=2.0, weight=weights.to(device))

def train(model, loader, optimizer, device, loss_fn):
    model.train()
    total_loss = 0
    for data in loader:
        data = data.to(device)
        optimizer.zero_grad()
        out = model(data)
        loss = loss_fn(out, data.y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(loader)


def test(model, loader, device):
    model.eval()
    correct = 0
    for data in loader:
        data = data.to(device)
        out = model(data)
        pred = out.argmax(dim=1)
        correct += int((pred == data.y).sum())
    return correct / len(loader.dataset)


In [None]:
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt

def plot_tsne(model, loader, device):
    model.eval()
    features, labels = [], []
    with torch.no_grad():
        for data in loader:
            data = data.to(device)
            x = model.extract_features(data)
            features.append(x.cpu())
            labels.append(data.y.cpu())
    features = torch.cat(features).numpy()
    labels = torch.cat(labels).numpy()

    tsne = TSNE(n_components=2, init='pca').fit_transform(features)
    plt.figure(figsize=(8, 6))
    plt.scatter(tsne[:, 0], tsne[:, 1], c=labels, cmap='tab10', s=10)
    plt.title("t-SNE of DGCNN Global Features")
    plt.colorbar()
    plt.show()


In [None]:

model = DGCNN(k=20, num_classes=len(set(y_train))).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

for epoch in range(1, 31):
    correct, total = compute_class_stats(model, train_loader, device, 10)
    weights = compute_dynamic_weights(correct, total).to(device)
    loss_fn = FocalLoss(gamma=2.0, weight=weights)

    train_loss = train(model, train_loader, optimizer, device, loss_fn)
    test_acc = test(model, test_loader, device)
    print(f"Epoch {epoch:02d} | Loss: {train_loss:.4f} | Test Acc: {test_acc:.2%} | Weights: {weights.cpu().numpy()}")


In [None]:
plot_tsne(model, test_loader, device)


In [None]:
from mpl_toolkits.mplot3d import Axes3D

def plot_point_cloud(points, label=None, title="Point Cloud", color='b'):
    fig = plt.figure()
    ax = fig.add_subplot(111, projection='3d')
    ax.scatter(points[:,0], points[:,1], points[:,2], c=color, s=2)
    ax.set_title(f"{title} - Label: {label}")
    plt.show()

plot_point_cloud(X_test[1], label=y_test[0], title="Ground Truth", color='g')


In [None]:
from sklearn.metrics import accuracy_score
model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for data in test_loader:
        data = data.to(device)
        out = model(data)
        preds = out.argmax(dim=1)
        all_preds.append(preds.cpu())
        all_labels.append(data.y.cpu())

y_pred = torch.cat(all_preds)
y_test = torch.cat(all_labels)

acc = accuracy_score(y_test, y_pred)
print(f"✅ Test Accuracy: {acc:.2%}")


In [None]:
import numpy as np

def per_class_accuracy(y_true, y_pred, class_names=None):
    classes = np.unique(y_true)
    for cls in classes:
        idx = y_true == cls
        correct = (y_pred[idx] == cls).sum()
        total = idx.sum()
        acc = correct / total
        label = class_names[cls] if class_names else str(cls)
        print(f"Class {label}: {acc:.2%}")
class_names = ["chair", "table", "sofa", "bed", "desk", "dresser", "monitor", "night_stand", "toilet", "bathtub"]  # 举例

per_class_accuracy(y_test, y_pred, class_names)