In [2]:
import pyddm.plot
from pyddm import Model, set_N_cpus
from pyddm.models import DriftConstant, DriftLinear, NoiseConstant,Bound,Drift, BoundConstant, OverlayNonDecision, ICPointSourceCenter,BoundCollapsingLinear
from pyddm.functions import fit_adjust_model, display_model

from pyddm import Fittable, Fitted, Sample
from pyddm.models import LossRobustBIC, LossBIC, LossLikelihood, LossRobustLikelihood,LossSquaredError
from pyddm.functions import fit_adjust_model

import copy

%matplotlib inline

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
plt.rc('font',family='Arial')
import os
import seaborn as sns
import scipy
from statannot import add_stat_annotation
import glob

from sbi import utils
import scipy.stats as stats

In [None]:
class DriftConstantSpeedAcc(Drift):
    name = "DriftConstantSpeedAcc"
    required_parameters = ["Vspeed","Vacc"]
    required_conditions = ['Cond']
    def get_drift(self, x, t, conditions, **kwargs):
        if conditions['Cond'] == 'Speed':
            return self.Vspeed
        elif conditions['Cond'] == 'Accuracy':
            return self.Vacc
        else: 
            print("Non-implemented SAT conditions")

class DriftBothSpeedAcc(Drift):
    name = "DriftBothSpeedAcc"
    required_parameters = ["Vspeed","Vacc", "Kspeed", "Kacc"]
    required_conditions = ['Cond']
        
    def get_drift(self, x, t, conditions, **kwargs):
        if conditions['Cond'] == 'Speed':
            return self.Vspeed + x*self.Kspeed 
        elif conditions['Cond'] == 'Accuracy':
            return self.Vacc + x*self.Kacc
        else: 
            print("Non-implemented SAT conditions")

class DriftMixSpeedAcc(Drift):
    name = "DriftMixSpeedAcc"
    required_parameters = ["Vspeed","Vacc", "K"]
    required_conditions = ['Cond']
        
    def get_drift(self, x, t, conditions, **kwargs):
        if conditions['Cond'] == 'Speed':
            return self.Vspeed + x*self.K 
        elif conditions['Cond'] == 'Accuracy':
            return self.Vacc + x*self.K
        else: 
            print("Non-implemented SAT conditions")
            
class DriftKSpeedAcc(Drift):
    name = "DriftMixSpeedAcc"
    required_parameters = ["V", "Kspeed","Kacc"]
    required_conditions = ['Cond']
        
    def get_drift(self, x, t, conditions, **kwargs):
        if conditions['Cond'] == 'Speed':
            return self.V + x*self.Kspeed
        elif conditions['Cond'] == 'Accuracy':
            return self.V + x*self.Kacc
        else: 
            print("Non-implemented SAT conditions")

In [None]:
class BoundSpeedAcc(Bound):
    name = "BoundSpeedAcc"
    required_parameters = ["Bspeed", "Bacc"]
    required_conditions = ['Cond']
    def get_bound(self, conditions, *args, **kwargs):
        assert self.Bacc > 0
        assert self.Bspeed > 0
        if conditions['Cond'] == 'Speed':
            return self.Bspeed
        elif conditions['Cond'] == 'Accuracy':
            return self.Bacc
        else:
            print("Non-implemented SAT conditions")
            
class BoundCollapsingWeibull(Bound):
    name = "Weibull CDF collapsing bounds"
    required_parameters = ["aSpeed", "aprimeSpeed", "lamSpeed", "kSpeed", "aAcc", "aprimeAcc", "lamAcc", "kAcc"]
    required_conditions = ['Cond']
    def get_bound(self, t,conditions, **kwargs):
        if conditions['Cond'] == 'Speed':
            l = self.lamSpeed
            a = self.aSpeed
            aprime = self.aprimeSpeed
            k = self.kSpeed
            
            return a - (1 - np.exp(-(t/l)**k)) * (a - aprime)
        elif conditions['Cond'] == 'Accuracy':
            l = self.lamAcc
            a = self.aAcc
            aprime = self.aprimeAcc
            k = self.kAcc    
            return a - (1 - np.exp(-(t/l)**k)) * (a - aprime)
        else: 
            print("Non-implemented SAT conditions")
            
class BoundCollapsingLinearSAT(BoundCollapsingLinear):
    name = "BoundSpeedAcc"
    required_parameters = ["BSpeed", "BAcc", "cSpeed", 'cAcc']
    required_conditions = ['Cond']
        
    def get_bound(self, t, conditions, **kwargs):
        if conditions['Cond'] == 'Speed':
            return max(self.BSpeed - self.cSpeed*t, 0.)
        elif conditions['Cond'] == 'Accuracy':
            return max(self.BAcc - self.cAcc*t, 0.)
        else: 
            print("Non-implemented SAT conditions")

