In [1]:
import torch

from utils.objaverse_dataset import ObjaversePointCloudDataset

annotations_file = '/Users/kostyalbalint/Documents/Egyetem/7.Felev/Szakdolgozat/objaverse_labeling/concatenated_annotations.npy'
pc_dir = '/Users/kostyalbalint/Documents/Egyetem/7.Felev/Szakdolgozat/pointClouds3000'

dataset = ObjaversePointCloudDataset(annotations_file=annotations_file, pc_dir=pc_dir, file_ext='.npz', scale_mode='shape_bbox', load_to_mem=False, transform=None)

In [2]:
dataset.__len__()
dataset.__getitem__(1)['latent_text'].size() #dict_keys(['pointcloud', 'latent_text', 'id', 'shift', 'scale'])

torch.Size([128])

## Load back from Checkpoint

In [3]:
from utils.misc import CheckpointManager

log_dir = './logs_gen/GEN_2023_11_04__16_12_14'

ckpt_mgr = CheckpointManager(log_dir)

state = ckpt_mgr.load_best()

## Add text conditioning to the model

In [25]:
from models.common import reparameterize_gaussian, gaussian_entropy, standard_normal_logprob, truncated_normal_
from models.diffusion import DiffusionPoint, PointwiseNet, VarianceSchedule
from models.flow import build_latent_flow
from models.encoders import PointNetEncoder
import torch
from torch.nn import Module


class FlowVAE(Module):

    def __init__(self, args):
        super().__init__()
        self.args = args
        self.encoder = PointNetEncoder(args.latent_dim)
        self.flow = build_latent_flow(args, args.latent_text_dim)
        self.diffusion = DiffusionPoint(
            net = PointwiseNet(point_dim=3, context_dim=args.latent_dim, residual=args.residual),
            var_sched = VarianceSchedule(
                num_steps=args.num_steps,
                beta_1=args.beta_1,
                beta_T=args.beta_T,
                mode=args.sched_mode
            )
        )

    def get_loss(self, x, encoded_text, kl_weight, writer=None, it=None):
        """
        Args:
            x:  Input point clouds, (B, N, d).
        """
        batch_size, _, _ = x.size()
        # print(x.size())
        z_mu, z_sigma = self.encoder(x)
        #print(z_mu.size()) #torch.Size([64, 256]) [batch_size, latent_dim]
        #print(z_sigma.size()) #torch.Size([64, 256]) [batch_size, latent_dim]
        
        z = reparameterize_gaussian(mean=z_mu, logvar=z_sigma)  # (B, F)
        print(z.size()) # [batch_size * latent_dim]

        # H[Q(z|X)]
        entropy = gaussian_entropy(logvar=z_sigma)      # (B, )
        
        # Condition the latent z vector by custom encoded_text
        conditioned_z = torch.cat((z , encoded_text), 1)

        # P(z), Prior probability, parameterized by the flow: z -> w.
        w, delta_log_pw = self.flow(conditioned_z, torch.zeros([batch_size, 1]).to(z), reverse=False)
        log_pw = standard_normal_logprob(w).view(batch_size, -1).sum(dim=1, keepdim=True)   # (B, 1)
        log_pz = log_pw - delta_log_pw.view(batch_size, 1)  # (B, 1)

        # Negative ELBO of P(X|z)
        neg_elbo = self.diffusion.get_loss(x, z)

        # Loss
        loss_entropy = -entropy.mean()
        loss_prior = -log_pz.mean()
        loss_recons = neg_elbo
        loss = kl_weight*(loss_entropy + loss_prior) + neg_elbo

        if writer is not None:
            writer.add_scalar('train/loss_entropy', loss_entropy, it)
            writer.add_scalar('train/loss_prior', loss_prior, it)
            writer.add_scalar('train/loss_recons', loss_recons, it)
            writer.add_scalar('train/z_mean', z_mu.mean(), it)
            writer.add_scalar('train/z_mag', z_mu.abs().max(), it)
            writer.add_scalar('train/z_var', (0.5*z_sigma).exp().mean(), it)

        return loss

    def sample(self, w, num_points, flexibility, truncate_std=None):
        batch_size, _ = w.size()
        if truncate_std is not None:
            w = truncated_normal_(w, mean=0, std=1, trunc_std=truncate_std)
        # Reverse: z <- w.
        z = self.flow(w, reverse=True).view(batch_size, -1)
        samples = self.diffusion.sample(num_points, context=z, flexibility=flexibility)
        return samples, z


In [26]:
from torch.utils.data import DataLoader
from utils.data import get_data_iterator

device = 'cpu'
args = state['args']
args.latent_text_dim = 128
args.train_batch_size = 69

train_iter = get_data_iterator(DataLoader(
    dataset,
    batch_size=args.train_batch_size,
    num_workers=0,
))

model = FlowVAE(args).to(device)

In [28]:
from utils.misc import BlackHole

batch = next(train_iter)
writer = BlackHole()

#batch.keys() # dict_keys(['pointcloud', 'latent_text', 'id', 'shift', 'scale'])

x = batch['pointcloud'].to(device)
encoded_text = batch['latent_text'].to(device)
it = 0

loss = model.get_loss(x, encoded_text, kl_weight=args.kl_weight, writer=writer, it=it)

torch.Size([69, 256])


tensor(136.3206, grad_fn=<AddBackward0>)

In [29]:
print(loss)

tensor(136.3206, grad_fn=<AddBackward0>)


In [37]:
dataset.__getitem__(0)['latent_text'].shape[0]

128

In [None]:
import torch

with torch.no_grad():
    z = torch.randn([1, args.latent_dim]).to(device)
    x = model.sample(z, args.sample_num_points, flexibility=args.flexibility)

In [None]:
x.shape

## Text to latent

In [40]:
from transformers import BertTokenizerFast, BertModel


def tokenize_sentences(sentence):
    tokenizer = BertTokenizerFast.from_pretrained("setu4993/LEALLA-small")
    tokenizer_model = BertModel.from_pretrained("setu4993/LEALLA-small").to('mps')
    tokenizer_model = tokenizer_model.eval()
    english_inputs = tokenizer([sentence], return_tensors="pt", padding=True, max_length=512, truncation=True).to('mps')
    with torch.no_grad():
        english_outputs = tokenizer_model(**english_inputs).pooler_output

    return english_outputs.cpu().numpy()[0]



In [57]:
import numpy

t = tokenize_sentences("Table")

In [71]:
import numpy as np

print(t.shape)

np.resize(t, (5, t.shape[0]))

(3,)


array([[0, 1, 2],
       [0, 1, 2],
       [0, 1, 2],
       [0, 1, 2],
       [0, 1, 2]])