In [2]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

class FeatureEncoder(nn.Module):
    """
    Block 1: Feature Encoding 
    -------------------------
    Applies a simple 2D convolution per frame, followed by
    an adaptive max pooling to produce (B, T, out_channels).
    NOTE: In the original paper, the Video Swin Transformer was used; 
    instead, I used a simple Video Encoder due to hardware limitations.
    """
    def __init__(self, in_channels=3, out_channels=512):
        super().__init__()
        self.conv = nn.Conv2d(
            in_channels, 
            out_channels, 
            kernel_size=3, 
            padding=1
        )
        self.pool = nn.AdaptiveMaxPool2d((1, 1))
        
    def forward(self, x):
        """
        x shape: (B, T, C, H, W)
          B: Batch size
          T: Number of frames
          C: Number of channels (3 for RGB)
          H,W: Frame height and width
        Returns:
          (B, T, out_channels)
        """
        B, T, C, H, W = x.shape
        # Merge B and T to process each frame individually
        x = x.view(B * T, C, H, W)  # shape: (B*T, C, H, W)
        
        # Convolution
        x = F.relu(self.conv(x))    # shape: (B*T, out_channels, H, W)
        
        # Pooling
        x = self.pool(x)           # shape: (B*T, out_channels, 1, 1)
        
        # Restore (B, T) as separate dimensions
        x = x.view(B, T, -1)       # shape: (B, T, out_channels)
        return x

In [3]:
class TemporalRelationModule(nn.Module):
    """
    Block 2: Hybrid Temporal Relation Modeling
    ------------------------------------------
    Generates a bi-modal self-similarity matrix using multi-head
    self-attention & dual-softmax, applies dropout for random matrix
    dropping, and uses 1D convolution for local context.
    """
    def __init__(self, feature_dim=512, num_heads=4, dropout_prob=0.3):
        super().__init__()
        self.num_heads = num_heads
        self.feature_dim = feature_dim
        self.head_dim = feature_dim // num_heads
        
        # Linear layers to get queries and keys
        self.query_linear = nn.Linear(feature_dim, feature_dim)
        self.key_linear = nn.Linear(feature_dim, feature_dim)
        
        # Dropout to simulate random matrix dropping
        self.dropout = nn.Dropout(dropout_prob)
        
        # Local Temporal Context Modeling (1D separable convolution)
        self.depthwise_conv = nn.Conv1d(
            in_channels=feature_dim, 
            out_channels=feature_dim, 
            kernel_size=5, 
            padding=2,
            groups=feature_dim  # depthwise
        )
        self.pointwise_conv = nn.Conv1d(
            in_channels=feature_dim, 
            out_channels=feature_dim, 
            kernel_size=1
        )
        
    def forward(self, x):
        """
        x shape: (B, T, feature_dim)
          B: batch size
          T: number of frames
          feature_dim: dimension of per-frame feature
        Returns:
          (B, 2, T, T)  # 2 channels: [global_sim, local_sim]
        """
        B, T, D = x.shape
        
        # ---- (1) Bi-modal Self-Attention Similarity ----
        Q = self.query_linear(x)  # (B, T, D)
        K = self.key_linear(x)    # (B, T, D)
        
        # Reshape into multiple heads: (B, num_heads, T, head_dim)
        Q = Q.view(B, T, self.num_heads, self.head_dim).transpose(1, 2)
        K = K.view(B, T, self.num_heads, self.head_dim).transpose(1, 2)
        
        # Self-attention-based similarity
        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / (self.head_dim**0.5)
        attn_matrix1 = torch.softmax(attn_scores, dim=-1)  # row-wise softmax
        
        # Dual-softmax
        raw_scores = torch.matmul(Q, K.transpose(-2, -1))
        softmax_row = torch.softmax(raw_scores, dim=-1)
        softmax_col = torch.softmax(raw_scores, dim=-2)
        attn_matrix2 = softmax_row * softmax_col
        
        # Combine and drop
        attn_matrix = attn_matrix1 + attn_matrix2
        attn_matrix = self.dropout(attn_matrix)  # (B, num_heads, T, T)
        
        # Average across heads
        attn_matrix_avg = attn_matrix.mean(dim=1, keepdim=True)  # (B, 1, T, T)
        
        # ---- (2) Local Temporal Context Modeling ----
        # Depthwise + pointwise conv on the temporal dimension
        local_context = x.transpose(1, 2)  # (B, D, T)
        local_context = self.depthwise_conv(local_context)
        local_context = self.pointwise_conv(local_context)
        local_context = local_context.transpose(1, 2)  # (B, T, D)
        
        # Build local similarity
        local_sim = torch.matmul(local_context, local_context.transpose(-2, -1))  # (B, T, T)
        local_sim = local_sim.unsqueeze(1)  # (B, 1, T, T)
        
        # ---- (3) Concatenate global and local similarity
        combined = torch.cat([attn_matrix_avg, local_sim], dim=1)  # (B, 2, T, T)
        return combined

