In [None]:
# Load all requirements, packages
import scipy.io
import seaborn as sns
import numpy as np
import ddm.plot # needs to be imported before matplotlib
import matplotlib.pyplot as plt
import pandas as pd
import os, sys 
import ddm.models
from itertools import product
from ddm import Sample, Model, Fittable, Solution, Fitted, InitialCondition, Drift
from ddm.functions import fit_adjust_model, display_model, fit_model, dependence_hit_boundary
from ddm.models import DriftConstant, NoiseConstant,  BoundCollapsingExponential, LossRobustBIC, OverlayNonDecision,  ICRange
from scipy import stats
from scipy.stats import zscore
from scipy.stats import ttest_ind
from ddm.fitresult import FitResult, FitResultEmpty

In [None]:
# load behavioral data as pandas dataframe
df = pd.read_csv("path/to/your/datafile") # format should be the same as for hddm
# Clean from Invalid Reaction Times 
data=df.loc[~df['rt'].isnull()]
# remove trials with RTs faster than 0.2 seconds
data = data[~(data['rt'] < 0.2)]  
#data['rt'] = data[data["rt"] > 1.65] # Remove trials greater than 1650ms
data['z_rt'] = stats.zscore(data['rt'])
# keep only trials within +3 to -3 standard deviations
data = data[np.abs(data['z_rt']) < 3]
#data = data[np.abs(data['rt']-data['rt'].mean()) <= (3*data['rt'].std())]

# create a lst with unique sub ids
SubjectList = np.unique(data['subj_idx'])
data

In [None]:
#check to make sure no nan values are left
data.isnull().sum()

In [None]:
# compute your own Drift Bias class
class DriftRespBias(Drift):
    name = "Drift bias"
    required_parameters = ["drift", "respbias"] # <-- Parameters we want to include in the model
    required_conditions = ["resp"] # <-- Task parameters ("conditions"). Should be the same name as in the sample.
    
    # We must always define the get_drift function, which is used to compute the instantaneous value of drift.
    def get_drift(self, conditions, **kwargs):
        # respbias is the fittable object (-1 to 1) multiplied with 1 or -1 dependent on "yes" or "no"
        resp_bias = self.respbias * (1 if conditions['resp'] == 1 else -1)
        #the fitted drift rate with the added resp_bias
        return self.drift + resp_bias


When fitting both the initial condition and the bound height, it can be preferable to express the initial condition as a proportion of the total distance between the bounds. This ensures that the initial condition will always stay within the bounds, preventing errors in fitting.


In [None]:
# implement your own starting point bias
class ICPointRespRatio(InitialCondition):
    name = "A response-biased starting point expressed as a proportion of the distance between the bounds."
    required_parameters = ["x0"]
    required_conditions = ["resp"]
    def get_IC(self, x, dx, conditions):
        x0 = self.x0/2 + .5 #rescale to between 0 and 1
        # resp bias > .5 for yes, resp bias < .5 for no response. 
        # positive bias for "yes" response conditions, negative for "no" 
        if not conditions['resp'] == 1:
            x0 = 1-x0
       
        shift_i = int((len(x)-1)*x0)
        assert shift_i >= 0 and shift_i < len(x), "Invalid initial conditions"
        pdf = np.zeros(len(x))
        pdf[shift_i] = 1. # Initial condition at x=x0*2*B.
        return pdf 

Analytic solutions are only possible in a select number of special cases; in particular, it works for simple DDM and for linearly collapsing bounds and arbitrary single-point initial conditions. (See Anderson (1960) for implementation details.) For most reasonably complex models, the method will fail. Check whether a solution is possible with has_analytic_solution()


In [None]:
def setup_model():
    """
    Function to setup model. 
    """
    model = Model(name='IC Bias model',
            #drift=DriftConstant(drift=1),#Fittable(minval=0, maxval=4)
            drift=DriftRespBias(drift=Fittable(minval=0, maxval=4),respbias=Fittable(minval=-1, maxval=1)),
            noise=NoiseConstant(noise=1),
            bound = BoundCollapsingExponential(B=Fittable(minval=0, maxval=6),tau=Fittable(minval=0.001,maxval=5)),
            #bound=BoundConstant(B=Fittable(minval=0, maxval=2)),
            IC=ICPointRespRatio(x0=Fittable(minval=-1, maxval=1)),
            #IC = ICPointResp(x0=0)
            overlay=OverlayNonDecision(nondectime=Fittable(minval=0, maxval=2)),
            dx=.01, dt=.01, T_dur=2.5)      
      
    return model

