In [1]:
import torch
import numpy as np
import matplotlib.pyplot as plt
import os
import torch.nn as nn

In [2]:
mot_dir = "/home/viplab/Research_DK/Mamba-MOT/GMTracker/data/MOT17/train"
print(os.listdir(mot_dir))

['MOT17-02-SDP', 'MOT17-11-SDP', 'MOT17-04-SDP', 'MOT17-05-SDP', 'MOT17-09-DPM', 'MOT17-11-FRCNN', 'MOT17-10-DPM', 'MOT17-10-FRCNN', 'MOT17-09-SDP', 'MOT17-10-SDP', 'MOT17-11-DPM', 'MOT17-13-SDP', 'MOT17-02-DPM', 'MOT17-13-FRCNN', 'MOT17-02-FRCNN', 'MOT17-13-DPM', 'MOT17-04-DPM', 'MOT17-05-FRCNN', 'MOT17-09-FRCNN', 'MOT17-04-FRCNN', 'MOT17-05-DPM']


In [3]:

def get_tracklet(gt_in, object_id):
    """
    Extract the tracklet for a given object ID.
    :param df: DataFrame containing the ground truth data.
    :param object_id: The ID of the object to extract the tracklet for.
    :return: DataFrame containing the tracklet.
    """
    tracklet  = []
    # tracklet.append(i) [for i in gt_in if gt_in[i] == object_id]
    for i in gt_in:
        # print(i)
        if i[1] == object_id:
            tracklet.append(i[2:6])
    return np.array(tracklet)

def tokenizer_frames(tracking, sliding_window):
    
    token  = [] 
    for i in range(sliding_window):
        current_bbox = tracking[i]
        # print("bounding box at {} frame : {}".format(i,current_bbox))
        # print("tracklet at Nth place is : ", tracking[sliding_window])
        delta = np.subtract(tracking[sliding_window], current_bbox)
        # print("delta is : ", delta)
        token.append(delta)
    
    return np.array(token)


for sequence in os.listdir(mot_dir):
    # seq = os.path.join(mot_dir, sequence)
    image_dir = os.path.join(mot_dir, sequence, "img1")
    gt_file = os.path.join(mot_dir, sequence, "gt", "gt.txt")
    # print(gt_file)

    gt_in = np.loadtxt(gt_file, delimiter = ",")

    # print(gt_in)

    ## Lets define 1 tracklet first
    object_id = 2 ## Tracklet with Object ID = 1
    tracklet = get_tracklet(gt_in, object_id)
    # print(" tracklet is : ", gt_in[tracklet])
    print(tracklet.shape)

    sliding_window = 50
    token = tokenizer_frames(tracklet, sliding_window)


    print("token shape is : ", token.shape)
    token = torch.tensor(token, dtype = torch.float32)

    input_dim = 4
    output_dim = 4
    embedding_layer = nn.Linear(input_dim, output_dim)

    embedded_tensor = embedding_layer(token)

    print(embedded_tensor)

    print(" shape of embedded tensot  : ", embedded_tensor.shape)
    
    break



