**Encoder (Using Sinusoidal Positional Encoding)**

In [2]:
import pandas as pd
import torch

df = pd.read_csv('/home/mseleem/Desktop/3d_SceneScript/0/semidense_points.csv.gz')
df = df.apply(pd.to_numeric, errors='coerce')
df = df.fillna(0) 
numpy_array = df.values
tensor = torch.tensor(numpy_array, dtype=torch.float32)

print(tensor.shape)

torch.Size([433426, 7])


In [1]:
import torch
from point_cloud_processor import PointCloudTransformerLayer

# Instantiate the model
model = PointCloudTransformerLayer().cuda()

# Define the filepath to the point cloud data
filepath = "/home/mseleem/Desktop/3d_model_pt/0/semidense_points.csv.gz"

# Call the forward method to process the point cloud and get encoded features
vit_encoded_features = model(filepath)

# Print the shape of the encoded features
print(f"ViT encoded features shape: {vit_encoded_features.shape}")


Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.
ViT encoded features shape: torch.Size([1212, 512])


In [4]:
import torch
import open3d as o3d
import pandas as pd
import numpy as np
import os
from torch import nn
import torchsparse
import torchsparse.nn as spnn
from torchsparse import SparseTensor

def read_points_file(filepath):
    assert os.path.exists(filepath), f"Could not find point cloud file: {filepath}"
    df = pd.read_csv(filepath, compression="gzip")
    point_cloud = df[["px_world", "py_world", "pz_world"]]
    dist_std = df["dist_std"]
    print(f"Loaded point cloud with {len(point_cloud)} points.")
    return point_cloud.to_numpy(), dist_std.to_numpy()

