In [1]:
# Model

import torch
import torch.nn as nn
import torch.nn.functional as F
from functools import partial
import torch.optim as optim
from tqdm import tqdm
import torch.fft
import math
import numpy as np
from mamba_ssm import Mamba
from einops import rearrange, repeat, einsum
import pandas as pd
from datetime import datetime, timedelta
import os
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from simba import Block_mamba, ClassBlock
from timm.models.layers import DropPath, to_2tuple, trunc_normal_
import arff

'''
Simple SiMBA model
x -> embedding layer -> SiMBA blocks -> output layer -> output
'''
class SiMBASimple(nn.Module):
    def __init__(self,
        num_classes, 
        input_dim, 
        embed_dim=64, 
        mlp_ratio=4, 
        num_blocks=4,
        scale=1,
    ):
        super().__init__()
        self.scale = scale
        self.embedding = nn.Linear(input_dim, embed_dim)
        self.blocks = nn.ModuleList([
            Block_mamba(dim=embed_dim, mlp_ratio=mlp_ratio, cm_type="EinFFT")
            for _ in range(num_blocks)
        ])
        self.norm = nn.LayerNorm(embed_dim)
        self.head = nn.Linear(embed_dim, num_classes)
        self.apply(self._init_weights)

    def _init_weights(self, m):
        if isinstance(m, nn.Linear):
            trunc_normal_(m.weight, std=.02)
            if isinstance(m, nn.Linear) and m.bias is not None:
                nn.init.constant_(m.bias, 0)
        elif isinstance(m, nn.LayerNorm):
            nn.init.constant_(m.bias, 0)
            nn.init.constant_(m.weight, 1.0)

    def forward(self, x): # x : B x L x D
        x = self.embedding(x)
        B, L, D = x.shape
        for block in self.blocks:
            x = block(x, L, D)
        x = self.norm(x.mean(dim=1))
        x = self.head(x) * self.scale
        return x

'''
Model based on SiMBA image classification model architecture 
https://github.com/badripatro/simba
'''
class SiMBA(nn.Module):
    def __init__(self,
        input_dim,
        num_classes,
        embed_dims=[64, 128, 320, 448],
        mlp_ratios=[8, 8, 4, 4], 
        drop_path_rate=0., 
        norm_layer=nn.LayerNorm,
        depths=[3, 4, 6, 3], 
        sr_ratios=[4, 2, 1, 1], 
        num_stages=4,
        scale=1,
        **kwargs
    ):
        super().__init__()
        self.scale = scale
        self.num_classes = num_classes
        self.depths = depths
        self.num_stages = num_stages

        dpr = [x.item() for x in torch.linspace(0, drop_path_rate, sum(depths))]
        cur = 0
        alpha=5
        for i in range(num_stages):
            if i == 0:
                patch_embed = nn.Linear(input_dim, embed_dims[i])
            else:
                patch_embed = nn.Linear(embed_dims[i - 1], embed_dims[i])

            block = nn.ModuleList([Block_mamba(
                dim=embed_dims[i], 
                mlp_ratio = mlp_ratios[i], 
                drop_path=dpr[cur + j], 
                norm_layer=norm_layer,
                sr_ratio = sr_ratios[i],
                cm_type='EinFFT')
            for j in range(depths[i])])

            norm = norm_layer(embed_dims[i])
            cur += depths[i]

            setattr(self, f"patch_embed{i + 1}", patch_embed)
            setattr(self, f"block{i + 1}", block)
            setattr(self, f"norm{i + 1}", norm)

        post_layers = ['ca']
        self.post_network = nn.ModuleList([
            ClassBlock(
                dim = embed_dims[-1], 
                mlp_ratio = mlp_ratios[-1],
                norm_layer=norm_layer,
                cm_type='EinFFT')
            for _ in range(len(post_layers))
        ])
        self.head = nn.Linear(embed_dims[-1], num_classes)
        self.apply(self._init_weights)

    def _init_weights(self, m):
        if isinstance(m, nn.Linear):
            trunc_normal_(m.weight, std=.02)
            if isinstance(m, nn.Linear) and m.bias is not None:
                nn.init.constant_(m.bias, 0)
        elif isinstance(m, nn.LayerNorm):
            nn.init.constant_(m.bias, 0)
            nn.init.constant_(m.weight, 1.0)

    def forward_cls(self, x, H, W):
        B, N, C = x.shape
        cls_tokens = x.mean(dim=1, keepdim=True)
        x = torch.cat((cls_tokens, x), dim=1)
        for block in self.post_network:
            x = block(x, H, W)
        return x

    def forward(self, x):
        B, L, D = x.shape
        for i in range(self.num_stages):
            patch_embed = getattr(self, f"patch_embed{i + 1}")
            block = getattr(self, f"block{i + 1}")
            x = patch_embed(x)
            for blk in block:
                x = blk(x, L, D)
            
            if i != self.num_stages - 1:
                norm = getattr(self, f"norm{i + 1}")
                x = norm(x)
                #x = x.reshape(B, H, W, -1).permute(0, 3, 1, 2).contiguous()

        x = self.forward_cls(x, L, D)[:, 0]
        norm = getattr(self, f"norm{self.num_stages}")
        x = norm(x)
        x = self.head(x) * self.scale
        return x

