In [2]:
import numpy as np
import math
import scipy as sp
from scipy import optimize
from numpy import genfromtxt
import pandas as pd 
#import pymc3 as pm
#import arviz as az
import statistics as stats
import matplotlib.pyplot as plt

In [3]:
# We import a data set to 1) use the formatting, and 2) use the values of the choice options. I didn't want to make up choice options for fear of departing from what we are attempting to simulate.
data = genfromtxt("Full Square\FAKE RANDOMIZED Full Subjective Value Table Full Square.csv", delimiter=',', dtype=str)
#Column titles: Trial Number	Stimulus Time	Response Time	SS amount	LL amount	SS delay	LL delay	Response	SS SV	LL SV	k	beta	ID	Day	Date	Time	LL	AIC	BIC	r2	correct percent
IDsUncut = np.array(data[1:,12])
DaysUncut = np.array(data[1:,13])
DatesUncut = np.array(data[1:,14])
TimesUncut = np.array(data[1:,15])
IdentifiersUncut = np.array([i + j + k + l for i, j, k, l in zip(IDsUncut, DaysUncut, DatesUncut, TimesUncut)])

SSAmountUncut = np.array(data[1:,3], dtype=float)
SSDelayUncut = np.array(data[1:,5], dtype=float)
LLAmountUncut = np.array(data[1:,4], dtype=float)
LLDelayUncut = np.array(data[1:,6], dtype=float)

# Use these lines if using the original k and b later.
#OGChoices = np.array(data[1:,7], dtype=int)
#OGBetas = np.array(data[1:,11], dtype=float)
#OGKappas = np.array(data[1:,10], dtype=float)

boolselector = (SSAmountUncut<1000)*(LLDelayUncut>0)

IDs = IDsUncut[boolselector]
Days = DaysUncut[boolselector]
Dates = DatesUncut[boolselector]
Times = TimesUncut[boolselector]
Identifiers = IdentifiersUncut[boolselector]
SSAmount = SSAmountUncut[boolselector]
SSDelay = SSDelayUncut[boolselector]
LLAmount = LLAmountUncut[boolselector]
LLDelay = LLDelayUncut[boolselector]
#OGChoices = OGChoices[boolselector]
#OGBetas = OGBetas[boolselector]
#OGKappas = OGKappas[boolselector]

groups = np.full(len(IDs),100000)
alreadycoded = []
indextocount = 0

for identifier in Identifiers:
    if identifier not in alreadycoded:
        groups[Identifiers==identifier] = indextocount
        indextocount = indextocount + 1
        alreadycoded.append(identifier)

groupsUncut = np.full(len(IDsUncut),100000)
alreadycodedUncut = []
indextocountUncut = 0

for identifierUncut in IdentifiersUncut:
    if identifierUncut not in alreadycodedUncut:
        groupsUncut[IdentifiersUncut==identifierUncut] = indextocountUncut
        indextocountUncut = indextocountUncut + 1
        alreadycodedUncut.append(identifierUncut)

Make K and B Distributions

In [6]:
#  *******Determine k and b by normal distribution*******
# Note: I'm actually doing a truncated normal distribution here, but at the time I didn't know that that was a thing. This code would be more elegant if it just used the numpy truncated normal option. I'm not using it anyway, though.

k_min_buffer = 0.0000001
b_min_buffer = 0.00001

k_mu = 0.057357942 - k_min_buffer #10^-6 seemed about as low as it gets, so this is one more decimal place for range
k_sigma = 0.134691313
b_mu = 0.012280441 - b_min_buffer #0.0002 was the lowest in the set
b_sigma = 0.010641449

# Keep these commented out if you're not using this method.
#ks = np.absolute(np.random.normal(k_mu, k_sigma, (np.size(np.unique(groups)))))+k_min_buffer #no k will be below buffer
#bs = np.absolute(np.random.normal(b_mu, b_sigma, (np.size(np.unique(groups)))))+b_min_buffer # /\ for b

#  *******Determine k and b by beta distribution*******

k_alpha = 0.113586413
k_beta = 1.866721959
b_alpha = 1.30312618
b_beta = 104.810832
#ks = np.random.beta(k_alpha, k_beta, (np.size(np.unique(groups))))
#bs = np.random.beta(b_alpha, b_beta, (np.size(np.unique(groups))))

