In [None]:
import numpy as np
import pandas as pd
from pathlib import Path
from scipy.io import loadmat
from scipy.stats import norm, pearsonr
from scipy.integrate import quad
import pickle
from sklearn.metrics import confusion_matrix
from skimage.metrics import structural_similarity

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.colors import hsv_to_rgb
from matplotlib.patches import Circle

In [None]:
from bayesee.evaluation import *
from bayesee.perception import *

In [None]:
%load_ext autoreload
%autoreload 2
plt.style.use('bayesee.academic')

In [None]:
repo_path = Path.cwd().parents[0]
print(repo_path)

In [None]:
subject = "PA"
file_name = repo_path / f"data/search/covert/large-field/p3_data_{subject}.pickle"

with open(file_name, "rb") as f:
    stimulus, response = pickle.load(f)

metadata = stimulus["metadata"]
spot_centers = metadata["spot_centers"].astype(int)
monitor_width, monitor_height = metadata["monitor_size"]
stimulus_size = metadata["stimulus_size"]
n_location = metadata["n_location"]
spot_size = metadata["spot_size"]
stimulus_ppd = metadata["stimulus_ppd"]
target_amplitude = metadata["target_amplitude"]
target = metadata["target"]

file_name = (
    repo_path / f"data/search/covert/large-field/p2_spatial_statistics_{subject}.csv"
)
spatial_statistics_human = pd.read_csv(file_name)
local_dp = spatial_statistics_human["dp"].values
local_ecc = spatial_statistics_human['ecc'].values

In [None]:
human_target = stimulus["df"]["location"].values
human_response = response["df"]["response_location"].values
human_accuracy = human_target == human_response
local_ecc = spatial_statistics_human['ecc'].values
array_index_sort = np.insert(np.argsort(local_ecc)+1,0,0)
human_confusion = confusion_matrix(human_response, human_target, labels=array_index_sort)
human_cr_all = human_confusion[0,0] / sum(human_target==0)
human_hit = np.array([human_confusion[l,l] / sum(human_target == array_index_sort[l]) for l in range(1, n_location)])
human_fa = human_confusion[1:,0] / sum(human_target==0)
human_dp = norm.ppf(human_hit) - norm.ppf(1 - human_cr_all)

In [None]:
n_location_ring = np.array([1,1,6,12])

def confusion_matrix_ring(confusion_mat, n_location_ring):
    # confusion matrix has been sorted by ring
    n_ring = len(n_location_ring) - 1
    cumsum_ring = np.cumsum(n_location_ring)
    
    confusion_ring = np.zeros(4*n_ring+1)
    confusion_diagonal = confusion_mat.diagonal()
    confusion_ring[0] = confusion_diagonal[0]

    for index_ring in range(n_ring):
        array_index_ring = range(cumsum_ring[index_ring], cumsum_ring[index_ring+1])
        hit_ring = confusion_diagonal[array_index_ring].sum()
        miss_ring = confusion_mat[0,array_index_ring].sum()
        fa_ring = confusion_mat[array_index_ring,0].sum()
        fhf_ring = confusion_mat[array_index_ring, 1:].sum() - hit_ring
        confusion_ring[1+index_ring*4:5+index_ring*4] = hit_ring, miss_ring, fa_ring, fhf_ring
    
    return confusion_ring

human_confusion_ring = confusion_matrix_ring(human_confusion, n_location_ring)

In [None]:
cortical_ppd = 60
retinal_ppd = 60

file_name = repo_path / f"data/search/cortical/xij_{cortical_ppd}ppd.csv"
xij = np.genfromtxt(file_name, delimiter=",")

file_name = repo_path / f"data/search/cortical/yij_{cortical_ppd}ppd.csv"
yij = np.genfromtxt(file_name, delimiter=",")

file_name = repo_path / f"data/search/cortical/ixy_{retinal_ppd}ppd.csv"
ixy = np.genfromtxt(file_name, delimiter=",")

file_name = repo_path / f"data/search/cortical/jxy_{retinal_ppd}ppd.csv"
jxy = np.genfromtxt(file_name, delimiter=",")

cut_off = 8  # degree
xij = xij[:, : cut_off * cortical_ppd].copy()
yij = yij[:, : cut_off * cortical_ppd].copy()

In [None]:
xij_ppd = xij * retinal_ppd
yij_ppd = yij * retinal_ppd
ixy_ppd = ixy * cortical_ppd
jxy_ppd = jxy * cortical_ppd

