In [None]:
# Script for reaction time models with within-trial baseline dynamics. Simulations, analytical solutions, parameter recovery, fitting methods
import numpy as np
import matplotlib.pyplot as plt
import math
from matplotlib.pyplot import cm
import warnings
from scipy.stats import entropy
from Scripts import behaviour
warnings.filterwarnings('ignore')
np.random.seed(0)
    
class eLATER:
    """
    Class for fitting eLATER model only (no baseline dynamics)
    """
    def __init__(self, name, time,
                 muR, sigR, muS, sigS, maxT):
        self.name = name
        self.time = time
        self.muR = muR # sensory integration mean rate
        self.sigR = sigR # sensory integration variance
        self.muS = muS
        self.sigS = sigS
        self.maxT = maxT # maximum time for each probability distribution

    def setDelayTimes(self, delays):
        """ Adds delay times for given trial set to the class (this is kinda stupid, remove function and do it in one line somewhere else.... well I guess this depends on if the delays are simulated or 
        provided from data. Just leave it for now and figure that out as it comes)

        Args:
            delays (ndarray): distribution of delay times to sample from
        """
        self.delayTimes = delays.astype(int)

    def extended_later(self, muR, muS, sR, sigma):
        """ Extended LATER model from Nakahara et al (2006) https://pubmed.ncbi.nlm.nih.gov/16971090/
        
        
        Takes class input for fixed time parameters, plus additional arguments for the mean and variance of the baseline and sensory integration

        Args:
            muR (float): mean rate of rise for sensory integration
            sR (float): variance of the rate of rise 

        Returns:
            pT (ndarray): array of 1xt where each point is the probability of a reaction time equalling that time point (sums to 1)
        """
        t = np.arange(1, self.maxT) #Need max time to normalize with other distributions
        a = muR / muS
        b = sR / muS
        c = sigma / sR
        
        e1 = (1/t - a) ** 2
        e2 = t ** 2 / (t ** 2 + c **2)
        e3 = -1 / (2 * b **2)
        pT = np.multiply(np.multiply(t + a * (c ** 2) / (t **2 + c ** 2) **(3/2), 1/((2 * math.pi)**(1/2)*b)), 
                        np.exp(e1 * e2 * e3))
        pT = pT / np.sum(pT)
        return pT
    
    def computeModel(self):
        pTrialOnset, pTargetOnset = [np.zeros((self.time.size, self.delayTimes.size)) for _ in range(2)] 
        if hasattr(self, 'delayTimes') == True:
            for i in range(self.delayTimes.size):
                pTargetOnset[0:self.maxT-1,i] = self.extended_later(self.muR, self.muS, self.sigR, self.sigS) #change self.sigma to self.ouVar[self.delayTimes[i]] for changing cov
                pTrialOnset[self.delayTimes[i]:self.delayTimes[i]+self.maxT,i] = pTargetOnset[0:self.maxT,i]
        else:  
            print("No delay distribution found. Please set via setDelayTimes first")   
        self.combined = pTrialOnset 

def fiteLATERModel(params, data):

    # Set delay time bins to fit data to
    minDelay = 750
    maxDelay = 1250
    delayBins = 10
    delays = np.linspace(minDelay, maxDelay, delayBins)

    # Set constants for model fitting
    shift = 0.5
    scale = 100
    trialBins = 100

    # Get Indices for delay bins
    delayData = dict()
    for i in range(delayBins):
        if i == delayBins-1:
            delayData[str(delays[i])] = data[(data['First Target Onset'] >= delays[i])] 
        else:
            delayData[str(delays[i])] = data[(data['First Target Onset'] >= delays[i]) & (data['First Target Onset'] < delays[i+1])]

    # Initialize model error
    model_error = 0 

    # Fit the model
    try:
        for delay in delays: # Loop model fit for same parameter set

            # Set up the model
            model = eLATER('Accumulator 1', time = np.arange(1,2000), muR = params[0], muS = params[1],
                           sigR = params[2], sigS = params[3], maxT = 700)
            model.setDelayTimes(np.linspace(delay, delay, 1)) # Set delay times equal to first delay bin
            model.computeModel() # Run Model

            # Align reaction times to delay bin
            data = delayData[str(delay)]['Reaction Time: First Target'] + delay
            data = data.to_numpy()
            data_pdf, _ = np.histogram(data, bins = trialBins, range =(750, 2000), density = True)
            
            # Sample data from model 
            model_sample = np.random.choice(1999, 10000, p = normalize_rt_pdfs(model.combined))
            model_pdf, _ = np.histogram(model_sample, bins = trialBins, range =(750, 2000), density = True)

            # Rescale model and data
            data_pdf = data_pdf + shift
            model_pdf = model_pdf + shift
            model_pdf = np.nan_to_num(model_pdf)
            data_pdf = np.nan_to_num(data_pdf)

            # Compute entropy between model and data, add to previous delay entropy
            model_error += entropy(model_pdf, data_pdf) * scale 
            
        return model_error #model error summed over all delay periods

    except:
        print('probabilities contain nan') #throw out bad models
        return np.inf

def normalize_rt_pdfs(pRT):
    pT = np.sum(pRT, 1) / np.shape(pRT)[1] 
    return pT / np.sum(pT) # Needs time dimension as axis [0] and probability density as axis [1]

