# Introduction

### Code attributions

Pytorch geometric tutorials: https://pytorch-geometric.readthedocs.io/en/latest/notes/colabs.html

PyGCL: Although I have ended up not using their code, I still read their article here: https://sxkdz.github.io/research/GraphCL/.  

# Code

## Dataset

In [None]:
#@title Dependencies and utility functions
#See above for the source

import os
import itertools
import torch
import copy
from tqdm.notebook import tqdm
os.environ['TORCH'] = torch.__version__
print(torch.__version__)

!pip install -q torch-scatter -f https://data.pyg.org/whl/torch-${TORCH}.html
!pip install -q torch-sparse -f https://data.pyg.org/whl/torch-${TORCH}.html
!pip install -q torch-cluster -f https://data.pyg.org/whl/torch-${TORCH}.html
!pip install -q git+https://github.com/pyg-team/pytorch_geometric.git

%matplotlib inline
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
torch.manual_seed(12345)

#Visualization methods for 3d shapes and point clouds
def visualize_mesh(pos, face):
    fig = plt.figure()
    ax = fig.gca(projection='3d')
    ax.axes.xaxis.set_ticklabels([])
    ax.axes.yaxis.set_ticklabels([])
    ax.axes.zaxis.set_ticklabels([])
    ax.plot_trisurf(pos[:, 0], pos[:, 1], pos[:, 2], triangles=face.t(), antialiased=False)
    plt.show()


def visualize_points(pos, edge_index=None, index=None):
    fig = plt.figure(figsize=(4, 4))
    if edge_index is not None:
        for (src, dst) in edge_index.t().tolist():
             src = pos[src].tolist()
             dst = pos[dst].tolist()
             plt.plot([src[0], dst[0]], [src[1], dst[1]], linewidth=1, color='black')
    if index is None:
        plt.scatter(pos[:, 0], pos[:, 1], s=50, zorder=1000)
    else:
       mask = torch.zeros(pos.size(0), dtype=torch.bool)
       mask[index] = True
       plt.scatter(pos[~mask, 0], pos[~mask, 1], s=50, color='lightgray', zorder=1000)
       plt.scatter(pos[mask, 0], pos[mask, 1], s=50, zorder=1000)
    plt.axis('off')
    plt.show()
    

def get_optimizer(opt_name, model, lr):
  #There is no match in python 3.7, which is the version colab uses :/
    if opt_name == "Adam":
        return torch.optim.Adam(model.parameters(), lr)
    elif opt_name == "SGD":
        return torch.optim.SGD(model.parameters(), lr, momentum=1e-4,
                           dampening=1e-6)
    return None


def get_loss_by_name(loss_f_name):
    if loss_f_name == "CELoss":
        return torch.nn.CrossEntropyLoss()

In [None]:
#@title Remove existing data
!rm -rf training_data_1
!rm -rf test_data_1

In [None]:
#Extend dataset with augmented samples

from torch_geometric.data import Dataset
from torch_geometric.datasets import GeometricShapes, ModelNet
from torch_geometric.transforms import RandomRotate, Compose, SamplePoints, ToDevice
from torch_geometric.loader import DataLoader
from collections import defaultdict

def shrink_ModelNet(dataset, max):
    
    datalist = []
    num_classes = defaultdict(lambda: 0)
    for data in dataset:
        class_num = int(data[0].y)
        if num_classes[class_num] < max:
            datalist.append(data)
            num_classes[class_num] += 1
    
    return datalist
  

#Extend existing Class
class AugmentedDS(Dataset):

  """An augmented version of the GeometricShapes dataset"""

  def __init__(self, root: str, train:bool, augmentor, transformer, ds_name: str):
        
        if ds_name == "GeometricShapes":
            self.dataset = GeometricShapes(root = root, train = train)
        else:
            self.dataset = ModelNet(root=root, train=train, name='10')
        self.transformer = transformer
        self.augmentor_transformer = Compose([augmentor,transformer])

  def __len__(self):
    return len(self.dataset)

  def __getitem__(self, idx):

    original_shape = self.dataset[idx]
    augmented_shape1 = self.augmentor_transformer(original_shape.clone()) 
    augmented_shape2 = self.augmentor_transformer(original_shape.clone())
    augmented_shape3 = self.augmentor_transformer(original_shape.clone()) 
    original_shape = self.transformer(original_shape)

    return original_shape, augmented_shape1, augmented_shape2, augmented_shape3