# train code

# 1. train codes for regression model
def train_one_epoch(model, train_loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    for inputs, targets in tqdm(train_loader, desc="Training", leave=False):
        inputs, targets = inputs.to(device), targets.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        outputs = outputs.squeeze()
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * inputs.size(0)
    epoch_loss = running_loss / len(train_loader.dataset)
    return epoch_loss

def validate_one_epoch(model, val_loader, criterion, device):
    model.eval()
    running_loss = 0.0
    with torch.no_grad():
        for inputs, targets in tqdm(val_loader, desc="Validation", leave=False):
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            outputs = outputs.squeeze()
            #print(outputs, targets)
            loss = criterion(outputs, targets)
            running_loss += loss.item() * inputs.size(0)
    epoch_loss = running_loss / len(val_loader.dataset)
    return epoch_loss

def train_model(model, train_loader, val_loader, num_epochs, learning_rate, device='cuda'):
    model.to(device)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    best_val_loss = float('inf')
    for epoch in range(num_epochs):
        print(f'Epoch {epoch+1}/{num_epochs}')
        train_loss = train_one_epoch(model, train_loader, criterion, optimizer, device)
        val_loss = validate_one_epoch(model, val_loader, criterion, device)
        print(f'Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), 'best_model.pth')
            print('Model saved!')
    print('Training complete.')

# 2. train codes for classification model

def train_one_epoch_c(model, train_loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    for inputs, targets in tqdm(train_loader, desc="Training", leave=False):
        inputs, targets = inputs.to(device), targets.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * inputs.size(0)
        
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == targets).sum().item()
        total += targets.size(0)
        
    epoch_loss = running_loss / len(train_loader.dataset)
    accuracy = correct / total
    return epoch_loss, accuracy

def validate_one_epoch_c(model, val_loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, targets in tqdm(val_loader, desc="Validation", leave=False):
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            running_loss += loss.item() * inputs.size(0)
            
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == targets).sum().item()
            total += targets.size(0)
            
    epoch_loss = running_loss / len(val_loader.dataset)
    accuracy = correct / total
    return epoch_loss, accuracy

def train_model_c(model, train_loader, val_loader, num_epochs, learning_rate, device='cuda'):
    model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    best_val_loss = float('inf')
    for epoch in range(num_epochs):
        print(f'Epoch {epoch+1}/{num_epochs}')
        train_loss, train_acc = train_one_epoch_c(model, train_loader, criterion, optimizer, device)
        val_loss, val_acc = validate_one_epoch_c(model, val_loader, criterion, device)
        print(f'Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}')
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), 'best_model.pth')
            print('Model saved!')
    print('Training complete.')

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# classification - handwriting dataset
# https://www.timeseriesclassification.com/description.php?Dataset=Handwriting

def load_arff_data(file_path):
    with open(file_path, 'r') as f:
        dataset = arff.load(f)
    data = np.array(dataset['data'])
    X = data[:, :-1].astype(np.float32)
    y = data[:, -1].astype(np.float32)
    y = y.astype(np.int64) - 1
    return X, y

class HandwritingDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)
    
    def __len__(self):
        return len(self.y)
    
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

