In [1]:
import scipy.io
import numpy as np

# Load the .mat file
file_path = "L23_neuron_20210228_Y54_Z320_test.mat"  # Update with the correct path
mat_data = scipy.io.loadmat(file_path)

# Convert MATLAB arrays to NumPy arrays
data_dict = {
    "Eigenface_0_trials_evoked": np.array(mat_data["Eigenface_0_trials_evoked"]),
    "Eigenface_0_trials_isi": np.array(mat_data["Eigenface_0_trials_isi"]),
    "dFF0_trials_evoked": np.array(mat_data["dFF0_trials_evoked"]),
    "dFF0_trials_isi": np.array(mat_data["dFF0_trials_isi"]),
}
eigenface_evoked = []
dff_evoked =[]
eigenface_isi =[]
dff_isi =[]

# Direct assignment from the data_dict
eigenface_evoked = data_dict["Eigenface_0_trials_evoked"]
eigenface_isi = data_dict["Eigenface_0_trials_isi"]
dff_evoked = data_dict["dFF0_trials_evoked"]
dff_isi = data_dict["dFF0_trials_isi"]


In [2]:
import torch
from torch.utils.data import Dataset

class NeuralDataset(Dataset):
    """
    A custom Dataset to handle both evoked (with stimulus) and ISI (no-stimulus) data.
    We create samples of shape (seq_len, input_dim) for face motion,
    along with a stimulus indicator and the corresponding neural targets.
    
    This code will:
      - Separate evoked data by condition.
      - Combine with ISI data as 'no-stimulus' condition.
      - Possibly chunk into smaller sequences for the Transformer.
    """
    def __init__(self,
                 eigenface_evoked,   # shape (500, 1000, 4)
                 dff_evoked,         # shape (229, 1000, 4)
                 eigenface_isi,      # shape (500, 1000)
                 dff_isi,            # shape (229, 1000)
                 seq_len=100,
                 device='cpu'):
        super().__init__()
        self.seq_len = seq_len
        self.device = device

        # Evoked data: separate each condition
        # We will end up with 4 separate sequences for face & neural
        # each shape: (500, 1000)
        self.face_evoked_list = []
        self.neural_evoked_list = []
        self.stimulus_id_list = []  # store integer or one-hot for each condition

        for cond_idx in range(4):
            face_data_cond = eigenface_evoked[:, :, cond_idx]  # shape (500, 1000)
            neural_data_cond = dff_evoked[:, :, cond_idx]       # shape (229, 1000)
            
            # We'll store them as Tensors
            face_data_cond = torch.tensor(face_data_cond, dtype=torch.float32)
            neural_data_cond = torch.tensor(neural_data_cond, dtype=torch.float32)
            
            self.face_evoked_list.append(face_data_cond)
            self.neural_evoked_list.append(neural_data_cond)
            
            # Stimulus one-hot or ID
            # e.g. cond_idx ∈ {0,1,2,3} => one-hot of length 4 
            stim_one_hot = torch.zeros(4)
            stim_one_hot[cond_idx] = 1.0
            self.stimulus_id_list.append(stim_one_hot)
        
        # ISI data (no stimulus)
        # shape (500, 1000) for face, (229, 1000) for neural
        self.face_isi = torch.tensor(eigenface_isi, dtype=torch.float32)
        self.neural_isi = torch.tensor(dff_isi, dtype=torch.float32)

        # Build a list of all sequences (face, neural, stim_vector)
        # for evoked:
        self.samples = []
        for i in range(4):
            face_seq = self.face_evoked_list[i]     # (500, 1000)
            neural_seq = self.neural_evoked_list[i] # (229, 1000)
            stim_vec = self.stimulus_id_list[i]     # (4,)
            
            # Chunk the 1000 frames into segments of seq_len
            # e.g. for start in [0, seq_len, 2*seq_len, ...]
            # Make sure we only keep full segments
            n_frames = face_seq.shape[1]  # 1000
            for start in range(0, n_frames, seq_len):
                end = start + seq_len
                if end > n_frames:
                    break
                face_chunk = face_seq[:, start:end]   # shape (500, seq_len)
                neural_chunk = neural_seq[:, start:end]  # shape (229, seq_len)
                self.samples.append((face_chunk, stim_vec, neural_chunk, True))  
                # last param = True indicates "stimulus present"
        
        # for ISI:
        n_frames_isi = self.face_isi.shape[1]  # 1000
        for start in range(0, n_frames_isi, seq_len):
            end = start + seq_len
            if end > n_frames_isi:
                break
            face_chunk = self.face_isi[:, start:end]   # shape (500, seq_len)
            neural_chunk = self.neural_isi[:, start:end]  # shape (229, seq_len)
            # Stim is zero vector => no stimulus
            stim_vec = torch.zeros(4)
            self.samples.append((face_chunk, stim_vec, neural_chunk, False))
        
        # We have a big list of (face_chunk, stim_one_hot, neural_chunk, has_stim)
        # We'll shuffle them in the DataLoader.  
        
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        face_chunk, stim_vec, neural_chunk, has_stim = self.samples[idx]

        # shape transformations if needed:
        # Now face_chunk: (500, seq_len). We want it as (seq_len, 500) for the Transformer
        face_chunk = face_chunk.permute(1, 0)  # shape (seq_len, 500)
        neural_chunk = neural_chunk.permute(1, 0)  # shape (seq_len, 229)
        
        return (face_chunk, stim_vec, neural_chunk, has_stim)


