In [1]:
import numpy as np
import pandas
import time
import math
import random
import os
import torch
import scipy.spatial.distance
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils

import plotly.graph_objects as go
import plotly.express as px

import matplotlib.pyplot as plt
random.seed = 42

In [2]:
torch.cuda.empty_cache()
torch.cuda.get_device_name(0)

'NVIDIA GeForce RTX 4060 Laptop GPU'

In [3]:
from path import Path
path = Path("./ModelNet10")

In [4]:
folders = [dir for dir in sorted(os.listdir(path)) if os.path.isdir(path/dir)]
classes = {folder: i for i, folder in enumerate(folders)};
classes

{'bathtub': 0,
 'bed': 1,
 'chair': 2,
 'desk': 3,
 'dresser': 4,
 'monitor': 5,
 'night_stand': 6,
 'sofa': 7,
 'table': 8,
 'toilet': 9}

In [5]:
def read_off(file):
    # 读取文件头并检查格式
    off_header = file.readline().strip()
    if off_header != 'OFF':
        raise ValueError('Not a valid OFF header')

    # 读取顶点和面数量
    n_verts, n_faces, _ = map(int, file.readline().strip().split())

    # 读取顶点坐标
    verts = [list(map(float, file.readline().strip().split())) for _ in range(n_verts)]

    # 读取面信息
    faces = [list(map(int, file.readline().strip().split()))[1:] for _ in range(n_faces)]

    return verts, faces

In [6]:
class PointSampler(object):
    def __init__(self, output_size):
        assert isinstance(output_size, int)
        self.output_size = output_size
    
    def triangle_area(self, pt1, pt2, pt3):
        side_a = np.linalg.norm(pt1 - pt2)
        side_b = np.linalg.norm(pt2 - pt3)
        side_c = np.linalg.norm(pt3 - pt1)
        s = 0.5 * ( side_a + side_b + side_c)
        return max(s * (s - side_a) * (s - side_b) * (s - side_c), 0)**0.5

    def sample_point(self, pt1, pt2, pt3):
        # barycentric coordinates on a triangle
        # https://mathworld.wolfram.com/BarycentricCoordinates.html
        s, t = sorted([random.random(), random.random()])
        f = lambda i: s * pt1[i] + (t-s)*pt2[i] + (1-t)*pt3[i]
        return (f(0), f(1), f(2))
        
    
    def __call__(self, mesh):
        verts, faces = mesh
        verts = np.array(verts)
        areas = np.zeros((len(faces)))

        for i in range(len(areas)):
            areas[i] = (self.triangle_area(verts[faces[i][0]],
                                           verts[faces[i][1]],
                                           verts[faces[i][2]]))
            
        sampled_faces = (random.choices(faces, 
                                      weights=areas,
                                      cum_weights=None,
                                      k=self.output_size))
        
        sampled_points = np.zeros((self.output_size, 3))

        for i in range(len(sampled_faces)):
            sampled_points[i] = (self.sample_point(verts[sampled_faces[i][0]],
                                                   verts[sampled_faces[i][1]],
                                                   verts[sampled_faces[i][2]]))
        
        return sampled_points


In [7]:
class Normalize(object):
    def __call__(self, pointcloud):
        assert len(pointcloud.shape)==2
        
        norm_pointcloud = pointcloud - np.mean(pointcloud, axis=0) 
        norm_pointcloud /= np.max(np.linalg.norm(norm_pointcloud, axis=1))

        return  norm_pointcloud

In [8]:
class RandRotation_z(object):
    def __call__(self, pointcloud):
        assert len(pointcloud.shape)==2

        theta = random.random() * 2. * math.pi
        rot_matrix = np.array([[ math.cos(theta), -math.sin(theta),    0],
                               [ math.sin(theta),  math.cos(theta),    0],
                               [0,                             0,      1]])
        
        rot_pointcloud = rot_matrix.dot(pointcloud.T).T
        return  rot_pointcloud
    
class RandomNoise(object):
    def __call__(self, pointcloud):
        assert len(pointcloud.shape)==2

        noise = np.random.normal(0, 0.02, (pointcloud.shape))
    
        noisy_pointcloud = pointcloud + noise
        return  noisy_pointcloud

In [9]:
class ToTensor(object):
    def __call__(self, pointcloud):
        assert len(pointcloud.shape)==2

        return torch.from_numpy(pointcloud)

In [10]:
def default_transforms():
    return transforms.Compose([
                                PointSampler(1024),
                                Normalize(),
                                ToTensor()
                              ])

