In [1]:
import torch
import sys
import glob
import matplotlib.pyplot as plt
import numpy as np

# # Importing the project from a subfolder
sys.path.append('./project')

from models.transformer import Aggregator
from models.tokenizer import Tokenizer
from models.vggt import VGGT, unflatten_tokens
from heads.camera_head import CameraHead

from dataloader.projection import *
from dataloader.dataset import *

In [2]:
test_model = Aggregator(embed_dim=64)

In [None]:
B, S, P, C = 5, 3, 20, 64 # B events, S images per event, P tokens per image, C elements per token

device = "cuda" if torch.cuda.is_available() else "cpu"

test_input = torch.tensor(np.random.randn(B, S, P, C)).float().to(device)
test_pos = torch.tensor(np.random.randint(1, 8, size=(B, S, P, 2))).to(device)
test_model = test_model.to(device)

In [None]:
test_output, test_idx = test_model.forward(test_input, test_pos)

In [None]:
test_output[0].shape

In [None]:
test_model = Tokenizer()

In [None]:
B, H = 5, 16 # B total patches, each image a HxH square

device = "cuda" if torch.cuda.is_available() else "cpu"

test_input = torch.tensor(np.random.randn(B, H, H)).float().to(device)
test_input = test_input.view(B, 1, H, H)
test_model = test_model.to(device)

In [None]:
test_output = test_model.forward(test_input)

In [None]:
test_output.shape

In [None]:
test_output.view(5, -1).shape

In [None]:
test_tokenizer = Tokenizer()
test_aggregator = Aggregator(embed_dim=256)

In [None]:
B, S, P, H = 1, 3, 5, 16 # B events, S images per event, P patches per image, HxH patches

device = "cuda" if torch.cuda.is_available() else "cpu"

test_input = torch.tensor(np.random.randn(B, S, P, H, H)).float().to(device)
test_pos = torch.tensor(np.random.randint(1, H+1, size=(B, S, P, 2))).to(device)
test_tokenizer, test_aggregator = test_tokenizer.to(device), test_aggregator.to(device)

In [None]:
# Simple flattening for this test. In the real case this would involve recording the sequence lengths
test_tokens = test_tokenizer.forward(test_input.view(B*S*P, H, H)).view(B, S, P, 256)
test_output, test_idx = test_aggregator.forward(test_tokens, test_pos)

In [None]:
test_output[0].shape

# Test the model

In [3]:
# Load the dataset as previously demonstrated, also get device
path = "/sdf/home/y/youngsam/data/dune/larnet/h5/DataAccessExamples/tutorial_example_v1.h5"

dataset = Dataset(path)

device = "cuda" if torch.cuda.is_available() else "cpu"

In [4]:
# Grab a sample
sample, _, rotations = dataset.choose_events(10, 3)
patch_counts, all_coords, all_patches = stack_patches(sample)
patch_counts = torch.Tensor(patch_counts).int().to(device)
all_coords = torch.Tensor(all_coords).int().to(device)
all_patches = torch.Tensor(all_patches).to(device)

In [5]:
test_model = VGGT()
test_model = test_model.to(device)

In [6]:
predictions, test_output, patch_start_idx = test_model(patch_counts, all_coords, all_patches)

In [7]:
len(test_output), test_output[-1].shape
# 24 blocks, results from every block; final result is NxSx(P+5)x(2*D)
# P+5 because 1 camera token and 4 register tokens added
# D*2 because ???

(24, torch.Size([10, 3, 55, 512]))

In [8]:
predictions["pose_enc"].shape

torch.Size([10, 3, 4])

In [14]:
quaternions = np.array([[r.as_quat() for r in row] for row in rotations])

In [15]:
quaternions.shape

(10, 3, 4)

In [21]:
quaternion_tensor = torch.tensor(quaternions).to(device)

In [24]:
torch.mean(torch.square(predictions["pose_enc"] - quaternion_tensor))

tensor(0.6238, device='cuda:0', dtype=torch.float64, grad_fn=<MeanBackward0>)