In [2]:
import argparse
import glob
import random
import numpy as np
from sklearn.svm import SVC
import torch
from torch import nn
from torch.utils.data import DataLoader
import torch.nn.functional as F


parser = argparse.ArgumentParser(description='Point Cloud Recognition')
parser.add_argument('--num_points', type=int, default=1024,
                    help='num of points to use')
parser.add_argument('--emb_dims', type=int, default=1024, metavar='N',
                    help='Dimension of embeddings')
parser.add_argument('--k', type=int, default=15, metavar='N',
                        help='Num of nearest neighbors to use')
parser.add_argument('--dropout', type=float, default=0.5,
                        help='dropout rate')
args = parser.parse_args("")

  from .autonotebook import tqdm as notebook_tqdm


In [6]:
def knn(x, k):
    inner = -2 * torch.matmul(x.transpose(2, 1), x)
    xx = torch.sum(x ** 2, dim=1, keepdim=True)
    pairwise_distance = -xx - inner - xx.transpose(2, 1)

    idx = pairwise_distance.topk(k=k, dim=-1)[1] 
    return idx


def get_graph_feature(x,device, k=20,idx = None):

    batch_size = x.size(0)
    num_points = x.size(2)
    x = x.view(batch_size, -1, num_points)

    if idx is None:
        idx = knn(x, k=k)  


    idx_base = torch.arange(0, batch_size, device=device).view(-1, 1, 1) * num_points

    idx = idx + idx_base

    idx = idx.view(-1)

    _, num_dims, _ = x.size()

    x = x.transpose(2,
                    1).contiguous() 
    feature = x.view(batch_size * num_points, -1)[idx, :]
    feature = feature.view(batch_size, num_points, k, num_dims)
    x = x.view(batch_size, num_points, 1, num_dims).repeat(1, 1, k, 1)

    feature = torch.cat((feature - x, x), dim=3).permute(0, 3, 1, 2).contiguous()

    return feature


class DGCNN(nn.Module):
    def __init__(self, args, cls=-1):
        super(DGCNN, self).__init__()
        self.args = args
        self.k = args.k

        self.bn1 = nn.BatchNorm2d(64)
        self.bn2 = nn.BatchNorm2d(64)
        self.bn3 = nn.BatchNorm2d(128)
        self.bn4 = nn.BatchNorm2d(256)
        self.bn5 = nn.BatchNorm1d(args.emb_dims)

        self.conv1 = nn.Sequential(nn.Conv2d(6, 64, kernel_size=1, bias=False),
                                   self.bn1,
                                   nn.LeakyReLU(negative_slope=0.2))
        self.conv2 = nn.Sequential(nn.Conv2d(64 * 2, 64, kernel_size=1, bias=False),
                                   self.bn2,
                                   nn.LeakyReLU(negative_slope=0.2))
        self.conv3 = nn.Sequential(nn.Conv2d(64 * 2, 128, kernel_size=1, bias=False),
                                   self.bn3,
                                   nn.LeakyReLU(negative_slope=0.2))
        self.conv4 = nn.Sequential(nn.Conv2d(128 * 2, 256, kernel_size=1, bias=False),
                                   self.bn4,
                                   nn.LeakyReLU(negative_slope=0.2))
        self.conv5 = nn.Sequential(nn.Conv1d(512, args.emb_dims, kernel_size=1, bias=False),
                                   self.bn5,
                                   nn.LeakyReLU(negative_slope=0.2))

        if cls != -1:
            self.linear1 = nn.Linear(args.emb_dims * 2, 512, bias=False)
            self.bn6 = nn.BatchNorm1d(512)
            self.dp1 = nn.Dropout(p=args.dropout)
            self.linear2 = nn.Linear(512, 256)
            self.bn7 = nn.BatchNorm1d(256)
            self.dp2 = nn.Dropout(p=args.dropout)
            self.linear3 = nn.Linear(256, output_channels)
        
        self.cls = cls

        self.inv_head = nn.Sequential(
            nn.Linear(args.emb_dims * 2, args.emb_dims),
            nn.BatchNorm1d(args.emb_dims),
            nn.ReLU(inplace=True),
            nn.Linear(args.emb_dims, 512)
        )

        self.inv_head1 = nn.Sequential(
            nn.Linear(args.emb_dims * 2, args.emb_dims),
            nn.BatchNorm1d(args.emb_dims),
            nn.ReLU(inplace=True),
            nn.Linear(args.emb_dims, 128)
        )


    def forward(self, x):
        batch_size = x.size(0)
        device = x.device
        x = get_graph_feature(x,device,k=self.k)
        x = self.conv1(x)
        x1 = x.max(dim=-1, keepdim=False)[0]

        x = get_graph_feature(x1,device,k=self.k)
        x = self.conv2(x)
        x2 = x.max(dim=-1, keepdim=False)[0]

        x = get_graph_feature(x2,device,k=self.k)
        x = self.conv3(x)
        x3 = x.max(dim=-1, keepdim=False)[0]

        x = get_graph_feature(x3,device,k=self.k)
        x = self.conv4(x)
        x4 = x.max(dim=-1, keepdim=False)[0]

        x = torch.cat((x1, x2, x3, x4), dim=1)

        x = self.conv5(x)
        x1 = F.adaptive_max_pool1d(x, 1).view(batch_size, -1)
        x2 = F.adaptive_avg_pool1d(x, 1).view(batch_size, -1)
        x = torch.cat((x1, x2), 1)

        feat = x
        inv_feat = self.inv_head(feat) #512

        inv_feat1 = self.inv_head1(feat) #128

        return x, inv_feat, inv_feat1, feat

