# Recurrent Neural Network with Long Short-Term Memory layers

# Load data

In [4]:
import os
import pandas as pd
import numpy as np
import torch

# Base directory
base_dir = '/home/vincent/AAA_projects/MVCS/Neuroscience/'

# Define the file paths
file_paths = {
    'eeg_data': os.path.join(base_dir, 'eeg_data_with_channels.npy'),
    'hurst_exponents': os.path.join(base_dir, 'HurstExponents/hurst_exponents.npy'),
    'mfdfa_combined': os.path.join(base_dir, 'RNN_data/rnn_X_data_combined.npy'),
    'kuramoto_phases': os.path.join(base_dir, 'Modelling/Kuramoto/kuramoto_phases_extended.npy'),
    'dipole': os.path.join(base_dir, 'Features/RNN/rnn_dipole.npy'),
    'arnold_tongues_rotations': os.path.join(base_dir, 'Features/RNN/rnn_arnold_tongues_rotations.npy'),
    'transfer_entropy_hemispheric_avg_input': os.path.join(base_dir, 'Features/RNN/rnn_transfer_entropy_hemispheric_avg_input.npy'),
    'dspm': os.path.join(base_dir, 'Features/RNN/rnn_dspm.npy'),
    'fractal': os.path.join(base_dir, 'Features/RNN/rnn_fractal.npy'),
    'mfdfa': os.path.join(base_dir, 'Features/RNN/rnn_mfdfa.npy'),
    'phase_syncronization': os.path.join(base_dir, 'Analysis/Phase Syncronization/plv_matrix.npy'),
    'quantum_matrix_elements': os.path.join(base_dir, 'Analysis/Quantum_Analysis/matrix_elements.npy'),
    'quantum_eigendata': os.path.join(base_dir, 'Analysis/Quantum_Analysis/eigen_data.npy'),
    'quantum_expectation_values': os.path.join(base_dir, 'Analysis/Quantum_Analysis/expectation_values.npy'),
    'hermitian_matrices': os.path.join(base_dir, 'Analysis/Quantum_Analysis/hermitian_matrices.npy'),
    'band_powers': os.path.join(base_dir, 'Analysis/Spectral Analysis/BandPowers_x.npy'),
    'combined_fft_psd': os.path.join(base_dir, 'Analysis/Spectral Analysis/combined_fft_psd_x.npy'),
    'peak_frequencies': os.path.join(base_dir, 'Analysis/Spectral Analysis/PeakFrequencies_x.npy'),
    'psd': os.path.join(base_dir, 'Analysis/Spectral Analysis/psd_x.npy'),
    'spectral_centroids': os.path.join(base_dir, 'Analysis/Spectral Analysis/SpectralCentroids_x.npy'),
    'spectral_edge_densities': os.path.join(base_dir, 'Analysis/Spectral Analysis/SpectralEdgeDensities_x.npy'),
    'spectral_entropy': os.path.join(base_dir, 'Analysis/Spectral Analysis/SpectralEntropy_x.npy'),
    'stft': os.path.join(base_dir, 'Analysis/Spectral Analysis/STFT_x.npy'),
    'welchs': os.path.join(base_dir, 'Analysis/Spectral Analysis/welchs_x.npy'),
    'false_nearest_neighbors': os.path.join(base_dir, 'false_nearest_neighbors.npy')
}

# Create tensors

In [2]:
tensor_data = {}

def handle_complex(array):
    if np.iscomplexobj(array):
        real_part = np.real(array)
        imag_part = np.imag(array)
        magnitude = np.abs(array)
        return real_part, imag_part, magnitude
    else:
        return array, None, None

for key, path in file_paths.items():
    if path.endswith('.csv'):
        df = pd.read_csv(path)
        tensor_data[key] = torch.tensor(df.values, dtype=torch.float32)
    elif path.endswith('.npy'):
        array = np.load(path, allow_pickle=True)  # Allow pickled object loading
        
        # Check if the array is 0-dimensional
        if array.ndim == 0:
            value = array.item()
            if isinstance(value, dict):
                print(f"Dictionary detected for {key}. Contents:\n{value}")
            else:
                tensor_data[key] = torch.tensor(value, dtype=torch.float32)  # Convert scalar to tensor
            continue

        
        if array.dtype == np.object_:  # Handle object dtype
            # If you expect nested arrays, you can flatten them, otherwise, print for inspection
            print(f"Object dtype detected for {key}. Inspecting first element: {array[0]}")
            continue  # Skip this iteration and handle the array later, based on its content
        
        real_part, imag_part, magnitude = handle_complex(array)  # Handle complex numbers
        
        tensor_data[key] = torch.tensor(real_part, dtype=torch.float32)
        
        # If the array was complex, add the imaginary and magnitude parts as separate tensors
        if imag_part is not None:
            tensor_data[key + "_imag"] = torch.tensor(imag_part, dtype=torch.float32)
            tensor_data[key + "_magnitude"] = torch.tensor(magnitude, dtype=torch.float32)
    else:
        print(f"Unrecognized file format for: {key}")

