In [None]:
import os
import torch
import random
import numpy as np
from torch.utils.data import Dataset, DataLoader
import torch.nn.parallel
import torch.utils.data
import torch.nn.functional as F
from torchvision import transforms
import torch.nn as nn
from path import Path
from tqdm import tqdm

### Попередня обробка даних

In [None]:
class Select_Points(object):
    def __init__(self, out_size_points):
        # вибір кількості точок фігури 
        self.out_size_points = out_size_points

    def __call__(self, mesh):
        # вершини та грані
        vertices, faces = mesh
        vertices = np.array(vertices)
        areas = np.zeros(len(faces))
        
        # площа кожної грані(трикутника)
        for i, face in enumerate(faces):
            v0, v1, v2 = vertices[face]
            a = np.linalg.norm(v1 - v0)
            b = np.linalg.norm(v2 - v1)
            c = np.linalg.norm(v0 - v2)
            s = 0.5 * (a + b + c)
            areas[i] = max(s * (s - a) * (s - b) * (s - c), 0) ** 0.5

        sampled_faces = random.choices(faces, weights=areas, k=self.out_size_points)
        sampled_points = np.zeros((self.out_size_points, 3))

        # Вибір точок випадковим чином на кожній грані
        for i, face in enumerate(sampled_faces):
            v0, v1, v2 = vertices[face]
            s, t = sorted([random.random(), random.random()])
            point = (s * v0 + (t - s) * v1 + (1 - t) * v2)
            sampled_points[i] = point

        return sampled_points
    
class Normalize(object):
    # нормалізація вхідних точок на основі хмар точок
    def __call__(self, point_cloud):
        normalized_point_cloud = point_cloud - np.mean(point_cloud, axis=0) 
        normalized_point_cloud /= np.max(np.linalg.norm(normalized_point_cloud, axis=1))
        return  normalized_point_cloud
    
class ToTensor(object):
    # конвертація масиву хмар точок в PyTorch tensor,
    # для побудови архітектури нейронної мережі
    def __call__(self, point_cloud):
        array = torch.from_numpy(point_cloud)
        return array.type(torch.DoubleTensor)

### Набір даних

In [None]:
class ModelRead(Dataset):
    # завантажує дані про об'єкти (клас і шлях) з датасетів для тренування та тестування
    def __init__(self, root_dir, subfolder=None, transform=None):
        self.transform = transform
        folders = [folder for folder in sorted(os.listdir(root_dir)) if os.path.isdir(os.path.join(root_dir, folder))]
        self.classes = {folder: i for i, folder in enumerate(folders)}
        self.files = []
        
        for class_name in self.classes.keys():
            new_dir = os.path.join(root_dir, class_name, subfolder)
            for file_name in os.listdir(new_dir):
                if file_name.endswith('.off'):
                    sample = {}
                    sample['File_Path'] = os.path.join(new_dir, file_name)
                    sample['Class'] = class_name
                    self.files.append(sample)

    # повертає кількість елементів датасету
    def __len__(self):
        return len(self.files)

    # завантажує конкретний файл
    # і застосовує до нього певне 
    # перетворення даних (якщо є)
    def __getitem__(self, idx):            
        file_path = self.files[idx]['File_Path']
        class_name = self.files[idx]['Class']
        datapoint = None
        
        with open(file_path, 'r') as file:
            off_header = file.readline().strip()
            if 'OFF' == off_header:
                num_vertices, num_faces, _ = tuple([int(s) for s in file.readline().strip().split(' ')])
            else:
                num_vertices, num_faces, _ = tuple([int(s) for s in off_header[3:].split(' ')])
            
            vertices = [[float(s) for s in file.readline().strip().split(' ')] for _ in range(num_vertices)]
            faces = [[int(s) for s in file.readline().strip().split(' ')][1:] for _ in range(num_faces)]
            
            if self.transform:
                datapoint = self.transform((vertices, faces))
        
        return {
            'Data': datapoint, 
            'Class': self.classes[class_name]
        }

### Допоміжні функції

In [None]:
# Обчислює квадрат відстані між двома наборами точок
def square_distance(src, dst):
    batch_size, num_src, _ = src.shape
    _, num_dst, _ = dst.shape
    dist = -2 * torch.matmul(src, dst.permute(0, 2, 1))
    dist += torch.sum(src ** 2, -1).view(batch_size, num_src, 1)
    dist += torch.sum(dst ** 2, -1).view(batch_size, 1, num_dst)
    return dist