In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class MultimodalTransformer(nn.Module):
    def __init__(self,
                 face_dim=500,       # input dimension of face PCA features
                 stim_dim=4,         # one-hot size for the 4 stimuli
                 d_model=128,        # transformer embedding dimension
                 nhead=4,            # number of attention heads
                 num_layers=2,       # number of transformer encoder layers
                 hidden_mlp=256,     # feedforward dimension in Transformer
                 output_dim=229,     # number of neurons to predict
                 dropout=0.1):
        super().__init__()
        
        self.face_dim = face_dim
        self.stim_dim = stim_dim
        self.d_model = d_model
        
        # 1) Linear projection for face input => d_model
        self.face_embedding = nn.Linear(face_dim, d_model)
        
        # 2) Embedding for stimulus => project 4-d one-hot to d_model
        self.stim_embedding = nn.Linear(stim_dim, d_model)
        
        # 3) Positional encoding (basic)
        self.positional_encoding = PositionalEncoding(d_model, dropout=dropout)
        
        # 4) Transformer Encoder
        encoder_layer = nn.TransformerEncoderLayer(d_model=d_model,
                                                   nhead=nhead,
                                                   dim_feedforward=hidden_mlp,
                                                   dropout=dropout,
                                                   activation='relu')
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer,
                                                         num_layers=num_layers)
        
        # 5) Output head: map from d_model -> output_dim (229)
        self.output_layer = nn.Linear(d_model, output_dim)
        
    def forward(self, face_seq, stim_vec, has_stim=None):
        """
        face_seq: (batch_size, seq_len, face_dim)
        stim_vec: (batch_size, stim_dim) - one-hot
        has_stim: (batch_size,) boolean or None
        """
        # Project face input to d_model
        B, T, _ = face_seq.shape
        
        face_embed = self.face_embedding(face_seq)  # (B, T, d_model)
        
        # Expand stimulus embedding across time:
        #  stim_vec => (B, 1, d_model), then tile to (B, T, d_model)
        stim_embed = self.stim_embedding(stim_vec)  # (B, d_model)
        stim_embed = stim_embed.unsqueeze(1).expand(-1, T, -1)  # (B, T, d_model)
        
        # If we have a "no stimulus" case, we can zero out or learn a separate embedding
        # For now, just keep stim_embed for both. Another approach:
        if has_stim is not None:
            # has_stim is a bool vector of length B
            # we can zero out stim_embed if has_stim=False
            mask = has_stim.view(-1, 1, 1).float()  # shape (B,1,1)
            stim_embed = stim_embed * mask  # zero if no stimulus
        # Alternatively, use something like a separate "no stim" embedding. 
        # This is flexible.
        
        # Combine face + stimulus by adding or concatenating.
        # A typical approach in transformers is to just add them (like we do with position encoding).
        # Or we can also concatenate them along dimension=2 and use a linear to get back to d_model.
        # Let's add them:
        x = face_embed + stim_embed  # shape (B, T, d_model)
        
        # Add positional encoding
        x = self.positional_encoding(x)  # (B, T, d_model)
        
        # Transformer expects shape (T, B, d_model), so transpose:
        x = x.permute(1, 0, 2)  # (T, B, d_model)
        
        # Pass through the transformer encoder
        encoded = self.transformer_encoder(x)  # (T, B, d_model)
        
        # Convert back to (B, T, d_model)
        encoded = encoded.permute(1, 0, 2)
        
        # Predict neural activity at each time step
        out = self.output_layer(encoded)  # (B, T, 229)
        
        return out


class PositionalEncoding(nn.Module):
    """
    Standard positional encoding for transformers.
    This version inserts sinusoidal positional embeddings.
    """
    def __init__(self, d_model, dropout=0.1, max_len=10000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * 
                             (-torch.log(torch.tensor(10000.0)) / d_model))
        
        pe = torch.zeros(max_len, d_model)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)  # shape (1, max_len, d_model)
        
        self.register_buffer('pe', pe)

    def forward(self, x):
        # x shape: (batch_size, seq_len, d_model)
        seq_len = x.size(1)
        # Add positional encoding
        x = x + self.pe[:, :seq_len, :]
        return self.dropout(x)


In [4]:
import torch
import torch.optim as optim
from torch.utils.data import DataLoader

