In [1]:
import csv
import torch
import numpy as np

from functions_simulator import *
import torch
from sbi.utils import BoxUniform
from sbi.inference import SNPE_A

Retrieve answer times of every participant

In [None]:
nb_obj_max3 = 200
nb_obj_max2 = 150
nb_obj_max = 100

part_middle_school = ['416','439','443','448','455','458','459','461','463','465','478','483','301', '302', '303', '304','307', '308', '309', '310', '311','312','314','316','315','722','737','738','739','740','741','749','753','759','765','769','772','773','774','777','780','783','784','787','790']
part_university = ['339','355','405','406','363', '364', '365', '366', '367', '368', '369', '372', '375', '376', '377', '380', '381','385', '386', '388', '390', '392', '396', '397', '400', '404','408', '695','668','666','659','655','653','652','643','640','550','551','554','555','561','563','564','565','577','580','583','586','587','588', '597','601','605','608']
participants = part_middle_school + part_university #Participants for which we run the SBI method

#Retrive the data of the participants

nb_obj_part = []

nb_part = len(participants)
times_participants = torch.zeros((nb_part,nb_obj_max3))

for i in range(nb_part):
    times_str = []
    part = participants[i]
    with open(f'{part}.csv', 'r') as file:
        reader = csv.reader(file, delimiter=',')
        for row in reader:
            if row[4] == 'timeout':
                times_str.append('5')
            else:
                times_str.append(row[6])

    nb = len(times_str)
    times_list = [float(times_str[i]) for i in range(1,nb)]
    n = len(times_list)
    if n <= nb_obj_max:
        nb_obj_part.append(nb_obj_max)
    elif n <= nb_obj_max2:
        nb_obj_part.append(nb_obj_max2)
    elif n <= nb_obj_max3:
        nb_obj_part.append(nb_obj_max3)
    else : print('The number of answers is too large')

    times_0 = torch.zeros(nb_obj_max3)
    times_0[:n] = torch.tensor(times_list)
    times_0[n:] += np.sum(np.array(times_0[n-18:n]))/18
    times_participants[i,:] = times_0

Define the simulator function used by the SBI method

In [21]:
def simulator_sbi(theta, nb_obj):
    num_sims = theta.shape[0] #number of simulations
    res = []
    if num_sims == 2:
        cond = True #becomes false if the learning task is achieved
        count = 0
        while cond:
            eta = theta.numpy()[0]
            s = theta.numpy()[1]
            nb_iter, times, accuracy = simulator_vectorized(eta, s, nb_obj_max=nb_obj)
            #print(nb_iter)
            if nb_iter < nb_obj_max + 1 :
                cond = False
            if count > 100:
                print('count too big')
                cond = False
            count +=1
        res.append(times)
        #print(count)
    else:
        for n in range(num_sims):
            cond = True #becomes false if the learning task is achieved
            count = 0
            while cond:
                eta = theta.numpy()[n,0]
                s = theta.numpy()[n,1]
                nb_iter, times, accuracy = simulator_vectorized(eta, s, nb_obj_max=nb_obj)
                if nb_iter < nb_obj_max + 1 :
                    cond = False
                if count > 100:
                    print('count too big')
                    cond = False
                count +=1
            res.append(times)
    res = np.array(res)
    return torch.FloatTensor(res)

Run the SBI method

In [None]:
num_dims = 2 #s and eta
num_sims = 1000

#Define a prior on the parameters
eta_inf = 0.09
eta_sup = 2
s_inf = 0.04
s_sup = 0.2

prior = BoxUniform(low=torch.tensor([eta_inf,s_inf]), high=torch.tensor([eta_sup,s_sup]))

num_rounds = 2
for i in range(nb_part):
    part = participants[i]
    nb_obj = nb_obj_part[i]
    print(part)
    x_0 = times_participants[i,:nb_obj]
    inference = SNPE_A(prior)
    proposal = prior
    for _ in range(num_rounds):
        theta = proposal.sample((num_sims,))
        x = simulator_sbi(theta, nb_obj)
        _ = inference.append_simulations(theta, x, proposal=proposal).train()
        posterior = inference.build_posterior().set_default_x(x_0)
        proposal = posterior

    torch.save(posterior, f'/Users/sophiejaffard/Desktop/Expé/old code discrete time/saves_sbi/posterior_new_{eta_inf}_{eta_sup}_{s_inf}_{s_sup}_{part}.pt')
    sample = posterior.sample((5000,))
    torch.save(sample, f'/Users/sophiejaffard/Desktop/Expé/old code discrete time/saves_sbi/sample_new_{eta_inf}_{eta_sup}_{s_inf}_{s_sup}_{part}_{part}.pt')