In [None]:
class NondecisionSpeedAcc(OverlayNonDecision):
    name = "NondecisionSpeedAcc"
    required_parameters = ["tSpeed", 'tAcc']
    required_conditions = ['Cond']
        
    def get_nondecision_time(self, conditions, **kwargs):
        if conditions['Cond'] == 'Speed':
            return self.tSpeed 
        elif conditions['Cond'] == 'Accuracy':
            return self.tAcc
        else: 
            print("Non-implemented SAT conditions")

In [None]:
def get_DDM_result_sub_2v2a2ter(samp, v = [0,6], a = [0.5,6], ter=[0.1,1], dx=.01, dt=.01, T_dur=5):
    
    model_fit = Model(name='Simple model (fitted)',
                      drift=DriftConstantSpeedAcc(Vspeed = Fittable(minval=v[0], maxval=v[1]),
                                                  Vacc = Fittable(minval=v[0], maxval=v[1])), 
                      noise=NoiseConstant(noise=1),
                      bound=BoundSpeedAcc(
                          Bspeed=Fittable(minval=a[0], maxval=a[1]),
                          Bacc=Fittable(minval=a[0], maxval=a[1])),
                      overlay=NondecisionSpeedAcc(
                          tSpeed=Fittable(minval=ter[0], maxval=ter[1]),
                          tAcc=Fittable(minval=ter[0], maxval=ter[1]),
                      ),dx=.01, dt=dt, T_dur=T_dur)
    fit_adjust_model(samp, model_fit,fitting_method="differential_evolution",lossfunction=LossRobustBIC, verbose=False)
    df = pd.DataFrame(data = np.array(model_fit.get_model_parameters()).reshape(1,6),
                      columns = model_fit.get_model_parameter_names())
    
    df['BIC'] = model_fit.get_fit_result().value()
    df = df.rename(columns={'Vspeed': 'Drift Rate Speed',
                            'Vacc': 'Drift Rate Accuracy',
                            'Bspeed': 'Boundary Separation Speed',
                            'Bacc': 'Boundary Separation Accuracy',
                            'tSpeed': 'Non-decision Time Speed',
                            'tAcc': 'Non-decision Time Accuracy'})
    return df

In [None]:
def get_DDM_result_sub_2v2a2c2ter(samp, v = [0,6], a = [0.5,6], c = [0,4], ter=[0.1,1], dx=.01, dt=.01, T_dur=5):
    
    model_fit = Model(name='Simple model (fitted)',
                      drift=DriftConstantSpeedAcc(Vspeed = Fittable(minval=v[0], maxval=v[1]),
                                                  Vacc = Fittable(minval=v[0], maxval=v[1])), 
                      noise=NoiseConstant(noise=1),
                      bound=BoundCollapsingLinearSAT(BSpeed=  Fittable(minval=a[0], maxval=a[1]),
                                                     BAcc =   Fittable(minval=a[0], maxval=a[1]),
                                                     cSpeed = Fittable(minval=c[0], maxval=c[1]),
                                                     cAcc =   Fittable(minval=c[0], maxval=c[1])),
                      overlay=NondecisionSpeedAcc(
                          tSpeed=Fittable(minval=ter[0], maxval=ter[1]),
                          tAcc=Fittable(minval=ter[0], maxval=ter[1]),
                      ),dx=.01, dt=dt, T_dur=T_dur)
    fit_adjust_model(samp, model_fit,fitting_method="differential_evolution",lossfunction=LossRobustBIC, verbose=False)
    df = pd.DataFrame(data = np.array(model_fit.get_model_parameters()).reshape(1,8),
                      columns = model_fit.get_model_parameter_names())
    
    df['BIC'] = model_fit.get_fit_result().value()
    df = df.rename(columns={'Vspeed': 'Drift Rate Speed',
                            'Vacc': 'Drift Rate Accuracy',
                            'Bspeed': 'Boundary Separation Speed',
                            'Bacc': 'Boundary Separation Accuracy',
                            'cSpeed': 'Collapse Speed',
                            'cAcc':   'Collapse Accuracy',
                            'tSpeed': 'Non-decision Time Speed',
                            'tAcc': 'Non-decision Time Accuracy'})
    return df