In [11]:
class PointCloudData(Dataset):
    def __init__(self, root_dir, test=False, folder="train", transform=default_transforms()):
        self.root_dir = root_dir
        folders = [dir for dir in sorted(os.listdir(root_dir)) if os.path.isdir(root_dir/dir)]
        self.classes = {folder: i for i, folder in enumerate(folders)}
        self.transforms = transform if not test else default_transforms()
        self.test = test
        self.files = []
        for category in self.classes.keys():
            new_dir = root_dir/Path(category)/folder
            for file in os.listdir(new_dir):
                if file.endswith('.off'):
                    sample = {}
                    sample['pcd_path'] = new_dir/file
                    sample['category'] = category
                    self.files.append(sample)

    def __len__(self):
        return len(self.files)

    def __preproc__(self, file):
        verts, faces = read_off(file)
        if self.transforms:
            pointcloud = self.transforms((verts, faces))
        return pointcloud

    def __getitem__(self, idx):
        pcd_path = self.files[idx]['pcd_path']
        category = self.files[idx]['category']
        with open(pcd_path, 'r') as f:
            pointcloud = self.__preproc__(f)
        return {'pointcloud': pointcloud, 
                'category': self.classes[category]}

In [58]:
train_transforms = transforms.Compose([
                    PointSampler(1024),
                    Normalize(),
                    RandRotation_z(),
                    RandomNoise(),
                    ToTensor()
                    ])

In [59]:
train_ds = PointCloudData(path, transform=train_transforms)
test_ds = PointCloudData(path, test=True, folder='test')

In [14]:
print('Train dataset size: ', len(train_ds))
print('Test dataset size: ', len(test_ds))

Train dataset size:  3991
Test dataset size:  908


In [15]:
#按9比1的分割训练数据集：训练集：9； 验证集：1
from torch.utils.data import random_split
def train_val_split(train_ds):
    train_size = int(len(train_ds) * 0.90)
    val_size = len(train_ds) - train_size
     
    train_ds, val_ds = random_split(train_ds, [train_size, val_size])
    return train_ds, val_ds

In [16]:
train_ds, valid_ds = train_val_split(train_ds)

In [17]:
print('Train dataset size: ', len(train_ds))
print('Valid dataset size: ', len(valid_ds))

Train dataset size:  3591
Valid dataset size:  400


In [18]:
train_loader = DataLoader(dataset=train_ds, batch_size=32, shuffle=True)
valid_loader = DataLoader(dataset=valid_ds, batch_size=64)

In [60]:
test_loader = DataLoader(dataset=test_ds, batch_size=64)

In [20]:
len(test_loader)

15

In [21]:
import torch
import torch.nn as nn
import numpy as np
import torch.nn.functional as F

class Tnet(nn.Module):
   def __init__(self, k=3):
      super().__init__()
      self.k=k
      self.conv1 = nn.Conv1d(k,64,1)
      self.conv2 = nn.Conv1d(64,128,1)
      self.conv3 = nn.Conv1d(128,1024,1)
      self.fc1 = nn.Linear(1024,512)
      self.fc2 = nn.Linear(512,256)
      self.fc3 = nn.Linear(256,k*k)

      self.bn1 = nn.BatchNorm1d(64)
      self.bn2 = nn.BatchNorm1d(128)
      self.bn3 = nn.BatchNorm1d(1024)
      self.bn4 = nn.BatchNorm1d(512)
      self.bn5 = nn.BatchNorm1d(256)
       

   def forward(self, input):
      # input.shape == (bs,n,3)
      bs = input.size(0)
      xb = F.relu(self.bn1(self.conv1(input)))
      xb = F.relu(self.bn2(self.conv2(xb)))
      xb = F.relu(self.bn3(self.conv3(xb)))
      pool = nn.MaxPool1d(xb.size(-1))(xb)
      flat = nn.Flatten(1)(pool)
      xb = F.relu(self.bn4(self.fc1(flat)))
      xb = F.relu(self.bn5(self.fc2(xb)))
      
      #initialize as identity
      init = torch.eye(self.k, requires_grad=True).repeat(bs,1,1)
      if xb.is_cuda:
        init=init.cuda()
      matrix = self.fc3(xb).view(-1,self.k,self.k) + init
      return matrix


class Transform(nn.Module):
   def __init__(self):
        super().__init__()
        self.input_transform = Tnet(k=3)
        self.feature_transform = Tnet(k=64)
        self.conv1 = nn.Conv1d(3,64,1)

        self.conv2 = nn.Conv1d(64,128,1)
        self.conv3 = nn.Conv1d(128,1024,1)
       

        self.bn1 = nn.BatchNorm1d(64)
        self.bn2 = nn.BatchNorm1d(128)
        self.bn3 = nn.BatchNorm1d(1024)
       
   def forward(self, input):
        matrix3x3 = self.input_transform(input)
        # batch matrix multiplication
        xb = torch.bmm(torch.transpose(input,1,2), matrix3x3).transpose(1,2)

        xb = F.relu(self.bn1(self.conv1(xb)))

        matrix64x64 = self.feature_transform(xb)
        xb = torch.bmm(torch.transpose(xb,1,2), matrix64x64).transpose(1,2)

        xb = F.relu(self.bn2(self.conv2(xb)))
        xb = self.bn3(self.conv3(xb))
        xb = nn.MaxPool1d(xb.size(-1))(xb)
        output = nn.Flatten(1)(xb)
        return output, matrix3x3, matrix64x64