# Виконує індексацію вздовж пакету та розмірів точок, дозволяючи ефективно вибирати точки з пакету хмар точок
def index_points(points, indices):
    device = points.device
    batch_size = points.shape[0]
    view_shape = list(indices.shape)
    view_shape[1:] = [1] * (len(view_shape) - 1)
    repeat_shape = list(indices.shape)
    repeat_shape[0] = 1
    batch_indices = torch.arange(batch_size, dtype=torch.long).to(device).view(view_shape).repeat(repeat_shape)
    new_points = points[batch_indices, indices, :]
    return new_points

# Виконує вибірку найдальших точок, щоб вибрати фіксовану кількість репрезентативних точок із певної хмари точок
def farthest_point_sample(xyz, num_points):
    device = xyz.device
    batch_size, num_points_src, _ = xyz.shape
    centroids = torch.zeros(batch_size, num_points, dtype=torch.long).to(device)
    distance = torch.ones(batch_size, num_points_src).to(device) * 1e10
    farthest = torch.randint(0, num_points_src, (batch_size,), dtype=torch.long).to(device)
    batch_indices = torch.arange(batch_size, dtype=torch.long).to(device)
    for i in range(num_points):
        centroids[:, i] = farthest
        centroid = xyz[batch_indices, farthest, :].view(batch_size, 1, 3)
        dist = torch.sum((xyz - centroid) ** 2, -1)
        mask = dist < distance
        distance[mask] = dist[mask]
        farthest = torch.max(distance, -1)[1]
    return centroids

# Запитує індекси точок, які лежать у заданому радіусі кожної точки
def query_ball_point(radius, num_samples, xyz, new_xyz):
    device = xyz.device
    batch_size, num_points, _ = xyz.shape
    _, num_new_points, _ = new_xyz.shape
    group_indices = torch.arange(num_points, dtype=torch.long).to(device).view(1, 1, num_points).repeat([batch_size, num_new_points, 1])
    sqrdists = square_distance(new_xyz, xyz)
    group_indices[sqrdists > radius ** 2] = num_points
    group_indices = group_indices.sort(dim=-1)[0][:, :, :num_samples]
    group_first = group_indices[:, :, 0].view(batch_size, num_new_points, 1).repeat([1, 1, num_samples])
    mask = group_indices == num_points
    group_indices[mask] = group_first[mask]
    return group_indices

# Поєднує наведені вище методи для вибірки фіксованої кількості точок із певної хмари точок
def sample_and_group(num_points, radius, num_samples, xyz, points, returnfps=False):
    batch_size, num_input_points, _ = xyz.shape
    num_output_points = num_points
    fps_indices = farthest_point_sample(xyz, num_points)
    new_xyz = index_points(xyz, fps_indices)
    indices = query_ball_point(radius, num_samples, xyz, new_xyz)
    grouped_xyz = index_points(xyz, indices)
    grouped_xyz_norm = grouped_xyz - new_xyz.view(batch_size, num_output_points, 1, -1)
    if points is not None:
        grouped_points = index_points(points, indices)
        new_points = torch.cat([grouped_xyz_norm, grouped_points], dim=-1)
    else:
        new_points = grouped_xyz_norm
    if returnfps:
        return new_xyz, new_points, grouped_xyz, fps_indices
    else:
        return new_xyz, new_points

# Вибирає всі точки з даної хмари точок і групує їх разом
def sample_and_group_all(xyz, points):
    device = xyz.device
    batch_size, num_points, num_dims = xyz.shape
    new_xyz = torch.zeros(batch_size, 1, num_dims).to(device)
    grouped_xyz = xyz.view(batch_size, 1, num_points, num_dims)
    if points is not None:
        new_points = torch.cat([grouped_xyz, points.view(batch_size, 1, num_points, -1)], dim=-1)
    else:
        new_points = grouped_xyz
    return new_xyz, new_points

### Архітектури допоміжних нейронних мереж