#  *******Determine k and b by importing previously determine k and b vals*******
# This would let you directly assess BHM performance on the range of B and K provided by MLE. This method might acquire reasonable k/b values but is very generous toward MLE. You also have the trouble of simulating data on b/k values that weren't legitimate - sometimes MLE returns ludicrous values when it fails, and those are included here.

#ks = OGKappas[np.unique(Identifiers,return_index=True)[1]]
#bs = OGBetas[np.unique(Identifiers,return_index=True)[1]]

#  *******Determine k and b by tiling*******
# This is the main method I'm using. You could change the max and min of the tile. I'm not doing more than 121 participant sessions because the BHM improves as the number of data sets increases (at least it's supposed to). You could do a million sessions, but this would probably be unfair to MLE (and would take a billion years to run)
min_k = 0.00000114
max_k = 0.768891643
min_b = 0.002337451 #min among non-exploded models
max_b = 0.113426498

n = round((np.size(np.unique(groups))**0.5)+0.49) # = 11

lilks = np.arange(min_k, max_k, (max_k-min_k)/n)
lilbs = np.arange(min_b, max_b, (max_b-min_b)/n)
ks = []
bs = []

# This is a simple way to pair up the k and b values. I think there's actually a numpy function that does this directly.
for i in range(n):
    for ii in range(n):
        ks.append(lilks[i])
        bs.append(lilbs[ii])

ks = np.array(ks) #[:116] # Use the index here if you want to restrict to 116 trials. If you don't use an edited Full SV Table (ie if you use one that only has 116 sessions), you'll need to put the index or code below will fail.
bs = np.array(bs) #[:116]


Generate Choices

In [6]:
#(v**risk)/(1+kappa*d)
#p = 1 / (1 + math.exp(beta[0]*(SV_1-SV_2)))

# More details in the explanation document
choices = np.random.binomial(1,(1/(1 + np.exp(bs[groups]*((SSAmount/(1+ks[groups]*SSDelay))-(LLAmount/(1+ks[groups]*LLDelay)))))))

Generate Fake Choice Data for Bayesian Model

In [8]:
#here I'm using the "Uncut" arrays because I need this to match to the length in the randomized full sv table (and they need to match correctly numerically too). However the Bayesian Simpler program will re-remove the catch trials, so I'm testing on the same set anyway.

choices_for_table = np.random.binomial(1,(1/(1 + np.exp(bs[groupsUncut]*((SSAmountUncut/(1+ks[groupsUncut]*SSDelayUncut))-(LLAmountUncut/(1+ks[groupsUncut]*LLDelayUncut)))))))
pd.DataFrame(np.reshape(choices_for_table,[-1,1])).to_csv("Full Square\Fake choices column generated by tiling smaller range full square.csv", index=False, header=False)

# IMPORTANT:
# The column generated here has to be manually copy/pasted into the FAKE RANDOMIZED SV table to feed into the Bayesian code. You could make this code put it in automatically. 

Restore Previously Randomized Choices (optional)

In [35]:
choices_from_previously_generated = genfromtxt("Full Square\Fake choices column generated by tiling smaller range full square.csv", delimiter=',', dtype=str)
choices = choices_from_previously_generated[boolselector].astype(int)

Optimize with MLE

In [38]:
MLEOutput = np.array(["ID", "Day", "NegLL", "beta", "k", "realK", "realB"])

# Loops through sessions, runs MLE on each, returns a nice array.

for pick in range(np.size(np.unique(groups))):
    boolpick = groups==(np.unique(groups)[pick])
    Is = IDs[boolpick]
    Ds = Days[boolpick]
    Dts = Dates[boolpick]
    Ts = Times[boolpick]
    Idents = Identifiers[boolpick]
    SSA = SSAmount[boolpick]
    SSD = SSDelay[boolpick]
    LLA = LLAmount[boolpick]
    LLD = LLDelay[boolpick]
    Cs = choices[boolpick]  #to test against control: OGChoices
    realK = ks[pick]
    realB = bs[pick]

    negLL, beta, k = optimizer(SSA, SSD, LLA, LLD, Cs) #, v1, d1, v2, d2, risk
    
    row = np.array([Is[0],Ds[0], negLL, beta, k, realK, realB])
    MLEOutput = np.vstack((MLEOutput, row))

#pd.DataFrame(MLEOutput).to_csv("MLEOutput on OG.csv")
pd.DataFrame(MLEOutput).to_csv("Full Square\MLEOutput on Generated Smaller Range Full Square Restored Test Again.csv")

