# Imports

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import os
from google.colab import drive

drive.mount('/content/drive')

Mounted at /content/drive


# Defining Model Classes

## Compact Bilinear Pooling

In [2]:
class CompactBilinearPooling(nn.Module):
    def __init__(self, input_dim1, input_dim2, output_dim, sum_pool=True):
        super().__init__()
        self.input_dim1 = input_dim1
        self.input_dim2 = input_dim2
        self.output_dim = output_dim
        self.sum_pool = sum_pool

        self.sketch1 = nn.Parameter(self._generate_sketch_matrix(input_dim1, output_dim), requires_grad=False)
        self.sketch2 = nn.Parameter(self._generate_sketch_matrix(input_dim2, output_dim), requires_grad=False)

    def _generate_sketch_matrix(self, input_dim, output_dim):
        sketch = torch.zeros(input_dim, output_dim)
        for i in range(input_dim):
            h = np.random.randint(output_dim)
            s = np.random.choice([-1, 1])
            sketch[i, h] = s
        return sketch

    def forward(self, x1, x2):
        batch_size = x1.size(0)

        # Compute count sketches
        sketch1 = torch.mm(x1, self.sketch1)
        sketch2 = torch.mm(x2, self.sketch2)

        fft1 = torch.fft.rfft(sketch1, dim=1)
        fft2 = torch.fft.rfft(sketch2, dim=1)

        fft_product = fft1 * fft2

        cbp = torch.fft.irfft(fft_product, n=self.output_dim, dim=1)

        if self.sum_pool:
            cbp = cbp.sum(dim=1)

        return cbp

This layer is used to combine two different types of feature embeddings (video and text) into a single representation. Compact Bilinear Pooling is efficient in combining these features while capturing rich interactions between them.

Sketch Matrices are used to project the input features into a higher-dimensional space. This projection helps in capturing interactions between the video and text features.

Fast Fourier Transform is applied to the count sketches of the input features, which helps in efficiently computing the outer product in the projected space.

## Multihead Attention

Allows the model to focus on different parts of the input sequences. By using multiple heads, the model can learn to attend to various aspects of the input data simultaneously.

Linear layers are used to project the input into different spaces for computing query, key, and value.

Attention Weights are computed using the dot product of the query and key matrices, followed by a softmax function.

In [3]:
class MultiHeadAttention(nn.Module):
    def __init__(self, embed_dim, num_heads, dropout=0.1):
        super().__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads

        assert self.head_dim * num_heads == embed_dim, "embed_dim must be divisible by num_heads"

        self.q_proj = nn.Linear(embed_dim, embed_dim)
        self.k_proj = nn.Linear(embed_dim, embed_dim)
        self.v_proj = nn.Linear(embed_dim, embed_dim)
        self.out_proj = nn.Linear(embed_dim, embed_dim)

        self.dropout = nn.Dropout(dropout)

    def forward(self, query, key, value, key_padding_mask=None, attn_mask=None):
        batch_size, seq_len, _ = query.size()

        q = self.q_proj(query).view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
        k = self.k_proj(key).view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
        v = self.v_proj(value).view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)

        scores = torch.matmul(q, k.transpose(-2, -1)) / (self.head_dim ** 0.5)

        if attn_mask is not None:
            scores = scores.masked_fill(attn_mask == 0, float('-inf'))

        if key_padding_mask is not None:
            scores = scores.masked_fill(key_padding_mask.unsqueeze(1).unsqueeze(2), float('-inf'))

        attn_weights = F.softmax(scores, dim=-1)
        attn_weights = self.dropout(attn_weights)

        attn_output = torch.matmul(attn_weights, v)
        attn_output = attn_output.transpose(1, 2).contiguous().view(batch_size, seq_len, self.embed_dim)

        return self.out_proj(attn_output)

## Positional Encoding

Since the transformer model doesn’t inherently capture the order of sequences, positional encoding is added to the input embeddings to provide some notion of order or position.

Sine and Cosine functions are used to create the positional encodings, which are then added to the input embeddings.

In [4]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        self.d_model = d_model
        self.max_len = max_len

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        seq_len = x.size(1)
        return x + self.pe[:, :seq_len]

## Transformer Encoding

The Transformer Encoder processes the input embeddings in parallel using self-attention and feedforward layers, allowing the model to capture complex dependencies in the data.

In [5]:
class TransformerEncoderLayer(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, nhead, dropout=dropout)
        self.linear1 = nn.Linear(d_model, dim_feedforward)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(dim_feedforward, d_model)

        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)

    def forward(self, src, src_mask=None, src_key_padding_mask=None):
        src2 = self.self_attn(src, src, src, attn_mask=src_mask, key_padding_mask=src_key_padding_mask)
        src = src + self.dropout1(src2)
        src = self.norm1(src)
        src2 = self.linear2(self.dropout(F.relu(self.linear1(src))))
        src = src + self.dropout2(src2)
        src = self.norm2(src)
        return src