dataset = "ModelNet"
batch_size = 80 if dataset == "ModelNet" else 40
nr_points = 1024 if dataset=="ModelNet" else 256
training_ds_root = "training_data_1" if dataset == "ModelNet" else "training_data"
test_ds_root = "test_data_1" if dataset == "ModelNet" else "test_data"
print("Sampling : {} points".format(nr_points))

#Augmentors and transformers for the data
augmentor = Compose([
    RandomRotate(degrees=90, axis=0),
    RandomRotate(degrees=90, axis=1),
    RandomRotate(degrees=90, axis=2)
])

transformer=Compose([
    SamplePoints(num=nr_points, include_normals=True),
    ToDevice(device)
])


train_ds = AugmentedDS(root = training_ds_root, augmentor=augmentor,
                       transformer=transformer, train=True, ds_name = dataset)
test_ds = AugmentedDS(root = test_ds_root, augmentor=augmentor,
                       transformer=transformer, train=False, ds_name = dataset)

#Shrink dataset if it's modelnet
if dataset == "ModelNet":
    datalist_train = shrink_ModelNet(train_ds, 81)
    datalist_train2 = shrink_ModelNet(train_ds, 10)
    datalist_test = shrink_ModelNet(test_ds, 31)

    train_dl = DataLoader(datalist_train, batch_size = batch_size, shuffle=True)
    #use 2nd datalist to train with less labels
    #train_dl2 = DataLoader(datalist_train2, batch_size = batch_size, shuffle=True)
    test_dl = DataLoader(datalist_test, batch_size = batch_size, shuffle=True)
else:
    train_dl = DataLoader(train_ds, batch_size = batch_size, shuffle=True)
    test_dl = DataLoader(test_ds, batch_size = batch_size, shuffle=True) 

In [None]:
#@title Print informations about the dataset
if dataset == "ModelNet":
    print("Dataset length: ", len(datalist_train), len(datalist_test), " \nDataset keys: ", train_ds[0][0].keys)
else:
    print("Dataset length: ", len(train_ds), len(test_ds), " \nDataset keys: ", train_ds[0][0].keys)

## Contrastive learning

In [None]:
from torch_geometric.nn import PPFConv, global_max_pool
from torch.nn import Sequential, Linear, ReLU
from torch_cluster import fps, knn_graph

class PPFNet(torch.nn.Module):
    def __init__(self, feat_dim, hidden_layer_dim):
        super().__init__()

        #Dimension of hidden layers
        self.hidden_layer_dim = hidden_layer_dim
        #Dimension of feature, 3 with 3D fd and 4 with 4D
        self.feat_dim = feat_dim

        
        mlp1 = Sequential(Linear(self.feat_dim, self.hidden_layer_dim),
                              ReLU(),
                              Linear(self.hidden_layer_dim, self.hidden_layer_dim*2))
        self.conv1 = PPFConv(local_nn = mlp1)  

        
        mlp2 = Sequential(Linear(self.hidden_layer_dim*2 + self.feat_dim, self.hidden_layer_dim*2 + self.feat_dim),
                              ReLU(),
                              Linear(self.hidden_layer_dim*2 + self.feat_dim, self.hidden_layer_dim*2 + self.feat_dim))  
        self.conv2 = PPFConv(local_nn = mlp2)  
        
        self.prj_head = Sequential(
            Linear(self.hidden_layer_dim*2 + self.feat_dim, 100),
            ReLU(),
            Linear(100,100)
        )
        
    def forward(self, pos, normal, batch):

        #Create edges in the point cloud
        edge_index = knn_graph(pos, k=16, batch=batch, loop=False)
        
        #There are no features in first convolution
        #Other datasets different from GS or MN may have them
        x = self.conv1(x=None, pos=pos, normal=normal, edge_index=edge_index)
        x = x.relu()
        
        #farthest point sampling
        index = fps(pos, batch, ratio=0.5)
        x = x[index]
        pos = pos[index]
        normal = normal[index]
        batch = batch[index]
        edge_index = knn_graph(pos, k=16, batch=batch, loop=False)
        
        x = self.conv2(x=x, pos=pos, normal=normal, edge_index=edge_index)
        x = x.relu()
        
        x = global_max_pool(x, batch)  # [num_examples, hidden_channels]
        return self.prj_head(x)


