In [None]:
from PacTimeOrig.data import DataHandling as dh
from PacTimeOrig.data import DataProcessing as dp
from PacTimeOrig.Attractor import base as attractor
from PacTimeOrig.utils import processing as proc
from ChangeOfMind.functions import processing as procdata
from PacTimeOrig.models import io,NNtorch,training,utils,lossfn
import scipy
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from scipy.ndimage import gaussian_filter1d



def probabilities_to_logits_sigmoid(probabilities):
    probabilities = np.clip(probabilities, 1e-15, 1 - 1e-15)  # Avoid log(0) or division by zero
    logits = np.log(probabilities / (1 - probabilities))
    return logits



def replace_bad_logits(logits_list, clip_min=-1e3, clip_max=1e3):
    for i, logit in enumerate(logits_list):
        if not torch.isfinite(logit).all():  # Check for NaN or Inf
            print(f"Logit entry {i}: contains NaN or Inf values, replacing with clipped values")
            # Replace NaN or Inf with 0 as a fallback
            logits_list[i] = torch.full_like(logit, 0)
        else:
            # Replace extreme values with clipped values
            min_mask = logit < clip_min
            max_mask = logit > clip_max

            if min_mask.any() or max_mask.any():
                print(f"Logit entry {i}: min={logit.min()}, max={logit.max()} (extreme values detected)")
                
                # Clip the logits
                logit[min_mask] = clip_min
                logit[max_mask] = clip_max
                
                # Use the sign of the last valid value, if available
                for j in torch.where(min_mask | max_mask)[0]:
                    if j > 0:  # Ensure there is a previous value
                        logit[j] = torch.sign(logit[j - 1]) * torch.clamp(logit[j], clip_min, clip_max)
                    else:
                        logit[j] = clip_min if min_mask[j] else clip_max

            logits_list[i] = logit

    return logits_list



def soft_range_normalize(tensor_list):
    # Concatenate all tensors along a new dimension
    all_data = torch.cat(tensor_list, dim=0)  # Shape: (total_samples, num_neurons)

    # Compute mean and range for each neuron (across the column dimension)
    neuron_means = all_data.mean(dim=0)  # Shape: (num_neurons,)
    neuron_ranges = all_data.max(dim=0).values - all_data.min(dim=0).values  # Shape: (num_neurons,)
    neuron_ranges_sqrt = torch.sqrt(neuron_ranges + 1e-8)  # Add epsilon for numerical stability

    # Normalize each tensor in the list
    normalized_tensors = []
    for tensor in tensor_list:
        # Subtract mean and divide by sqrt of range for each neuron
        normalized_tensor = (tensor - neuron_means) / neuron_ranges_sqrt
        normalized_tensors.append(normalized_tensor)

    return normalized_tensors

def compute_soft_range_params(tensor_list):
    """
    Compute the mean and range parameters for soft range normalization.
    """
    # Concatenate all tensors in the train set
    all_data = torch.cat(tensor_list, dim=0)  # Shape: (total_samples, num_neurons)

    # Compute mean and range for each neuron
    neuron_means = all_data.mean(dim=0)  # Shape: (num_neurons,)
    neuron_ranges = all_data.max(dim=0).values - all_data.min(dim=0).values  # Shape: (num_neurons,)
    neuron_ranges_sqrt = torch.sqrt(neuron_ranges + 1e-8)  # Add epsilon for numerical stability

    return neuron_means, neuron_ranges_sqrt


def compute_zscore_params(tensor_list):
    """
    Compute the mean and range parameters for soft range normalization.
    """
    # Concatenate all tensors in the train set
    all_data = torch.cat(tensor_list, dim=0)  # Shape: (total_samples, num_neurons)

    # Compute mean and range for each neuron
    neuron_means = all_data.mean(dim=0)  # Shape: (num_neurons,)
    neuron_std = all_data.std(dim=0)   # Shape: (num_neurons,)

    return neuron_means, neuron_std



def apply_soft_range_normalization(tensor_list, neuron_means, neuron_ranges_sqrt):
    """
    Apply soft range normalization using precomputed parameters.
    """
    normalized_tensors = []
    for tensor in tensor_list:
        # Subtract mean and divide by sqrt of range for each neuron
        normalized_tensor = (tensor - neuron_means) / neuron_ranges_sqrt
        normalized_tensors.append(normalized_tensor)
    return normalized_tensors