mesh_y, mesh_x = np.meshgrid(
    np.arange(int(yij_ppd.min()), int(yij_ppd.max())),
    np.arange(int(xij_ppd.min()), int(xij_ppd.max())),
)

spot_centers_cortical = np.zeros_like(spot_centers)
for i in range(spot_centers.shape[0]):
    x, y = spot_centers[i] - spot_centers[0]
    if y >= 0:
        index_loc = (mesh_x == x) & (mesh_y == y)
    else:
        index_loc = (mesh_x == x) & (mesh_y == -y)
        
    spot_centers_cortical[i, :] = ixy_ppd[index_loc][0], jxy_ppd[index_loc][0]

cortical_gain = np.zeros_like(xij)

In [None]:
prior = np.array((0.5, *((0.5 / (n_location - 1),) * (n_location - 1))))
assert np.allclose(prior.sum(), 1.0)
e_prior = prior.copy()
assert np.allclose(e_prior.sum(), 1.0)
log_prior_ratio = np.log(e_prior / e_prior[0])
log_likelihood_ratio = np.zeros_like(prior)

model_response = np.zeros_like(human_target)
n_bootstrap = 1

In [None]:
def cost_function(x):
    # x[0]: fovea_gain
    # x[1]: periphery_gain
    # x[2]: Weibull scale parameter a
    # x[3]: Weibull shape parameter b
    # x[4]: d' peak
    # x[5]: d' fall-off

    rng = np.random.default_rng(rng_seed)
    cost = 0
    for index_i in np.arange(xij.shape[1]):
        cortical_gain[:, index_i] = x[0]+ (x[1] - x[0]) * (1 - np.exp(-((index_i / (cortical_ppd * x[2])) ** x[3])))
        
    local_gain = np.array(
        [
            cortical_gain[*spot_centers_cortical[n_location]]
            for n_location in range(n_location - 1)
        ]
    )

    local_gdp = local_dp * local_gain
    e_local_dp = x[4] * x[5] / (local_ecc + x[5])
    
    for index_bootstrap in range(n_bootstrap):
        for index_trial in range(len(human_target)):

            array_standard_normal = rng.normal(size=(n_location - 1,))
            
            log_likelihood_ratio[1:] = e_local_dp * (
                array_standard_normal - e_local_dp / 2
            )

            if human_target[index_trial] > 0:
                log_likelihood_ratio[human_target[index_trial]] += (
                    e_local_dp[human_target[index_trial] - 1]
                    * local_gdp[human_target[index_trial] - 1]
                )

            log_posterior_ratio = log_prior_ratio + log_likelihood_ratio
            model_response[index_trial] = np.argmax(log_posterior_ratio)
            
        model_confusion = confusion_matrix(model_response, human_target, labels=array_index_sort) 
        model_confusion_probability = (model_confusion + 1) # Laplace smoothing
        model_confusion_probability = model_confusion_probability / model_confusion_probability.sum()
        
        model_confusion_ring = confusion_matrix_ring(model_confusion, n_location_ring)
        model_confusion_ring_probability = (model_confusion_ring + 1) # Laplace smoothing
        model_confusion_ring_probability = model_confusion_ring_probability / model_confusion_ring_probability.sum()
        
        model_cr_all = model_confusion[0,0] / sum(human_target==0)
        model_hit = np.array([model_confusion[l,l] / sum(human_target == array_index_sort[l]) for l in range(1, n_location)])
        model_fa = model_confusion[1:,0] / sum(human_target==0)
        model_dp = norm.ppf(model_hit) - norm.ppf(1 - model_cr_all)
        
        # cost += rmse(human_confusion, model_confusion)
        # cost += mae(human_confusion, model_confusion)
        # cost += -(human_confusion * np.log(model_confusion_probability)).sum()
        # cost += - pearson(human_confusion, model_confusion)
        # cost += - ssim(human_confusion, model_confusion)
        # cost += - cosine_similarity(human_confusion, model_confusion)
        
        # cost += rmse(human_confusion_ring, model_confusion_ring)
        # cost += mae(human_confusion_ring, model_confusion_ring)
        cost += -(human_confusion_ring * np.log(model_confusion_ring_probability)).sum()
        # cost += - pearson(human_confusion_ring, model_confusion_ring)
        # cost += - ssim(human_confusion_ring, model_confusion_ring)
        # cost += - cosine_similarity(human_confusion_ring, model_confusion_ring)
        
        # cost += rmse(human_confusion.diagonal(), model_confusion.diagonal())
        # cost += mae(human_confusion.diagonal(), model_confusion.diagonal())
        # cost += -(human_confusion.diagonal() * np.log(model_confusion_probability.diagonal())).sum()
        # cost += - pearson(human_confusion.diagonal(), model_confusion.diagonal())
        # cost += - ssim(human_confusion.diagonal(), model_confusion.diagonal())
        # cost += - cosine_similarity(human_confusion.diagonal(), model_confusion.diagonal())
        
        # cost += rmse(human_dp, model_dp)
        # cost += mae(human_dp, model_dp)
        # cost += - pearson(human_dp, model_dp)
        # cost += - ssim(human_dp, model_dp)
        # cost += - cosine_similarity(human_dp, model_dp)
        
        # cost += rmse(human_hit, model_hit)
        # cost += mae(human_hit, model_hit)
        # cost += - pearson(human_hit, model_hit)
        # cost += - ssim(human_hit, model_hit)
        # cost += - cosine_similarity(human_hit, model_hit)
    
        # cost += rmse(human_fa, model_fa)
        # cost += mae(human_fa, model_fa)
        # cost += - pearson(human_fa, model_fa)
        # cost += - ssim(human_fa, model_fa)
        # cost += - cosine_similarity(human_fa, model_fa)
    
    return cost / n_bootstrap