def load_handwriting(data_dir='./dataset/handwriting', batch=32):
    X1_test, y_test = load_arff_data(f'{data_dir}/HandwritingDimension1_TRAIN.arff')
    X2_test, _ = load_arff_data(f'{data_dir}/HandwritingDimension2_TRAIN.arff')
    X3_test, _ = load_arff_data(f'{data_dir}/HandwritingDimension3_TRAIN.arff')
    X1_train, y_train = load_arff_data(f'{data_dir}/HandwritingDimension1_TEST.arff')
    X2_train, _ = load_arff_data(f'{data_dir}/HandwritingDimension2_TEST.arff')
    X3_train, _ = load_arff_data(f'{data_dir}/HandwritingDimension3_TEST.arff')
    X_train = np.stack([X1_train, X2_train, X3_train], axis=-1)
    X_test = np.stack([X1_test, X2_test, X3_test], axis=-1)

    train_dataset = HandwritingDataset(X_train, y_train)
    test_dataset = HandwritingDataset(X_test, y_test)
    train_loader = DataLoader(train_dataset, batch_size=batch, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch, shuffle=False)
    return train_loader, test_loader

def run_handwriting(batch=32, epochs=12, lr=3e-4, simple=False):
    train_loader, test_loader = load_handwriting(batch=batch)
    model = SiMBASimple(input_dim=3, num_classes=26) if simple else SiMBA(input_dim=3, num_classes=26)
    train_model_c(model, train_loader, test_loader, num_epochs=epochs, learning_rate=lr)

In [3]:
# regressoin - TAC

class TACDataset(Dataset):
    def __init__(self, data_dir, sequence_length=1000):
        self.data_dir = data_dir
        self.sequence_length = sequence_length
        self.data = self._load_data()

    def _load_data(self):
        data_list = []
        pids = [f.split(' ')[0] for f in os.listdir(self.data_dir) if 'CAM Results.csv' in f]
        
        for pid in pids:
            cam_file = os.path.join(self.data_dir, f'{pid} CAM Results.csv')
            cam_data = pd.read_csv(cam_file)
            cam_data['Time'] = pd.to_datetime(cam_data['Time'])
            acc_file = os.path.join(self.data_dir, f'{pid}_acc_data.csv')
            acc_data = pd.read_csv(acc_file)
            acc_data['time'] = pd.to_datetime(acc_data['time'])
            acc_data = acc_data.sort_values('time').reset_index(drop=True)
            
            for idx, row in cam_data.iterrows():
                tac_time = row['Time']
                tac_value = row['TAC Level']
                
                acc_subset = acc_data[acc_data['time'] < tac_time].tail(self.sequence_length)
                if len(acc_subset) == self.sequence_length:
                    x = acc_subset[['x', 'y', 'z']].values
                    y = tac_value
                    data_list.append((x, y))
        return data_list

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        x, y = self.data[idx]
        x = torch.tensor(x, dtype=torch.float32)
        y = torch.tensor(y, dtype=torch.float32)
        return x, y

def load_tac(data_dir='./dataset/tac', batch=32, seq_len=1000):
    dataset = TACDataset(data_dir=data_dir, sequence_length=seq_len)
    train_idx, val_idx = train_test_split(np.arange(len(dataset)), test_size=0.2, random_state=42)
    train_sampler = torch.utils.data.SubsetRandomSampler(train_idx)
    val_sampler = torch.utils.data.SubsetRandomSampler(val_idx)

    train_loader = DataLoader(dataset, batch_size=batch, sampler=train_sampler)
    val_loader = DataLoader(dataset, batch_size=batch, sampler=val_sampler)
    return train_loader, val_loader

def run_tac(seq_len=1000, batch=32, epochs=12, lr=3e-3, simple=False):
    train_loader, val_loader = load_tac(batch=batch, seq_len=seq_len)
    model = SiMBASimple(input_dim=3, num_classes=1) if simple else SiMBA(input_dim=3, num_classes=1)
    train_model(model, train_loader, val_loader, num_epochs=epochs, learning_rate=lr)

In [7]:
# regression - Lamp