In [4]:
class MultiScaleFusion(nn.Module):
    """
    Block 3: Multi-scale Self-Similarity Fusion
    ------------------------------------------
    Fuses a list of self-similarity matrices (from multiple scales)
    by stacking them and applying a 3D convolution.
    """
    def __init__(self, in_channels, out_channels):
        """
        Args:
          in_channels: total channels after stacking all scales
          out_channels: number of output channels after fusion
        """
        super().__init__()
        self.conv3d = nn.Conv3d(
            in_channels=in_channels,
            out_channels=out_channels,
            kernel_size=1
        )
        
    def forward(self, x_list):
        """
        x_list: list of tensors, each shape (B, C, T, T).
                Suppose we have N scales, each with C channels.
        Returns:
          fused: shape (B, out_channels, T, T)
        """
        # 1) Stack along a new dimension => shape (B, N, C, T, T)
        x = torch.stack(x_list, dim=1)  
        
        B, N, C, T, _ = x.shape  # N = number of scales
        
        # 2) Merge the scale dim (N) and channel dim (C) => (B, N*C, T, T)
        x = x.view(B, N * C, T, T)
        
        # 3) Add a dummy 'depth' dimension => (B, N*C, 1, T, T)
        x = x.unsqueeze(2)
        
        # 4) Apply 3D convolution => (B, out_channels, 1, T, T)
        fused = self.conv3d(x)
        
        # 5) Squeeze out the dummy dimension => (B, out_channels, T, T)
        fused = fused.squeeze(2)
        return fused


In [5]:
class DensityMapRegressor(nn.Module):
    """
    Block 4: Density Map Regression
    -------------------------------
    Uses a transformer encoder (1 layer) followed by
    an MLP to produce a 1D density map over frames.
    """
    def __init__(self, in_features, hidden_dim, output_dim=1, num_frames=64):
        """
        Args:
          in_features: Number of channels in the fused feature (from block 3).
          hidden_dim: Hidden dimension in the MLP.
          output_dim: Size of final output per frame (1 for a density value).
          num_frames: Number of frames (T).
        """
        super().__init__()
        self.transformer = nn.TransformerEncoderLayer(
            d_model=in_features, 
            nhead=4
        )
        self.mlp = nn.Sequential(
            nn.Linear(num_frames * in_features, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, num_frames * output_dim)
        )
        self.num_frames = num_frames
        
    def forward(self, x):
        """
        x shape: (B, C, T, T)  # from block 3
          B: Batch size
          C: # of channels (in_features)
          T: # of frames
        
        Returns:
          A density map of shape (B, T).
        """
        B, C, T, _ = x.shape
        
        # 1) Average across one T dimension => (B, C, T)
        #    This is a simple approach to get "per-frame" features from a T x T matrix
        x_avg = x.mean(dim=-1)   # shape: (B, C, T)
        
        # 2) Switch to (B, T, C) for the transformer
        x_avg = x_avg.transpose(1, 2)  # (B, T, C)
        
        # 3) Pass through the transformer encoder
        x_trans = self.transformer(x_avg)  # (B, T, C)
        
        # 4) Flatten => (B, T*C)
        x_flat = x_trans.reshape(B, -1)
        
        # 5) MLP => (B, T * output_dim)
        density = self.mlp(x_flat)
        
        # 6) Reshape => (B, T)
        density = density.view(B, T)
        
        return density

def temporal_downsample(x, factor):
    return x[:, ::factor, :]


In [6]:
class HTRMNet(nn.Module):
    def __init__(self, in_channels=3, feature_dim=512, hidden_dim=128, out_channels_fusion=8, num_frames=64):
        super().__init__()
        self.num_frames = num_frames
        
        self.feature_encoder = FeatureEncoder(in_channels, feature_dim)
        self.relation_module = TemporalRelationModule(feature_dim, num_heads=4, dropout_prob=0.3)
        self.fusion = MultiScaleFusion(in_channels=6, out_channels=out_channels_fusion)
        self.regressor = DensityMapRegressor(in_features=out_channels_fusion, hidden_dim=hidden_dim, num_frames=num_frames)
        
    def forward(self, x):
        features = self.feature_encoder(x)  # (B, T, D)
        
        # multi-scale
        s1 = features
        s2 = temporal_downsample(features, 2)
        s3 = temporal_downsample(features, 4)
        
        sim1 = self.relation_module(s1)  # (B, 2, T, T)
        sim2 = self.relation_module(s2)  # (B, 2, T/2, T/2)
        sim3 = self.relation_module(s3)  # (B, 2, T/4, T/4)
        
        # upsample sim2, sim3
        sim2_up = nn.functional.interpolate(sim2, size=(self.num_frames, self.num_frames), mode='bilinear', align_corners=False)
        sim3_up = nn.functional.interpolate(sim3, size=(self.num_frames, self.num_frames), mode='bilinear', align_corners=False)
        
        fused = self.fusion([sim1, sim2_up, sim3_up])  # (B, out_channels_fusion, T, T)
        density_map = self.regressor(fused)            # (B, T)
        return density_map