In [None]:
opt_results = pd.DataFrame(columns=["gf", "gp", "a", "b", "dp0", "ed", "cost", "nfunc", "niter"])
bounds = [(0.5, 1.0), (0.5, 1.5), (1,4), (1,3), (2,7), (2, 16)]
n_init = 5
rng_seed = 2

rng = np.random.default_rng(rng_seed)

In [None]:
for index_init in range(n_init):
    init = [(bound[1] - bound[0]) * rng.random() + bound[0] for bound in bounds]
    
    results = minimize(
        cost_function,
        init,
        bounds=bounds,
        method="Powell",
    )
    
    row_result = [ *results.x, results.fun, results.nfev, results.nit]    
    
    if index_init == 0:
        opt_results.loc[0] = row_result
    elif results.fun < opt_results.iloc[-1]["cost"]:
        opt_results.loc[len(opt_results)] = row_result
        
    print(f"{index_init+1}/{n_init}")

row_result = opt_results.iloc[-1].values
print(f"| {row_result[-3]:.5f} | {row_result[0]:.3f} | {row_result[1]:.3f} | {row_result[2]:.3f} | {row_result[3]:.3f} | {row_result[4]:.3f} | {row_result[5]:.3f} |")

In [None]:
n_trial = 10000
rng = np.random.default_rng(rng_seed)
model_target = rng.choice(range(n_location), size=n_trial, p=prior)
model_response = np.zeros_like(model_target)

stimulus_df = pd.DataFrame()
response_df = pd.DataFrame()
model = 'gainer37'
metadata['gf'] = gf = 0.780
metadata['gp'] = gp = 1.348
metadata['a'] = a = 3.781
metadata['b'] = b = 2.580
metadata['dp0'] = dp0 = 3.547
metadata['ed'] = ed = 7.054

for index_i in np.arange(xij.shape[1]):
    cortical_gain[:, index_i] = gf + (gp - gf) * (1 - np.exp(-((index_i / (cortical_ppd * a)) ** b)))
    
local_gain = np.array(
    [
        cortical_gain[*spot_centers_cortical[n_location]]
        for n_location in range(n_location - 1)
    ]
)
    
local_gdp = local_dp * local_gain
e_local_dp = dp0 * ed / (local_ecc + ed)

for index_bootstrap in range(n_bootstrap):
    for index_trial in range(len(model_target)):

        array_standard_normal = rng.normal(size=(n_location - 1,))
        
        log_likelihood_ratio[1:] = e_local_dp * (
            array_standard_normal - e_local_dp / 2
        )

        if model_target[index_trial] > 0:
            log_likelihood_ratio[model_target[index_trial]] += (
                e_local_dp[model_target[index_trial] - 1]
                * local_gdp[model_target[index_trial] - 1]
            )

        log_posterior_ratio = log_prior_ratio + log_likelihood_ratio
        model_response[index_trial] = np.argmax(log_posterior_ratio)
    
stimulus_df['location'] = model_target.round()
response_df['response_location'] = model_response.round()
    
file_name = repo_path / f"data/search/covert/large-field/p3_data_model_{model}_{subject}.pickle"
stimulus = {"df": stimulus_df, "metadata": metadata}
response = {"df": response_df}

with open(file_name, "wb") as f:
    pickle.dump((stimulus, response), f)

In [None]:
file_name