In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.special import erf
from tqdm import tqdm

In [3]:
def reading_csv(file_path):
    df = pd.read_csv(file_path)
    mean_used = df['S2_val'].tolist()
    result    = df['Decision (S1>S2)'].tolist()
    data_tuple = (result, mean_used)
    return data_tuple

def data_var2(file_path):
    df = pd.read_csv(file_path)
    var2 = df['S2_std'].to_numpy()
    var2 = pd.unique(var2)     # valeurs uniques de S2_std
    return list(var2)

def psycometrique(data_tuple, var2, block_size=1500, big_blocs=11):
    """
    Reconstruit les moyennes par bloc comme dans ton code :
    - true_array_result[j][k] = P(1) pour le j-ième var2, k-ième S2_val
    - true_array_mean_used[j][k] = S2_val moyenne correspondante
    """
    result, mean_used = data_tuple
    true_array_mean_used = []
    true_array_result    = []

    size_one_block = block_size * big_blocs  # nb essais par courbe (11 x 200)

    for j in range(len(var2)):
        array_mean_result = []
        array_mean_used   = []
        for i in range(0, size_one_block, block_size):
            # bloc de result
            block_r = result[size_one_block*j + i : size_one_block*j + i + block_size]
            mean_r  = sum(block_r) / len(block_r)
            array_mean_result.append(mean_r)

            # bloc de mean_used
            block_m = mean_used[size_one_block*j + i : size_one_block*j + i + block_size]
            mean_m  = sum(block_m) / len(block_m)
            array_mean_used.append(mean_m)

        true_array_result.append(array_mean_result)
        true_array_mean_used.append(array_mean_used)

    return true_array_result, true_array_mean_used

def build_targets_from_csv(file_path, block_size=1500, big_blocs=11):
    """
    Construit :
      - target[(mu2, std2)] = P_empirique(réponse=1)
      - mus_test : liste ordonnée des S2_val utilisés
      - vars_test : liste ordonnée des S2_std utilisés
    à partir de ton CSV.
    """
    data_tuple = reading_csv(file_path)
    var2_list  = data_var2(file_path)           # S2_std uniques
    array_mean_result, array_mean_used = psycometrique(
        data_tuple, var2_list,
        block_size=block_size,
        big_blocs=big_blocs
    )

    target = {}
    # on suppose que les S2_val sont les mêmes pour chaque var2
    mus_test = array_mean_used[0]
    vars_test = var2_list

    for j, std2 in enumerate(var2_list):
        for k, mu2 in enumerate(array_mean_used[j]):
            p_emp = array_mean_result[j][k]
            target[(mu2, std2)] = p_emp

    return target, mus_test, vars_test

In [4]:
target, mus_test, vars_test = build_targets_from_csv('C:\\Users\\gabri\\Desktop\\bayesian\\experiment_results_1500.csv') # before experiment_results_test.csv 
print(target)

