In [20]:
import numpy as np
import open3d as o3d
import torch
import torch.utils.data as Data
import torch.nn as nn


In [21]:
from torch.autograd import Function
from torch.utils.cpp_extension import load
chamfer_3D=load(name="chamfer_3D",sources=[rf"/home/ananthakrishnak/pe/pcn-our/chamfer_cuda.cpp",rf"/home/ananthakrishnak/pe/pcn-our/chamfer3D.cu",])
class chamfer_3DFunction(Function):
    @staticmethod
    def forward(ctx,xyz1,xyz2):
        batchsize,n,_=xyz1.size()
        _,m,_=xyz2.size()
        device=xyz1.device
        dist1=torch.zeros(batchsize,n)
        dist2=torch.zeros(batchsize,m)
        idx1=torch.zeros(batchsize,n).type(torch.IntTensor)
        idx2=torch.zeros(batchsize,m).type(torch.IntTensor)
        dist1=dist1.to(device)
        dist2=dist2.to(device)
        idx1=idx1.to(device)
        idx2=idx2.to(device)
        torch.cuda.set_device(device)
        chamfer_3D.forward(xyz1,xyz2,dist1,dist2,idx1,idx2)
        ctx.save_for_backward(xyz1,xyz2,idx1,idx2)
        return dist1,dist2,idx1,idx2
    
    @staticmethod
    def backward(ctx,graddist1,graddist2,gradidx1,gradidx2):
        xyz1,xyz2,idx1,idx2=ctx.saved_tensors
        graddist1=graddist1.contiguous()
        graddist2=graddist2.contiguous()
        device=graddist1.device
        gradxyz1=torch.zeros(xyz1.size())
        gradxyz2=torch.zeros(xyz2.size())
        gradxyz1=gradxyz1.to(device)
        gradxyz2=gradxyz2.to(device)
        chamfer_3D.backward(xyz1,xyz2,gradxyz1,gradxyz2,graddist1,graddist2,idx1,idx2)
        return gradxyz1,gradxyz2
    
class ChamferDistance(nn.Module):
    def __init__(self):
        super(ChamferDistance,self).__init__()
    def forward(self,input1,input2):
        dist1,dist2,_,_=chamfer_3DFunction.apply(input1,input2)
        return dist1,dist2
    
CD=ChamferDistance()
def cd_loss_L1(pcs1,pcs2):
    dist1,dist2=CD(pcs1,pcs2)
    dist1=torch.sqrt(dist1)
    dist2=torch.sqrt(dist2)
    return (torch.mean(dist1)+torch.mean(dist2))/2.0

def l1_cd(pcs1,pcs2):
    dist1,dist2=CD(pcs1,pcs2)
    dist1=torch.mean(torch.sqrt(dist1),1)
    dist2=torch.mean(torch.sqrt(dist2),1)
    return torch.sum(dist1+dist2)/2

def l2_cd(pcs1,pcs2):
    dist1,dist2=CD(pcs1,pcs2)
    dist1=torch.mean(dist1,dim=1)
    dist2=torch.mean(dist2,dim=1)
    return torch.sum(dist1+dist2)

def f_score(pred,gt,th=0.01):
    pred=o3d.geometry.PointCloud(o3d.utility.Vector3dVector(pred))
    gt=o3d.geometry.PointCloud(o3d.utility.Vector3dVector(gt))
    dist1=pred.compute_point_cloud_distance(gt)
    dist2=gt.compute_point_cloud_distance(pred)
    recall=float(sum(d<th for d in dist2))/float(len(dist2))
    precision=float(sum(d<th for d in dist1))/float(len(dist1))
    return 2*recall*precision/(recall+precision) if recall+precision else 0


In [22]:
import torch
import torch.nn as nn


