In [None]:
# conda install pytorch torchvision torchaudio pytorch-cuda=11.8 -c pytorch -c nvidia

In [19]:
import torch
import numpy as np
from tqdm import tqdm
import torch.nn as nn
from torch.utils.data import DataLoader

The functions defined in the next cell comes from the [**Github repository of Yang You**](https://github.com/qq456cvb/Point-Transformers)

In [20]:
# https://github.com/qq456cvb/Point-Transformers

def square_distance(src, dst):
    """
    Calculate Euclid distance between each two points.
    src^T * dst = xn * xm + yn * ym + zn * zm；
    sum(src^2, dim=-1) = xn*xn + yn*yn + zn*zn;
    sum(dst^2, dim=-1) = xm*xm + ym*ym + zm*zm;
    dist = (xn - xm)^2 + (yn - ym)^2 + (zn - zm)^2
         = sum(src**2,dim=-1)+sum(dst**2,dim=-1)-2*src^T*dst
    Input:
        src: source points, [B, N, C]
        dst: target points, [B, M, C]
    Output:
        dist: per-point square distance, [B, N, M]
    """
    return torch.sum((src[:, :, None] - dst[:, None]) ** 2, dim=-1)

def index_points(points, idx):
    """
    Input:
        points: input points data, [B, N, C]
        idx: sample index data, [B, S, [K]]
    Return:
        new_points:, indexed points data, [B, S, [K], C]
    """
    raw_size = idx.size()
    idx = idx.reshape(raw_size[0], -1)
    res = torch.gather(points, 1, idx[..., None].expand(-1, -1, points.size(-1)))
    return res.reshape(*raw_size, -1)


def farthest_point_sample(xyz, npoint):
    """
    Input:
        xyz: pointcloud data, [B, N, 3]
        npoint: number of samples
    Return:
        centroids: sampled pointcloud index, [B, npoint]
    """
    device = xyz.device
    B, N, C = xyz.shape
    centroids = torch.zeros(B, npoint, dtype=torch.long).to(device)
    distance = torch.ones(B, N).to(device) * 1e10
    farthest = torch.randint(0, N, (B,), dtype=torch.long).to(device)
    batch_indices = torch.arange(B, dtype=torch.long).to(device)
    for i in range(npoint):
        centroids[:, i] = farthest
        centroid = xyz[batch_indices, farthest, :].view(B, 1, 3)
        dist = torch.sum((xyz - centroid) ** 2, -1)
        distance = torch.min(distance, dist)
        farthest = torch.max(distance, -1)[1]
    return centroids

def sample_and_group(npoint, nsample, xyz, points):
    B, N, C = xyz.shape
    S = npoint 
    
    fps_idx = farthest_point_sample(xyz, npoint) # [B, npoint]

    new_xyz = index_points(xyz, fps_idx) 
    new_points = index_points(points, fps_idx)

    dists = square_distance(new_xyz, xyz)  # B x npoint x N
    idx = dists.argsort()[:, :, :nsample]  # B x npoint x K

    grouped_points = index_points(points, idx)
    grouped_points_norm = grouped_points - new_points.view(B, S, 1, -1)
    new_points = torch.cat([grouped_points_norm, new_points.view(B, S, 1, -1).repeat(1, 1, nsample, 1)], dim=-1)
    return new_xyz, new_points


class Local_op(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size=1, bias=False)
        self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm1d(out_channels)
        self.bn2 = nn.BatchNorm1d(out_channels)
        self.relu = nn.ReLU()

    def forward(self, x):
        b, n, s, d = x.size()  # torch.Size([32, 512, 32, 6]) 
        x = x.permute(0, 1, 3, 2)
        x = x.reshape(-1, d, s)
        batch_size, _, N = x.size()
        x = self.relu(self.bn1(self.conv1(x))) # B, D, N
        x = self.relu(self.bn2(self.conv2(x))) # B, D, N
        x = torch.max(x, 2)[0]
        x = x.view(batch_size, -1)
        x = x.reshape(b, n, -1).permute(0, 2, 1)
        return x


class SA_Layer(nn.Module):
    def __init__(self, channels):
        super().__init__()
        self.q_conv = nn.Conv1d(channels, channels // 4, 1, bias=False)
        self.k_conv = nn.Conv1d(channels, channels // 4, 1, bias=False)
        self.q_conv.weight = self.k_conv.weight 
        self.v_conv = nn.Conv1d(channels, channels, 1)
        self.trans_conv = nn.Conv1d(channels, channels, 1)
        self.after_norm = nn.BatchNorm1d(channels)
        self.act = nn.ReLU()
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, x):
        x_q = self.q_conv(x).permute(0, 2, 1) # b, n, c 
        x_k = self.k_conv(x)# b, c, n        
        x_v = self.v_conv(x)
        energy = x_q @ x_k # b, n, n 
        attention = self.softmax(energy)
        attention = attention / (1e-9 + attention.sum(dim=1, keepdims=True))
        x_r = x_v @ attention # b, c, n 
        x_r = self.act(self.after_norm(self.trans_conv(x - x_r)))
        x = x + x_r
        return x
    

class StackedAttention(nn.Module):
    def __init__(self, channels=256):
        super().__init__()
        self.conv1 = nn.Conv1d(channels, channels, kernel_size=1, bias=False)
        self.conv2 = nn.Conv1d(channels, channels, kernel_size=1, bias=False)

        self.bn1 = nn.BatchNorm1d(channels)
        self.bn2 = nn.BatchNorm1d(channels)

        self.sa1 = SA_Layer(channels)
        self.sa2 = SA_Layer(channels)
        self.sa3 = SA_Layer(channels)
        self.sa4 = SA_Layer(channels)

        self.relu = nn.ReLU()
        
    def forward(self, x):
        # 
        # b, 3, npoint, nsample  
        # conv2d 3 -> 128 channels 1, 1
        # b * npoint, c, nsample 
        # permute reshape
        batch_size, _, N = x.size()

        x = self.relu(self.bn1(self.conv1(x))) # B, D, N
        x = self.relu(self.bn2(self.conv2(x)))

        x1 = self.sa1(x)
        x2 = self.sa2(x1)
        x3 = self.sa3(x2)
        x4 = self.sa4(x3)
        
        x = torch.cat((x1, x2, x3, x4), dim=1)

        return x


class PointTransformer(nn.Module):
    def __init__(self):
        super().__init__()
        output_channels = 2 # it's a binary classification
        d_points = 7 # we have 7 features for each point
        self.conv1 = nn.Conv1d(d_points, 64, kernel_size=1, bias=False)
        self.conv2 = nn.Conv1d(64, 64, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm1d(64)
        self.bn2 = nn.BatchNorm1d(64)
        self.gather_local_0 = Local_op(in_channels=128, out_channels=128)
        self.gather_local_1 = Local_op(in_channels=256, out_channels=256)
        self.pt_last = StackedAttention()

        self.relu = nn.ReLU()
        self.conv_fuse = nn.Sequential(nn.Conv1d(1280, 1024, kernel_size=1, bias=False),
                                   nn.BatchNorm1d(1024),
                                   nn.LeakyReLU(negative_slope=0.2))

        self.linear1 = nn.Linear(1024, 512, bias=False)
        self.bn6 = nn.BatchNorm1d(512)
        self.dp1 = nn.Dropout(p=0.5)
        self.linear2 = nn.Linear(512, 256)
        self.bn7 = nn.BatchNorm1d(256)
        self.dp2 = nn.Dropout(p=0.5)
        self.linear3 = nn.Linear(256, output_channels)

    def forward(self, x):
        xyz = x[..., :3]
        x = x.permute(0, 2, 1)
        batch_size, _, _ = x.size()
        x= x.double()
        x = self.relu(self.bn1(self.conv1(x))) # B, D, N
        x = self.relu(self.bn2(self.conv2(x))) # B, D, N
        x = x.permute(0, 2, 1)
        new_xyz, new_feature = sample_and_group(npoint=512, nsample=32, xyz=xyz, points=x)         
        feature_0 = self.gather_local_0(new_feature)
        feature = feature_0.permute(0, 2, 1)
        new_xyz, new_feature = sample_and_group(npoint=256, nsample=32, xyz=new_xyz, points=feature) 
        feature_1 = self.gather_local_1(new_feature)
        
        x = self.pt_last(feature_1)
        x = torch.cat([x, feature_1], dim=1)
        x = self.conv_fuse(x)
        x = torch.max(x, 2)[0]
        x = x.view(batch_size, -1)

        x = self.relu(self.bn6(self.linear1(x)))
        x = self.dp1(x)
        x = self.relu(self.bn7(self.linear2(x)))
        x = self.dp2(x)
        x = self.linear3(x)

        return x

In [21]:
def reshape_7(df):
    """ This function, modifies the shape of the tensor to fit the model 
    
    Input:
       Dataset of pointclouds, shape of each pointcloud : [3,1024,3]
    Return:
       Dataset of pointclouds, shape of each pointcloud : [1024,7]
    """
    for sublist in df:
        concatenated_array = np.concatenate(sublist[0], axis=1)
        reshaped_array = concatenated_array[:, :-2]  # delete the last 2 element
        sublist[0] = reshaped_array
    return df

    
def use_GPU():
    """" This function activates the gpu 
    """"
    if torch.cuda.is_available():
        device = torch.device("cuda")
        print(torch.cuda.get_device_name(0), "is available and being used")
    else:
        device = torch.device("cpu")
        print("GPU is not available, using CPU instead")    

In [6]:
# data loading
train = torch.load("C:\\Users\\Alessandro\\Desktop\\Tesi\\classif_datasets\\train_dataset_REGbalanced_norm_area.pt")
val = torch.load("C:\\Users\\Alessandro\\Desktop\\Tesi\\classif_datasets\\val_dataset_REGbalanced_norm_area.pt")
test = torch.load("C:\\Users\\Alessandro\\Desktop\\Tesi\\classif_datasets\\test_dataset_REGbalanced_norm_area.pt")

In [7]:
# reshape the datasets
train = reshape_7(train)
val = reshape_7(val)
test = reshape_7(test)

In [8]:
train_loader = DataLoader(train, batch_size=32)
val_loader = DataLoader(val, batch_size=32)
test_loader = DataLoader(test, batch_size=32)

In [17]:
use_GPU()

NVIDIA GeForce RTX 4080 is available and being used


In [27]:
model = PointTransformer().to(device)
model.double()

# Sets the path where the model parameters will be stored.
model_path_ = r'C:\\Users\\Alessandro\\Desktop\\Tesi\\model_weights_val.pth'

In [8]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
num_epochs = 10
best_val_accuracy = 0.0 
for epoch in range(num_epochs):
    model.train() 

    total_loss = 0.0
    correct_predictions = 0
    total_samples = 0

    
    progress_bar = tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}', leave=False)

    ###########
    ## Train ##
    ###########


    for batch_data in progress_bar:
        optimizer.zero_grad() 
        inputs, labels = batch_data

        inputs = inputs.double().to(device)
        labels_tensor = torch.tensor([item for sublist in labels for item in sublist]).to(device)

        outputs = model(inputs)
        loss = criterion(outputs, labels_tensor)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

        
        _, predicted = torch.max(outputs.data, 1)
        total_samples += labels_tensor.size(0)
        correct_predictions += (predicted == labels_tensor).sum().item()

        
        progress_bar.set_postfix({'Loss': loss.item(), 'Accuracy': correct_predictions / total_samples})

    
    accuracy = correct_predictions / total_samples
    train_loss = total_loss/len(train_loader)

    
    ###############
    ## Inference ##
    ###############

    model.eval()  
    
    val_loss = 0.0
    val_correct_predictions = 0
    val_total_samples = 0
    
    with torch.no_grad():
        for val_batch in val_loader:
            val_inputs, val_labels = val_batch
            
            val_inputs = val_inputs.double().to(device)
            val_labels_tensor = torch.tensor([item for sublist in val_labels for item in sublist]).to(device)
            
            val_outputs = model(val_inputs)
            val_loss += criterion(val_outputs, val_labels_tensor).item()
            
            _, val_predicted = torch.max(val_outputs.data, 1)
            val_total_samples += val_labels_tensor.size(0)
            val_correct_predictions += (val_predicted == val_labels_tensor).sum().item()

    val_accuracy = val_correct_predictions / val_total_samples
    val_loss /= len(val_loader)
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Training Loss: {train_loss:.4f}, Training Accuracy: {accuracy:.4f}, '
          f'Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}')


    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        torch.save(model.state_dict(), model_path_)

Epoch 1/10:   0%|          | 0/354 [00:00<?, ?it/s]

                                                                                         

Epoch [1/10], Training Loss: 0.4456, Training Accuracy: 0.7912, Validation Loss: 0.3027, Validation Accuracy: 0.8769


                                                                                         

Epoch [2/10], Training Loss: 0.3148, Training Accuracy: 0.8634, Validation Loss: 0.2696, Validation Accuracy: 0.8905


                                                                                          

Epoch [3/10], Training Loss: 0.2603, Training Accuracy: 0.8925, Validation Loss: 0.2668, Validation Accuracy: 0.8943


                                                                                          

Epoch [4/10], Training Loss: 0.2076, Training Accuracy: 0.9153, Validation Loss: 0.2848, Validation Accuracy: 0.8889


                                                                                          

Epoch [5/10], Training Loss: 0.1733, Training Accuracy: 0.9299, Validation Loss: 0.2802, Validation Accuracy: 0.8951


                                                                                          

Epoch [6/10], Training Loss: 0.1424, Training Accuracy: 0.9425, Validation Loss: 0.3211, Validation Accuracy: 0.8827


                                                                                          

Epoch [7/10], Training Loss: 0.1260, Training Accuracy: 0.9494, Validation Loss: 0.3180, Validation Accuracy: 0.9021


                                                                                           

Epoch [8/10], Training Loss: 0.0958, Training Accuracy: 0.9626, Validation Loss: 0.3287, Validation Accuracy: 0.8893


                                                                                           

Epoch [9/10], Training Loss: 0.0927, Training Accuracy: 0.9642, Validation Loss: 0.3944, Validation Accuracy: 0.8781


                                                                                            

Epoch [10/10], Training Loss: 0.0765, Training Accuracy: 0.9727, Validation Loss: 0.3688, Validation Accuracy: 0.8905


In [25]:
# Use the stored parameters to evaluate the model in the test_set
W_stored = torch.load(model_path_)
model.load_state_dict(W_stored)

<All keys matched successfully>

In [26]:
model.eval()
correct_predictions = 0
total_samples = 0

with torch.no_grad():
    for batch_data in test_loader:
        inputs, labels = batch_data

        inputs = inputs.double().to(device)
        labels_tensor = torch.tensor([item for sublist in labels for item in sublist]).to(device)

        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)

        total_samples += labels_tensor.size(0)
        correct_predictions += (predicted == labels_tensor).sum().item()

accuracy = correct_predictions / total_samples
print(f'Test Accuracy: {accuracy:.4f}')

Test Accuracy: 0.8799