model = PPFNet(feat_dim=4, hidden_layer_dim = 32)
model.to(device)
print(model)

In [None]:
#@title Contrastive training

from torch_geometric.utils import to_dense_batch, to_dense_adj
from torch.nn import CosineSimilarity
from torch.optim.lr_scheduler import LinearLR 
from random import randint

cos = CosineSimilarity(dim=0)
cos2 = CosineSimilarity(dim=1)

def InfoNCELossSN(anchors, augmented, temperature=0.05):

  loss = 0
  batch_len = anchors.shape[0]

  for index in range(batch_len):
    anchor = anchors[index]
    pos_sample = augmented[index]
    pos_sim = torch.exp(cos(anchor, pos_sample) / temperature)
    neg_sim = 0
    
    if index == 0:
        negatives = augmented[index+1:, :]
    elif index == batch_len-1:
        negatives = augmented[:index, :]
    else:
        negatives = torch.cat((augmented[:index, :], augmented[index+1:, :]), dim=0)
    
    neg_sim = torch.einsum("i-> ", torch.exp(cos2(anchor, negatives) / temperature))
        
    loss += -torch.log(pos_sim / (pos_sim + neg_sim))
    
  return loss 

def InfoNCELoss(anchors, augmented1, augmented2, temperature=0.05):

  """InfoNCE with support to multiple positives"""
  loss = 0
  batch_len = anchors.shape[0]

  for index in range(batch_len):
    anchor = anchors[index]
    positives = (torch.cat((augmented1[index], augmented2[index]), dim=0)).reshape(2, -1)
    pos_sim = torch.einsum("i->",torch.exp(cos2(anchor, positives) / temperature))
    
    neg_sim = 0
    #Handle first and last row
    if index == 0:
        negatives = torch.cat((augmented1[index+1:, :], augmented2[index+1:, :]), dim=0)
    elif index == batch_len-1:
        negatives = torch.cat((augmented1[:index, :],augmented2[:index, :]),dim=0)
    else:
        n1 = torch.cat((augmented1[:index, :], augmented1[index+1:, :]), dim=0)
        n2 = torch.cat((augmented2[:index, :], augmented2[index+1:, :]), dim=0)
        negatives = torch.cat((n1, n2), dim=0)
         
    neg_sim = torch.einsum("i-> ", torch.exp(cos2(anchor, negatives) / temperature))
    
    loss += -torch.log(pos_sim / (pos_sim + neg_sim))

  return loss 

def train(model, optim, loader, temperature, scheduler=None):
    model.train()

    total_loss = 0
    for data in loader:
        optim.zero_grad()  # Clear gradients.
        
        #random_n = randint(1, 3)
        #choose one of the n augmentations
        original, augmented1, augmented2 = data[0], data[1], data[2]
        
        z1 = model(original.pos, original.normal, original.batch)
        z2 = model(augmented1.pos, augmented1.normal, augmented1.batch)
        z3 = model(augmented2.pos, augmented2.normal, augmented2.batch)
        #print(z1.shape, z2.shape)

        loss = InfoNCELoss(anchors=z1, augmented1=z2, augmented2=z3, 
                            temperature=temperature)
        #loss = InfoNCELossSN(anchors=z1, augmented=z2, temperature=temperature)
                            
        loss.backward()
        optim.step()
        total_loss += loss

    scheduler.step() if scheduler is not None else None
    return total_loss 

def train_contrastive(model, hyperparams_dict, debug=True):

  """Expects a dictionary containing the parameters to test."""
  for params in tqdm(hyperparams_dict):
    
    lr, nr_epochs, optim_name, temperature = params[0], params[1], params[2], params[3]
    optim = get_optimizer(opt_name = optim_name, model = model, lr=lr)
    
    best_acc = 10000
    for epoch in range(1, nr_epochs):
        loss = train(model, optim, train_dl,
                     temperature=temperature)
        
        #break
        
        if debug and (epoch % 10 == 0):
              print("Epoch", epoch, "loss", loss)
        best_acc = loss if loss < best_acc else best_acc

    result = "Best loss of {} achieved with the following hyperparams: lr {} \t #epochs {} \t optim {} \t temperature {}".format(best_acc, lr, nr_epochs, 
                                                                                                                                     optim_name, temperature)
    #break
    print(result)

