In [None]:
import numpy as np
import random
import math
import os
import time
import torch
import scipy.spatial.distance
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
import torch.nn as nn
import torch.nn.functional as F
import sys
import numpy as np

### **Ply**

In [None]:
ply_dtypes = dict([
    (b'int8', 'i1'),
    (b'char', 'i1'),
    (b'uint8', 'u1'),
    (b'uchar', 'b1'),
    (b'uchar', 'u1'),
    (b'int16', 'i2'),
    (b'short', 'i2'),
    (b'uint16', 'u2'),
    (b'ushort', 'u2'),
    (b'int32', 'i4'),
    (b'int', 'i4'),
    (b'uint32', 'u4'),
    (b'uint', 'u4'),
    (b'float32', 'f4'),
    (b'float', 'f4'),
    (b'float64', 'f8'),
    (b'double', 'f8')
])

valid_formats = {'ascii': '', 'binary_big_endian': '>',
                 'binary_little_endian': '<'}

def parse_header(plyfile, ext):
    line = []
    properties = []
    num_points = None

    while b'end_header' not in line and line != b'':
        line = plyfile.readline()
        if b'element' in line:
            line = line.split()
            num_points = int(line[2])

        elif b'property' in line:
            line = line.split()
            properties.append((line[2].decode(), ext + ply_dtypes[line[1]]))

    return num_points, properties

def read_ply(filename):
    with open(filename, 'rb') as plyfile:
        if b'ply' not in plyfile.readline():
            raise ValueError('The file does not start whith the word ply')

        fmt = plyfile.readline().split()[1].decode()
        if fmt == "ascii":
            raise ValueError('The file is not binary')

        ext = valid_formats[fmt]
        num_points, properties = parse_header(plyfile, ext)
        data = np.fromfile(plyfile, dtype=properties, count=num_points)
    return data


def header_properties(field_list, field_names):
    lines = []
    lines.append('element vertex %d' % field_list[0].shape[0])

    i = 0
    for fields in field_list:
        for field in fields.T:
            lines.append('property %s %s' % (field.dtype.name, field_names[i]))
            i += 1

    return lines

def write_ply(filename, field_list, field_names):

    field_list = list(field_list) if (type(field_list) == list or type(field_list) == tuple) else list((field_list,))
    for i, field in enumerate(field_list):
        if field is None:
            print('WRITE_PLY ERROR: a field is None')
            return False
        elif field.ndim > 2:
            print('WRITE_PLY ERROR: a field have more than 2 dimensions')
            return False
        elif field.ndim < 2:
            field_list[i] = field.reshape(-1, 1)

    n_points = [field.shape[0] for field in field_list]
    if not np.all(np.equal(n_points, n_points[0])):
        print('wrong field dimensions')
        return False

    n_fields = np.sum([field.shape[1] for field in field_list])
    if (n_fields != len(field_names)):
        print('wrong number of field names')
        return False

    if not filename.endswith('.ply'):
        filename += '.ply'

    with open(filename, 'w') as plyfile:

        header = ['ply']

        header.append('format binary_' + sys.byteorder + '_endian 1.0')

        header.extend(header_properties(field_list, field_names))

        header.append('end_header')

        for line in header:
            plyfile.write("%s\n" % line)

    with open(filename, 'ab') as plyfile:

        i = 0
        type_list = []
        for fields in field_list:
            for field in fields.T:
                type_list += [(field_names[i], field.dtype.str)]
                i += 1
        data = np.empty(field_list[0].shape[0], dtype=type_list)
        i = 0
        for fields in field_list:
            for field in fields.T:
                data[field_names[i]] = field
                i += 1

        data.tofile(plyfile)

    return True

def describe_element(name, df):

    property_formats = {'f': 'float', 'u': 'uchar', 'i': 'int'}
    element = ['element ' + name + ' ' + str(len(df))]

    if name == 'face':
        element.append("property list uchar int points_indices")

    else:
        for i in range(len(df.columns)):
            f = property_formats[str(df.dtypes[i])[0]]
            element.append('property ' + f + ' ' + df.columns.values[i])

    return element


##Classes pour augmentation de donnees

In [None]:
class RandomRotation_z(object):
    def __call__(self, pointcloud):
        theta = random.random() * 2. * math.pi
        rot_matrix = np.array([[math.cos(theta), -math.sin(theta),      0],
                               [math.sin(theta),  math.cos(theta),      0],
                               [0,                               0,      1]])
        rot_pointcloud = rot_matrix.dot(pointcloud.T).T
        return rot_pointcloud