class PointNet(nn.Module):
    def __init__(self, classes = 10):
        super().__init__()
        self.transform = Transform()
        self.fc1 = nn.Linear(1024, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, classes)
        

        self.bn1 = nn.BatchNorm1d(512)
        self.bn2 = nn.BatchNorm1d(256)
        self.dropout = nn.Dropout(p=0.3)
        self.logsoftmax = nn.LogSoftmax(dim=1)

    def forward(self, input):
        xb, matrix3x3, matrix64x64 = self.transform(input)
        xb = F.relu(self.bn1(self.fc1(xb)))
        xb = F.relu(self.bn2(self.dropout(self.fc2(xb))))
        output = self.fc3(xb)
        return self.logsoftmax(output), matrix3x3, matrix64x64

In [22]:
def pointnetloss(outputs, labels, m3x3, m64x64, alpha=0.0001):
    criterion = torch.nn.NLLLoss()
    
    bs = outputs.size(0)
    id3x3 = torch.eye(3, device=outputs.device).repeat(bs, 1, 1)
    id64x64 = torch.eye(64, device=outputs.device).repeat(bs, 1, 1)

    diff3x3 = id3x3 - torch.bmm(m3x3, m3x3.transpose(1, 2))
    diff64x64 = id64x64 - torch.bmm(m64x64, m64x64.transpose(1, 2))

    # 计算损失
    loss = criterion(outputs, labels) + alpha * (torch.norm(diff3x3) + torch.norm(diff64x64)) / float(bs)
    return loss

In [23]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cuda:0


In [24]:
pointnet = PointNet()
pointnet.to(device);pointnet = PointNet()
pointnet.to(device);

In [30]:
save_path = './pointnet_cls_train/ModelNet10'

# 优化器
optimizer = torch.optim.Adam(pointnet.parameters(), lr=0.001, weight_decay=1e-4)