class LAMPDataset(Dataset):
    def __init__(self, data_dir, y_path, seq_len, target, is_regression):
        self.data_dir = data_dir
        self.seq_len = seq_len
        self.y_data = pd.read_csv(y_path)
        self.uuids = self.y_data['uuid'].values
        self.scores = self.y_data[target].values
        self.scaler = StandardScaler()
        self.is_regression = is_regression

    def __len__(self):
        return len(self.uuids)

    def __getitem__(self, idx):
        uuid = self.uuids[idx]
        score = self.scores[idx]
        file_path = os.path.join(self.data_dir, f'{uuid}.csv')
        data = pd.read_csv(file_path)
        data = data.sort_values(by='timestamp').reset_index(drop=True)
        time_series_data = data.drop(columns=['timestamp']).values
        time_series_data = self.scaler.fit_transform(time_series_data)
        if time_series_data.shape[0] < self.seq_len:
            padding = np.zeros((self.seq_len - time_series_data.shape[0], time_series_data.shape[1]))
            time_series_data = np.vstack((time_series_data, padding))
        elif time_series_data.shape[0] > self.seq_len:
            time_series_data = time_series_data[:self.seq_len, :]
        if self.is_regression:
            return torch.tensor(time_series_data, dtype=torch.float32), torch.tensor(score, dtype=torch.float32)
        else:
            return torch.tensor(time_series_data, dtype=torch.float32), torch.tensor(score, dtype=torch.int64)

def load_lamp(target, batch_size, seq_len, is_regression, data_dir='./dataset/lamp', y_file='./dataset/lamp/student_info.csv'):
    dataset = LAMPDataset(data_dir=data_dir, y_path=y_file, seq_len=seq_len, target=target, is_regression=is_regression)
    train_idx, val_idx = train_test_split(np.arange(len(dataset)), test_size=0.2, random_state=42)
    train_sampler = torch.utils.data.SubsetRandomSampler(train_idx)
    val_sampler = torch.utils.data.SubsetRandomSampler(val_idx)
    train_loader = DataLoader(dataset, batch_size=batch_size, sampler=train_sampler)
    val_loader = DataLoader(dataset, batch_size=batch_size, sampler=val_sampler)
    return train_loader, val_loader

def run_regression(target, batch=8, seq_len=6000, epochs=12, lr=3e-3, simple=False):
    train_loader, val_loader = load_lamp(target=target, batch_size=batch, seq_len=seq_len, is_regression=True)
    model = SiMBASimple(input_dim=15, num_classes=1, scale=80) if simple else SiMBA(input_dim=15, num_classes=1, scale=1)
    train_model(model, train_loader, val_loader, num_epochs=epochs, learning_rate=lr)

def run_classification(target, batch=8, seq_len=6000, epochs=12, lr=3e-3, simple=False):
    train_loader, val_loader = load_lamp(target=target, batch_size=batch, seq_len=seq_len, is_regression=False)
    model = SiMBASimple(input_dim=15, num_classes=2, scale=1) if simple else SiMBA(input_dim=15, num_classes=2, scale=1)
    train_model_c(model, train_loader, val_loader, num_epochs=epochs, learning_rate=lr)

In [None]:
run_handwriting()
run_tac()

In [9]:
run_regression(target='STAI-X-1', simple=True)
run_classification(target="is_STAI-X-1", simple=True)

Epoch 1/12


  return F.mse_loss(input, target, reduction=self.reduction)
                                                         

Train Loss: 600.5672, Val Loss: 37.8623
Model saved!
Epoch 2/12


  return F.mse_loss(input, target, reduction=self.reduction)
                                                         

Train Loss: 124.6956, Val Loss: 45.5317
Epoch 3/12


  return F.mse_loss(input, target, reduction=self.reduction)
                                                         

Train Loss: 118.2643, Val Loss: 34.6985
Model saved!
Epoch 4/12


  return F.mse_loss(input, target, reduction=self.reduction)
                                                         

Train Loss: 103.2348, Val Loss: 35.7588
Epoch 5/12


  return F.mse_loss(input, target, reduction=self.reduction)
                                                         

Train Loss: 125.4829, Val Loss: 37.9688
Epoch 6/12


  return F.mse_loss(input, target, reduction=self.reduction)
                                                         

Train Loss: 109.0614, Val Loss: 35.3274
Epoch 7/12


  return F.mse_loss(input, target, reduction=self.reduction)
                                                         