# Print the shapes of all tensors
for key, tensor in tensor_data.items():
    print(f"Shape of {key}:", tensor.shape)

Dictionary detected for band_powers. Contents:
{'Fp1': {'delta_power': array([4.78590761e+08, 3.72771719e+08, 1.56330447e+08, ...,
       8.39795010e+07, 2.62797441e+08, 3.56701517e+08]), 'theta_power': array([2.75602827e+08, 9.63921238e+08, 8.14212146e+08, ...,
       5.30444548e+08, 6.00202215e+08, 1.54066539e+08]), 'alpha_power': array([7.53269008e+07, 4.92050954e+08, 1.03908801e+09, ...,
       7.77922050e+08, 4.31995333e+08, 8.20214470e+07]), 'beta_power': array([2.83392209e+07, 4.49856658e+08, 1.32748362e+09, ...,
       1.39200369e+09, 6.83904722e+08, 1.57695470e+08]), 'gamma_power': array([0., 0., 0., ..., 0., 0., 0.])}, 'Fpz': {'delta_power': array([4.26684285e+08, 3.32460615e+08, 1.39361559e+08, ...,
       7.73618360e+07, 2.42237090e+08, 3.28682974e+08]), 'theta_power': array([2.45754207e+08, 8.59787064e+08, 7.26403624e+08, ...,
       4.88230911e+08, 5.52074293e+08, 1.41561671e+08]), 'alpha_power': array([6.73075512e+07, 4.39527516e+08, 9.27974206e+08, ...,
       7.1738814

# Normalize the data

In [8]:
def normalize_tensor(tensor):
    """Normalize a tensor to have zero mean and unit variance."""
    mean = torch.mean(tensor)
    std = torch.std(tensor)
    return (tensor - mean) / (std + 1e-7)  # adding a small value to avoid division by zero

def channel_wise_normalize(tensor):
    """Normalize a tensor along its channel dimension."""
    mean = torch.mean(tensor, dim=0, keepdim=True)
    std = torch.std(tensor, dim=0, keepdim=True)
    return (tensor - mean) / (std + 1e-7)  # adding a small value to avoid division by zero

# Normalize eeg_data
tensor_data['eeg_data'] = channel_wise_normalize(tensor_data['eeg_data'])

# Normalize other tensors
for key, tensor in tensor_data.items():
    if key != 'eeg_data':  # we've already normalized eeg_data
        if tensor.dim() == 3 and tensor.size(1) == 32:  # Check for shape like [X, 32, Y]
            tensor_data[key] = channel_wise_normalize(tensor)
        else:
            tensor_data[key] = normalize_tensor(tensor)

# If there's any specific tensor that shouldn't be normalized or needs special attention, handle that separately.

# Split

In [12]:
def split_data(tensor, train_ratio=0.7, val_ratio=0.2):
    num_samples = tensor.shape[0]
    train_end = int(train_ratio * num_samples)
    val_end = int((train_ratio + val_ratio) * num_samples)
    
    return tensor[:train_end], tensor[train_end:val_end], tensor[val_end:]

# Split each tensor
train_tensors, val_tensors, test_tensors = {}, {}, {}
for key, tensor in tensor_data.items():
    train_tensors[key], val_tensors[key], test_tensors[key] = split_data(tensor)


# RNN



In [None]:
# Define the LSTM model (Ensure output_dim matches EEG data channel size)
class LSTMRNN(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers, output_dim):
        super(LSTMRNN, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_dim, output_dim)
    
    def forward(self, x):
        out, _ = self.lstm(x)
        out = self.linear(out)
        return out

# Hyperparameters
input_dim = sum(tensor.shape[-1] for tensor in train_tensors.values())  # Total concatenated feature size
hidden_dim = 128
num_layers = 2
output_dim = train_tensors["eeg_data"].shape[1]  # Number of EEG channels

# Instantiate the model
model = LSTMRNN(input_dim, hidden_dim, num_layers, output_dim)

# Loss and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training settings
num_epochs = 10
batch_size = 64
n_samples = len(train_tensors["eeg_data"])
n_batches = n_samples // batch_size

# Training loop
for epoch in range(num_epochs):
    for batch_idx in range(n_batches):
        # Get batch data
        start_idx = batch_idx * batch_size
        end_idx = start_idx + batch_size
        inputs = torch.cat([tensor[start_idx:end_idx] for tensor in train_tensors.values()], dim=-1)
        
        # Align targets for the batch
        targets = train_tensors["eeg_data"][start_idx+1:end_idx+1]  # shifted by one timestep
        
        # Zero the parameter gradients
        optimizer.zero_grad()
        
        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        
        # Backward pass and optimization
        loss.backward()
        optimizer.step()
    
    # Print epoch loss
    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {loss.item()}')

# Evaluation

# Model Saving and Loading

consider using techniques like gradient clipping, learning rate scheduling, advanced regularization techniques,
 Bidirectional RNNs, attention mechanisms, transformer architectures

In [7]:
def create_sequences(tensor, seq_length):
    xs = []
    ys = []
    for i in range(len(tensor) - seq_length - 1):  # -1 to avoid index out of range for y
        x = tensor[i:i+seq_length]
        y = tensor[i+seq_length]  # the next one is the target
        xs.append(x)
        ys.append(y)
    return torch.stack(xs), torch.stack(ys)

seq_length = 50  # you can adjust this
X, y = create_sequences(eeg_tensor_normalized, seq_length)

In [16]:
# Calculate the index separating the training and test data
train_test_split_idx = int(len(rnn_mfdfa_X_tensor) * 0.8)

# Split rnn_X_tensor, eeg_tensor_normalized and hurst_exponents_tensor_normalized
train_X = rnn_mfdfa_X_tensor[:train_test_split_idx]
test_X = rnn_mfdfa_X_tensor[train_test_split_idx:]

train_X_hurst = hurst_exponents_tensor_normalized[:train_test_split_idx]
test_X_hurst = hurst_exponents_tensor_normalized[train_test_split_idx:]

train_y = eeg_tensor_normalized[:train_test_split_idx]
test_y = eeg_tensor_normalized[train_test_split_idx:]

print(f"Training set: {len(train_X)} samples")
print(f"Test set: {len(test_X)} samples")


Training set: 688 samples
Test set: 172 samples


In [25]:
print(train_X.shape)
print(train_X_hurst.shape)
print(train_y.shape)


torch.Size([688, 100, 51])
torch.Size([32, 1])
torch.Size([688, 33])


In [17]:
import torch.nn as nn

class EEGPredictor(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(EEGPredictor, self).__init__()
        self.hidden_size = hidden_size
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        out, _ = self.lstm(x)
        out = self.fc(out[:, -1, :])
        return out

# Instantiate the model
input_size = train_X.shape[-1]  # input feature dimension
hidden_size = 64  # number of hidden units in LSTM
output_size = train_y.shape[-1]  # output feature dimension
model = EEGPredictor(input_size, hidden_size, output_size)

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
import pytorch_lightning as pl
import optuna
from torch.utils.data import DataLoader

# Set the precision to 'high'
torch.set_float32_matmul_precision('high')

# Make sure your process_dataframes function is returning correctly shaped and combined tensors
class LSTMModel(pl.LightningModule):
    def __init__(self, input_dim, hidden_dim, layer_dim, output_dim, learning_rate):
        super(LSTMModel, self).__init__()
        self.save_hyperparameters()

        self.hidden_dim = hidden_dim
        self.layer_dim = layer_dim

        # Replace nn.RNN with nn.LSTM
        self.lstm = nn.LSTM(input_dim, hidden_dim, layer_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        h0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).requires_grad_().cuda()
        c0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).requires_grad_().cuda()
        h0 = h0.detach()
        c0 = c0.detach()

        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out

    def training_step(self, batch, batch_idx):
        x, y = batch
        y_pred = self(x)
        loss = nn.MSELoss()(y_pred, y)
        self.log("train_loss", loss)
        return loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.hparams.learning_rate)

def train_model(trial):
    # Define model
    model = LSTMModel(input_dim=10,  # adjust this to fit your data
                      hidden_dim=trial.suggest_categorical('hidden_dim', [32, 64, 128]),
                      layer_dim=trial.suggest_categorical('layer_dim', [2, 3, 4]),
                      output_dim=1,  # adjust this to fit your data
                      learning_rate=trial.suggest_loguniform('learning_rate', 1e-5, 1e-1))

    # Create TensorDatasets for training and test sets
    train_dataset = TensorDataset(train_X, train_X_hurst, train_y)
    test_dataset = TensorDataset(test_X, test_X_hurst, test_y)
    
    # If you want to use DataLoader for batching and shuffling during training, you can do the following:
    batch_size = 32
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size)


    # Create the Trainer
    trainer = pl.Trainer(
        devices=2,  # change this according to your setup
        accelerator="cuda",  # change this according to your setup
        max_epochs=100
    )


    # Fit the model
    trainer.fit(model, train_dl)

    # Evaluate the model
    model.eval()
    with torch.no_grad():
        mse = 0
        loss = nn.MSELoss()
        for x, y in train_dl:
            mse += loss(model(x), y)
        mse /= len(train_dl)

    return mse.item()


if __name__ == "__main__":
    study = optuna.create_study(direction="minimize")
    study.optimize(train_model, n_trials=100)

    print("Number of finished trials: ", len(study.trials))
    print("Best trial:")
    trial = study.best_trial

    print("  Value: ", trial.value)
    print("  Params: ")
    for key, value in trial.params.items():
        print("    {}: {}".format(key, value))


In [None]:
# Move tensors to GPU
test_X = test_X.cuda()
test_y = test_y.cuda()

# Set model to evaluation mode
model.eval()

# No gradient calculation needed
with torch.no_grad():
    # Forward pass
    predictions = model(test_X)

# Calculate the loss
loss = criterion(predictions, test_y)

print(f"Test Loss: {loss.item()}")