class RandomScale(object):
    def __init__(self):
        mini = random.random()
        maxi = 1+ random.random()
        self.scale = maxi - mini
        self.bias = mini
    def __call__(self, coords):
        s = self.scale * np.random.rand(1) + self.bias
        return coords * s

class RandomTranslation(object):
    def __call__(self, coords):
        trans = 0.05 * np.random.randn(1, 3)
        return coords + trans

class RandomNoise(object):
    def __call__(self, pointcloud):
        noise = np.random.normal(0, 0.02, (pointcloud.shape))
        noisy_pointcloud = pointcloud + noise
        return noisy_pointcloud

class ShufflePoints(object):
    def __call__(self, pointcloud):
        np.random.shuffle(pointcloud)
        return pointcloud

class RandomFlip(object):
    def __call__(self, pointcloud):
        if np.random.rand() > 0.5:
            pointcloud[:, 0] = -pointcloud[:, 0]  # Flip X-axis
        if np.random.rand() > 0.5:
            pointcloud[:, 1] = -pointcloud[:, 1]  # Flip Y-axis
        return pointcloud
'''
class RandomDropout(object):
    def __call__(self, pointcloud, dropout_rate=0.2):
        num_points = pointcloud.shape[0]
        mask = np.random.rand(num_points) > dropout_rate
        return pointcloud[mask]

class RandomShear(object):
    def __call__(self, pointcloud):
        shear_matrix = np.array([[1, np.random.uniform(-0.2, 0.2), 0],
                                 [np.random.uniform(-0.2, 0.2), 1, 0],
                                 [0, 0, 1]])
        return pointcloud @ shear_matrix.T
'''


class ToTensor(object):
    def __call__(self, pointcloud):
        return torch.from_numpy(pointcloud)


def default_transforms():
    return transforms.Compose([
        RandomRotation_z(),
        RandomNoise(),
        ShufflePoints(),
        RandomScale(),
        RandomTranslation(),
        RandomFlip(),
        ToTensor()
    ])


class PointCloudData(Dataset):
    def __init__(self, root_dir, folder="train", transform=default_transforms()):
        self.root_dir = root_dir
        folders = [dir for dir in sorted(os.listdir(root_dir)) if os.path.isdir(root_dir+"/"+dir)]
        self.classes = {folder: i for i, folder in enumerate(folders)}
        self.transforms = transform
        self.files = []
        for category in self.classes.keys():
            new_dir = root_dir+"/"+category+"/"+folder
            for file in os.listdir(new_dir):
                if file.endswith('.ply'):
                    sample = {}
                    sample['ply_path'] = new_dir+"/"+file
                    sample['category'] = category
                    self.files.append(sample)

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        ply_path = self.files[idx]['ply_path']
        category = self.files[idx]['category']
        data = read_ply(ply_path)
        pointcloud = self.transforms(np.vstack((data['x'], data['y'], data['z'])).T)
        return {'pointcloud': pointcloud, 'category': self.classes[category]}


## **PointMLP**

In [None]:
class PointMLP(nn.Module):
    def __init__(self, num_classes=40):
        super(PointMLP, self).__init__()
        self.fc1 = nn.Linear(3072, 512)
        self.bn1 = nn.BatchNorm1d(512)
        self.fc2 = nn.Linear(512, 256)
        self.dropout = nn.Dropout(0.3)
        self.bn2 = nn.BatchNorm1d(256)
        self.fc3 = nn.Linear(256, num_classes)
        self.log_softmax = nn.LogSoftmax(dim=1)

    def forward(self, x):
        x = x.reshape(-1, 3072)  # équivalent à un flatten

        x = self.fc1(x)
        x = self.bn1(x)
        x = F.relu(x)

        x = self.fc2(x)
        x = self.dropout(x)
        x = self.bn2(x)
        x = F.relu(x)

        x = self.fc3(x)
        x = self.log_softmax(x)
        return x


## **PointNet Basic**

