# LiDAR-LLM

Student Name: **Zhe HUANG** (from Master IASD)

In [None]:
import numpy as np

import torch.nn as nn
import torch.nn.functional as F
import torch

import transformers

## VoxelNet

In [None]:
# Fully Connected Network
class FCN(nn.Module):

    def __init__(self, cin, cout):
        super(FCN, self).__init__()
        self.cout = cout
        self.linear = nn.Linear(cin, cout)
        self.bn = nn.BatchNorm1d(cout)

    def forward(self, x):
        # KK is the stacked k across batch
        kk, t, _ = x.shape
        x = self.linear(x.view(kk * t, -1))
        x = F.relu(self.bn(x))
        return x.view(kk, t, -1)

# Voxel Feature Encoding layer
class VFE(nn.Module):

    def __init__(self, cin, cout):
        super(VFE, self).__init__()
        assert cout % 2 == 0
        self.units = cout // 2
        self.fcn = FCN(cin, self.units)

    def forward(self, x, mask):
        # point-wise feature
        pwf = self.fcn(x)
        # locally aggregated feature
        laf = torch.max(pwf, 1)[0].unsqueeze(1).repeat(1, cfg.T, 1)
        # point-wise concat feature
        pwcf = torch.cat((pwf, laf), dim=2)
        # apply mask
        mask = mask.unsqueeze(2).repeat(1, 1, self.units * 2)
        pwcf = pwcf * mask.float()

        return pwcf

# Stacked Voxel Feature Encoding
class SVFE(nn.Module):

    def __init__(self):
        super(SVFE, self).__init__()
        self.vfe_1 = VFE(7, 32)
        self.vfe_2 = VFE(32, 128)
        self.fcn = FCN(128, 128)

    def forward(self, x):
        mask = torch.ne(torch.max(x, 2)[0], 0)
        x = self.vfe_1(x, mask)
        x = self.vfe_2(x, mask)
        x = self.fcn(x)
        # element-wise max pooling
        x = torch.max(x, 1)[0]
        return x


class VoxelNet(nn.Module):
    def __init__(self):
        super(VoxelNet, self).__init__()
        self.svfe = SVFE()

    def forward(self, point_clouds):
        # Feature learning network
        voxel_features = self.svfe(point_clouds)
        return voxel_features

In [None]:
class LiDAR3DFeatureExtractor:
    def __init__(self, model_path, point_cloud_range=(-54.0, 54.0, -5.0, 54.0, 54.0, 3.0), bev_grid_size=(0.6, 0.6)):
        self.model = VoxelNet()
        self.model.load_state_dict(torch.load(model_path))  # Load the pretrained model weights
        self.model.eval()  # Set the model to evaluation mode
        
        self.point_cloud_range = point_cloud_range
        self.bev_grid_size = bev_grid_size

    def extract_features(self, point_cloud):
        """
        Extracts 3D voxel features from a LiDAR point cloud.
        
        :param point_cloud: A numpy array of shape (n, 3) representing the LiDAR point cloud.
        :return: 3D voxel feature tensor.
        """
        # Normalize point cloud within specified range
        point_cloud_normalized = self.normalize_point_cloud(point_cloud)
        
        point_cloud_tensor = torch.tensor(point_cloud_normalized, dtype=torch.float32)
        voxel_feature = self.model(point_cloud_tensor.unsqueeze(0))  # Adding a batch dimension
        
        return voxel_feature

    def normalize_point_cloud(self, point_cloud):
        """
        Normalize the LiDAR point cloud within the specified range.
        
        :param point_cloud: A numpy array of shape (n, 3) representing the LiDAR point cloud.
        :return: Normalized point cloud.
        """
        # Assuming point_cloud_range is in format (min_x, max_x, min_y, max_y, min_z, max_z)
        min_x, max_x, min_y, max_y, min_z, max_z = self.point_cloud_range
        
        # Normalize x, y, z coordinates
        point_cloud[:, 0] = (point_cloud[:, 0] - min_x) / (max_x - min_x)
        point_cloud[:, 1] = (point_cloud[:, 1] - min_y) / (max_y - min_y)
        point_cloud[:, 2] = (point_cloud[:, 2] - min_z) / (max_z - min_z)
        
        return point_cloud

    def flatten_feature_to_bev(self, voxel_feature):
        """
        Flattens the 3D voxel feature along the z-axis to generate a BEV feature.
        
        :param voxel_feature: A tensor representing the 3D voxel feature.
        :return: BEV feature tensor.
        """
        # Assuming bev_grid_size is in format (grid_size_x, grid_size_y)
        grid_size_x, grid_size_y = self.bev_grid_size
        
        # Perform max pooling along the z-axis
        bev_feature = voxel_feature.max(dim=-1)[0]
        
        # Reshape the feature tensor to match the BEV grid size using nearest neighbor interpolation
        bev_feature = F.interpolate(bev_feature.unsqueeze(0).unsqueeze(0), size=(grid_size_x, grid_size_y), mode='nearest')
        
        return bev_feature.squeeze(0)