In [None]:
class PNetSetAbstraction(nn.Module):
    def __init__(self, npoint, radius, nsample, in_channel, mlp):
        super(PNetSetAbstraction, self).__init__()
        self.npoint = npoint
        self.radius = radius
        self.nsample = nsample
        self.mlp_convs = nn.ModuleList()
        self.mlp_bns = nn.ModuleList()
        last_channel = in_channel
        for out_channel in mlp:
            self.mlp_convs.append(nn.Conv2d(last_channel, out_channel, 1))
            self.mlp_bns.append(nn.BatchNorm2d(out_channel))
            last_channel = out_channel

    def forward(self, xyz, points):
        xyz = xyz.permute(0, 2, 1)
        if points is not None:
            points = points.permute(0, 2, 1)

        new_xyz = index_points(xyz, farthest_point_sample(xyz, self.npoint))
        group_idx = query_ball_point(self.radius, self.nsample, xyz, new_xyz)
        grouped_xyz = index_points(xyz, group_idx)
        grouped_xyz -= new_xyz.view(-1, self.npoint, 1, 3)
        if points is not None:
            grouped_points = index_points(points, group_idx)
            grouped_points = torch.cat([grouped_points, grouped_xyz], dim=-1)
        else:
            grouped_points = grouped_xyz

        grouped_points = grouped_points.permute(0, 3, 2, 1)
        for i, conv in enumerate(self.mlp_convs):
            bn = self.mlp_bns[i]
            grouped_points = F.relu(bn(conv(grouped_points)))

        new_points = torch.max(grouped_points, 2)[0]
        new_xyz = new_xyz.permute(0, 2, 1)
        return new_xyz, new_points


class PNetSetAbstractionMsg(nn.Module):
    def __init__(self, npoint, radius_list, nsample_list, in_channel, mlp_list):
        super(PNetSetAbstractionMsg, self).__init__()
        self.npoint = npoint
        self.radius_list = radius_list
        self.nsample_list = nsample_list
        self.conv_blocks = nn.ModuleList()
        self.bn_blocks = nn.ModuleList()
        last_channel = in_channel + 3
        for mlp in mlp_list:
            convs = nn.ModuleList()
            bns = nn.ModuleList()
            for out_channel in mlp:
                convs.append(nn.Conv2d(last_channel, out_channel, 1))
                bns.append(nn.BatchNorm2d(out_channel))
                last_channel = out_channel
            self.conv_blocks.append(convs)
            self.bn_blocks.append(bns)

    def forward(self, xyz, points):
        xyz = xyz.permute(0, 2, 1)
        if points is not None:
            points = points.permute(0, 2, 1)

        B, N, C = xyz.shape
        S = self.npoint
        new_xyz = index_points(xyz, farthest_point_sample(xyz, S))
        new_points_list = []
        for i, radius in enumerate(self.radius_list):
            K = self.nsample_list[i]
            group_idx = query_ball_point(radius, K, xyz, new_xyz)
            grouped_xyz = index_points(xyz, group_idx)
            grouped_xyz -= new_xyz.view(B, S, 1, C)
            if points is not None:
                grouped_points = index_points(points, group_idx)
                grouped_points = torch.cat([grouped_points, grouped_xyz], dim=-1)
            else:
                grouped_points = grouped_xyz

            grouped_points = grouped_points.permute(0, 3, 2, 1)
            for j, conv in enumerate(self.conv_blocks[i]):
                bn = self.bn_blocks[i][j]
                grouped_points = F.relu(bn(conv(grouped_points)))

            new_points = torch.max(grouped_points, 2)[0]
            new_points_list.append(new_points)

        new_xyz = new_xyz.permute(0, 2, 1)
        new_points_concat = torch.cat(new_points_list, dim=1)
        return new_xyz, new_points_concat

### Архітектура PointNet++

In [None]:
class PointNetPlusPlus(nn.Module):
    def __init__(self, number_of_classes):
        super(PointNetPlusPlus, self).__init__()
        self.sa1 = PNetSetAbstractionMsg(512, [0.1, 0.2, 0.4], [16, 32, 128], 0,
                                         [[32, 32, 64], [64, 64, 128], [64, 96, 128]])
        self.sa2 = PNetSetAbstractionMsg(128, [0.2, 0.4, 0.8], [32, 64, 128], 320,
                                         [[64, 64, 128], [128, 128, 256], [128, 128, 256]])
        self.sa3 = PNetSetAbstraction(1024, None, None, 640 + 3, [256, 512, 1024])
        self.fc1 = nn.Linear(1024, 512)
        self.bn1 = nn.BatchNorm1d(512)
        self.drop1 = nn.Dropout(0.4)
        self.fc2 = nn.Linear(512, 256)
        self.bn2 = nn.BatchNorm1d(256)
        self.drop2 = nn.Dropout(0.4)
        self.fc3 = nn.Linear(256, number_of_classes)

    def forward(self, xyz):
        B, _, _ = xyz.shape
        l1_xyz, l1_points = self.sa1(xyz, None)
        l2_xyz, l2_points = self.sa2(l1_xyz, l1_points)
        l3_xyz, l3_points = self.sa3(l2_xyz, l2_points)
        x = l3_points.view(B, 1024)
        x = self.drop1(F.relu(self.bn1(self.fc1(x))))
        x = self.drop2(F.relu(self.bn2(self.fc2(x))))
        x = self.fc3(x)
        x = F.log_softmax(x, dim=-1)
        return x, l3_points