In [None]:
class PointNetBasic(nn.Module):
    def __init__(self, num_classes=40):
        super(PointNetBasic, self).__init__()
        # Première convolution (entrée de 3, sortie de 64)
        self.conv1 = nn.Conv1d(3, 64, 1)
        self.bn1 = nn.BatchNorm1d(64)
        # Deuxième convolution
        self.conv2 = nn.Conv1d(64, 64, 1)
        self.bn2 = nn.BatchNorm1d(64)
        # Troisième convolution
        self.conv3 = nn.Conv1d(64, 64, 1)
        self.bn3 = nn.BatchNorm1d(64)
        # Quatrième convolution
        self.conv4 = nn.Conv1d(64, 128, 1)
        self.bn4 = nn.BatchNorm1d(128)
        # Cinquième convolution (entrée de 128, sortie de 1024)
        self.conv5 = nn.Conv1d(128, 1024, 1)
        self.bn5 = nn.BatchNorm1d(1024)

        self.maxpool = nn.MaxPool1d(1024) # Max pooling

        # Première couche fully connected (entrée de 1024, sortie de 512)
        self.fc1 = nn.Linear(1024, 512)
        self.bn6 = nn.BatchNorm1d(512)
        # Deuxième couche
        self.fc2 = nn.Linear(512, 256)
        self.dropout = nn.Dropout(0.3)
        self.bn7 = nn.BatchNorm1d(256)
        # Troisième couche
        self.fc3 = nn.Linear(256, num_classes)
        # Activation LogSoftmax
        self.log_softmax = nn.LogSoftmax(dim=1)

    def forward(self, x):
        # Passage à travers les convo et les batch normalizations avec ReLU
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        x = F.relu(self.bn4(self.conv4(x)))
        x = F.relu(self.bn5(self.conv5(x)))

        # Max pooling
        x = self.maxpool(x)

        # Redimensionnement de l'entrée pour correspondre à la première couche fully connected
        x = x.reshape(-1, 1024)

        # Passage à travers les couches fully connected avec batch normalization et dropout
        x = F.relu(self.bn6(self.fc1(x)))
        x = F.relu(self.bn7(self.dropout(self.fc2(x))))
        x = self.fc3(x)

        return self.log_softmax(x)


## **Tnet**

In [None]:
class Tnet(nn.Module):
    def __init__(self, k=3):
        super(Tnet, self).__init__()
        # Première convolution (entrée de 3, sortie de 64)
        self.conv1 = nn.Conv1d(3, 64, 1)
        self.bn1 = nn.BatchNorm1d(64)
        # Deuxième convolution
        self.conv2 = nn.Conv1d(64, 128, 1)
        self.bn2 = nn.BatchNorm1d(128)
        # Troisième convolution (entrée de 128, sortie de 1024)
        self.conv3 = nn.Conv1d(128, 1024, 1)
        self.bn3 = nn.BatchNorm1d(1024)

        # Max pooling
        self.maxpool = nn.MaxPool1d(1024)

        # Première couche fully connected (entrée de 1024, sortie de 512)
        self.fc1 = nn.Linear(1024, 512)
        self.bn4 = nn.BatchNorm1d(512)
        # Deuxième couche
        self.fc2 = nn.Linear(512, 256)
        self.bn5 = nn.BatchNorm1d(256)
        # Troisième couche fully connected (entrée de 256, sortie de k*k)
        self.fc3 = nn.Linear(256, k * k)

        self.k = k

    def forward(self, x):
        # Passage à travers les convo et les batch normalizations avec activation ReLU
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))

        # Max pooling
        x = self.maxpool(x)
        # Redimensionnement de l'entrée pour correspondre à la première couche fully connected
        x = x.reshape(-1, 1024)

        # Passage à travers les couches fully connected avec batch normalization
        x = F.relu(self.bn4(self.fc1(x)))
        x = F.relu(self.bn5(self.fc2(x)))
        x = self.fc3(x)

        # Ajout de l'identité pour former une matrice 3x3
        id3x3 = torch.eye(self.k, requires_grad=True).repeat(x.shape[0], 1, 1)
        if x.is_cuda:
            id3x3 = id3x3.cuda()
        x = x.view(-1, self.k, self.k)
        x = x + id3x3
        x = x.view(-1, self.k, self.k)

        return x


## **PointNet Full**

