In [44]:
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import open3d as o3d
import os
import torch
from torch.utils.data import Dataset, DataLoader
from dataclasses import dataclass

if torch.cuda.is_available():
    device = "cuda"

elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
    device = "mps"

print(f'Using: {device}')

Using: mps


In [61]:
import os
import torch
from torch.utils.data import Dataset
import numpy as np
import open3d as o3d

class PointCloudDataset(Dataset):
    def __init__(self,base_dir, point_cloud_size = 1000, split = 'train'):

        self.point_cloud_size = point_cloud_size
        self.point_clouds = None
        self.split = split
        self.base_dir = base_dir

        self.files = self.get_file_paths(self.split, self.base_dir)
        self.point_clouds = self.get_uniform_point_clouds(self.split)


    def __len__(self):
        return len(self.point_clouds)
    
    def __getitem__(self, idx):
        return {
            "points": self.point_clouds[idx],
            "filename" : self.files[idx]
        }
    
    def get_file_paths(self, split, base_dir):
        '''
        Return list of all filepaths in ModelNet40 that are part of split (train or test)
        '''
        file_paths = []
        for root, _, files in os.walk(base_dir):
            for file in files:
                if file.endswith('.off'):
                    full_path = os.path.join(root, file)
                    if f'/{split}/' in full_path:
                        file_paths.append(full_path)
        return file_paths
    
    
    def get_uniform_point_clouds(self, split):
        '''
        Return a tensor that is all point clouds of fixed size
        '''

        point_clouds_list = []
        for file in self.files: 
            mesh = o3d.io.read_triangle_mesh(file)
            try: 
                sampled_point_cloud = mesh.sample_points_uniformly(number_of_points = self.point_cloud_size)
                point_clouds_list.append(torch.tensor(np.asanyarray(sampled_point_cloud.points),dtype = torch.float32))
            except RuntimeError: # Some .OFF files are damaged, run repair script
                print(f'Damaged file: {file}')

        return point_clouds_list
        
        
dataset = PointCloudDataset(base_dir = "../ModelNet40")
train_loader = DataLoader(dataset, batch_size=16, shuffle=False)

In [59]:
# class MLPEncoder(nn.Module):

#     def __init__(self, config):
#         super().__init__()

#         self.fc1 = nn.Linear(config.input_dim, config.hidden_dim1)
#         self.fc2 = nn.Linear(config.hidden_dim1, config.hidden_dim2)
#         self.fc3 = nn.Linear(config.hidden_dim2, config.latent_dim)

#     def forward(self, x):
#         x = F.gelu(self.fc1(x))
#         x = F.gelu(self.fc2(x))
#         x = self.fc3(x)

#         return x

# class MLPDecoder(nn.Module):

#     def __init__(self, config):
#         super().__init__()

#         self.fc1 = nn.Linear(config.latent_dim, config.hidden_dim2)
#         self.fc2 = nn.Linear(config.hidden_dim2, config.hidden_dim1)
#         self.fc3 = nn.Linear(config.hidden_dim1, config.input_dim)

#     def forward(self, x):
#         x = F.gelu(self.fc1(x))
#         x = F.gelu(self.fc2(x))
#         x = self.fc3(x)

#         return x


# class Autoencoder(nn.Module):

#     def __init__(self, config):
#         super().__init__()

#         self.encoder = MLPEncoder(config)
#         self.decoder = MLPDecoder(config)

#     def forward(self,x):
#         latent_rep = self.encoder(x)
#         out = self.decoder(latent_rep)
#         return out


class Autoencoder(nn.Module):

    def __init__(self, config):
        super().__init__()

        self.fc = nn.Linear(config.input_dim, config.input_dim)

    def forward(self,x):
        y = self.fc(x)
        return x + (.00001 * y)
    



In [63]:
point_cloud_size = 1000 


# @dataclass 
# class MLPAutoEncoderConfig:
#     input_dim = point_cloud_size * 3
#     hidden_dim1 = 3072
#     hidden_dim2 = 2048
#     latent_dim = 768

@dataclass 
class MLPAutoEncoderConfig:
    input_dim = point_cloud_size * 3
    hidden_dim1 = point_cloud_size * 3
    hidden_dim2 = point_cloud_size * 3
    latent_dim = point_cloud_size * 3


config = MLPAutoEncoderConfig()

model = Autoencoder(config).to(device)

optim = torch.optim.AdamW(model.parameters(), lr= 1e-4)

epochs = 100

report_rate = 600

for epoch in range(epochs):

    running_loss = 0 

    batch_count = 0 

    for i, data in enumerate(train_loader):

        x = data['points']
        x = x.view(x.shape[0], -1).to(device)
        
        optim.zero_grad()

        pred = model(x)

        loss = F.mse_loss(pred, x)

        loss.backward()

        optim.step()

        running_loss += loss.item()

        batch_count +=1

    print(f'Epoch {epoch:<3} Epoch Loss: {running_loss / batch_count}')

        # if i % report_rate == report_rate - 1:
        #     print(f'Batch {i:<3} Running Loss: {running_loss / report_rate}')
        #     running_loss = 0


    

Epoch 0   Epoch Loss: 1359899964.831061
Epoch 1   Epoch Loss: 4864357849.294123
Epoch 2   Epoch Loss: 8762562699.149101
Epoch 3   Epoch Loss: 7538794729.141841
Epoch 4   Epoch Loss: 6088163790.689809
Epoch 5   Epoch Loss: 5902926931.395541
Epoch 6   Epoch Loss: 6470063842.453177


KeyboardInterrupt: 

In [38]:
x.view(x.shape[0], -1).shape

torch.Size([16, 15000])