(56, 4)
token shape is :  (50, 4)
tensor([[ 110.3233, -100.0952,  -56.5693,   42.5965],
        [ 109.8223,  -99.1943,  -55.6777,   42.0264],
        [ 108.6947,  -97.0546,  -55.0401,   41.2286],
        [ 107.3791,  -95.3689,  -54.3434,   40.5082],
        [ 106.2515,  -93.2291,  -53.7057,   39.7104],
        [ 104.9358,  -91.5435,  -53.0090,   38.9899],
        [  99.2570,  -93.3553,  -55.7706,   42.6067],
        [  92.5880,  -94.7836,  -59.0507,   46.4234],
        [  86.9092,  -96.5954,  -61.8122,   50.0402],
        [  80.2402,  -98.0238,  -65.0923,   53.8569],
        [  79.7711,  -96.2077,  -62.6003,   52.1179],
        [  77.9544,  -93.6211,  -61.0119,   50.8274],
        [  76.1378,  -91.0346,  -59.4235,   49.5369],
        [  74.3211,  -88.4481,  -57.8352,   48.2463],
        [  78.2724,  -79.1531,  -50.1997,   39.8168],
        [  82.7566,  -69.8438,  -41.8555,   30.7886],
        [  86.8835,  -60.1475,  -33.8964,   22.0088],
        [  90.5655,  -50.0006,  -26.1299,   13.1

In [4]:
import torch
import torch.nn as nn
import torch.nn.functional as F

from mamba_ssm import Mamba

class BiMambaEncoder(nn.Module):
    def __init__(self, d_model, n_state):
        super(BiMambaEncoder, self).__init__()
        self.d_model = d_model
        
        self.mamba = Mamba(d_model, n_state)

        # Norm and feed-forward network layer
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.feed_forward = nn.Sequential(
            nn.Linear(d_model, d_model * 4),
            nn.GELU(),
            nn.Linear(d_model * 4, d_model)
        )

    def forward(self, x):
        # Residual connection of the original input
        residual = x
        
        # Forward Mamba
        x_norm = self.norm1(x)
        mamba_out_forward = self.mamba(x_norm)

        # Backward Mamba
        x_flip = torch.flip(x_norm, dims=[1])  # Flip Sequence
        mamba_out_backward = self.mamba(x_flip)
        mamba_out_backward = torch.flip(mamba_out_backward, dims=[1])  # Flip back

        # Combining forward and backward
        mamba_out = mamba_out_forward + mamba_out_backward
        
        mamba_out = self.norm2(mamba_out)
        ff_out = self.feed_forward(mamba_out)

        output = ff_out + residual
        return output

# Initialize and test the model
d_model = 4
n_state = 64
model = BiMambaEncoder(d_model, n_state).cuda()
x = torch.rand(1, 50, d_model).cuda()  # Analog input data: (batch_size, seq_len, feature_dim)
output = model(x)
print(output.shape)  # Mamba Out: (32, 100, 512)

  from .autonotebook import tqdm as notebook_tqdm


torch.Size([1, 50, 4])


## Initialize Tracklets

In [5]:
def initialize_tracklets(data):
    first_frame = int(np.min(data[:, 0]))
    first_frame_data = data[data[:, 0] == first_frame]
    # print(" first frame data : ", first_frame_data)

    tracklets = {}
    for row in first_frame_data:
        # print(" row is :", row)
        object_id = int(row[1])
        if object_id not in tracklets:
            tracklets[object_id] = [row[2:6]]
    return tracklets

### Update tracklets

In [6]:
def update_tracklets_for_frame(data, tracklets, frame):
    # unique_frames = np.unique(data[:, 0])


    # for frame in unique_frames:
    #     if frame == np.min(data[:, 0]):
    #         continue
    


    frame_data = data[data[:, 0] == frame]

    for row in frame_data:
        object_id = int(row[1])
        if object_id in tracklets:
            tracklets[object_id].append(row[2:6])
        else:
            ## Initialize a new tracklet if the object ID does not exist
            tracklets[object_id] = [row[2:6]]

    return tracklets
### Convert tracklet to numpy arrays

def convert_tracklets_to_numpy(tracklets):
    for object_id , tracklet_data in tracklets.items():
        tracklets[object_id] = np.array(tracklet_data)
    
    return tracklets



def load_data(filename):
    data = np.loadtxt(filename, delimiter=',', dtype=float)
    return data

In [7]:
filename = "/home/viplab/Research_DK/Mamba-MOT/GMTracker/data/MOT17/train/MOT17-02-SDP/gt/gt.txt"
data = load_data(filename)
tracklets = initialize_tracklets(data)

unique_frames = np.unique(data[:, 0])
for frame in unique_frames:
    if frame == 1:
        continue
    else:
        tracklets = update_tracklets_for_frame(data, tracklets, frame)
    # trackletsupdate_tracklets(data, tracklets) ## Updates all the tracklets at once. Can be changed to sending data per frame and updating the tracklets in a better way
tracklets = convert_tracklets_to_numpy(tracklets)


In [8]:
tracklets.get(1).shape

(600, 4)

## Tracklet Delta - Creating a Tokenizer function


In [9]:
def compute_deltas(tracklet_data):
    # Compute deltas as described
    deltas = []
    for i in range(len(tracklet_data) - 1):
        delta = tracklet_data[i+1] - tracklet_data[i]
        deltas.append(delta)
    return np.array(deltas)


# Example tracklet data: shape (num_frames, 4) with columns [cx, cy, w, h]
tracklet_data = np.array([
    [100, 150, 50, 60],
    [105, 155, 50, 60],
    [110, 160, 50, 60]
])

# Create delta values for input sequence
deltas = compute_deltas(tracklet_data)
print(deltas)

[[5 5 0 0]
 [5 5 0 0]]


In [11]:
import torch.nn as nn

class TemporalTokenEmbedding(nn.Module):
    def __init__(self, input_dim, embedding_dim):
        super(TemporalTokenEmbedding, self).__init__()
        self.embedding = nn.Linear(input_dim, embedding_dim)
    
    def forward(self, x):
        return self.embedding(x)

# Parameters
input_dim = deltas.shape[1]  # Number of features in deltas (e.g., 4)
embedding_dim = 128  # Example embedding dimension
print(input_dim)
# Create and apply embedding layer
# embedding_layer = TemporalTokenEmbedding(input_dim, embedding_dim)
deltas_tensor = torch.tensor(deltas, dtype=torch.float32)
# embedded_tokens = embedding_layer(deltas_tensor)
# print(embedded_tokens.shape)

4


## Steps to implement Bi-Mamba Encdoding Layer

#### Define Forward and Backward Mamba Modules: 
Implement the forward and backward Mamba modules.
#### Create a Bi-Mamba Block: 
Use the forward and backward modules to process input and apply normalization and MLP.
#### Assemble the Bi-Mamba Encoding Layer: 
Stack multiple Bi-Mamba blocks.

In [12]:
import torch
import torch.nn as nn
from mamba_ssm import Mamba



In [37]:

class BiMambaBlock(nn.Module):
    def __init__(self, d_model, n_state):
        super(BiMambaBlock, self).__init__()
        self.d_model = d_model
        
        self.mamba = Mamba(d_model, n_state)

        # Norm and feed-forward network layer
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.feed_forward = nn.Sequential(
            nn.Linear(d_model, d_model * 4),
            nn.GELU(),
            nn.Linear(d_model * 4, d_model)
        )

    def forward(self, x):
        # Residual connection of the original input
        residual = x
        
        # Forward Mamba
        x_norm = self.norm1(x)
        mamba_out_forward = self.mamba(x_norm)

        # Backward Mamba
        x_flip = torch.flip(x_norm, dims=[1])  # Flip Sequence
        mamba_out_backward = self.mamba(x_flip)
        mamba_out_backward = torch.flip(mamba_out_backward, dims=[1])  # Flip back

        # Combining forward and backward
        mamba_out = mamba_out_forward + mamba_out_backward
        mamba_out1  = self.norm2(mamba_out)

    
        mamba_out2 = self.feed_forward(mamba_out)

        ff_out  = mamba_out1 + mamba_out2
        # output = ff_out + residual
        return ff_out


In [38]:
class BiMambaEncodingLayer(nn.Module):
    def __init__(self, embedding_dim, num_blocks):
        super(BiMambaEncodingLayer, self).__init__()
        self.embedding_dim = embedding_dim
        self.num_blocks = num_blocks
        # self.blocks = nn.ModuleList([BiMambaBlock(input_dim, embedding_dim) for _ in range(num_blocks)])
    
        self.mamba_block = BiMambaBlock(embedding_dim, embedding_dim)
    def forward(self, x):
        # print(" embedding dimension is : ", self.embedding_dim)
        # for block in self.blocks:
        x = self.mamba_block(x)
        # x = self.mamba_block(x)
        # x = self.mamba_block(x)
        
            # x = block(x)
            # print(" x shape in mamba block is : ", x.shape)
        
        return x

In [39]:
class FullModel(nn.Module):
    def __init__(self, input_dim, embedding_dim, num_blocks, prediction_dim):
        super(FullModel, self).__init__()
        self.temporal_token_embedding = TemporalTokenEmbedding(input_dim, embedding_dim)
        self.bi_mamba_encoding_layer = BiMambaEncodingLayer(embedding_dim, num_blocks)
        self.prediction_head = nn.Linear(embedding_dim, prediction_dim)
    
    def forward(self, x):
        x = self.temporal_token_embedding(x)
        print(" x shape is : ", x.shape)
        print(' type of x is : ', type(x))
        x  =  x.unsqueeze(0)
        print(" x after reshaping it is : ", x.shape)
        x = self.bi_mamba_encoding_layer(x)
        print(" x shape after  bimamba encoding layer is : ", x.shape)
        x = self.prediction_head(x)
        return x

# Example usage
input_dim = deltas_tensor.shape[1]  # Number of features in deltas (e.g., 4)
embedding_dim = 4 # Example embedding dimension
num_blocks = 4  # Number of Bi-Mamba blocks
prediction_dim = 4  # Number of predicted offsets

# Create and apply model
model = FullModel(input_dim, embedding_dim, num_blocks, prediction_dim)
predictions = model(deltas_tensor)

 x shape is :  torch.Size([2, 4])
 x after reshaping it is :  torch.Size([1, 2, 4])


RuntimeError: Expected x.is_cuda() to be true, but got false.  (Could this error message be improved?  If so, please report an enhancement request to PyTorch.)