Compare (don't need to run this)

In [None]:
np.corrcoef(MLEOutput[1:,4].astype(float),ks)

Functions from MLE

In [37]:
# This is all mostly copied from the MLE code.

#naming conventions that I've kept for consistency with the original code - "v1" refers to SSAmount, and "v2" refers to LLAmount.
def optimizer(immediate_vals, immediate_times, delay_vals, delay_times, choices):
    guesses = [0.005, 0.01]
    bkbounds = ((0,8),(0.00000001,6.4))
    risk = 1
    v1 = immediate_vals.tolist()  
    d1 = immediate_times.tolist() 
    v2 = delay_vals.tolist() 
    d2 = delay_times.tolist()

    inputs = [choices,v1,d1,v2,d2,risk]

    results = sp.optimize.minimize(optimize_me,guesses,inputs, bounds = bkbounds, tol=None, callback = None, options={'maxiter':10000, 'disp': False})
    negLL = results.fun
    beta = results.x[0]
    k = results.x[1]
    return negLL, beta, k #, v1, d1, v2, d2, risk

def analysis(choices,v1,d1,v2,d2,risk,given_k,beta):
    # Changed from other program: added LLfromGiven right here
    given_beta = [beta,given_k]  #FYI: name conventions are wonky here. The pairing of b,k is referred to as "beta". We also call that stochasticity factor "beta". I think the latter is not quite proper, but I don't know.
    negLL = local_negLL(given_beta,choices,v1,d1,v2,d2,risk)
    
    # Unrestricted log-likelihood
    LL = -negLL

    # Restricted log-likelihood
    LL0 = np.sum((choices==1)*math.log(0.5) + (1-(choices==1))*math.log(0.5))

    # Akaike Information Criterion
    AIC = -2*LL + 2*2  #CHANGE TO len(results.x) IF USING A DIFFERENT MODEL (parameters != 2)

    # Bayesian information criterion
    BIC = -2*LL + 2*math.log(len(v1))  #len(results.x)

    #R squared
    r2 = 1 - LL/LL0

    #Percent accuracy
    k_for_accuracy = given_k
    beta_for_accuracy = [beta,k_for_accuracy]
    parray = np.array(choice_prob(v1,d1,v2,d2,beta_for_accuracy,risk))
    correct =sum((parray>=0.5)==choices)/len(v1)

    #print("k=",given_k)
    #print("LL",LL,"AIC",AIC,"BIC",BIC,"R2",r2,"correct",correct)
    return(LL,LL0,AIC,BIC,r2,correct)

def LLfromGiven(given_k, given_b, choices, v1, d1, v2, d2, risk):   #not currently in use
    given_beta = [given_b,given_k]
    negLL = local_negLL(given_beta,choices,v1,d1,v2,d2,risk)
    return negLL

def local_negLL(beta,choices_list,v1,d1,v2,d2,risk):
    
    #print("beta", beta)
    #print("risk", risk)

    ps = np.array(choice_prob(v1,d1,v2,d2,beta,risk))
    choices = np.array(choices_list)

    # Trap log(0)
    ps[ps==0] = 0.0001
    ps[ps==1] = 0.9999
    
    # Log-likelihood

    err = (choices==1)*np.log(ps) + ((choices==0))*np.log(1-ps)
    # Sum of -log-likelihood
    sumerr = -sum(err)
    return sumerr

def choice_prob(v1,d1,v2,d2,beta,risk):
    ps = []

    # NOTETHIS:THIS IS COMING IN AS A LIST (v1[n])
    #print(len(v1)) #list has no shape
    #print(v1)

    for n in range(len(v1)):
        #print(v1[n])
        SV_1 = discount(v1[n],d1[n],beta[1],risk)
        SV_2 = discount(v2[n],d2[n],beta[1],risk)
        try: 
            p = 1 / (1 + math.exp(beta[0]*(SV_1-SV_2)))
        except OverflowError:
            #print("beta:",beta[0],"k:",beta[1],"SV_1:",SV_1,"SV_2",SV_2,"imm val",v1[n],"imm delay",d1[n], "del val",v2[n],"del del",d2[n])
            p = 0
            #raise SystemExit(0)
            #break
        ps.append(p)
        
    return ps

def discount(v,d,kappa,risk):
    SV = (v**risk)/(1+kappa*d)
    return SV

def optimize_me(to_optimize, inputs):
    choices_list,v1,d1,v2,d2,risk = inputs
    return local_negLL(to_optimize,choices_list,v1,d1,v2,d2,risk)