def normalizeData(data):
    return (data - np.min(data)) / (np.max(data) - np.min(data))

def kl_divergence(p, q):
    p = p + .5
    q = q + .5
    return np.sum(np.where(p != 0, p * np.log(p / q), 0))

def combine_anticipatory_sensory(pTrialOnset, pAntic, k):
    pAntic = pAntic * k
    t = np.stack((normalize_rt_pdfs(pTrialOnset), pAntic), axis = 1)
    return normalize_rt_pdfs(t)



In [3]:
path = r'C:\Users\Brandon\Desktop\PhD\Baseline Dynamics\Baseline-Dynamics\Behavioural_Data'
subj_path = r'E:\Free Choice\Data\tDCS\tDCS\Final Data'
subj_keys = behaviour.get_immediate_subdirectories(subj_path)
polarity_keys = ['AN', 'CA']
exp_keys = ['PR','ST', 'PO']


In [8]:
## Fit Model to all Data
from scipy.optimize import differential_evolution
from skopt import dump

for subj in subj_keys:
    for polarity in polarity_keys:
        for exp in exp_keys:
            data = behaviour.combineBehaviour(path = path, all_key = False, subj_key= subj,
                                              polarity_key = polarity, exp_key = exp)
            bnds = ((1e-8, 1), (1e-5, 2), (1e-6, 1e-1), (0.0001, 1))
            modelFit = differential_evolution(fiteLATERModel, bounds = bnds, args=([data]))
            dump(modelFit, fr'C:\Users\Brandon\Desktop\PhD\Baseline Dynamics\Baseline-Dynamics\Model_Fits\eLATERfit_{subj}_{polarity}_{exp}')


probabilities contain nan


In [5]:
def plotBDIFit(modelFit, data, numBins, minDelay, maxDelay):
    """ Plotting function for returning pdf and cdf of model vs data. 
    Arguments follow previous conventions. 
    """

    # Compute recovered model
    recoveredModel = eLATER('Accumulator 1', time = np.arange(1,2000), muR = modelFit.x[0], muS = modelFit.x[1],
                           sigR = modelFit.x[2], sigS = modelFit.x[3], maxT = 700)
    
    # Return binned delay data for plotting
    delayidx = behaviour.binDelaydata(data, minDelay = minDelay, maxDelay = maxDelay, numBins = numBins)            
    delays= np.linspace(minDelay, maxDelay, numBins)
    
    # Set plotting colours
    _ , axs = plt.subplots(2, 2, figsize=(12,4))
    color = iter(cm.BuPu(np.linspace(0.1, 1, 10)))

    # Loop over delay bins and plot data overlayed with model
    for delay in delays[:-1]:
        data = delayidx[str(delay)]['Reaction Time: First Target'] + delay #Align to delay time 
        data = data.to_numpy()

        # Run model on single delay 
        recoveredModel.setDelayTimes(np.linspace(delay, delay, 1))
        recoveredModel.computeModel()

        # Plot results
        c = next(color)
        axs[0,0].plot(recoveredModel.combined, c = c)
        axs[0,0].hist(data, bins = 500, density = True, color = c)
        axs[0,0].set_title('PDF Model vs Data')
        axs[0,0].set_xlim([750, 2000])

        axs[1,0].plot(np.cumsum(recoveredModel.combined), c =  c)
        axs[1,0].set_title('CDF Model')
        axs[1,0].set_xlim([750, 2000])
    plt.savefig(rf"C:\Users\Brandon\Desktop\PhD\Baseline Dynamics\Baseline-Dynamics\Figures\example model", format='svg')

    plt.show()

In [6]:
plotBDIFit(modelFit, data, numBins = 10, minDelay = 750, maxDelay = 1250)

print(modelFit.fun)

NameError: name 'modelFit' is not defined

In [4]:
from skopt import load
import matplotlib.pyplot as plt
counter = 1
color = iter(cm.twilight(np.linspace(0.1, 1, 7)))
for subj in subj_keys:
    c = next(color)
    for polarity in polarity_keys:
        for exp in exp_keys:
            eLATER_Model = load(fr'C:\Users\Brandon\Desktop\PhD\Baseline Dynamics\Baseline-Dynamics\Model_Fits\eLATERfit_{subj}_{polarity}_{exp}')
            baseline_Model = load(fr'C:\Users\Brandon\Desktop\PhD\Baseline Dynamics\Baseline-Dynamics\Model_Fits\modelfit_{subj}_{polarity}_{exp}')
            plt.bar(counter, eLATER_Model.fun - baseline_Model.fun, color = c)
            counter = counter + 1


#plt.savefig(rf"C:\Users\Brandon\Desktop\PhD\Baseline Dynamics\Baseline-Dynamics\Figures\eLATER vs Baseline Model", format='svg')


ModuleNotFoundError: No module named 'scipy.optimize._optimize'

In [None]:
from skopt import load
eLATER_Model = load(fr'C:\Users\Brandon\Desktop\PhD\Baseline Dynamics\Baseline-Dynamics\Model_Fits\eLATERfit_{subj}_{polarity}_{exp}')
baseline_Model = load(fr'C:\Users\Brandon\Desktop\PhD\Baseline Dynamics\Baseline-Dynamics\Model_Fits\modelfit_{subj}_{polarity}_{exp}')


ModuleNotFoundError: No module named 'scipy.optimize._optimize'