def apply_zscore_normalization(tensor_list, neuron_means, neuron_std):
    """
    Apply soft range normalization using precomputed parameters.
    """
    normalized_tensors = []
    for tensor in tensor_list:
        # Subtract mean and divide by sqrt of range for each neuron
        normalized_tensor = (tensor - neuron_means) / neuron_std
        normalized_tensors.append(normalized_tensor)
    return normalized_tensors


Prepare behaviora and neural data

In [None]:
cfgparams={}
cfgparams['event']='zero' # 'zero
cfgparams['keepamount']=40
cfgparams['timewarp']={}
cfgparams['prewin']=14
cfgparams['behavewin']=15
cfgparams['timewarp']['dowarp']=True
cfgparams['timewarp']['warpN']=cfgparams['prewin']+cfgparams['behavewin']+1
cfgparams['timewarp']['originalTimes']=np.arange(1,cfgparams['timewarp']['warpN']+1)
cfgparams['percent_train']=0.9
cfgparams['smoothing']=120

vars_sess_H_pmd, psth_sess_H_pmd, Xd_sess_H_pmd, outputs_sess_H_pmd,cfg_H_pmd=procdata.get_all_vars_nhp(subj='H', area='dACC', sessions=np.arange(1,6),prewin=cfgparams['prewin'],behavewin=cfgparams['behavewin'])


Do by session

In [None]:
sess=2
#Get Wt and remove is bad
cfg={}
cfg['subj']='H'
cfg['session']=sess
cfg['wtype']='bma'
cfg['folder']='/Users/user/PycharmProjects/PacManMain/ChangeOfMind/results/NHP/'

#Get wt
wt=procdata.get_wt(cfg)

# Only save trials in which Wt was good
indices_to_keep = outputs_sess_H_pmd[sess]['is_good_index']
wt = [item for idx, item in enumerate(wt) if idx in indices_to_keep]

#Set inputs for RNN
N_neurons=psth_sess_H_pmd[sess][0].shape[1]
X_train=psth_sess_H_pmd[sess]
X_train = [df.astype(float) for df in X_train]

#making firing rates smoother
sigma = procdata._gauss_smooth_parameters(80)

X_train = [df.apply(lambda col: gaussian_filter1d(col, sigma=sigma), axis=0) for df in X_train]

#Convert to arrays
X_train = [np.array(df) for df in X_train]


# wt = [probabilities_to_logits_sigmoid(arr) for arr in wt]
wt = [torch.tensor(arr) for arr in wt]
X_train = [torch.tensor(arr) for arr in X_train]
# wt=replace_bad_logits(wt)



X_train, X_test, Y_train, Y_test=procdata.split_train_test(X_train, wt, test_size=0.2, seed=42)

neuron_means, neuron_ranges_sqrt=compute_soft_range_params(X_train)


X_train=apply_soft_range_normalization(X_train, neuron_means, neuron_ranges_sqrt)

X_test=apply_soft_range_normalization(X_test, neuron_means, neuron_ranges_sqrt)


mu,std = compute_zscore_params(Y_train)

# Y_train = apply_zscore_normalization(Y_train,mu,std)
# Y_test = apply_zscore_normalization(Y_test,mu,std)


Try LSTM/GRU, higher hidden, logit or softmac

In [None]:
dataset = io.VariableLengthDataset(X_train, Y_train)
dataloader = io.torchloader(dataset,batch_size=32)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
criterion = nn.BCELoss()
model=NNtorch.RNN_decoder(input_size=N_neurons,hidden_size=64,output_size=1,use_softmax=True,rnn_type="simpleRNN").to(device)

optimizer = optim.Adam(model.parameters(), lr=0.15)
training.train_masked_rnn(model,dataloader,criterion,optimizer,device,num_epochs=10,outcard=1)


In [None]:
from sklearn.metrics import r2_score

val_loss=[]
r2=[]
for i in range(len(X_test)):
    X_data=X_test[i]
    X_trial = torch.tensor(X_data)  # Your input data
    X_trial = X_trial.float()  # Converts tensor to torch.float32
    seq_length = torch.tensor([X_trial.shape[0]])
    
    X_trial = X_trial.unsqueeze(0)  # Shape: (1, seq_len, input_size)
    # Move data to the appropriate device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    X_trial = X_trial.to(device)
    seq_length = seq_length.to(device)
    # Set the model to evaluation mode
    model.eval()
    
    
    with torch.no_grad():
    # Run the model
        outputs = model(X_trial, seq_length)
        # Remove batch dimension if needed
        outputs = outputs.squeeze(0) 
    plt.plot(outputs)
    r2.append(r2_score(Y_test[i].numpy().flatten(), outputs.numpy().flatten()))
    val_loss.append(np.array(criterion(outputs,Y_test[i])))