{(-10.0, np.int64(0)): 1.0, (-8.0, np.int64(0)): 1.0, (-6.0, np.int64(0)): 1.0, (-4.0, np.int64(0)): 1.0, (-2.0, np.int64(0)): 0.9993333333333333, (0.0, np.int64(0)): 0.4746666666666667, (2.0, np.int64(0)): 0.005333333333333333, (4.0, np.int64(0)): 0.0, (6.0, np.int64(0)): 0.0, (8.0, np.int64(0)): 0.0, (10.0, np.int64(0)): 0.0, (-10.0, np.int64(2)): 1.0, (-8.0, np.int64(2)): 0.9986666666666667, (-6.0, np.int64(2)): 0.9953333333333333, (-4.0, np.int64(2)): 0.9566666666666667, (-2.0, np.int64(2)): 0.8313333333333334, (0.0, np.int64(2)): 0.5873333333333334, (2.0, np.int64(2)): 0.346, (4.0, np.int64(2)): 0.11333333333333333, (6.0, np.int64(2)): 0.04133333333333333, (8.0, np.int64(2)): 0.008, (10.0, np.int64(2)): 0.0, (-10.0, np.int64(4)): 0.9526666666666667, (-8.0, np.int64(4)): 0.9266666666666666, (-6.0, np.int64(4)): 0.8886666666666667, (-4.0, np.int64(4)): 0.8313333333333334, (-2.0, np.int64(4)): 0.73, (0.0, np.int64(4)): 0.642, (2.0, np.int64(4)): 0.536, (4.0, np.int64(4)): 0.454, (6.0

In [None]:
mus_test  = [-10,-8,-6,-4,-2,0,2,4,6,8,10]
vars_test = [0,2,4,6,8]
mu1,mu2 = 0 , 2
var1,var2 = 0.2 , 4
print(target[(mu2, var2)])  # exemple d'accès à une valeur cible

0.536


In [None]:
# level_01.py
# Level 01: single-cue Gaussian Bayes comparator with fixed prior.
# - Prior: S ~ N(m0, v0)
# - Cue (measurement) for each stimulus: M | S ~ N(S, v_like), where v_like = v_internal + v_stim
# - Given an observed cue value m (we treat CSV “Stimulus value” as the measured cue),
#   the posterior is Gaussian and we use its mean as the estimate Ŝ.
# - Decision: return 1 if Ŝ1 > Ŝ2 else 0.
from __future__ import annotations
import io, csv, math, random
from dataclasses import dataclass
from typing import Optional, Tuple, Dict, Any, List

def _to_var(std: float) -> float:
    std = float(std)
    return std * std

@dataclass
class Level01Agent:
    prior_mean: float          # m0
    prior_std: float           # s0
    internal_std: float        # s_int (agent's internal noise std)
    rng: random.Random          # randomness

    @property
    def prior_var(self) -> float:
        return _to_var(self.prior_std)

    @property
    def internal_var(self) -> float:
        return _to_var(self.internal_std)

    def estimate_posterior_mean_std(self, m: float, stim_std: float) -> float:
        """
        Posterior mean for conjugate Gaussian model:
          prior  S ~ N(m0, v0)
          like   M | S ~ N(S, v_like) with v_like = v_internal + v_stim
          obs    M = m (CSV value treated as the measured cue)
        posterior mean:
          mu_post = (m/v_like + m0/v0) / (1/v_like + 1/v0)
        """
        v0 = self.prior_var
        v_like = self.internal_var + _to_var(stim_std)
        # guard against degenerate zeros
        eps = 1e-12
        inv_sum = (1.0 / (v_like + eps)) + (1.0 / (v0 + eps))
        mu_post = ((m / (v_like + eps)) + (self.prior_mean / (v0 + eps))) / inv_sum
        std_post = math.sqrt(v0*v_like/(v0+v_like))
        return mu_post, std_post

    def compare_pair(self, s1_val: float, s1_std: float, s2_val: float, s2_std: float) -> int:
        mu1, std1 = self.estimate_posterior_mean_std(s1_val, s1_std)
        mu2, std2 = self.estimate_posterior_mean_std(s2_val, s2_std)
        m1 = self.rng.gauss(mu1, std1)  # sample from posterior S1
        m2 = self.rng.gauss(mu2, std2)  # sample from posterior S2
        return 1 if m1 > m2 else 0

# Standard app interface expected by the JS
APP = {
    "agent": None,
    "experiment_rows": [],   # [["Trial", ...], ...]
}

def init_app(params: Dict):
    """
    params keys (hidden):
      - seed (int, optional)          : for reproducibility if needed
      - prior_mean (float, required)  : prior mean
      - prior_std  (float, required)  : prior std
      - internal_std (float, required): agent internal noise (std)
    """
    seed = int(params.get("seed", 0)) & 0xFFFFFFFF
    if seed:
        random.seed(seed)

    pm  = float(params["prior_mean"])
    ps  = float(params["prior_std"])
    ist = float(params["internal_std"])
    rng = random.Random(seed or None)

    APP["agent"] = Level01Agent(prior_mean=pm, prior_std=ps, internal_std=ist, rng=rng)
    APP["experiment_rows"].clear()

def run_stimulus_pairs_csv(agent: Level01Agent, csv_text: str):
    """
    Input CSV header (case-insensitive on 'Trial' is fine):
      Trial, Stimulus 1 value, Stimulus 1 std, Stimulus 2 value, Stimulus 2 std

    Output rows:
      [trial, s1, sd1, s2, sd2, decision]
    where decision is 1 if Ŝ1>Ŝ2 else 0.
    """
    f = io.StringIO(csv_text)
    rdr = csv.reader(f)
    rows = [r for r in rdr if r and any(cell.strip() for cell in r)]

    # skip header if first cell looks like "trial"
    start = 1 if rows and rows[0] and rows[0][0].strip().lower().startswith("trial") else 0

    out = []
    for r in rows[start:]:
        try:
            trial = int(r[0])
            s1 = float(r[1]); sd1 = float(r[2])
            s2 = float(r[3]); sd2 = float(r[4])
            decision = agent.compare_pair(s1, sd1, s2, sd2)
            out.append([trial, s1, sd1, s2, sd2, decision])
        except Exception as e:
            out.append(["ERROR", str(e)])

    return out