Train Loss: 116.5639, Val Loss: 38.6360
Epoch 8/12


  return F.mse_loss(input, target, reduction=self.reduction)
                                                         

Train Loss: 96.2602, Val Loss: 38.8472
Epoch 9/12


  return F.mse_loss(input, target, reduction=self.reduction)
                                                         

Train Loss: 92.9007, Val Loss: 35.8911
Epoch 10/12


  return F.mse_loss(input, target, reduction=self.reduction)
                                                         

Train Loss: 93.5012, Val Loss: 38.2805
Epoch 11/12


  return F.mse_loss(input, target, reduction=self.reduction)
                                                         

Train Loss: 99.4943, Val Loss: 36.1101
Epoch 12/12


  return F.mse_loss(input, target, reduction=self.reduction)
                                                         

Train Loss: 84.6787, Val Loss: 39.6269
Training complete.
Epoch 1/12


                                                         

Train Loss: 0.6348, Train Acc: 0.5088, Val Loss: 0.1664, Val Acc: 0.3333
Model saved!
Epoch 2/12


                                                         

Train Loss: 0.5567, Train Acc: 0.4737, Val Loss: 0.1308, Val Acc: 0.6667
Model saved!
Epoch 3/12


                                                         

Train Loss: 0.5148, Train Acc: 0.6316, Val Loss: 0.1344, Val Acc: 0.6667
Epoch 4/12


                                                         

Train Loss: 0.5189, Train Acc: 0.6316, Val Loss: 0.1299, Val Acc: 0.6667
Model saved!
Epoch 5/12


                                                         

Train Loss: 0.5182, Train Acc: 0.6316, Val Loss: 0.1313, Val Acc: 0.6667
Epoch 6/12


                                                         

Train Loss: 0.5294, Train Acc: 0.5789, Val Loss: 0.1410, Val Acc: 0.6667
Epoch 7/12


                                                         

Train Loss: 0.5031, Train Acc: 0.6491, Val Loss: 0.1288, Val Acc: 0.6667
Model saved!
Epoch 8/12


                                                         

Train Loss: 0.5237, Train Acc: 0.6316, Val Loss: 0.1382, Val Acc: 0.6667
Epoch 9/12


                                                         

Train Loss: 0.5353, Train Acc: 0.5614, Val Loss: 0.1294, Val Acc: 0.6667
Epoch 10/12


                                                         

Train Loss: 0.5323, Train Acc: 0.6316, Val Loss: 0.1361, Val Acc: 0.6667
Epoch 11/12


                                                         

Train Loss: 0.5546, Train Acc: 0.5614, Val Loss: 0.1384, Val Acc: 0.6667
Epoch 12/12


                                                         

Train Loss: 0.5228, Train Acc: 0.6316, Val Loss: 0.1330, Val Acc: 0.6667
Training complete.




In [5]:
# simple test code

def load_single_test_data(file_path, seq_len=1440):
    data = pd.read_csv(file_path)
    data = data.sort_values(by='timestamp').reset_index(drop=True)
    time_series_data = data.drop(columns=['timestamp']).values
    scaler = StandardScaler()
    time_series_data = scaler.fit_transform(time_series_data)

    if time_series_data.shape[0] < seq_len:
        padding = np.zeros((seq_len - time_series_data.shape[0], time_series_data.shape[1]))
        time_series_data = np.vstack((time_series_data, padding))
    elif time_series_data.shape[0] > seq_len:
        time_series_data = time_series_data[:seq_len, :]

    return torch.tensor(time_series_data, dtype=torch.float32).unsqueeze(0)

def run_inference(model, test_data, device='cuda'):
    model.to(device)
    model.eval()
    with torch.no_grad():
        test_data = test_data.to(device)
        output = model(test_data)
    
    return output.item()

def test(uuid):
    test_file_path = f'./dataset/lamp/{uuid}.csv'
    model_path = 'best_model.pth'

    model = SiMBA(input_dim=15, num_classes=1, scale=1)
    model.load_state_dict(torch.load(model_path))

    test_data = load_single_test_data(test_file_path, seq_len = 6000)

    prediction = run_inference(model, test_data)
    print(f"Predicted score: {prediction}")