def generate_sinusoidal_positional_encoding(coordinates, d_model):
    """
    Generates a sinusoidal positional encoding matrix.
    """
    n_positions, n_dims = coordinates.shape
    pe = torch.zeros(n_positions, d_model).cuda()
    position = coordinates.float().cuda()
    div_term = torch.exp(torch.arange(0, d_model // n_dims, 2).float() * -(np.log(10000.0) / (d_model // n_dims))).cuda()

    for i in range(n_dims):
        pe[:, 2 * i:d_model:2 * n_dims] = torch.sin(position[:, i].unsqueeze(1) * div_term)
        pe[:, 2 * i + 1:d_model:2 * n_dims] = torch.cos(position[:, i].unsqueeze(1) * div_term)

    return pe

class TransformerEncoderBlock(nn.Module):
    def __init__(self, dim, heads, mlp_dim, dropout=0.1):
        super(TransformerEncoderBlock, self).__init__()
        self.norm1 = nn.LayerNorm(dim)
        self.attn = nn.MultiheadAttention(dim, heads, dropout=dropout)
        self.norm2 = nn.LayerNorm(dim)
        self.ff = nn.Sequential(
            nn.Linear(dim, mlp_dim),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(mlp_dim, dim),
            nn.Dropout(dropout)
        )

    def forward(self, x):
        x = x + self.attn(self.norm1(x), self.norm1(x), self.norm1(x))[0]
        x = x + self.ff(self.norm2(x))
        return x

class ViTEncoder(nn.Module):
    def __init__(self, dim, depth, heads, mlp_dim, dropout=0.1):
        super(ViTEncoder, self).__init__()
        self.layers = nn.ModuleList([
            TransformerEncoderBlock(dim, heads, mlp_dim, dropout) for _ in range(depth)
        ])

    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return x

class VisionTransformer(nn.Module):
    def __init__(self, dim, depth, heads, mlp_dim, dropout=0.1, emb_dropout=0.1):
        super(VisionTransformer, self).__init__()
        self.context_embedding = nn.Parameter(torch.randn(1, 1, dim))
        self.dropout = nn.Dropout(emb_dropout)
        self.encoder = ViTEncoder(dim, depth, heads, mlp_dim, dropout)
        # self.norm = nn.LayerNorm(dim)

    def forward(self, x, pos_encoding):
        batch_size = x.shape[0]
        cls_tokens = self.context_embedding.expand(batch_size, -1, -1)
        x = torch.cat((cls_tokens, x), dim=1)
        pos_encoding = torch.cat((torch.zeros(batch_size, 1, pos_encoding.size(-1)).cuda(), pos_encoding), dim=1)
        x = x + pos_encoding
        x = self.dropout(x)
        x = self.encoder(x)
        # x = self.norm(x)
        return x

class SparseResNetEncoder(nn.Module):
    def __init__(self):
        super(SparseResNetEncoder, self).__init__()
        self.conv1 = spnn.Conv3d(1, 16, kernel_size=3, stride=2)
        self.conv2 = spnn.Conv3d(16, 32, kernel_size=3, stride=2)
        self.conv3 = spnn.Conv3d(32, 64, kernel_size=3, stride=2)
        self.conv4 = spnn.Conv3d(64, 128, kernel_size=3, stride=2)
        self.conv5 = spnn.Conv3d(128, 512, kernel_size=3, stride=2)

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.conv5(x)
        return x

if __name__ == "__main__":
    voxel_size = 0.03 
    points, dist_std = read_points_file("/home/mseleem/Desktop/3d_model_pt/0/semidense_points.csv.gz")
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(points)
    voxel_grid = o3d.geometry.VoxelGrid.create_from_point_cloud(pcd, voxel_size)

    voxel_map = {}

    for i, point in enumerate(points):
        voxel_index = tuple(voxel_grid.get_voxel(point))
        if voxel_index in voxel_map:
            voxel_map[voxel_index].append(i)
        else:
            voxel_map[voxel_index] = [i]

    aggregated_features = []
    filtered_voxel_indices = []
    for idx, (voxel_index, point_indices) in enumerate(voxel_map.items()):
        aggregated_feature = np.mean(dist_std[point_indices])
        aggregated_features.append(aggregated_feature)
        filtered_voxel_indices.append(voxel_index)

    voxel_indices_tensor = torch.tensor(filtered_voxel_indices, dtype=torch.int32).cuda()
    features_tensor = torch.tensor(aggregated_features, dtype=torch.float32).view(-1, 1).cuda()

    batch_indices = torch.zeros((voxel_indices_tensor.shape[0], 1), dtype=torch.int32).cuda()
    voxel_indices_tensor_with_batch = torch.cat([batch_indices, voxel_indices_tensor], dim=1)

    sparse_tensor = SparseTensor(features_tensor, voxel_indices_tensor_with_batch)

    encoder = SparseResNetEncoder().cuda()
    encoded_features = encoder(sparse_tensor.cuda())

    print(f"Encoded features (F) from point cloud: {encoded_features.F.shape}")
    print(f"Encoded features (C) from point cloud: {encoded_features.C.shape}")

    positional_encoding = generate_sinusoidal_positional_encoding(encoded_features.C, 512)
    encoded_features_with_pos = encoded_features.F + positional_encoding

    vit = VisionTransformer(dim=512, depth=6, heads=8, mlp_dim=2048).cuda()
    encoded_features_with_pos = encoded_features_with_pos.unsqueeze(0)
    vit_encoded_features = vit(encoded_features_with_pos, positional_encoding.unsqueeze(0))

    vit_encoded_features = vit_encoded_features.squeeze(0)
    print(f"ViT encoded features shape: {vit_encoded_features.shape}")

    # preprocessed_context_embedding = vit_encoded_features[0]  
    # print(f"Preprocessed context embedding shape: {preprocessed_context_embedding.shape}")


Loaded point cloud with 433426 points.
Encoded features (F) from point cloud: torch.Size([1211, 512])
Encoded features (C) from point cloud: torch.Size([1211, 4])
ViT encoded features shape: torch.Size([1212, 512])


**GT Script**

In [None]:
import pandas as pd
import re

# Function to parse the script and extract values
def parse_script(file_path):
    with open(file_path, 'r') as file:
        lines = file.readlines()

    data = []
    for line in lines:
        line = line.strip()
        if line.startswith("make_wall"):
            command = "make_wall"
            pattern = r'a_x=(-?\d*\.\d+), a_y=(-?\d*\.\d+), a_z=(-?\d*\.\d+), b_x=(-?\d*\.\d+), b_y=(-?\d*\.\d+), b_z=(-?\d*\.\d+), height=(-?\d*\.\d+)'
            match = re.search(pattern, line)
            if match:
                values = [command] + list(map(float, match.groups()))
                data.append(values)
        
        elif line.startswith("make_door"):
            command = "make_door"
            pattern = r'wall0_id=(-?\d+), wall1_id=(-?\d+), position_x=(-?\d*\.\d+), position_y=(-?\d*\.\d+), position_z=(-?\d*\.\d+), width=(-?\d*\.\d+), height=(-?\d*\.\d+)'
            match = re.search(pattern, line)
            if match:
                values = [command] + list(map(float, match.groups()))
                data.append(values)

        elif line.startswith("make_window"):
            command = "make_window"
            pattern = r'wall0_id=(-?\d+), wall1_id=(-?\d+), position_x=(-?\d*\.\d+), position_y=(-?\d*\.\d+), position_z=(-?\d*\.\d+), width=(-?\d*\.\d+), height=(-?\d*\.\d+)'
            match = re.search(pattern, line)
            if match:
                values = [command] + list(map(float, match.groups()))
                data.append(values)
    
    return data

# File path
file_path = '/home/mseleem/Desktop/3d_SceneScript/0/ase_scene_language.txt'

# Parse the script
data = parse_script(file_path)

# Find the maximum number of columns required
max_columns = max(len(row) for row in data)

# Ensure all rows have the same number of columns by padding with None
for row in data:
    while len(row) < max_columns:
        row.append(None)

# Create a unified DataFrame
columns = ["command"] + ["parameter_" + str(i) for i in range(1, max_columns)]
df_all = pd.DataFrame(data, columns=columns)

# Convert DataFrame rows to vector embeddings
vector_embeddings = df_all.values.tolist()

# Display the vector embeddings
print("Script Embeddings:")
for vector in vector_embeddings:
    print(vector)


In [None]:
from enum import Enum
import torch.nn as nn


class Commands(Enum):
    START = 1
    STOP = 2
    MAKE_WALL = 3
    MAKE_WINDOW = 4
    MAKE_DOOR = 5

    @classmethod
    def get_name_for(cls, value: int):
        if value == cls.START.value:
            return cls.START
        if value == cls.STOP.value:
            return cls.STOP
        if value == cls.MAKE_WALL.value:
            return cls.MAKE_WALL
        if value == cls.MAKE_WINDOW.value:
            return cls.MAKE_WINDOW
        if value == cls.MAKE_DOOR.value:
            return cls.MAKE_DOOR

class TransformerOutputLayer(nn.Module):
    def __init__(self, transformer_dim):
        super(TransformerOutputLayer, self).__init__()
        self.command_layer = nn.Linear(*transformer_dim, 5)  # Output 5 command logits (START included)
        self.object_id_and_height_layer = nn.Linear(*transformer_dim, 2)  # Output the id and the height of the object
        self.door_or_window_parameters_layer = nn.Linear(*transformer_dim, 6)  # Output the wall_id_0, wall_id_1, x, y, z, w of the door/window
        self.wall_corners_layer = nn.Linear(*transformer_dim, 4)  # Output the x1, y1, x2, y2 of the wall

    def forward(self, x):
        # x is the output from the transformer, shape: (batch_size, sequence_length, transformer_dim)
        
        # Predict commands
        command_logits = self.command_layer(x)  # Shape: (batch_size, sequence_length, 3)
        command_probs = F.softmax(command_logits, dim=-1)  # Shape: (batch_size, sequence_length, 3)
        
        object_id_and_height = self.object_id_and_height_layer(x)  # Shape: (batch_size, sequence_length, 2)

        door_or_window_parameters = self.door_or_window_parameters_layer(x)  # Shape: (batch_size, sequence_length, 6)

        wall_corners = self.wall_corners_layer(x)  # Shape: (batch_size, sequence_length, 4)
        
        return command_probs, object_id_and_height, door_or_window_parameters, wall_corners

def select_parameters(command_probs, object_id_and_height, door_or_window_parameters, wall_corners):
    # command_probs: shape (batch_size, sequence_length, 3)
    # shared_parameter: shape (batch_size, sequence_length, 1)
    # Get the predicted command indices (shape: batch_size, sequence_length)
    command_indx = command_probs.argmax(dim=-1) + 2
    command_indx = command_indx.numpy()
    if command_indx == Commands.STOP.value:
        parameters = torch.zeros(12)
    elif command_indx == Commands.MAKE_WALL.value:
        parameters = torch.cat((object_id_and_height, wall_corners))
    elif command_indx in [Commands.MAKE_DOOR.value, Commands.MAKE_WINDOW.value]:
        parameters = torch.cat((object_id_and_height, door_or_window_parameters))
    
    return Commands.get_name_for(command_indx), parameters

In [None]:
command, parameters = select_parameters(torch.tensor([0, 1, 0, 0 ]), torch.zeros(2), torch.zeros(6), torch.zeros(4)+1)
if parameters.size(0) < 8:
    parameters = torch.cat((parameters, torch.zeros(8 - parameters.size(0))))
print(command, parameters)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
from enum import Enum

PARAM_SIZE = 8
vocab = ["START", "STOP", "make_wall", "make_window", "make_door"]
COMMANDS = vocab


vocab_index = [1, 0, 2, 3, 4]
# for command in ["make_wall", "make_window", "make_door"]:
#     # for i in range(PARAM_SIZE):
#     vocab.append(command)

token_to_index = {token: idx for idx, token in enumerate(vocab)}
index_to_token = {idx: token for token, idx in token_to_index.items()}


VOCAB_SIZE = len(vocab) + PARAM_SIZE
print(f"Size of the vocabulary: {vocab_size}")

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return x

class CustomTransformerDecoder(nn.Module):
    def __init__(self, d_model, nhead, num_decoder_layers, dim_feedforward, dropout=0.1):
        super(CustomTransformerDecoder, self).__init__()
        self.decoder_layer = nn.TransformerDecoderLayer(d_model, nhead, dim_feedforward, dropout)
        self.transformer_decoder = nn.TransformerDecoder(self.decoder_layer, num_decoder_layers)
        self.d_model = d_model

    def forward(self, tgt, memory, tgt_mask=None, memory_mask=None, tgt_key_padding_mask=None, memory_key_padding_mask=None):
        # If no encoder is used, we need to provide the memory
        # Normally memory would be the output from the encoder
        output = self.transformer_decoder(tgt, memory, tgt_mask=tgt_mask, memory_mask=memory_mask,
                                          tgt_key_padding_mask=tgt_key_padding_mask, memory_key_padding_mask=memory_key_padding_mask)
        return output

# # Example usage
# d_model = 512
# nhead = 8
# num_decoder_layers = 6
# dim_feedforward = 2048
# dropout = 0.1

# decoder = CustomTransformerDecoder(d_model, nhead, num_decoder_layers, dim_feedforward, dropout)

# # Example input tensors
# tgt = torch.rand((10, 32, d_model))  # (sequence_length, batch_size, d_model)
# memory = torch.rand((10, 32, d_model))  # Memory should come from somewhere (e.g., encoder output)

# output = decoder(tgt, memory)
# print(output.shape)


def construct_embedding_vector_from_vocab(command: Commands, parameters: torch.Tensor):
    num_classes = len(Commands)

    # Convert the integer value to a tensor
    value_tensor = torch.tensor(command.value - 1)

    # Create the one-hot tensor
    one_hot_tensor = F.one_hot(value_tensor, num_classes=num_classes)
    if parameters.size(0) < 8:
        parameters = torch.cat((parameters, torch.zeros(8 - parameters.size(0))))

    return torch.cat((one_hot_tensor, parameters))


class CommandTransformer(nn.Module):
    def __init__(self, vocab_size, d_model=512, nhead=8, num_layers=6):
        super(CommandTransformer, self).__init__()
        # self.embedding = nn.Embedding(vocab_size, d_model)
        self.point_cloud_encoder = None #TODO: Add the encoder
        self.pos_encoder = PositionalEncoding(d_model)
        self.transformer = CustomTransformerDecoder(d_model, nhead, num_layers, 2048)
        self.output_layer = TransformerOutputLayer((12, d_model))


    def forward(self, src: torch.Tensor, tgt: torch.Tensor):
        src_emb = self.point_cloud_encoder(src)
        # tgt_emb = self.construct_embedding_vector_from_vocab(command, parameters)  # (seq_len, batch_size, d_model)
        tgt_emb = self.pos_encoder(tgt_emb)
        transformer_output = self.transformer(src_emb, tgt_emb)  # (tgt_seq_len, batch_size, d_model)
        
        # command_output = self.fc_command(transformer_output)  # (tgt_seq_len, batch_size, 3)
        ouputs = self.output_layer(transformer_output)  # (tgt_seq_len, batch_size, vocab_size)
        
        return ouputs


model = CommandTransformer(vocab_size=vocab_size)
input_emb = construct_embedding_vector_from_vocab(Commands.START, torch.zeros(12)).unsqueeze(-1)
point_cloud_tensor = torch.zeros((433426,6))

while True:
    pred = model(point_cloud_tensor, input_emb)
    command, parameters = select_parameters(*pred)
    output_emb = construct_embedding_vector_from_vocab(command, parameters)
    input_emb = torch.cat(input_emb, output_emb.unsqueeze(-1))
    if command == Commands.STOP:
        break

print(input_emb)

# def generate_sequence(model):
#     input_seq = torch.tensor([token_to_index["START"]]).unsqueeze(1)  # Start with the START token
#     print(input_seq.shape)
#     tgt_seq = torch.tensor([token_to_index["START"]]).unsqueeze(1)
#     print("START")
#     sequence = []

#     while True:
#         with torch.no_grad():
#             command_output, params_output = model(input_seq, tgt_seq)

#         # Apply softmax to command_output to get probabilities
#         command_probs = F.softmax(command_output[-1, 0], dim=-1)
#         command_index = torch.argmax(command_probs).item()
#         command_token = ["make_wall", "make_window", "make_door", "STOP"][command_index]

#         if command_token == "STOP":
#             print("STOP")
#             sequence.append(command_token)
#             break

#         # Print the predicted command and parameters
#         print(command_token)

#         params = params_output[-1, 0].tolist()
#         print(params)

#         sequence.append((command_token, params))

#         # Update input sequence with the new command
#         new_input = torch.tensor([token_to_index[command_token]]).unsqueeze(1)
#         tgt_seq = torch.cat((tgt_seq, new_input), dim=0)
#         input_seq = torch.cat((input_seq, new_input), dim=0)

#     return sequence

# def parse_predictions(predictions):
#     sequence = ["START"]
#     for command_token, params in predictions:
#         sequence.append(command_token)
#         if command_token != "STOP":
#             sequence.extend(params)
#     return sequence

# # Generate and parse a sequence using the model
# predictions = generate_sequence(model)
# parsed_sequence = parse_predictions(predictions)

# print("Generated Sequence:")
# print(parsed_sequence)