def train_model(eigenface_evoked, dff_evoked,
                eigenface_isi, dff_isi,
                epochs=10,
                batch_size=8,
                seq_len=100,
                lr=1e-3,
                device='cuda'):
    
    # 1) Build dataset & dataloader
    dataset = NeuralDataset(eigenface_evoked, dff_evoked,
                            eigenface_isi, dff_isi,
                            seq_len=seq_len,
                            device=device)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, drop_last=True)
    
    # 2) Instantiate the model
    model = MultimodalTransformer(
        face_dim=500,
        stim_dim=4,
        d_model=128,
        nhead=4,
        num_layers=2,
        hidden_mlp=256,
        output_dim=229,
        dropout=0.1
    ).to(device)
    
    # 3) Loss and optimizer
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    # 4) Training loop
    for epoch in range(epochs):
        model.train()
        total_loss = 0.0
        
        for batch in dataloader:
            face_chunk, stim_vec, neural_chunk, has_stim = batch
            # face_chunk : (B, seq_len, 500)
            # stim_vec   : (B, 4)
            # neural_chunk : (B, seq_len, 229)
            # has_stim   : (B,) bool
            face_chunk = face_chunk.to(device)
            stim_vec   = stim_vec.to(device)
            neural_chunk = neural_chunk.to(device)
            has_stim   = has_stim.to(device)
            
            # ----- Modality Dropout (optional) -----
            # randomly zero out face or stim with some probability
            drop_prob = 0.1  # 10% chance to drop a modality
            if torch.rand(1).item() < drop_prob:
                # drop face
                face_chunk = torch.zeros_like(face_chunk)
            if torch.rand(1).item() < drop_prob:
                # drop stimulus
                stim_vec = torch.zeros_like(stim_vec)
                has_stim = torch.zeros_like(has_stim)  # effectively no stim

            optimizer.zero_grad()
            
            # forward
            pred = model(face_chunk, stim_vec, has_stim=has_stim)
            # pred shape: (B, seq_len, 229)
            
            # compute loss
            loss = criterion(pred, neural_chunk)
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
        
        avg_loss = total_loss / len(dataloader)
        print(f"Epoch [{epoch+1}/{epochs}], Loss: {avg_loss:.4f}")
    
    return model



In [9]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = train_model(
     eigenface_evoked, dff_evoked,
     eigenface_isi, dff_isi,
     epochs=50, batch_size=8, seq_len=100, lr=1e-3, device=device
 )



Epoch [1/50], Loss: 0.2409
Epoch [2/50], Loss: 0.0530
Epoch [3/50], Loss: 0.0345
Epoch [4/50], Loss: 0.0263
Epoch [5/50], Loss: 0.0244
Epoch [6/50], Loss: 0.0226
Epoch [7/50], Loss: 0.0210
Epoch [8/50], Loss: 0.0193
Epoch [9/50], Loss: 0.0186
Epoch [10/50], Loss: 0.0181
Epoch [11/50], Loss: 0.0176
Epoch [12/50], Loss: 0.0169
Epoch [13/50], Loss: 0.0165
Epoch [14/50], Loss: 0.0160
Epoch [15/50], Loss: 0.0157
Epoch [16/50], Loss: 0.0154
Epoch [17/50], Loss: 0.0148
Epoch [18/50], Loss: 0.0145
Epoch [19/50], Loss: 0.0141
Epoch [20/50], Loss: 0.0142
Epoch [21/50], Loss: 0.0137
Epoch [22/50], Loss: 0.0133
Epoch [23/50], Loss: 0.0131
Epoch [24/50], Loss: 0.0128
Epoch [25/50], Loss: 0.0127
Epoch [26/50], Loss: 0.0124
Epoch [27/50], Loss: 0.0122
Epoch [28/50], Loss: 0.0121
Epoch [29/50], Loss: 0.0118
Epoch [30/50], Loss: 0.0116
Epoch [31/50], Loss: 0.0118
Epoch [32/50], Loss: 0.0117
Epoch [33/50], Loss: 0.0114
Epoch [34/50], Loss: 0.0110
Epoch [35/50], Loss: 0.0109
Epoch [36/50], Loss: 0.0107
E

In [15]:
# 1) Suppose you've already trained your model:
# model = train_model(...)  # your custom function
# 
# 2) Suppose you have a DataLoader to evaluate on:
eval_loader = create_eval_loader(...)
#
# 3) Get predictions:

all_preds, all_actual = compute_predictions(model, eval_loader, device=device)

# 4) Plot a time-series trace for a single neuron:

plot_single_neuron_trace(all_preds, all_actual, sample_idx=0, neuron_id=0)

# 5) Compute and plot R² for each neuron:

r2_vals = compute_r2_per_neuron(all_preds, all_actual)
plot_r2_distribution(r2_vals)

# 6) Scatter plot predicted vs actual for neuron 0, flattening across all batches:

plot_scatter_pred_vs_actual(all_preds, all_actual, neuron_id=0, sample_idx=None)

# 7) Plot PCA trajectories for one sample:

plot_pca_trajectory(all_preds, all_actual, sample_idx=0, n_components=2)


NameError: name 'create_eval_loader' is not defined