In [None]:
def get_DDM_result_sub_2a2ter(samp, v = [0,6], a = [0.5,6], ter=[0.1,1], dx=.01, dt=.01, T_dur=5):
    
    model_fit = Model(name='Simple model (fitted)',
                      drift=DriftConstant(drift = Fittable(minval=v[0], maxval=v[1])), 
                      noise=NoiseConstant(noise=1),
                      bound=BoundSpeedAcc(
                          Bspeed=Fittable(minval=a[0], maxval=a[1]),
                          Bacc=Fittable(minval=a[0], maxval=a[1])),
                      overlay=NondecisionSpeedAcc(
                          tSpeed=Fittable(minval=ter[0], maxval=ter[1]),
                          tAcc=Fittable(minval=ter[0], maxval=ter[1]),
                      ),dx=.01, dt=dt, T_dur=T_dur)
    fit_adjust_model(samp, model_fit,fitting_method="differential_evolution",lossfunction=LossRobustBIC, verbose=False)
    df = pd.DataFrame(data = np.array(model_fit.get_model_parameters()).reshape(1,5),
                      columns = model_fit.get_model_parameter_names())
    
    df['BIC'] = model_fit.get_fit_result().value()
    df = df.rename(columns={'drift': 'Drift Rate',
                            'Bspeed': 'Boundary Separation Speed',
                            'Bacc': 'Boundary Separation Accuracy',
                            'tSpeed': 'Non-decision Time Speed',
                            'tAcc': 'Non-decision Time Accuracy'})
    return df

In [None]:
def get_OUM_result_sub_2v2k2ter(samp, v = [0,6], a = [0.5,6], k = [-15,15], ter=[0.1,1], T_dur=5, dt=.01):
    
    model_fit = Model(name='Simple model (fitted)',
                      drift=DriftBothSpeedAcc(Vspeed = Fittable(minval=v[0], maxval=v[1]),
                                              Vacc = Fittable(minval=v[0], maxval=v[1]),
                                              Kspeed=Fittable(minval=k[0], maxval=k[1]),
                                              Kacc=Fittable(minval=k[0], maxval=k[1])), 
                      noise=NoiseConstant(noise=1),
                      bound=BoundConstant(B=Fittable(minval=a[0], maxval=a[1])),                      
                      overlay=NondecisionSpeedAcc(
                          tSpeed=Fittable(minval=ter[0], maxval=ter[1]),
                          tAcc=Fittable(minval=ter[0], maxval=ter[1]),

                      ),
                      dx=.01, dt=dt, T_dur=T_dur)
    #set_N_cpus(4)
    fit_adjust_model(samp, model_fit,
                     fitting_method="differential_evolution",
                     lossfunction=LossRobustBIC, verbose=False)
    #print(model_fit.get_model_parameters())
    df = pd.DataFrame(data = np.array(model_fit.get_model_parameters()).reshape(1,7),
                      columns = model_fit.get_model_parameter_names())
    
    #df['Participant']=i+1
    #df['SAT'] = SAT
    df['BIC'] = model_fit.get_fit_result().value()
    df = df.rename(columns={'Vspeed': 'Drift Rate Speed',
                            'Vacc': 'Drift Rate Accuracy',
                            'B': 'Boundary Separation',
                            'Kspeed': "Self-excitation Speed",
                            'Kacc': "Self-excitation Accuracy",
                            'tSpeed': 'Non-decision Time Speed',
                            'tAcc': 'Non-decision Time Accuracy'})
    return df

In [1]:
def get_OUM_result_sub_2a2ter(samp, v = [0,6], a = [0.5,6], k = [-15,15], ter=[0.1,1], T_dur=5, dt=.01):
    
    model_fit = Model(name='Simple model (fitted)',
                      drift=DriftLinear(drift = Fittable(minval=v[0], maxval=v[1]),
                                             x=Fittable(minval=k[0], maxval=k[1]),
                                             t=0), 

                      noise=NoiseConstant(noise=1),
                      bound=BoundSpeedAcc(Bspeed=Fittable(minval=a[0], maxval=a[1]),
                                          Bacc=Fittable(minval=a[0], maxval=a[1])),  
                      overlay=NondecisionSpeedAcc(
                          tSpeed=Fittable(minval=ter[0], maxval=ter[1]),
                          tAcc=Fittable(minval=ter[0], maxval=ter[1]),

                      ),
                      dx=.01, dt=dt, T_dur=T_dur)
    #set_N_cpus(4)
    fit_adjust_model(samp, model_fit,
                     fitting_method="differential_evolution",
                     lossfunction=LossRobustBIC, verbose=False)
    #print(model_fit.get_model_parameters())
    df = pd.DataFrame(data = np.array(model_fit.get_model_parameters()).reshape(1,6),
                      columns = model_fit.get_model_parameter_names())
    
    #df['Participant']=i+1
    #df['SAT'] = SAT
    df['BIC'] = model_fit.get_fit_result().value()
    df = df.rename(columns={'drift': 'Drift Rate',
                            'Bspeed': 'Boundary Separation Speed',
                            'Bacc': 'Boundary Separation Accuracy',
                            'x': "Self-excitation",
                            'tSpeed': 'Non-decision Time Speed',
                            'tAcc': 'Non-decision Time Accuracy'})
    return df