### Функції тестування та збереження результатів

In [None]:
def evaluate(model, loader):
    mean_accuracy = []
    for j, data in enumerate(loader, 0):
        points, target = data
        target = target[:, 0]
        points = points.transpose(2, 1)
        points, target = points.cuda(), target.cuda()
        model.eval()
        pred, _ = model(points)
        pred_choice = pred.data.max(1)[1]
        correct = pred_choice.eq(target.long().data).cpu().sum()
        mean_accuracy.append(correct.item() / float(points.size()[0]))
    return np.mean(mean_accuracy)

def save_checkpoint(epoch, train_accuracy, test_accuracy, model, optimizer, path, model_name='checkpoint'):
    save_path = path + '/%s-%f-%04d.pt' % (model_name, test_accuracy, epoch)
    state = {
        'epoch': epoch,
        'train_accuracy': train_accuracy,
        'test_accuracy': test_accuracy,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
    }
    torch.save(state, save_path)

### Визначення даних

In [None]:
# попередня обробка даних
transformation = transforms.Compose([
    Select_Points(1024),
    Normalize(),
    ToTensor()
])
DATASET_PATH = Path("../Diploma/ModelNet10")

TrainData = ModelRead(DATASET_PATH, "train", transformation)
TestData = ModelRead(DATASET_PATH, "test", transformation)

# Завантаження тренувальних та тестувальниз даних
# перемішаних та поділених на частини 
TrainLoader = DataLoader(TrainData, shuffle=True, batch_size=64)
TestLoader = DataLoader(TestData, shuffle=True, batch_size=64)

print('Train DataSet len:',len(TrainData))
print('Train DataSet Classes:',len(TrainData.classes))
print('Test DataSet len:',len(TestData))
print('Test DataSet Classes:',len(TestData.classes))

### Визначення гіперпараметрів

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
classifier = PointNetPlusPlus(len(TrainData.classes))
classifier = classifier.to(device=device)
criterion = nn.CrossEntropyLoss()
start_epoch = 0
optimizer = torch.optim.Adam(
        classifier.parameters(),
        lr= 1e-4,
        betas=(0.9, 0.999),
        eps=1e-08,
        weight_decay=1e-4
    )
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.5)
global_epoch = 0
global_step = 0
best_tst_accuracy = 0.0
blue = lambda x: '\033[94m' + x + '\033[0m'

### Тренування та тестування

In [None]:
for epoch in range(start_epoch, 10):
    print('Epoch %d (%d/%s):' % (global_epoch + 1, epoch + 1, 10))
    scheduler.step()
    loss_ep = 0
    for batch_id, data in tqdm(enumerate(TrainLoader, 0), total=len(TrainLoader), smoothing=0.9):
        points = data['Data'].to(device=device)
        target = data['Class'].to(device=device)
        points = points.transpose(2, 1)
        points = points.type(torch.FloatTensor)
        optimizer.zero_grad()
        scores = classifier(points)
        loss = criterion(scores, target)
        loss.backward()
        optimizer.step()
        global_step += 1
        loss_ep += loss.item()
    train_acc = evaluate(classifier.eval(), TrainLoader)
    acc = evaluate(classifier, TestLoader)


    print('\r Loss: %f' % loss.data)
    print('Train Accuracy: %f' % train_acc)
    print('\r Test %s: %f' % (blue('Accuracy'),acc))
    
    if (acc >= best_tst_accuracy) and epoch > 5:
        best_tst_accuracy = acc
        logger.info('Save model...')
        save_checkpoint(
            global_epoch + 1,
            train_acc,
            acc,
            classifier,
            optimizer,
            str(checkpoints_dir),
            'pointnet2')
        print('Saving model....')
    global_epoch += 1
print('Best Accuracy: %f'%best_tst_accuracy)