In [None]:
#Hypeparams
lr_dict = {0.01}
epochs = {150}
#use 0.3 for modelnet, 0.02 for geom shapes
temperature = {0.03}
optim_dict = {"Adam"} 

hyperparams_dict = itertools.product(lr_dict, epochs, optim_dict, temperature)
train_contrastive(model, hyperparams_dict) 

In [None]:
model = torch.load("Model_updated/Model1024").to(device)
print(model)
#torch.save(model, "Model_updated/Model256")

## Testing results

In [None]:
#@title Utily functions to test the result of contrastive training

class SimpleClassifier(torch.nn.Module):

  def __init__(self):
    super().__init__()

    self.mlp = torch.nn.Linear(in_features=100, out_features=40)
    self.activation = torch.nn.ReLU()

  def forward(self, x):
    return self.activation(self.mlp(x))


def train(model, classifier, optim, loader,
          loss_f, scheduler=None):
    classifier.train()
    
    total_loss = 0
    for data in loader:

        optim.zero_grad()  # Clear gradients.
        original = data[0]
        z1 = model(original.pos, original.normal, original.batch)
        
        logits = classifier(z1)
        loss = loss_f(logits, data[0].y)
        loss.backward()  # Backward pass.
        
        optim.step()  # Update model parameters.
        total_loss += loss

    scheduler.step() if scheduler is not None else None
    return total_loss / len(train_dl.dataset)


@torch.no_grad()
def test(model, classifier, loader):
    classifier.eval()

    total_correct = 0
    for data in loader:

        original = data[0]
        z1 = model(original.pos, original.normal, original.batch)
        logits = classifier(z1)
        preds = torch.argmax(logits, dim=-1)
        total_correct += int((preds == data[0].y).sum())

    return total_correct/(len(loader.dataset))

def test_hyperparams(model, hyperparams_dict, debug=True):

  """This is nothing more than a GridSearch. 
  It expects a dictionary containing the parameters to test."""
  for params in tqdm(hyperparams_dict):
    
    #Clone the classifier to avoid having to create a new one each loop
    classifier_test = SimpleClassifier().to(device)
    lr, nr_epochs, optim_name, loss_f_name = params[0], params[1], params[2], params[3]
    optim = get_optimizer(opt_name = optim_name, model = classifier_test,
                        lr=lr)
    loss_f = get_loss_by_name(loss_f_name)  

    best_acc = 0
    for epoch in range(1, nr_epochs):
        loss = train(model, classifier_test, optim, train_dl, loss_f)
        test_acc = test(model, classifier_test, test_dl)

        if debug and (epoch % 50 == 0):
          print("Epoch", epoch, "loss", loss, "test_acc", test_acc)

        best_acc = test_acc if test_acc > best_acc else best_acc

    result = "Best accuracy of {} achieved with the following hyperparams: lr {} \t #epochs {} \t optim {} \t loss_f {}".format(best_acc, lr, nr_epochs,
                                                                                                                       optim_name, loss_f_name)
    print(result)


In [None]:
#the model is the previously trained contrastive model
#Uncomment the following line to remove projection head
#Also switch in_features from 100 to 68
#model.prj_head = torch.nn.Identity()
loss_f_dict = {"CELoss"}
lr_dict = {0.01, 0.02}
epochs = {100, 250, 500, 1000}
optim_dict = {"Adam"} 

hyperparams_dict = itertools.product(lr_dict, epochs, optim_dict, loss_f_dict)
results = test_hyperparams(model, hyperparams_dict) 


# Standard pointnet

In [None]:
from torch_geometric.nn import PPFConv, global_max_pool
from torch.nn import Sequential, Linear, ReLU
from torch_cluster import fps, knn_graph