In [None]:
def get_OUM_result_sub_2k2ter(samp, v = [0,6], a = [0.5,6], k = [-15,15], ter=[0.1,1], T_dur=5, dt=.01):
    
    model_fit = Model(name='Simple model (fitted)',
                      drift=DriftKSpeedAcc(V = Fittable(minval=v[0], maxval=v[1]),
                                            Kspeed=Fittable(minval=k[0], maxval=k[1]),
                                            Kacc=Fittable(minval=k[0], maxval=k[1])), 
                      noise=NoiseConstant(noise=1),
                      bound=BoundConstant(B=Fittable(minval=a[0], maxval=a[1])),                      
                      overlay=NondecisionSpeedAcc(
                          tSpeed=Fittable(minval=ter[0], maxval=ter[1]),
                          tAcc=Fittable(minval=ter[0], maxval=ter[1]),

                      ),
                      dx=.01, dt=dt, T_dur=T_dur)
    #set_N_cpus(4)
    fit_adjust_model(samp, model_fit,
                     fitting_method="differential_evolution",
                     lossfunction=LossRobustBIC, verbose=False)
    #print(model_fit.get_model_parameters())
    df = pd.DataFrame(data = np.array(model_fit.get_model_parameters()).reshape(1,6),
                      columns = model_fit.get_model_parameter_names())
    
    #df['Participant']=i+1
    #df['SAT'] = SAT
    df['BIC'] = model_fit.get_fit_result().value()
    df = df.rename(columns={'V': 'Drift Rate',
                            'B': 'Boundary Separation',
                            'Kspeed': "Self-excitation Speed",
                            'Kacc': "Self-excitation Accuracy",
                            'tSpeed': 'Non-decision Time Speed',
                            'tAcc': 'Non-decision Time Accuracy'})
    return df

In [None]:
def get_OUM_result_sub_2v2a2ter(samp, v = [0,6], a = [0.5,6], k = [-15,15], ter=[0.1,1], T_dur=5, dt=.01):
    
    model_fit = Model(name='Simple model (fitted)',
                      drift=DriftMixSpeedAcc(Vspeed = Fittable(minval=v[0], maxval=v[1]),
                                             Vacc = Fittable(minval=v[0], maxval=v[1]),
                                             K = Fittable(minval=k[0], maxval=k[1])), 
                      noise=NoiseConstant(noise=1),
                      bound=BoundSpeedAcc(
                          Bspeed=Fittable(minval=a[0], maxval=a[1]),
                          Bacc=Fittable(minval=a[0], maxval=a[1])),
                      
                      overlay=NondecisionSpeedAcc(
                          tSpeed=Fittable(minval=ter[0], maxval=ter[1]),
                          tAcc=Fittable(minval=ter[0], maxval=ter[1])),
                      dx=.01, dt=dt, T_dur=T_dur)
    fit_adjust_model(samp, model_fit,
                     fitting_method="differential_evolution",
                     lossfunction=LossRobustBIC, verbose=False)
    df = pd.DataFrame(data = np.array(model_fit.get_model_parameters()).reshape(1,7),
                      columns = model_fit.get_model_parameter_names())
    

    df['BIC'] = model_fit.get_fit_result().value()
    df = df.rename(columns={'Vspeed': 'Drift Rate Speed',
                            'Vacc': 'Drift Rate Accuracy',
                            'Bspeed': 'Boundary Separation Speed',
                            'Bacc': 'Boundary Separation Accuracy',
                            'K': "Self-excitation",
                            'tSpeed': 'Non-decision Time Speed',
                            'tAcc': 'Non-decision Time Accuracy'})
    return df

