In [8]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import scipy.stats as stats
from itertools import combinations
from math import prod, log
from sklearn.linear_model import LinearRegression, Lasso
import pymc as pm
import pymc.math as math
import pytensor.tensor as pt
import arviz as az
import graphviz
np.set_printoptions(suppress=True)

In [9]:
def to_block_prob(input_string):
    remove_elements = ['\n', '[', ']']
    for element in remove_elements:
        input_string =  input_string.replace(element, '')
    input_string = input_string.split()
    output_arr =  [float(string) for string in input_string]
    return np.array(output_arr)

def to_para_set(input_string):
    if type(input_string) == str:
        remove_elements = ['[', ']']
        for element in remove_elements:
            input_string =  input_string.replace(element, '')
        input_string = input_string.split()
        para_set =  np.array([float(string) for string in input_string])
    else:
        para_set = input_string
    feature_arr = []
    n_combine = [1,2,3,4,5,6]
    index = [0,1,2,3,4,5]

    for n in n_combine:
        combine = list(combinations(index, n))
        for current_combine in combine:
            feature_arr.append(prod(para_set[list(current_combine)]))
    return np.array(feature_arr)

def compute_curve(X_train, coef_matrix):
    ones_column = np.ones((X_train.shape[0], 1))
    X_train = np.append(X_train, 1)
    return (X_train @ coef_matrix.T).reshape((6,32))

In [10]:
search_grid = pd.read_csv('search grid 6_4.csv', header = None).values
X_train, y_train = [], []
for i in range(len(search_grid)):
    X_train.append(to_para_set(search_grid[i][1]))
    y_train.append(to_block_prob(search_grid[i][2]))
X_train = np.array(X_train)
y_train = np.array(y_train)

In [11]:
coef_matrix = np.array(())
for block in range(y_train.shape[1]):
    y_subtrain = y_train[:,block]
    model = LinearRegression()
    model.fit(X_train, y_subtrain)
    w = np.append(model.coef_,model.intercept_)
    coef_matrix = np.append(coef_matrix, w)
coef_matrix = coef_matrix.reshape(y_train.shape[1],w.shape[0])
coef_tensor_matrix = pt.as_tensor_variable(coef_matrix)

In [12]:
nSubject = 13
nCondition = 6
nTrial = 9600
nBlock = 32
nPara = 6
nTest = 1
trial_per_block = 50

learning_curve = np.array([])
df = pd.read_csv('Data_withfeedback.csv')
for iSubject in range(nSubject):
    sub_df = df[iSubject * nTrial : (iSubject + 1) * nTrial]
    correct = sub_df['Correct'].values
    contrast = sub_df['Contrast'].values
    congruent = sub_df['Congruent'].values
    li = np.intersect1d(np.where(contrast==0.10),np.where(congruent==0))
    mi = np.intersect1d(np.where(contrast==0.15),np.where(congruent==0))
    hi = np.intersect1d(np.where(contrast==0.23),np.where(congruent==0))
    lc = np.intersect1d(np.where(contrast==0.10),np.where(congruent==1))
    mc = np.intersect1d(np.where(contrast==0.15),np.where(congruent==1))
    hc = np.intersect1d(np.where(contrast==0.23),np.where(congruent==1))
    sub_learning_curve = correct[np.stack([li, mi, hi, lc, mc, hc])].reshape(nCondition,nBlock,trial_per_block).sum(axis=2)
    learning_curve = np.append(learning_curve, sub_learning_curve)
learning_curve = learning_curve.reshape(nSubject,nCondition,nBlock)

combine_arr = []
n_combine = [2,3,4,5,6]
index = [0,1,2,3,4,5]
for n in n_combine:
    combine = list(combinations(index, n))
    for current_combine in combine:
            combine_arr.append(list(current_combine))

In [28]:
HBM = pm.Model()
low = 1e-4
lower=[log(low), log(low), log(low),log(low), log(low), log(low)]
upper=[log(0.005), log(5), log(3), log(0.3), log(0.3), log(0.3)]

'''
lr_range = [0.0010,0.0015,0.0020,0.0025]
bias_range = [1.5,2.0,2.5,3.0]
gamma_range = [0.4,0.8,1.2,1.6]
noise_d_range = [0.12,0.20,0.28,0.36]
noise_r_range = [0.05,0.10,0.15,0.20]
w_init_range = [0.15,0.20,0.25,0.30]
'''
with HBM:
    '''
    population=pm.Uniform('Population', lower=lower,upper=upper)

    sd_subject = pm.Exponential.dist(1.0, size=nPara)
    chol_subject, _ , _ = pm.LKJCholeskyCov('Chol_subject i', eta=2, n=nPara, sd_dist=sd_subject, compute_corr=True)
    subject = pm.MvNormal('Subject i', mu=pt.tile(population,(nSubject,1)), chol=chol_subject)
    
    sd_test = pm.Exponential.dist(1.0, size=nPara)
    chol_test, _ , _ = pm.LKJCholeskyCov('Chol_test ij', eta=2, n=nPara, sd_dist=sd_test, compute_corr=True)
    test = pm.MvNormal('Test ij', mu=pt.tile(subject,(nTest,1)), chol=chol_test)   
    '''
    test = pm.Uniform('Test ij', lower=pt.tile(lower, (nSubject,1)), upper=pt.tile(upper, (nSubject,1)))
    
    exp_test = math.exp(test)
    for i_combine in range(len(combine_arr)):
        exp_test = math.concatenate([exp_test, math.prod(exp_test[:,combine_arr[i_combine]], axis=1).reshape((-1,1))], axis= 1)
    para = math.concatenate([exp_test, pt.ones((nSubject,1))], axis =1) 
    curve = pt.clip((para @ coef_tensor_matrix.T).reshape((nSubject, nCondition, nBlock)),0.001,0.999)
    n_block_correct = pm.Binomial('nCorrect ijk', p = curve, n = trial_per_block, observed = learning_curve)

