In [2]:
import matplotlib
#matplotlib.use('Agg')

%load_ext autoreload
%autoreload 2

%matplotlib tk
%autosave 180


import matplotlib.pyplot as plt
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

import numpy as np
import os


Autosaving every 180 seconds


  from IPython.core.display import display, HTML


############### PREDICT NEXT NEURAL TIME SERIES POINT ################ 

Here we are just trying to get the transformer to learn the dynamics of raw (i.e. PCA denoised) neural time series.

#### Step 1: One brain area

- input shape [time_points] = [40000]    # this is just a single time series from the visualizion notebook
- label: [time_points[1:]]                # here we predict the time series but shifted by 1

This is exactly what transformers are developed to do, so we shouldn't have to do too much work to adapt them. We can also smooth or bin the neural data as it's abit noisy. 

##### Major challenges:

1. Figure out how to feed continous time series into the transformer.

There are some methods already out there

https://huggingface.co/blog/time-series-transformers

https://huggingface.co/docs/transformers/model_doc/time_series_transformer


#### Step 2: Multiple brain areas

- input shape [time_points, n_areas] = [40000, 30]     #  
- label: [time_points[1:], 30]                         # 

##### Major challenges:

1. So we would need to extend the above to work with multiple cortical areas...


In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import math
from torch.utils.data import DataLoader, TensorDataset
import glob
from sklearn.preprocessing import StandardScaler

In [4]:
#Loading The Data
root_dir = 'data'
mouse_id = 'IA1'
session_id = 'Feb_16'

# load the raw data from each neural area
animal_dir = os.path.join(root_dir, mouse_id, session_id)

# find the file using glob that has "wholestack.npz" in it
fname = glob.glob(os.path.join(animal_dir, '*wholestack.npz'))[0]

In [5]:
# How much are we working with - Reducing initial computational time
data_slice = 40000

# Load data and verify shape
data_stack = np.load(fname, allow_pickle=True)
data = data_stack['whole_stack'].T  # Shape: (40000, 16)
neural_data = torch.tensor(data[:data_slice], dtype=torch.float32)  # Slice to (1000, 16) - Cut down on the computational complexity


# After loading data and slicing to (1000, 16)
# Compute mean/std on training data only
training_slice = int(data_slice*0.7)
train_data = neural_data[:training_slice]  # 70% of 1000 = 700 samples
mean = train_data.mean(dim=0)
std = train_data.std(dim=0)
neural_data_normalized = (neural_data - mean) / (std + 1e-8)

window_size = 20  # Adjust with some testing

# Regenerate inputs/targets with normalized data
inputs, targets = [], []
for i in range(window_size, len(neural_data_normalized)):
    inputs.append(neural_data_normalized[i-window_size:i])
    targets.append(neural_data_normalized[i])


# Convert to tensors
inputs = torch.stack(inputs)    # Shape: (N, window_size, 16)
targets = torch.stack(targets)  # Shape: (N, 16)

# Split into train/val/test (70/15/15)
train_size = int(0.7 * len(inputs))
val_size = int(0.15 * len(inputs))
test_size = len(inputs) - train_size - val_size

train_dataset = TensorDataset(inputs[:train_size], targets[:train_size])
val_dataset = TensorDataset(inputs[train_size:train_size+val_size], targets[train_size:train_size+val_size])
test_dataset = TensorDataset(inputs[-test_size:], targets[-test_size:])

# Create DataLoaders
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [6]:
class TEAModule(nn.Module):
    def __init__(self, input_dims, core_dims):
        super().__init__()
        # Time compression: (batch, time, features) => (batch, core_time, features)
        self.U_time = nn.Linear(input_dims[0], core_dims[0], bias=False)
        
        # Feature compression: (batch, core_time, features) => (batch, core_time, core_feat)
        self.U_feat = nn.Linear(input_dims[1], core_dims[1], bias=False)

    def forward(self, x):
        # x shape: (batch, time_steps, features)
        
        # 1. Compress time dimension
        x = x.transpose(1, 2)  # (batch, features, time)
        x = self.U_time(x)     # (batch, features, core_time)
        x = x.transpose(1, 2)  # (batch, core_time, features)
        
        # 2. Compress feature dimension
        x = self.U_feat(x)     # (batch, core_time, core_feat)
        return x

class TEAReconstruction(nn.Module):
    def __init__(self, core_dims, output_dims):
        super().__init__()
        self.V_feat = nn.Linear(core_dims[1], output_dims[1], bias=False)
        self.V_time = nn.Linear(core_dims[0], output_dims[0], bias=False)

    def forward(self, x):
        # 1. Reconstruct features
        x = self.V_feat(x)     # (batch, core_time, features)
        
        # 2. Reconstruct time dimension
        x = x.transpose(1, 2)  # (batch, features, core_time)
        x = self.V_time(x)     # (batch, features, time)
        x = x.transpose(1, 2)  # (batch, time, features)
        return x

In [7]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(1), :]
        return self.dropout(x)

class NeuralTransformer(nn.Module):
    def __init__(self, input_dim=16, d_model=64, core_dims=(10, 32), 
                 nhead=4, num_layers=3, window_size=20):
        super().__init__()
        self.d_model = d_model
        self.embedding = nn.Linear(input_dim, d_model)
        self.norm = nn.LayerNorm(d_model)
        self.pos_encoder = PositionalEncoding(d_model)
        
        # TEA Compression
        self.tea_compress = TEAModule(
            input_dims=(window_size, d_model), 
            core_dims=core_dims
        )
        
        # Transformer Encoder (operates on compressed core tensor)
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=core_dims[1],  # Core feature dimension
            nhead=nhead,
            batch_first=True,
            dropout=0.1
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers)
        
        # TEA Reconstruction
        self.tea_reconstruct = TEAReconstruction(
            core_dims=core_dims,
            output_dims=(window_size, d_model)
        )
        
        self.decoder = nn.Linear(d_model, input_dim)
        
    def forward(self, x):
        # Original embedding
        x = self.embedding(x) * math.sqrt(self.d_model)
        x = self.norm(x)
        x = self.pos_encoder(x)  # (batch, window_size, d_model)
        
        # TEA Compression
        core = self.tea_compress(x)  # (batch, core_time, core_feat)
        
        # Transformer processing
        core = self.transformer(core)  # (batch, core_time, core_feat)
        
        # TEA Reconstruction
        x_recon = self.tea_reconstruct(core)  # (batch, window_size, d_model)
        
        # Decoder
        x_out = x_recon.mean(dim=1)  # Aggregate time steps
        return self.decoder(x_out)

In [8]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = NeuralTransformer().to(device)
criterion = nn.HuberLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4) # Dicuss choice in Adam

# Training loop
num_epochs = 50
for epoch in range(num_epochs):
    model.train()
    for batch_inputs, batch_targets in train_loader:
        batch_inputs, batch_targets = batch_inputs.to(device), batch_targets.to(device)
        optimizer.zero_grad()
        outputs = model(batch_inputs)
        loss = criterion(outputs, batch_targets)
        
        if torch.isnan(loss):
            print("NaN detected. Stopping training.")
            break
            
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)  # Gradient clipping
        optimizer.step()
    
    print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}")

RuntimeError: mat1 and mat2 shapes cannot be multiplied (640x64 and 20x10)