In [None]:
class PointNetFull(nn.Module):
    def __init__(self, num_classes=40):
        super(PointNetFull, self).__init__()
        self.tnet = Tnet(k=3)

        # Première convolution (entrée de 3, sortie de 64)
        self.conv1 = nn.Conv1d(3, 64, 1)
        self.bn1 = nn.BatchNorm1d(64)
        # Deuxième convolution
        self.conv2 = nn.Conv1d(64, 64, 1)
        self.bn2 = nn.BatchNorm1d(64)

        # Troisième convolution
        self.conv3 = nn.Conv1d(64, 64, 1)
        self.bn3 = nn.BatchNorm1d(64)
        # Quatrième convolution
        self.conv4 = nn.Conv1d(64, 128, 1)
        self.bn4 = nn.BatchNorm1d(128)
        # Cinquième convolution (entrée de 128, sortie de 1024)
        self.conv5 = nn.Conv1d(128, 1024, 1)
        self.bn5 = nn.BatchNorm1d(1024)

        # Max pooling
        self.maxpool = nn.MaxPool1d(1024)

        # Première couche fully connected (entrée de 1024, sortie de 512)
        self.fc1 = nn.Linear(1024, 512)
        self.bn6 = nn.BatchNorm1d(512)
        # Deuxième couche
        self.fc2 = nn.Linear(512, 256)
        self.dropout = nn.Dropout(0.3)
        self.bn7 = nn.BatchNorm1d(256)
        # Troisième couche fully connected (entrée de 256, sortie du nombre de classes)
        self.fc3 = nn.Linear(256, num_classes)

        # Activation LogSoftmax
        self.log_softmax = nn.LogSoftmax(dim=1)

    def forward(self, x):
        # Application du Tnet pour obtenir la transformation
        tnet_output = self.tnet(x)

        # Multiplication matricielle de l'entrée avec la transformation obtenue
        x = torch.matmul(tnet_output, x)

        # Passage à travers les convo et les batch normalizations avec ReLU
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        x = F.relu(self.bn4(self.conv4(x)))
        x = F.relu(self.bn5(self.conv5(x)))

        # Max pooling
        x = self.maxpool(x)

        # Redimensionnement de l'entrée pour correspondre à la première couche fully connected
        x = x.reshape(-1, 1024)

        # Passage à travers les couches fully connected avec batch normalization et dropout
        x = F.relu(self.bn6(self.fc1(x)))
        x = F.relu(self.bn7(self.dropout(self.fc2(x))))
        x = self.fc3(x)

        return self.log_softmax(x), tnet_output


Fonctions

In [None]:
def basic_loss(outputs, labels):
    criterion = torch.nn.NLLLoss()
    bs=outputs.size(0)
    return criterion(outputs, labels)

def pointnet_full_loss(outputs, labels, m3x3, alpha = 0.001):
    criterion = torch.nn.NLLLoss()
    bs=outputs.size(0)
    id3x3 = torch.eye(3, requires_grad=True).repeat(bs,1,1)
    if outputs.is_cuda:
        id3x3=id3x3.cuda()
    diff3x3 = id3x3-torch.bmm(m3x3,m3x3.transpose(1,2))
    return criterion(outputs, labels) + alpha * (torch.norm(diff3x3)) / float(bs)


def train(model, device, train_loader, test_loader=None, epochs=250):
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.5)
    loss=0
    for epoch in range(epochs):
        model.train()
        for i, data in enumerate(train_loader, 0):
            inputs, labels = data['pointcloud'].to(device).float(), data['category'].to(device)
            optimizer.zero_grad()
            # outputs = model(inputs.transpose(1,2))
            outputs, m3x3 = model(inputs.transpose(1,2))
            # loss = basic_loss(outputs, labels)
            loss = pointnet_full_loss(outputs, labels, m3x3)
            loss.backward()
            optimizer.step()

        model.eval()
        correct = total = 0
        if test_loader:
            with torch.no_grad():
                for data in test_loader:
                    inputs, labels = data['pointcloud'].to(device).float(), data['category'].to(device)
                    # outputs = model(inputs.transpose(1,2))
                    outputs, __ = model(inputs.transpose(1,2))
                    _, predicted = torch.max(outputs.data, 1)
                    total += labels.size(0)
                    correct += (predicted == labels).sum().item()
            val_acc = 100. * correct / total
            print('Epoch: %d, Loss: %.3f, Test accuracy: %.1f %%' %(epoch+1, loss, val_acc))

        scheduler.step()


### **DATA**

In [None]:
import os
import zipfile