In [6]:
class TransformerEncoder(nn.Module):
    def __init__(self, encoder_layer, num_layers):
        super().__init__()
        self.layers = nn.ModuleList([encoder_layer for _ in range(num_layers)])

    def forward(self, src, mask=None, src_key_padding_mask=None):
        output = src
        for layer in self.layers:
            output = layer(output, src_mask=mask, src_key_padding_mask=src_key_padding_mask)
        return output

## Fusion Model

In [7]:
class MultimodalFusion(nn.Module):
    def __init__(self, video_dim, text_dim, fusion_dim, num_heads=8, num_encoder_layers=6):
        super().__init__()
        self.video_projection = nn.Linear(video_dim, fusion_dim)
        self.text_projection = nn.Linear(text_dim, fusion_dim)
        self.compact_bilinear = CompactBilinearPooling(fusion_dim, fusion_dim, fusion_dim)

        self.pos_encoder = PositionalEncoding(fusion_dim)
        encoder_layer = TransformerEncoderLayer(fusion_dim, num_heads)
        self.transformer_encoder = TransformerEncoder(encoder_layer, num_encoder_layers)

        self.final_projection = nn.Linear(fusion_dim, fusion_dim)

    def forward(self, video_features, text_features):
        video_proj = self.video_projection(video_features)  # [batch_size, fusion_dim]
        text_proj = self.text_projection(text_features)     # [batch_size, fusion_dim]

        fused_features = self.compact_bilinear(video_proj, text_proj)  # [batch_size, fusion_dim]
        fused_features = fused_features.unsqueeze(1)  # [batch_size, 1, fusion_dim]

        fused_features = self.pos_encoder(fused_features)  # Apply positional encoding

        encoded_features = self.transformer_encoder(fused_features)  # [batch_size, 1, fusion_dim]
        encoded_features = encoded_features.squeeze(1)  # [batch_size, fusion_dim]

        output = self.final_projection(encoded_features)  # [batch_size, fusion_dim]

        return output

# File Loading

In [8]:
def get_base_name_from_text_filename(text_filename):
    """ Extract base name from text filename before '_embedding.npy' """
    return text_filename.split('_embedding.npy')[0]

def find_matching_text_filename(video_filename, text_filenames):
    """ Find a matching text filename based on base name in video filename. """
    video_base_name = video_filename.split('_embeddings.npy')[0]
    for text_filename in text_filenames:
        text_base_name = get_base_name_from_text_filename(text_filename)
        if text_base_name in video_base_name:
            return text_filename
    return None

def find_anomaly_type(filename):
    if 'AbandonedObject' in filename:
        return 'abandonedobject'
    elif 'Vandalism' in filename:
        return 'vandalism'
    elif 'Violence' in filename:
        return 'violence'
    elif 'Normal' in filename:
        return 'normal'
    else:
        return 'unknown'

In [9]:
# Paths to feature directories
video_features_dir = '/content/drive/MyDrive/video_embeddings'
text_features_dir = '/content/drive/MyDrive/rowtext_embeddings'

In [10]:
# Load video features
video_feature_list = []
video_filenames = []
for file_name in sorted(os.listdir(video_features_dir)):
    if file_name.endswith('.npy'):
        file_path = os.path.join(video_features_dir, file_name)
        video_features = np.load(file_path)
        video_feature_list.append(video_features)
        video_filenames.append(file_name)

# Load text features
text_feature_list = []
text_filenames = []
for file_name in sorted(os.listdir(text_features_dir)):
    if file_name.endswith('.npy'):
        file_path = os.path.join(text_features_dir, file_name)
        text_features = np.load(file_path)
        text_feature_list.append(text_features)
        text_filenames.append(file_name)

# Ensure the video and text features are of the same data type
video_features = torch.tensor(np.stack(video_feature_list), dtype=torch.float32)
text_features = torch.tensor(np.stack(text_feature_list), dtype=torch.float32)

# Configuring and Running Model

In [11]:
video_dim = video_features.shape[2] if len(video_features.shape) == 3 else video_features.shape[1]
text_dim = text_features.shape[2] if len(text_features.shape) == 3 else text_features.shape[1]
fusion_dim = 512

fusion_model = MultimodalFusion(video_dim, text_dim, fusion_dim)

# Saving Fused Features

In [12]:
output_fused_dir = '/content/drive/MyDrive/cbp_fused'
os.makedirs(output_fused_dir, exist_ok=True)

for i, video_filename in enumerate(video_filenames):
    matching_text_filename = find_matching_text_filename(video_filename, text_filenames)
    if matching_text_filename:
        video_feat = video_features[i].unsqueeze(0).float()
        text_feat_index = text_filenames.index(matching_text_filename)
        text_feat = text_features[text_feat_index].unsqueeze(0).float()

        fused_feat = fusion_model(video_feat, text_feat)
        fused_feat = fused_feat.squeeze(0).detach().cpu().numpy()
        #print(fused_feat)

        output_filename = f"{video_filename.split('_embeddings.npy')[0]}_fused.npy"
        output_path = os.path.join(output_fused_dir, output_filename)
        np.save(output_path, fused_feat)

print("Fused features saved successfully.")

Fused features saved successfully.