# View-Aware Transfomer(VAT)

In [None]:
class VAT(nn.Module):
    def __init__(self, num_queries=576, input_dim=768, output_dim=768, num_view_positions=6):
        super(VAT, self).__init__()
        self.num_queries = num_queries
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.num_view_positions = num_view_positions
        
        # Learnable query embeddings
        self.query_embeddings = nn.Parameter(torch.randn(num_queries, input_dim))
        
        # View position embeddings
        self.view_position_embeddings = nn.Parameter(torch.zeros(input_dim, num_view_positions))
        
        # Cross-attention mechanism
        self.cross_attention = nn.MultiheadAttention(embed_dim=output_dim, num_heads=8)
        
        # MLP for processing visual vectors
        self.mlp = nn.Sequential(
            nn.Linear(output_dim, output_dim),
            nn.ReLU(),
            nn.Linear(output_dim, output_dim)
        )
        
    def forward(self, visual_feature):
        # Repeat query embeddings to match the batch size of visual features
        queries = self.query_embeddings.unsqueeze(0).repeat(visual_feature.size(0), 1, 1)
        
        # Add view position embeddings to the BEV feature
        visual_feature_with_view_position = visual_feature + self.view_position_embeddings.unsqueeze(0)
        
        # Cross-attention mechanism
        attn_output, _ = self.cross_attention(queries, visual_feature_with_view_position, visual_feature_with_view_position)
        
        # Apply MLP to process visual vectors
        processed_output = self.mlp(attn_output.transpose(0, 1))
        
        return processed_output


# LLaMA-7B

In [None]:
# load the pretrained model 'dfurman/llama-7b' and the tokenizer
model_name = "dfurman/llama-7b"
tokenizer = transformers.AutoTokenizer.from_pretrained(model_name)
llama_model = transformers.LlamaForCausalLM.from_pretrained(
    model_name,
    torch_dtype=torch.bfloat16,
    device_map="auto"
)



class LiDAR_LLMA(nn.Module):
    def __init__(self, llama_model, mlp_hidden_dim):
        super(LiDAR_LLMA, self).__init__()
        self.llama_model = llama_model
        self.mlp = nn.Sequential(
            nn.Linear(mlp_hidden_dim, mlp_hidden_dim),
            nn.ReLU(),
            nn.Linear(mlp_hidden_dim, mlp_hidden_dim)
        )
        
    def forward(self, text_input_ids, text_attention_mask):
        # Run LLAMA model
        output = self.llama_model(
            input_ids=text_input_ids,
            attention_mask=text_attention_mask
        )
        # Process output through MLP
        visual_vectors = output.logits  # Assuming logits contain visual vectors
        processed_vectors = self.mlp(visual_vectors)
        return processed_vectors


# Instantiate LiDAR-LLMA model
lidar_llma_model = LiDAR_LLMA(llama_model)