class PCN(nn.Module):
    def __init__(self, num_dense=16384, latent_dim=1024, grid_size=4):
        super().__init__()
        self.num_dense=num_dense
        self.latent_dim=latent_dim
        self.grid_size=grid_size
        assert self.num_dense % self.grid_size ** 2 == 0
        self.num_coarse=self.num_dense // (self.grid_size ** 2)
        self.first_conv=nn.Sequential(nn.Conv1d(3, 128, 1),nn.BatchNorm1d(128),nn.ReLU(inplace=True),nn.Conv1d(128, 256, 1))
        self.second_conv=nn.Sequential(nn.Conv1d(512, 512, 1),nn.BatchNorm1d(512),nn.ReLU(inplace=True),nn.Conv1d(512, self.latent_dim, 1))
        self.mlp=nn.Sequential(nn.Linear(self.latent_dim, 1024),nn.ReLU(inplace=True),nn.Linear(1024, 1024),nn.ReLU(inplace=True),nn.Linear(1024, 3 * self.num_coarse))
        self.final_conv=nn.Sequential(nn.Conv1d(1024 + 3 + 2, 512, 1),nn.BatchNorm1d(512),nn.ReLU(inplace=True),nn.Conv1d(512, 512, 1),nn.BatchNorm1d(512),nn.ReLU(inplace=True),nn.Conv1d(512, 3, 1))
        a=torch.linspace(-0.05, 0.05, steps=self.grid_size, dtype=torch.float).view(1, self.grid_size).expand(self.grid_size, self.grid_size).reshape(1, -1)
        b=torch.linspace(-0.05, 0.05, steps=self.grid_size, dtype=torch.float).view(self.grid_size, 1).expand(self.grid_size, self.grid_size).reshape(1, -1)
        self.folding_seed=torch.cat([a, b], dim=0).view(1, 2, self.grid_size ** 2).cuda()  # (1, 2, S)

    def forward(self, xyz):
        B, N, _=xyz.shape
        # encoder
        feature=self.first_conv(xyz.transpose(2, 1))                                      
        feature_global=torch.max(feature, dim=2, keepdim=True)[0]                          
        feature=torch.cat([feature_global.expand(-1, -1, N), feature], dim=1)              
        feature=self.second_conv(feature)                                                  
        feature_global=torch.max(feature,dim=2,keepdim=False)[0]                           
        # decoder
        coarse=self.mlp(feature_global).reshape(-1, self.num_coarse, 3)                    
        point_feat=coarse.unsqueeze(2).expand(-1, -1, self.grid_size ** 2, -1)             
        point_feat=point_feat.reshape(-1, self.num_dense, 3).transpose(2, 1)               
        seed=self.folding_seed.unsqueeze(2).expand(B, -1, self.num_coarse, -1)             
        seed=seed.reshape(B, -1, self.num_dense)                                           
        feature_global=feature_global.unsqueeze(2).expand(-1, -1, self.num_dense)          
        feat=torch.cat([feature_global, seed, point_feat], dim=1)                         
        fine=self.final_conv(feat) + point_feat                                            
        return coarse.contiguous(), fine.transpose(1, 2).contiguous()


In [23]:
import random
import torch.utils.data as data
import os
class shapenet_data_loader(data.Dataset):
    def __init__(self, x_path, y_path, split):
        self.x_path = x_path
        self.y_path = y_path
        self.split = split
        self.partial_paths, self.complete_paths = self._load_data()

    def _load_data(self):
        lines=os.listdir(self.y_path)
        partial_paths, complete_paths = list(), list()
        for model_id in lines:
            if self.split == 'train':
                a=model_id.split('.ply')[0]
                partial_paths.append(os.path.join(self.x_path, a + '_{}.ply'))
            else:
                partial_paths.append(os.path.join(self.x_path, model_id))
            complete_paths.append(os.path.join(self.y_path, model_id ))
        return partial_paths, complete_paths
    
    def read_point_cloud(self, path):
        pc = o3d.io.read_point_cloud(path)
        return np.array(pc.points, np.float32)
    
    def random_sample(self, pc, n):
        if pc.shape[0] == 0:
            raise ValueError("Point cloud is empty, cannot sample points.")
        idx = np.random.permutation(pc.shape[0])
        if idx.shape[0] < n:
            idx = np.concatenate([idx, np.random.randint(pc.shape[0], size=n-pc.shape[0])])
        return pc[idx[:n]]
    
    def __getitem__(self, index):
        if self.split == 'train':
            partial_path = self.partial_paths[index].format(random.randint(0, 7))
        else:
            partial_path = self.partial_paths[index]
        complete_path = self.complete_paths[index]
        partial_pc = self.random_sample(self.read_point_cloud(partial_path), 2048)
        complete_pc = self.random_sample(self.read_point_cloud(complete_path), 16384)
        return torch.from_numpy(partial_pc), torch.from_numpy(complete_pc)

    def __len__(self):
        return len(self.complete_paths)

In [24]:
model=PCN(16384, 1024, 4).to("cuda:0")
model.load_state_dict(torch.load(rf"/home/ananthakrishnak/pe/pcn-our/checkpoints/best_l1_cd.pth"))
model.eval()
test_dataset = shapenet_data_loader(rf"/home/ananthakrishnak/pe/pcn-our/data/PCN/test/partial/02958343",rf"/home/ananthakrishnak/pe/pcn-our/data/PCN/test/complete/02958343","test")
test_dataloader = Data.DataLoader(test_dataset, batch_size=32, shuffle=False)
index = 1
total_l1_cd, total_l2_cd, total_f_score = 0.0, 0.0, 0.0
with torch.no_grad():
    for p, c in test_dataloader:
        p = p.to("cuda:0")
        c = c.to("cuda:0")
        _, c_ = model(p)
        total_l1_cd += l1_cd(c_, c).item()
        total_l2_cd += l2_cd(c_, c).item()
        for i in range(len(c)):
            input_pc = p[i].detach().cpu().numpy()
            output_pc = c_[i].detach().cpu().numpy()
            gt_pc = c[i].detach().cpu().numpy()
            total_f_score += f_score(output_pc, gt_pc)
            index += 1