In [7]:

#######################################
#. Simple Training Script on Dummy Data
#######################################
if __name__ == "__main__":
    # Hyperparameters
    BATCH_SIZE = 2
    T = 64
    H, W = 128, 256   # smaller than 256 for speed
    EPOCHS = 15
    
    # Instantiate the model
    model = HTRMNet(
        in_channels=3, 
        feature_dim=128,      # smaller for speed
        hidden_dim=64, 
        out_channels_fusion=4,  # smaller for speed
        num_frames=T
    ).cuda()  # move to GPU if available
    
    # Define a simple MSE loss for density maps
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=1e-4)
    
    # Dummy training loop
    for epoch in range(EPOCHS):
        print(f"\n=== Epoch {epoch+1}/{EPOCHS} ===")
        
        # In a real scenario, you'd iterate over a DataLoader
        # Here, we just create random data for 5 "batches"
        for step in range(5):
            # 1) Create random video (B, T, C, H, W) and random target density (B, T)
            dummy_video = torch.randn(BATCH_SIZE, T, 3, H, W).cuda()
            # random target density map in [0,1]
            dummy_target = torch.rand(BATCH_SIZE, T).cuda()
            
            # 2) Forward pass
            pred_density = model(dummy_video)
            
            # 3) Compute loss
            loss = criterion(pred_density, dummy_target)
            
            # 4) Backprop and update
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            print(f"Step [{step+1}/5], Loss: {loss.item():.4f}")



=== Epoch 1/15 ===
Step [1/5], Loss: 0.4747
Step [2/5], Loss: 0.4206
Step [3/5], Loss: 0.4128
Step [4/5], Loss: 0.4319
Step [5/5], Loss: 0.4089

=== Epoch 2/15 ===
Step [1/5], Loss: 0.4570
Step [2/5], Loss: 0.4031
Step [3/5], Loss: 0.4212
Step [4/5], Loss: 0.4063
Step [5/5], Loss: 0.3724

=== Epoch 3/15 ===
Step [1/5], Loss: 0.3947
Step [2/5], Loss: 0.3732
Step [3/5], Loss: 0.3743
Step [4/5], Loss: 0.3577
Step [5/5], Loss: 0.3450

=== Epoch 4/15 ===
Step [1/5], Loss: 0.3562
Step [2/5], Loss: 0.3585
Step [3/5], Loss: 0.3605
Step [4/5], Loss: 0.3542
Step [5/5], Loss: 0.2981

=== Epoch 5/15 ===
Step [1/5], Loss: 0.3055
Step [2/5], Loss: 0.3194
Step [3/5], Loss: 0.3050
Step [4/5], Loss: 0.3039
Step [5/5], Loss: 0.3115

=== Epoch 6/15 ===
Step [1/5], Loss: 0.3461
Step [2/5], Loss: 0.2962
Step [3/5], Loss: 0.3052
Step [4/5], Loss: 0.3061
Step [5/5], Loss: 0.3038

=== Epoch 7/15 ===
Step [1/5], Loss: 0.2798
Step [2/5], Loss: 0.2772
Step [3/5], Loss: 0.2782
Step [4/5], Loss: 0.2619
Step [5/5]

In [None]:
import torch
import matplotlib.pyplot as plt

# 1) Define model hyperparams
T = 64
H, W = 128, 128  # Frame height & width
BATCH_SIZE = 1   # We'll visualize only 1 sample

# 2) Create a dummy video: shape (B, T, C=3, H, W)
video = torch.randn(BATCH_SIZE, T, 3, H, W).cuda()

# 3) Import or define HTRMNet (from your code)
#    If needed, copy your HTRMNet code here.
#    For demonstration, we assume you already have the definition.
#    We'll just instantiate it:

model = HTRMNet(
    in_channels=3, 
    feature_dim=128,      # smaller for speed
    hidden_dim=64, 
    out_channels_fusion=4,  
    num_frames=T
).cuda()

# 4) Forward pass to get density map (B, T)
with torch.no_grad():
    density_map = model(video)  # shape: (1, 64)

# 5) Convert the first sample's density map to CPU NumPy for plotting
density_map_np = density_map[0].cpu().numpy()  # shape: (64,)

# 6) Plot
plt.figure(figsize=(8, 4))
plt.plot(density_map_np, marker='o', label='Predicted Density')
plt.title("Predicted Density over Frames")
plt.xlabel("Frame Index")
plt.ylabel("Density")
plt.legend()
plt.grid(True)
plt.show()