In [None]:
subjects = []
drift = []
driftbias = []
boundary = []
nondectime = []
ICbias = []
fit = []
loss = []
age = []
tau = []

    
for sub in SubjectList:
   
    sub_df = data[data['subj_idx'] == sub]
    if sub in [40, 45, 48, 56]:
        print(np.unique(sub_df['stim']))
        continue
    else:
        resp = np.unique(sub_df["response"][(sub_df["stim"] == 1) & (sub_df["accuracy"] == 1)])
        print(resp[0])
        sub_df['resp']=np.where(sub_df["response"] == resp[0],1,0)

    #Load Data
    sample = Sample.from_pandas_dataframe(sub_df, rt_column_name="rt", correct_column_name="accuracy")
    print("mean decision time", sample.mean_decision_time())
    print("condition names", sample.condition_names())
    print("probability of correct and error trials", sample.prob_correct(), sample.prob_error())
    print("probability of undecided trials", sample.prob_undecided())
    #print(sample.corr) # RT for correct trials
    #plt.plot(sample.corr)
    #plt.plot(sample.err)
    #plt.show()
   
    #print("correct + error component of the joint CDF")
    #plt.plot(sample.cdf_corr())
    #plt.plot(sample.cdf_err())
    #plt.show()
    
    
 
    #Set up and fit ddm model
    simple_model = setup_model()
    print(simple_model.has_analytical_solution()) #False if collapsing bounds, can't apply solve function to this model then
    print(simple_model.can_solve_cn()) # False if collapsing bounds, solving numerically with Crank-Nicolson won't work then
    
    
    fit_model = fit_adjust_model(sample=sample, model=simple_model, lossfunction=LossRobustBIC, verbose=False)
    print("Fitted", fit_model.get_fit_result())


    print(sub)
    #print(simple_model.get_model_parameters())
    #display_model(fit_model)

    
    age.append(np.unique(sub_df['age'])[0])
    subjects.append(np.unique(sub_df['subj_idx'])[0])
    drift.append(float(simple_model.get_model_parameters()[0]))
    driftbias.append(float(simple_model.get_model_parameters()[1])) 
    boundary.append(float(simple_model.get_model_parameters()[2]))
    tau.append(float(simple_model.get_model_parameters()[3])) 
    ICbias.append(float(simple_model.get_model_parameters()[4])) 
    nondectime.append(float(simple_model.get_model_parameters()[5]))
    fit.append(simple_model.fitresult.value())
    loss.append(simple_model.fitresult.loss)
   # s = fit_model.solve()
   # print("correct RTs prob", s.prob_correct()) 
   

    ddm.plot.plot_fit_diagnostics(model=fit_model, sample=sample)
    plt.title("Drift {}, driftbias {}, Bound {}, tau {}, \n startbias {}, nondectime {}, fit {}".format(float(simple_model.get_model_parameters()[0]),
                                                                                                 float(simple_model.get_model_parameters()[1]),
                                                                                                 float(simple_model.get_model_parameters()[2]),
                                                                                                 float(simple_model.get_model_parameters()[3]),
                                                                                                 float(simple_model.get_model_parameters()[4]),
                                                                                                 float(simple_model.get_model_parameters()[5]),
                                                                                                 simple_model.fitresult.value()), y=2)

    
       
    plt.savefig('path/where/you/want/to/save/figure/modelName_studyName_sub_{}.png'.format(sub), bbox_inches='tight')
    #plt.show()
    plt.close()
    #ddm.plot.model_gui_jupyter(model=fit_model, sample=sample)

In [None]:
# create dictonary from the lists of fitted model parameters
fit_dict= {'subjects': subjects, 'age': age, 'drift': drift, 'driftbias': driftbias, 'boundary': boundary, 'tau': tau, 'startbias': ICbias, 'nondec': nondectime, 'fit': fit, 'loss': loss}
# create dataframe from dictonary
fit_df = pd.DataFrame(fit_dict)


In [None]:
# save df 
fit_df.to_pickle('fitted_df_filename') 
fit_df

In [None]:
# read in saved df
df_10 = pd.read_pickle('fitted_df_filename')
print(df_10.describe())

In [None]:
def age_grouping(df):
    
    """
    function to divide one df into two based on age
    returns one df for OA and one for YA
    """
    ya_df = df.loc[df['age'] == 0]
    oa_df = df.loc[df['age'] == 1]
    print(ya_df.describe())
    print(oa_df.describe()) 
    #print(ya_df.head())
    #print(oa_df.head())
    return ya_df, oa_df

In [None]:
ya_df, oa_df = age_grouping(df_10)