avg_l1_cd = total_l1_cd / len(test_dataset)
avg_l2_cd = total_l2_cd / len(test_dataset)
avg_f_score = total_f_score / len(test_dataset)
avg_l2_cd=np.sqrt(avg_l2_cd)
print(avg_l1_cd,avg_l2_cd,avg_f_score)


  model.load_state_dict(torch.load(rf"/home/ananthakrishnak/pe/pcn-our/checkpoints/best_l1_cd.pth"))


0.009130416413148244 0.016479345815503026 0.7058733123391993


In [25]:
model=PCN(16384, 1024, 4).to("cuda:0")
model.load_state_dict(torch.load(rf"/home/ananthakrishnak/pe/pcn-our/checkpoints/best_l1_cd.pth"))
model.eval()
test_dataset = shapenet_data_loader(rf"/home/ananthakrishnak/pe/pcn-our/data/PCN/test/partial/02958343",rf"/home/ananthakrishnak/pe/pcn-our/data/PCN/test/complete/02958343","test")
test_dataloader = Data.DataLoader(test_dataset, batch_size=32, shuffle=False)
index = 1
total_l1_cd, total_l2_cd, total_f_score = 0.0, 0.0, 0.0
with torch.no_grad():
    for p, c in test_dataloader:
        p = p.to("cuda:0")
        c = c.to("cuda:0")
        _, c_ = model(p)
        total_l1_cd += l1_cd(c_, c).item()
        total_l2_cd += l2_cd(c_, c).item()
        for i in range(len(c)):
            input_pc = p[i].detach().cpu().numpy()
            output_pc = c_[i].detach().cpu().numpy()
            gt_pc = c[i].detach().cpu().numpy()
            total_f_score += f_score(output_pc, gt_pc)
            index += 1

avg_l1_cd = total_l1_cd / len(test_dataset)
avg_l2_cd = total_l2_cd / len(test_dataset)
avg_f_score = total_f_score / len(test_dataset)
avg_l2_cd=np.sqrt(avg_l2_cd)
print(avg_l1_cd,avg_l2_cd,avg_f_score)


  model.load_state_dict(torch.load(rf"/home/ananthakrishnak/pe/pcn-our/checkpoints/best_l1_cd.pth"))


0.009130416413148244 0.016479345815503026 0.7058733123391993


In [26]:
model=PCN(16384, 1024, 4).to("cuda:0")
model.load_state_dict(torch.load(rf"/home/ananthakrishnak/pe/pcn-our/checkpoints/best_l1_cd.pth"))
model.eval()
test_dataset = shapenet_data_loader(rf"/home/ananthakrishnak/pe/pcn-our/data/PCN/test_novel/partial/02924116",rf"/home/ananthakrishnak/pe/pcn-our/data/PCN/test_novel/complete/02924116","test_novel")
test_dataloader = Data.DataLoader(test_dataset, batch_size=32, shuffle=False)
index = 1
total_l1_cd, total_l2_cd, total_f_score = 0.0, 0.0, 0.0
with torch.no_grad():
    for p, c in test_dataloader:
        p = p.to("cuda:0")
        c = c.to("cuda:0")
        _, c_ = model(p)
        total_l1_cd += l1_cd(c_, c).item()
        total_l2_cd += l2_cd(c_, c).item()
        for i in range(len(c)):
            input_pc = p[i].detach().cpu().numpy()
            output_pc = c_[i].detach().cpu().numpy()
            gt_pc = c[i].detach().cpu().numpy()
            total_f_score += f_score(output_pc, gt_pc)
            index += 1

avg_l1_cd = total_l1_cd / len(test_dataset)
avg_l2_cd = total_l2_cd / len(test_dataset)
avg_f_score = total_f_score / len(test_dataset)
avg_l2_cd=np.sqrt(avg_l2_cd)
print(avg_l1_cd,avg_l2_cd,avg_f_score)


  model.load_state_dict(torch.load(rf"/home/ananthakrishnak/pe/pcn-our/checkpoints/best_l1_cd.pth"))


0.01051094243923823 0.021130284559519683 0.6697825741354743