class PPFNet(torch.nn.Module):
    def __init__(self, feat_dim, hidden_layer_dim):
        super().__init__()

        #Dimension of hidden layers
        self.hidden_layer_dim = hidden_layer_dim
        #Dimension of feature, 3 with 3D fd and 4 with 4D
        self.feat_dim = feat_dim

        
        mlp1 = Sequential(Linear(self.feat_dim, self.hidden_layer_dim),
                              ReLU(),
                              Linear(self.hidden_layer_dim, self.hidden_layer_dim*2))
        self.conv1 = PPFConv(local_nn = mlp1)  
        
        mlp2 = Sequential(Linear(self.hidden_layer_dim*2 + self.feat_dim, self.hidden_layer_dim*2 + self.feat_dim),
                              ReLU(),
                              Linear(self.hidden_layer_dim*2 + self.feat_dim, self.hidden_layer_dim*2 + self.feat_dim))  
        self.conv2 = PPFConv(local_nn = mlp2)  
        self.classifier = Linear(68, 40)
        
        
    def forward(self, pos, normal, batch):

        #Create edges in the point cloud
        edge_index = knn_graph(pos, k=16, batch=batch, loop=False)
        
        #There are no features in first convolution except the 3D/4D descriptor, 
        #which is concatenated om the message passing
        x = self.conv1(x=None, pos=pos, normal=normal, edge_index=edge_index)
        x = x.relu()
        
        #farthest point sampling
        index = fps(pos, batch, ratio=0.5)
        x = x[index]
        pos = pos[index]
        normal = normal[index]
        batch = batch[index]
        edge_index = knn_graph(pos, k=16, batch=batch, loop=False)
        
        x = self.conv2(x=x, pos=pos, normal=normal, edge_index=edge_index)
        x = x.relu()

        x = global_max_pool(x, batch)  # [num_examples, hidden_channels]

        return self.classifier(x)

In [None]:
#@title Standard training

from torch_geometric.utils import to_dense_batch, to_dense_adj
from torch.nn import CosineSimilarity
from torch.optim.lr_scheduler import LinearLR 

def train(model, optim, loader, loss_f, scheduler=None):
    model.train()

    total_loss = 0
    for data in loader:
        optim.zero_grad()  # Clear gradients.
    
        original= data[0]
        logits = model(original.pos, original.normal, original.batch)
        loss = loss_f(logits, data[0].y)
        loss.backward()  # Backward pass.

        optim.step()
        total_loss += loss

    scheduler.step() if scheduler is not None else None
    return total_loss #/batch_size

@torch.no_grad()
def test(model, loader):
    model.eval()

    total_correct = 0
    for data in loader:

        original = data[0]
        logits = model(original.pos, original.normal, original.batch)
        preds = torch.argmax(logits, dim=-1)
        total_correct += int((preds == data[0].y).sum())

    return total_correct/(len(loader.dataset))

def train_ptnet(hyperparams_dict, debug=True):

  """This is nothing more than a GridSearch. 
  It expects a dictionary containing the parameters to test."""
  for params in tqdm(hyperparams_dict):
    
    model_st = (PPFNet(feat_dim=4, hidden_layer_dim = 32)).to(device)
    lr, nr_epochs, optim_name, loss_f_name = params[0], params[1], params[2], params[3]
    optim = get_optimizer(opt_name = optim_name, model = model_st,
                        lr=lr)
    loss_f = get_loss_by_name(loss_f_name)  

    best_acc = 0
    for epoch in range(1, nr_epochs):
        loss = train(model_st, optim, train_dl, loss_f)
        test_acc = test(model_st, test_dl)

        if debug and (epoch % 50 == 0):
          print("Epoch", epoch, "loss", loss, "test_acc", test_acc)

        best_acc = test_acc if test_acc > best_acc else best_acc

    result = "Best accuracy of {} achieved with the following hyperparams: lr {} \t #epochs {} \t optim {} \t loss_f {}".format(best_acc, lr, nr_epochs,
                                                                                                                       optim_name, loss_f_name)
    print(result)


In [None]:
loss_f_dict = {"CELoss"}
lr_dict = {0.01, 0.02}
epochs = {100, 250, 500}
optim_dict = {"Adam"} 

hyperparams_dict = itertools.product(lr_dict, epochs, optim_dict, loss_f_dict)
results = train_ptnet(hyperparams_dict) 