In [7]:

device = torch.device("cuda") 

net_self = torch.load('./best_modelnet40.pth')

model_self = DGCNN(args).to(device)
model_self = torch.nn.DataParallel(model_self)
    
model_self.load_state_dict(net_self['state_dict'], strict=False)

<All keys matched successfully>

# feature extraction on ModelNet40

In [9]:
def load_modelnet_data(partition):
    BASE_DIR = ''
    DATA_DIR = os.path.join(BASE_DIR, 'data')
    all_data = []
    all_label = []
    for h5_name in glob.glob(os.path.join(DATA_DIR, 'modelnet40_ply_hdf5_2048', 'ply_data_%s*.h5' % partition)):
        f = h5py.File(h5_name)
        data = f['data'][:].astype('float32')
        label = f['label'][:].astype('int64')
        f.close()
        all_data.append(data)
        all_label.append(label)
    all_data = np.concatenate(all_data, axis=0)
    all_label = np.concatenate(all_label, axis=0)
    return all_data, all_label


In [15]:
from torch.utils.data import Dataset
import os
import sys
import h5py

class ModelNet40SVM(Dataset):
    def __init__(self, num_points, partition='train'):
        self.data, self.label = load_modelnet_data(partition)
        self.num_points = num_points
        self.partition = partition

    def __getitem__(self, item):
        pointcloud = self.data[item][:self.num_points]
        label = self.label[item]
        return pointcloud, label

    def __len__(self):
        return self.data.shape[0]


In [16]:
train_loader = DataLoader(ModelNet40SVM(partition='train', num_points=args.num_points),
                              batch_size=256, shuffle=True,num_workers=12)
test_loader = DataLoader(ModelNet40SVM(partition='test', num_points=args.num_points),
                              batch_size=256, shuffle=True,num_workers=12)
print('Done !!')

Done !!


# Load Train Feats

In [19]:
import random

feats_train = []
labels_train = []
model = model_self.eval()


for i, (data, label) in enumerate(train_loader):
    labels = list(map(lambda x: x[0],label.numpy().tolist()))
    data = data.permute(0, 2, 1).to(device)
    with torch.no_grad():
        feats = model(data)[3]
    feats = feats.detach().cpu().numpy()
    for feat in feats:
        feats_train.append(feat)
    labels_train += labels

feats_train = np.array(feats_train)
labels_train = np.array(labels_train)


# Load Test Feats

In [22]:
import random

feats_test = []
labels_test = []

for i, (data, label) in enumerate(test_loader):
    labels = list(map(lambda x: x[0],label.numpy().tolist()))
    data = data.permute(0, 2, 1).to(device)
    with torch.no_grad():
        feats = model(data)[3]
    feats = feats.detach().cpu().numpy()
    for feat in feats:
        feats_test.append(feat)
    labels_test += labels

feats_test = np.array(feats_test)
labels_test = np.array(labels_test)

## The Train Linear Model is used to verify the classification accuracy

In [31]:
c = 0.01
model_tl = SVC(C = c, kernel ='linear')
model_tl.fit(feats_train, labels_train)
print(f"C = {c} : {model_tl.score(feats_test, labels_test)}")

C = 0.01 : 0.9250405186385737