In [None]:
def get_OUM_result_sub_2v2a2k2ter(samp, v = [0,6], a = [0.5,6], k = [-15,15], ter=[0.1,1], T_dur=5, dt=.01):
    
    model_fit = Model(name='Simple model (fitted)',
                      drift=DriftBothSpeedAcc(Vspeed = Fittable(minval=v[0], maxval=v[1]),
                                              Vacc = Fittable(minval=v[0], maxval=v[1]),
                                              Kspeed=Fittable(minval=k[0], maxval=k[1]),
                                              Kacc=Fittable(minval=k[0], maxval=k[1])), 
                      noise=NoiseConstant(noise=1),
                      bound=BoundSpeedAcc(
                          Bspeed=Fittable(minval=a[0], maxval=a[1]),
                          Bacc=Fittable(minval=a[0], maxval=a[1])),
                      overlay=NondecisionSpeedAcc(
                          tSpeed=Fittable(minval=ter[0], maxval=ter[1]),
                          tAcc=Fittable(minval=ter[0], maxval=ter[1]),

                      ),
                      dx=.01, dt=dt, T_dur=T_dur)
    fit_adjust_model(samp, model_fit,
                     fitting_method="differential_evolution",
                     lossfunction=LossRobustBIC, verbose=False)
    df = pd.DataFrame(data = np.array(model_fit.get_model_parameters()).reshape(1,8),
                      columns = model_fit.get_model_parameter_names())
    
    df['BIC'] = model_fit.get_fit_result().value()
    df = df.rename(columns={'Vspeed': 'Drift Rate Speed',
                            'Vacc': 'Drift Rate Accuracy',
                            'Bspeed': 'Boundary Separation Speed',
                            'Bacc': 'Boundary Separation Accuracy',
                            'Kspeed': "Self-excitation Speed",
                            'Kacc': "Self-excitation Accuracy",
                            'tSpeed': 'Non-decision Time Speed',
                            'tAcc': 'Non-decision Time Accuracy'})
    return df

In [None]:
def get_DDM_result_sub_2v2a2Weilbull2ter(samp, v = [0,6], a = [0.5,6], c = [0,4], ter=[0.1,1],dt=.01, T_dur=5):

    model_fit = Model(name='Simple model (fitted)',
                      drift=DriftConstantSpeedAcc(Vspeed = Fittable(minval=v[0], maxval=v[1]),
                                                  Vacc = Fittable(minval=v[0], maxval=v[1])), 
                      noise=NoiseConstant(noise=1),
                      bound=BoundCollapsingWeibull(aSpeed=Fittable(minval=1, maxval=4),
                                                   aprimeSpeed=Fittable(minval=0, maxval=2),
                                                   lamSpeed=Fittable(minval=0.01, maxval=2),
                                                   kSpeed=Fittable(minval=0, maxval=5),
                                                   aAcc=Fittable(minval=1, maxval=4),
                                                   aprimeAcc=Fittable(minval=0, maxval=2),
                                                   lamAcc=Fittable(minval=0.01, maxval=2),
                                                   kAcc=Fittable(minval=0, maxval=5)),
                      overlay=NondecisionSpeedAcc(tSpeed=Fittable(minval=ter[0], maxval=ter[1]),
                                                  tAcc=Fittable(minval=ter[0], maxval=ter[1])),
                      dx=0.01, dt=dt, T_dur=T_dur)
    fit_adjust_model(samp, model_fit,
                     fitting_method="differential_evolution",
                     lossfunction=LossRobustBIC, verbose=False)
    df = pd.DataFrame(data = np.array(model_fit.get_model_parameters()).reshape(1,12),
                      columns = model_fit.get_model_parameter_names())
    
    df['BIC'] = model_fit.get_fit_result().value()
    df = df.rename(columns={'Vspeed': 'Drift Rate Speed',
                            'Vacc': 'Drift Rate Accuracy',
                            'aSpeed': 'Boundary Separation Speed',
                            'aAcc':   'Boundary Separation Accuracy',
                            'tSpeed': 'Non-decision Time Speed',
                            'tAcc': 'Non-decision Time Accuracy'})
    return df

In [None]:
def sim_fit_plot_all(par, par_fitted, par_labels, par_fitted_labels, fontsize, s = 4, figsize=(10,10)):

    param_num = par.shape[1]
    param_fitted_num = par_fitted.shape[1]

    fig, axes = plt.subplots(param_fitted_num, param_num, figsize=figsize)

    for i in range(param_num):
        for j in range(param_fitted_num):
            axes[j,i].scatter(par[:,i], par_fitted[:,j], s = s, facecolors='none', edgecolors='k')
            #axes[j,i].locator_params(axis='both', nbins=4)
            if j!=(param_fitted_num-1):
                axes[j,i].set_xticks([])
            if i!=0:
                axes[j,i].set_yticks([])

    for m in range(param_fitted_num):
        axes[m,0].set_ylabel(par_fitted_labels[m],fontsize=fontsize)#, rotation=0)

    for n in range(param_num):
        axes[(param_fitted_num-1),n].set_xlabel(par_labels[n], fontsize=fontsize)

    fig.align_ylabels(axes[:, 0])
    return fig, axes