In [29]:
pm.model_to_graphviz(HBM).render(filename='results/model structure/model strucutre_BIP', format='jpg')

'results\\model structure\\model strucutre_BIP.jpg'

In [7]:
with HBM:
    #idata = pm.sample(draws = 3000, tune = 3000)
    #approx=pm.fit(n=150000,method='advi')
    #idata=approx.sample(10000)
    
    step = pm.Metropolis(thin=10)
    idata = pm.sample(step=step, draws=5000,tune=20000)

Multiprocess sampling (4 chains in 4 jobs)
Metropolis: [test]


ValueError: Not enough samples to build a trace.

In [None]:
summary = az.summary(idata)
summary.to_csv('BIP Solution_MH.csv')

In [None]:
mean = np.round(np.exp(pd.read_csv('HBM Solution_MH.csv')[ : nSubject * nPara]['mean'].values), 4).reshape(nSubject, nPara)
print(mean)


In [None]:
para = np.exp(idata.posterior['test'])
for i in range(para.shape[-1]):
    az.plot_trace(para[..., i])

In [None]:
iSubject = 2
block_prob_true = learning_curve[iSubject]/trial_per_block
block_prob_simu = compute_curve(to_para_set(np.exp(np.array([-6.694, 0.061, 0.046, -1.837, -4.699, -4.183]))), coef_matrix)

In [None]:
title={0:'Incongruent stimuli', 1: 'Congruent stimuli'}
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 10))
axes=[ax1,ax2]
for i in [0,1]:
    low_model, low_true = block_prob_simu[i*3],  block_prob_true[i*3]
    middle_model, middle_true = block_prob_simu[i*3+1],  block_prob_true[i*3+1]
    high_model, high_true = block_prob_simu[i*3+2],  block_prob_true[i*3+2]
    
    z_low_model,z_middle_model,z_high_model= stats.norm.ppf(low_model), stats.norm.ppf(middle_model), stats.norm.ppf(high_model)
    start=[1,9,17,25,31]
    for j in range(4):
        low_model_fit, normal_model_fit, high_model_fit=z_low_model[start[j]:start[j+1]],z_middle_model[start[j]:start[j+1]],z_high_model[start[j]:start[j+1]]
        axes[i].plot(np.arange(start[j],start[j+1]), low_model_fit, color='black', marker='s', markersize=5)
        axes[i].plot(np.arange(start[j],start[j+1]), normal_model_fit, color='black', marker='o', markersize=5)
        axes[i].plot(np.arange(start[j],start[j+1]), high_model_fit, color='black', marker='^', markersize=5)
       
    z_low_true,z_middle_true,z_high_true= stats.norm.ppf(low_true), stats.norm.ppf(middle_true), stats.norm.ppf(high_true)
    
    axes[i].plot(z_low_true, marker='s',color='black', linestyle='',markerfacecolor='none',markeredgewidth=1,markersize=5, label='Contrast 0.10')
    axes[i].plot(z_middle_true, marker='o',color='black', linestyle='',markerfacecolor='none',markeredgewidth=1,markersize=5, label='Contrast 0.15')
    axes[i].plot(z_high_true, marker='^', color='black',linestyle='',markerfacecolor='none',markeredgewidth=1,markersize=5, label='Contrast 0.23')
    
    axes[i].set_ylabel('z-probability correct')
    axes[i].set_xlabel('Block number (300 trials/block, 4 blocks/day)')
    axes[i].set_title(title.get(i))
    axes[i].set_ylim(-1,3)
    
legend_handles = [plt.Line2D([],[],marker='s',color='black', linestyle='',markerfacecolor='none',markeredgewidth=1,markersize=5, label='Contrast 0.10'),
                  plt.Line2D([],[],marker='o',color='black', linestyle='',markerfacecolor='none',markeredgewidth=1,markersize=5, label='Contrast 0.15'),
                  plt.Line2D([],[],marker='^',color='black', linestyle='',markerfacecolor='none',markeredgewidth=1,markersize=5, label='Contrast 0.23'),
                  plt.Line2D([], [], color='black', marker='o',markersize=3, linestyle='None', label='Model fits'),]
plt.legend(handles=legend_handles, loc='lower right',bbox_to_anchor=(1.2, 0))
# plt.savefig(f'results/congruent and incongruent/Subject_{iSubject}',bbox_inches='tight')
plt.show()