zip_path = "/content/ModelNet40_PLY.zip"

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
  zip_ref.extractall("/content/")


In [None]:
t0 = time.time()
train_ds = PointCloudData("/content/ModelNet40_PLY")
test_ds = PointCloudData("/content//ModelNet40_PLY", folder='test')
inv_classes = {i: cat for cat, i in train_ds.classes.items()}
print("Classes: ", inv_classes)
print('Train dataset size: ', len(train_ds))
print('Test dataset size: ', len(test_ds))
print('Number of classes: ', len(train_ds.classes))
print('Sample pointcloud shape: ', train_ds[0]['pointcloud'].size())
train_loader = DataLoader(dataset=train_ds, batch_size=32, shuffle=True)
test_loader = DataLoader(dataset=test_ds, batch_size=32)

Classes:  {0: 'airplane', 1: 'bathtub', 2: 'bed', 3: 'bench', 4: 'bookshelf', 5: 'bottle', 6: 'bowl', 7: 'car', 8: 'chair', 9: 'cone', 10: 'cup', 11: 'curtain', 12: 'desk', 13: 'door', 14: 'dresser', 15: 'flower_pot', 16: 'glass_box', 17: 'guitar', 18: 'keyboard', 19: 'lamp', 20: 'laptop', 21: 'mantel', 22: 'monitor', 23: 'night_stand', 24: 'person', 25: 'piano', 26: 'plant', 27: 'radio', 28: 'range_hood', 29: 'sink', 30: 'sofa', 31: 'stairs', 32: 'stool', 33: 'table', 34: 'tent', 35: 'toilet', 36: 'tv_stand', 37: 'vase', 38: 'wardrobe', 39: 'xbox'}
Train dataset size:  9843
Test dataset size:  2468
Number of classes:  40
Sample pointcloud shape:  torch.Size([1024, 3])


### **Test PointNet Full avec 250 epochs**

In [None]:
model_2 = PointNetFull()
model_parameters_2 = filter(lambda p: p.requires_grad, model_2.parameters())
print("Number of parameters in the Neural Networks: ", sum([np.prod(p.size()) for p in model_parameters_2]))
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("Device: ", device)
model_2.to(device)
train(model_2, device, train_loader, test_loader, epochs=250)
print("Total time for training : ", time.time() - t0)

Number of parameters in the Neural Networks:  1622705
Device:  cuda:0
Epoch: 1, Loss: 1.687, Test accuracy: 52.1 %
Epoch: 2, Loss: 1.471, Test accuracy: 57.4 %
Epoch: 3, Loss: 1.834, Test accuracy: 63.9 %
Epoch: 4, Loss: 1.030, Test accuracy: 60.9 %
Epoch: 5, Loss: 0.662, Test accuracy: 70.4 %
Epoch: 6, Loss: 0.885, Test accuracy: 73.8 %
Epoch: 7, Loss: 0.776, Test accuracy: 70.8 %
Epoch: 8, Loss: 0.978, Test accuracy: 71.1 %
Epoch: 9, Loss: 0.799, Test accuracy: 75.2 %
Epoch: 10, Loss: 0.660, Test accuracy: 75.4 %
Epoch: 11, Loss: 0.805, Test accuracy: 75.9 %
Epoch: 12, Loss: 0.777, Test accuracy: 76.1 %
Epoch: 13, Loss: 0.673, Test accuracy: 78.5 %
Epoch: 14, Loss: 0.725, Test accuracy: 77.4 %
Epoch: 15, Loss: 0.848, Test accuracy: 77.4 %
Epoch: 16, Loss: 0.689, Test accuracy: 76.8 %
Epoch: 17, Loss: 0.401, Test accuracy: 71.9 %
Epoch: 18, Loss: 0.697, Test accuracy: 77.6 %
Epoch: 19, Loss: 0.960, Test accuracy: 80.0 %
Epoch: 20, Loss: 0.598, Test accuracy: 79.6 %
Epoch: 21, Loss: 0.

## **Enregistrer Model**

In [None]:
# Enregistrer le model
save_path_f = '/content/model_PointNet_full_augm_2.pth'
torch.save(model_2.state_dict(), save_path_f)
print(f"Model saved to {save_path_f}")

#telecharger le model
from google.colab import files
files.download(save_path_f)

Model saved to /content/model_PointNet_full_augm_2.pth


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>