def train(model, train_loader, val_loader=None, epochs=15, save=False, save_path=save_path):

    # 记录训练损失和准确率
    train_losses = []
    train_accuracies = []
    val_accuracies = []
    best_val_acc = 0.0  # 初始化最佳验证准确率
    scaler = torch.cuda.amp.GradScaler()  # 用于半精度训练

    total_start_time = time.time()  # 记录总训练开始时间

    for epoch in range(epochs): 
        model.train()
        running_loss = 0.0
        epoch_start_time = time.time()  # 记录每个 epoch 的开始时间
        correct = total = 0  # 用于计算训练准确率

        for i, data in enumerate(train_loader, 0):
            inputs, labels = data['pointcloud'].to(device).float(), data['category'].to(device)

            optimizer.zero_grad()
            
            # 使用自动混合精度
            with torch.cuda.amp.autocast():
                outputs, m3x3, m64x64 = model(inputs.transpose(1, 2))
                loss = pointnetloss(outputs, labels, m3x3, m64x64)

            # 反向传播
            scaler.scale(loss).backward()  # 半精度反向传播
            scaler.step(optimizer)  # 更新优化器
            scaler.update()  # 更新缩放器

            # 记录损失
            running_loss += loss.item()

            # 计算训练准确率
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            if i % 10 == 9:    # 每10个小批次打印一次
                print('[Epoch: %d, Batch: %4d / %4d], loss: %.3f' %
                      (epoch + 1, i + 1, len(train_loader), running_loss / 10))
                running_loss = 0.0

        # 记录每个 epoch 的训练损失和准确率
        train_losses.append(running_loss)
        train_accuracy = 100. * correct / total
        train_accuracies.append(train_accuracy)

        # 计算并打印每个 epoch 的训练时间
        epoch_time = time.time() - epoch_start_time
        print('Epoch %d finished in %.2f seconds. Training Accuracy: %.2f %%' % (epoch + 1, epoch_time, train_accuracy))

        # 评估模型
        model.eval()
        correct = total = 0

        # 验证过程
        if val_loader:
            with torch.no_grad():
                for data in val_loader:
                    inputs, labels = data['pointcloud'].to(device).float(), data['category'].to(device)
                    with torch.cuda.amp.autocast():  # 在验证时也使用半精度
                        outputs, __, __ = model(inputs.transpose(1, 2))
                        _, predicted = torch.max(outputs.data, 1)
                        total += labels.size(0)
                        correct += (predicted == labels).sum().item()

            val_acc = 100. * correct / total
            val_accuracies.append(val_acc)
            print('Valid accuracy: %.2f %%' % val_acc)

            # 保存最优模型
            if val_acc > best_val_acc:  # 如果当前验证准确率更高
                best_val_acc = val_acc
                if save:
                    torch.save(model.state_dict(), f"{save_path}/best_pointnet10_cls.pth")
                    print('Best model saved with accuracy: %.2f %%' % best_val_acc)

    # 计算并打印总训练时间
    total_time = time.time() - total_start_time
    print('Total training time: %.2f seconds.' % total_time)

    # 绘制损失曲线
    plt.figure()
    plt.plot(train_losses, label='Training Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training Loss Curve')
    plt.legend()
    plt.savefig(f"{save_path}/training_loss_curve.png")
    plt.close()

    # 绘制准确率曲线
    plt.figure()
    plt.plot(train_accuracies, label='Training Accuracy')
    if val_loader:
        plt.plot(val_accuracies, label='Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.title('Training and Validation Accuracy Curve')
    plt.legend()
    plt.savefig(f"{save_path}/training_validation_accuracy_curve.png")
    plt.close()

In [31]:
train(pointnet, train_loader, valid_loader, save=True, save_path=save_path)

[Epoch: 1, Batch:   10 /  113], loss: 0.961
[Epoch: 1, Batch:   20 /  113], loss: 1.078
[Epoch: 1, Batch:   30 /  113], loss: 0.993
[Epoch: 1, Batch:   40 /  113], loss: 0.815
[Epoch: 1, Batch:   50 /  113], loss: 0.845
[Epoch: 1, Batch:   60 /  113], loss: 0.797
[Epoch: 1, Batch:   70 /  113], loss: 0.776
[Epoch: 1, Batch:   80 /  113], loss: 0.733
[Epoch: 1, Batch:   90 /  113], loss: 0.756
[Epoch: 1, Batch:  100 /  113], loss: 0.724
[Epoch: 1, Batch:  110 /  113], loss: 0.761
Epoch 1 finished in 470.04 seconds. Training Accuracy: 72.01 %
Valid accuracy: 72.50 %
Best model saved with accuracy: 72.50 %
[Epoch: 2, Batch:   10 /  113], loss: 0.687
[Epoch: 2, Batch:   20 /  113], loss: 0.771
[Epoch: 2, Batch:   30 /  113], loss: 0.672
[Epoch: 2, Batch:   40 /  113], loss: 0.603
[Epoch: 2, Batch:   50 /  113], loss: 0.640
[Epoch: 2, Batch:   60 /  113], loss: 0.731
[Epoch: 2, Batch:   70 /  113], loss: 0.717
[Epoch: 2, Batch:   80 /  113], loss: 0.701
[Epoch: 2, Batch:   90 /  113], loss:

In [57]:
torch.cuda.empty_cache()
torch.cuda.get_device_name(0)

'NVIDIA GeForce RTX 4060 Laptop GPU'

## 优化部分

In [33]:
pre_train_path = './pointnet_cls_train/ModelNet10/best_pointnet10_cls.pth'
pointnet = PointNet()
pointnet.load_state_dict(torch.load(pre_train_path))
pointnet.eval();

In [34]:
# 类别数量
num_classes = 10 

# 用于存储每类的预测和真实标签
class_correct = [0] * num_classes
class_total = [0] * num_classes

# 全局统计
total_correct = 0
total_samples = 0

all_preds = []
all_labels = []

with torch.no_grad():
    for i, data in enumerate(test_loader):
        print('Batch [%4d / %4d]' % (i + 1, len(test_loader)))

        inputs, labels = data['pointcloud'].float(), data['category']
        outputs, __, __ = pointnet(inputs.transpose(1, 2))
        
        # 获取预测结果
        _, preds = torch.max(outputs.data, 1)
        
        # 更新所有预测和标签
        all_preds += list(preds.numpy())
        all_labels += list(labels.numpy())

        # 统计每个类别的正确预测数和总数
        for j in range(labels.size(0)):
            label = labels[j].item()  # 获取当前样本的真实标签
            class_total[label] += 1  # 该类样本总数加 1
            class_correct[label] += (preds[j] == label).item()  # 如果预测正确，相应类别的正确预测数加 1
            
            # 更新全局统计
            total_samples += 1
            total_correct += (preds[j] == label).item()

# 计算每类的准确率
class_accuracy = [0] * num_classes
for i in range(num_classes):
    if class_total[i] > 0:
        class_accuracy[i] = class_correct[i] / class_total[i]
    else:
        class_accuracy[i] = 0.0  # 如果该类没有样本，准确率为 0

# 计算总体分类准确率
overall_accuracy = total_correct / total_samples if total_samples > 0 else 0.0

# 打印每类的测试数量和准确率
for i in range(num_classes):
    print(f'Class {i}: Total = {class_total[i]}, Correct = {class_correct[i]}, Accuracy = {class_accuracy[i]:.4f}')

# 打印总体分类准确率
print(f'Overall Accuracy: {overall_accuracy:.4f}')

Batch [   1 /   15]
Batch [   2 /   15]
Batch [   3 /   15]
Batch [   4 /   15]
Batch [   5 /   15]
Batch [   6 /   15]
Batch [   7 /   15]
Batch [   8 /   15]
Batch [   9 /   15]
Batch [  10 /   15]
Batch [  11 /   15]
Batch [  12 /   15]
Batch [  13 /   15]
Batch [  14 /   15]
Batch [  15 /   15]
Class 0: Total = 50, Correct = 41, Accuracy = 0.8200
Class 1: Total = 100, Correct = 84, Accuracy = 0.8400
Class 2: Total = 100, Correct = 93, Accuracy = 0.9300
Class 3: Total = 86, Correct = 58, Accuracy = 0.6744
Class 4: Total = 86, Correct = 66, Accuracy = 0.7674
Class 5: Total = 100, Correct = 94, Accuracy = 0.9400
Class 6: Total = 86, Correct = 60, Accuracy = 0.6977
Class 7: Total = 100, Correct = 94, Accuracy = 0.9400
Class 8: Total = 100, Correct = 89, Accuracy = 0.8900
Class 9: Total = 100, Correct = 88, Accuracy = 0.8800
Overall Accuracy: 0.8447


In [35]:
class_total

[50, 100, 100, 86, 86, 100, 86, 100, 100, 100]

In [36]:
# 计算类别权重
inverse_accuracy = 1 - np.array(class_accuracy)  # 反向准确率
weights = inverse_accuracy * class_total  # 结合样本数量

# 归一化权重
weights /= np.sum(weights)  # 使总和为1

# 转换为PyTorch张量
class_weights = torch.tensor(weights, dtype=torch.float)

print("Calculated class weights:", class_weights)

Calculated class weights: tensor([0.0638, 0.1135, 0.0496, 0.1986, 0.1418, 0.0426, 0.1844, 0.0426, 0.0780,
        0.0851])


In [39]:
### 引入新的数据增强方法

def generate_sphere(center, radius, num_points):
    """生成一个球体的点云"""
    u = np.random.uniform(0, 1, num_points)
    v = np.random.uniform(0, 1, num_points)
    
    theta = 2 * np.pi * u
    phi = np.arccos(2 * v - 1)

    # 根据球坐标系转换为三维笛卡尔坐标系
    x = center[0] + radius * np.sin(phi) * np.cos(theta)
    y = center[1] + radius * np.sin(phi) * np.sin(theta)
    z = center[2] + radius * np.cos(phi)

    return np.column_stack((x, y, z))

def apply_occlusion(pointcloud, sphere_center, sphere_radius):
    """将生成的球体和原本的点云数据合成，将球体内的点云数据删除"""
    distances = np.linalg.norm(pointcloud - sphere_center, axis=1)
    occluded_pointcloud = pointcloud[distances > sphere_radius]
    return occluded_pointcloud

def pad_or_crop(pointcloud, target_size):
    """填充或裁剪点云到目标大小"""
    current_size = pointcloud.shape[0]
    if current_size < target_size:
        padding = np.zeros((target_size - current_size, pointcloud.shape[1]), dtype=pointcloud.dtype)
        return np.vstack((pointcloud, padding))  # 在第一个维度上堆叠
    elif current_size > target_size:
        return pointcloud[:target_size]  # 裁剪到目标大小
    return pointcloud

class Sphere_Occlusion(object):
    def __call__(self, pointcloud):
        assert len(pointcloud.shape) == 2
        random_index = np.random.randint(0, pointcloud.shape[0])
        sphere_center = pointcloud[random_index]
        
        sphere_radius = 0.2
        num_sphere_points = 400

        sphere_pointcloud = generate_sphere(sphere_center, sphere_radius, num_sphere_points)
        
        occluded_pointcloud = apply_occlusion(pointcloud, sphere_center, sphere_radius)

        # 确保输出的点云大小为 [1024, 3]
        return pad_or_crop(occluded_pointcloud, 1024)

class RandomShift(object):
    def __call__(self, pointcloud):
        assert len(pointcloud.shape) == 2
        shift_range = 0.1

        shifts = np.random.uniform(-shift_range, shift_range, pointcloud.shape)
        shifted_pointcloud = pointcloud + shifts
        
        # 确保输出的点云大小为 [1024, 3]
        return pad_or_crop(shifted_pointcloud, 1024)

class RandomScale(object):
    def __call__(self, pointcloud):
        assert len(pointcloud.shape) == 2
        scale_low = 0.8
        scale_high = 1.25
        scales = np.random.uniform(scale_low, scale_high, (pointcloud.shape[0], 1))
        scaled_pointcloud = pointcloud * scales
        
        # 确保输出的点云大小为 [1024, 3]
        return pad_or_crop(scaled_pointcloud, 1024)

In [40]:
train_transforms = transforms.Compose([
                    PointSampler(1024),
                    Normalize(),
                    Sphere_Occlusion(),
                    RandomShift(),
                    RandomScale(),
                    ToTensor()
                    ])

In [41]:
train_ds = PointCloudData(path, transform=train_transforms)
test_ds = PointCloudData(path, test=True, folder='test')

In [82]:
def check_tensor_shapes(dataset):
    # 获取第一个样本的形状作为基准
    first_shape = None

    for i, data in enumerate(dataset):
        # 假设每个样本是一个字典，包含 'pointcloud' 和 'category'
        tensor = data['pointcloud']  # 替换为您的张量键

        # 检查张量的形状
        if first_shape is None:
            first_shape = tensor.shape
        else:
            if tensor.shape != first_shape:
                print(f"Mismatch found in sample {i}: {tensor.shape} vs {first_shape}")
                return False

    print("All tensors have the same shape:", first_shape)
    return True

# 使用示例
check_tensor_shapes(train_ds)

All tensors have the same shape: torch.Size([1024, 3])


True

In [48]:
print('Train dataset size: ', len(train_ds))
print('Test dataset size: ', len(test_ds))

Train dataset size:  3591
Test dataset size:  908


In [43]:
#按9比1的分割训练数据集：训练集：9； 验证集：1
from torch.utils.data import random_split
def train_val_split(train_ds):
    train_size = int(len(train_ds) * 0.90)
    val_size = len(train_ds) - train_size
     
    train_ds, val_ds = random_split(train_ds, [train_size, val_size])
    return train_ds, val_ds

In [44]:
train_ds, valid_ds = train_val_split(train_ds)

In [46]:
train_loader = DataLoader(dataset=train_ds, batch_size=32, shuffle=True)
valid_loader = DataLoader(dataset=valid_ds, batch_size=64)

In [47]:
test_loader = DataLoader(dataset=test_ds, batch_size=64)

In [49]:
def pointnetloss(outputs, labels, m3x3, m64x64, class_weights=None, alpha=0.0001):
    # Convert outputs to log probabilities if they are logits
    log_probs = torch.nn.functional.log_softmax(outputs, dim=1)

    if class_weights is not None:
        class_weights = class_weights.to(outputs.device)

    # Use weighted negative log likelihood loss
    criterion = torch.nn.NLLLoss(weight=class_weights) if class_weights is not None else torch.nn.NLLLoss()
    
    bs = outputs.size(0)  # Get batch size

    # Create identity matrices
    id3x3 = torch.eye(3, device=outputs.device).repeat(bs, 1, 1)
    id64x64 = torch.eye(64, device=outputs.device).repeat(bs, 1, 1)

    # Calculate differences
    diff3x3 = id3x3 - torch.bmm(m3x3, m3x3.transpose(1, 2))
    diff64x64 = id64x64 - torch.bmm(m64x64, m64x64.transpose(1, 2))

    # Calculate total loss
    loss = criterion(log_probs, labels) + alpha * (torch.norm(diff3x3) + torch.norm(diff64x64)) / float(bs)
    
    return loss

In [50]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cuda:0


In [51]:
pointnet.load_state_dict(torch.load(pre_train_path))
pointnet = PointNet()
pointnet.to(device);pointnet = PointNet()
pointnet.to(device);

In [52]:
optimizer = torch.optim.Adam(pointnet.parameters(), lr=0.001, weight_decay=1e-4)

In [53]:
save_path = './pointnet_cls_train/ModelNet10'

def train(model, train_loader, val_loader=None, epochs=15, class_weights=None, save=False, save_path=save_path):
    
    # 记录训练损失和准确率
    train_losses = []
    train_accuracies = []
    val_accuracies = []
    best_val_acc = 0.0  # 初始化最佳验证准确率
    scaler = torch.cuda.amp.GradScaler()  # 用于半精度训练

    total_start_time = time.time()  # 记录总训练开始时间

    for epoch in range(epochs): 
        model.train()
        running_loss = 0.0
        epoch_start_time = time.time()  # 记录每个 epoch 的开始时间
        correct = total = 0  # 用于计算训练准确率

        for i, data in enumerate(train_loader, 0):
            inputs, labels = data['pointcloud'].to(device).float(), data['category'].to(device)

            optimizer.zero_grad()
            
            # 使用自动混合精度
            with torch.cuda.amp.autocast():
                outputs, m3x3, m64x64 = model(inputs.transpose(1, 2))
                loss = pointnetloss(outputs, labels, m3x3, m64x64, class_weights)

            # 反向传播
            scaler.scale(loss).backward()  # 半精度反向传播
            scaler.step(optimizer)  # 更新优化器
            scaler.update()  # 更新缩放器

            # 记录损失
            running_loss += loss.item()

            # 计算训练准确率
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            if i % 10 == 9:    # 每10个小批次打印一次
                print('[Epoch: %d, Batch: %4d / %4d], loss: %.3f' %
                      (epoch + 1, i + 1, len(train_loader), running_loss / 10))
                running_loss = 0.0

        # 记录每个 epoch 的训练损失和准确率
        train_losses.append(running_loss)
        train_accuracy = 100. * correct / total
        train_accuracies.append(train_accuracy)

        # 计算并打印每个 epoch 的训练时间
        epoch_time = time.time() - epoch_start_time
        print('Epoch %d finished in %.2f seconds. Training Accuracy: %.2f %%' % (epoch + 1, epoch_time, train_accuracy))

        # 评估模型
        model.eval()
        correct = total = 0

        # 验证过程
        if val_loader:
            with torch.no_grad():
                for data in val_loader:
                    inputs, labels = data['pointcloud'].to(device).float(), data['category'].to(device)
                    with torch.cuda.amp.autocast():  # 在验证时也使用半精度
                        outputs, __, __ = model(inputs.transpose(1, 2))
                        _, predicted = torch.max(outputs.data, 1)
                        total += labels.size(0)
                        correct += (predicted == labels).sum().item()

            val_acc = 100. * correct / total
            val_accuracies.append(val_acc)
            print('Valid accuracy: %.2f %%' % val_acc)

            # 保存最优模型
            if val_acc > best_val_acc:  # 如果当前验证准确率更高
                best_val_acc = val_acc
                if save:
                    torch.save(model.state_dict(), f"{save_path}/best_pointnet10_cls_opt1.pth")
                    print('Best model saved with accuracy: %.2f %%' % best_val_acc)

    # 计算并打印总训练时间
    total_time = time.time() - total_start_time
    print('Total training time: %.2f seconds.' % total_time)

    # 绘制损失曲线
    plt.figure()
    plt.plot(train_losses, label='Training Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training Loss Curve')
    plt.legend()
    plt.savefig(f"{save_path}/training_loss_curve_opt1.png")
    plt.close()

    # 绘制准确率曲线
    plt.figure()
    plt.plot(train_accuracies, label='Training Accuracy')
    if val_loader:
        plt.plot(val_accuracies, label='Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.title('Training and Validation Accuracy Curve')
    plt.legend()
    plt.savefig(f"{save_path}/training_validation_accuracy_curve_opt1.png")
    plt.close()

In [54]:
train(pointnet, train_loader, valid_loader, class_weights=class_weights, save=True, save_path=save_path)

[Epoch: 1, Batch:   10 /  113], loss: 2.161
[Epoch: 1, Batch:   20 /  113], loss: 1.725
[Epoch: 1, Batch:   30 /  113], loss: 1.311
[Epoch: 1, Batch:   40 /  113], loss: 1.248
[Epoch: 1, Batch:   50 /  113], loss: 1.279
[Epoch: 1, Batch:   60 /  113], loss: 1.101
[Epoch: 1, Batch:   70 /  113], loss: 0.967
[Epoch: 1, Batch:   80 /  113], loss: 0.963
[Epoch: 1, Batch:   90 /  113], loss: 0.780
[Epoch: 1, Batch:  100 /  113], loss: 0.884
[Epoch: 1, Batch:  110 /  113], loss: 0.776
Epoch 1 finished in 409.39 seconds. Training Accuracy: 62.38 %
Valid accuracy: 64.00 %
Best model saved with accuracy: 64.00 %
[Epoch: 2, Batch:   10 /  113], loss: 0.791
[Epoch: 2, Batch:   20 /  113], loss: 0.688
[Epoch: 2, Batch:   30 /  113], loss: 0.746
[Epoch: 2, Batch:   40 /  113], loss: 0.851
[Epoch: 2, Batch:   50 /  113], loss: 0.747
[Epoch: 2, Batch:   60 /  113], loss: 0.780
[Epoch: 2, Batch:   70 /  113], loss: 0.840
[Epoch: 2, Batch:   80 /  113], loss: 0.695
[Epoch: 2, Batch:   90 /  113], loss:

In [55]:
pre_train_path_opt = './pointnet_cls_train/ModelNet10/best_pointnet10_cls_opt1.pth'
pointnet = PointNet()
pointnet.load_state_dict(torch.load(pre_train_path_opt))
pointnet.eval();

In [56]:
# 是对经过新增数据增强方法的test_loader进行测试的

# 类别数量
num_classes = 10 

# 用于存储每类的预测和真实标签
class_correct = [0] * num_classes
class_total = [0] * num_classes

# 全局统计
total_correct = 0
total_samples = 0

all_preds = []
all_labels = []

with torch.no_grad():
    for i, data in enumerate(test_loader):
        print('Batch [%4d / %4d]' % (i + 1, len(test_loader)))

        inputs, labels = data['pointcloud'].float(), data['category']
        outputs, __, __ = pointnet(inputs.transpose(1, 2))
        
        # 获取预测结果
        _, preds = torch.max(outputs.data, 1)
        
        # 更新所有预测和标签
        all_preds += list(preds.numpy())
        all_labels += list(labels.numpy())

        # 统计每个类别的正确预测数和总数
        for j in range(labels.size(0)):
            label = labels[j].item()  # 获取当前样本的真实标签
            class_total[label] += 1  # 该类样本总数加 1
            class_correct[label] += (preds[j] == label).item()  # 如果预测正确，相应类别的正确预测数加 1
            
            # 更新全局统计
            total_samples += 1
            total_correct += (preds[j] == label).item()

# 计算每类的准确率
class_accuracy = [0] * num_classes
for i in range(num_classes):
    if class_total[i] > 0:
        class_accuracy[i] = class_correct[i] / class_total[i]
    else:
        class_accuracy[i] = 0.0  # 如果该类没有样本，准确率为 0

# 计算总体分类准确率
overall_accuracy = total_correct / total_samples if total_samples > 0 else 0.0

# 打印每类的测试数量和准确率
for i in range(num_classes):
    print(f'Class {i}: Total = {class_total[i]}, Correct = {class_correct[i]}, Accuracy = {class_accuracy[i]:.4f}')

# 打印总体分类准确率
print(f'Overall Accuracy: {overall_accuracy:.4f}')

Batch [   1 /   15]
Batch [   2 /   15]
Batch [   3 /   15]
Batch [   4 /   15]
Batch [   5 /   15]
Batch [   6 /   15]
Batch [   7 /   15]
Batch [   8 /   15]
Batch [   9 /   15]
Batch [  10 /   15]
Batch [  11 /   15]
Batch [  12 /   15]
Batch [  13 /   15]
Batch [  14 /   15]
Batch [  15 /   15]
Class 0: Total = 50, Correct = 34, Accuracy = 0.6800
Class 1: Total = 100, Correct = 100, Accuracy = 1.0000
Class 2: Total = 100, Correct = 97, Accuracy = 0.9700
Class 3: Total = 86, Correct = 74, Accuracy = 0.8605
Class 4: Total = 86, Correct = 64, Accuracy = 0.7442
Class 5: Total = 100, Correct = 90, Accuracy = 0.9000
Class 6: Total = 86, Correct = 75, Accuracy = 0.8721
Class 7: Total = 100, Correct = 96, Accuracy = 0.9600
Class 8: Total = 100, Correct = 96, Accuracy = 0.9600
Class 9: Total = 100, Correct = 85, Accuracy = 0.8500
Overall Accuracy: 0.8932


In [61]:
#对初始数据增强方法处理的test_loader进行测试

# 类别数量
num_classes = 10 

# 用于存储每类的预测和真实标签
class_correct = [0] * num_classes
class_total = [0] * num_classes

# 全局统计
total_correct = 0
total_samples = 0

all_preds = []
all_labels = []

with torch.no_grad():
    for i, data in enumerate(test_loader):
        print('Batch [%4d / %4d]' % (i + 1, len(test_loader)))

        inputs, labels = data['pointcloud'].float(), data['category']
        outputs, __, __ = pointnet(inputs.transpose(1, 2))
        
        # 获取预测结果
        _, preds = torch.max(outputs.data, 1)
        
        # 更新所有预测和标签
        all_preds += list(preds.numpy())
        all_labels += list(labels.numpy())

        # 统计每个类别的正确预测数和总数
        for j in range(labels.size(0)):
            label = labels[j].item()  # 获取当前样本的真实标签
            class_total[label] += 1  # 该类样本总数加 1
            class_correct[label] += (preds[j] == label).item()  # 如果预测正确，相应类别的正确预测数加 1
            
            # 更新全局统计
            total_samples += 1
            total_correct += (preds[j] == label).item()

# 计算每类的准确率
class_accuracy = [0] * num_classes
for i in range(num_classes):
    if class_total[i] > 0:
        class_accuracy[i] = class_correct[i] / class_total[i]
    else:
        class_accuracy[i] = 0.0  # 如果该类没有样本，准确率为 0

# 计算总体分类准确率
overall_accuracy = total_correct / total_samples if total_samples > 0 else 0.0

# 打印每类的测试数量和准确率
for i in range(num_classes):
    print(f'Class {i}: Total = {class_total[i]}, Correct = {class_correct[i]}, Accuracy = {class_accuracy[i]:.4f}')

# 打印总体分类准确率
print(f'Overall Accuracy: {overall_accuracy:.4f}')

Batch [   1 /   15]
Batch [   2 /   15]
Batch [   3 /   15]
Batch [   4 /   15]
Batch [   5 /   15]
Batch [   6 /   15]
Batch [   7 /   15]
Batch [   8 /   15]
Batch [   9 /   15]
Batch [  10 /   15]
Batch [  11 /   15]
Batch [  12 /   15]
Batch [  13 /   15]
Batch [  14 /   15]
Batch [  15 /   15]
Class 0: Total = 50, Correct = 35, Accuracy = 0.7000
Class 1: Total = 100, Correct = 100, Accuracy = 1.0000
Class 2: Total = 100, Correct = 98, Accuracy = 0.9800
Class 3: Total = 86, Correct = 72, Accuracy = 0.8372
Class 4: Total = 86, Correct = 67, Accuracy = 0.7791
Class 5: Total = 100, Correct = 89, Accuracy = 0.8900
Class 6: Total = 86, Correct = 72, Accuracy = 0.8372
Class 7: Total = 100, Correct = 96, Accuracy = 0.9600
Class 8: Total = 100, Correct = 97, Accuracy = 0.9700
Class 9: Total = 100, Correct = 85, Accuracy = 0.8500
Overall Accuracy: 0.8932
