In [1]:
import aesara
import nutpie
import pymc as pm
import numpy as np
import pandas as pd
import arviz as az
import matplotlib.pyplot as plt
import scipy.stats as st




In [4]:
from watermark import watermark
print(watermark())
print(watermark(packages="aesara,nutpie,pymc,arviz"))


Last updated: 2023-06-02T21:07:47.455087+02:00

Python implementation: CPython
Python version       : 3.10.8
IPython version      : 8.7.0

Compiler    : MSC v.1916 64 bit (AMD64)
OS          : Windows
Release     : 10
Machine     : AMD64
Processor   : Intel64 Family 6 Model 126 Stepping 5, GenuineIntel
CPU cores   : 8
Architecture: 64bit

aesara: 2.8.7
nutpie: 0.3.0
pymc  : 4.4.0
arviz : 0.14.0



## ABC functions for inferring Gaussian parameters in 1d and nD

In [10]:
def genData(N, ndim, mu_true, sigma2_true):
    #N = 100
    #sigma2_true = 1**2
    #mu_true = -1.8 #np.random.normal(0, 1)
    
    if N == 1:
        data = st.multivariate_normal(mean = np.ones(ndim) * mu_true, cov = np.identity(ndim) * sigma2_true).rvs(1)
    else: 
        data = np.random.normal(mu_true, np.sqrt(sigma2_true), N)
    return data

In [11]:
def normal_sim_Nd(rng, m, s2, ndim, size):
    data = st.multivariate_normal(mean = np.ones(ndim) * m, cov = np.identity(ndim) * s2).rvs(1)
    return data

In [None]:
def normal_sim_Nd_norm(rng, m, s2, ndim, size):
    data = st.multivariate_normal(mean = np.ones(ndim) * m, cov = np.identity(ndim) * s2).rvs(10)
    return data

In [12]:
def normal_sim_1d(rng, m, s2, size):
    data = np.random.normal(m, np.sqrt(s2), size)
    return data

In [13]:
def modelDef(target, ndim, fixedParamValue, priorArg1, priorArg2, x_sim, eps):    
    if target == "mu":
            
            if ndim == 1:
                with pm.Model() as model:
                    mu = pm.Normal("mu", priorArg1, np.sqrt(priorArg2)) #mean_true #pm.Normal("a", mu=0, sigma=5)
                    sigma2 = fixedParamValue

                    x = pm.Simulator(
                        "x", 
                        fn = normal_sim_1d, 
                        params = (mu, sigma2), 
                        sum_stat = "identity", 
                        distance = "gaussian",
                        epsilon = eps, 
                        observed = x_sim)
            
            if ndim > 1:
                with pm.Model() as model:
                    mu = pm.Normal("mu", priorArg1, np.sqrt(priorArg2)) #mean_true #pm.Normal("a", mu=0, sigma=5)
                    sigma2 = fixedParamValue

                    x = pm.Simulator(
                        "x", 
                        fn = normal_sim_Nd, 
                        params = (mu, sigma2, ndim), 
                        sum_stat = "identity", 
                        distance = "gaussian",
                        epsilon = eps, 
                        observed = x_sim)
                            
    if target == "sigma2":
            if ndim == 1:
                with pm.Model() as model:
                    mu = fixedParamValue #mean_true #pm.Normal("a", mu=0, sigma=5)
                    sigma2 = pm.InverseGamma("sigma2", priorArg1, priorArg2)

                    x = pm.Simulator(
                        "x", 
                        fn = normal_sim_1d, 
                        params = (mu, sigma2), 
                        sum_stat = "identity", 
                        distance = "gaussian",
                        epsilon = eps, 
                        observed = x_sim)
            
            if ndim > 1:
                with pm.Model() as model:
                    mu = fixedParamValue #mean_true #pm.Normal("a", mu=0, sigma=5)
                    sigma2 = pm.InverseGamma("sigma2", priorArg1, priorArg2)

                    x = pm.Simulator(
                        "x", 
                        fn = normal_sim_Nd_norm, 
                        params = (mu, sigma2, ndim), 
                        sum_stat = "identity", 
                        distance = "gaussian_norm",
                        epsilon = eps, 
                        observed = x_sim)
            
            
    return model


In [14]:
def genDatawModel(N, mu_true, sigma2_true, target, ndim, priorArg1, priorArg2, eps):
    data = genData(N, ndim, mu_true, sigma2_true)
    if target == "mu": 
        fixedParamValue = sigma2_true
    if target =="sigma2":
        fixedParamValue = mu_true
    model = modelDef(target, ndim, fixedParamValue, priorArg1, priorArg2, data, eps)

    return data, model

In [15]:
def testABC(N, target, ndim, mu_true, sigma2_true, priorArg1, priorArg2, eps, draws):
    data, model = genDatawModel(N = N, mu_true = mu_true, sigma2_true = sigma2_true, target = target, ndim = ndim, priorArg1 = priorArg1, priorArg2 = priorArg2, eps = eps)
    chain_num = 1

    %time 
    trace = pm.sample_smc(model = model, chains = chain_num, cores = 1, draws = draws, progressbar = False)
        
    print("\n \n ####  Inferring  "+str(target)+"  in a  "+str(ndim)+"  dimensional setup")
    print(" ####  N = "+str(N)+"  mu_true = "+str(mu_true)+"  $\sigma^2$_true = "+str(sigma2_true))
    print(" ####  Prior arguments: "+str(priorArg1)+", "+str(priorArg2))
    print(" ####  ABC epsilon = "+str(eps)+", draws = "+str(draws)+"\n")

    if target == "mu":
        plot_posterior_hist_mu(res = trace, data = data, mu_true = mu_true, norm_1 = priorArg1, norm_2 = priorArg2, sigma2_true = sigma2_true, N = N, ndim = ndim, chain_num=chain_num, eps=eps)
    if target == "sigma2":
        plot_posterior_hist_sigma2(res = trace, data = data, sigma2_true = sigma2_true, ig_1 = priorArg1, ig_2 = priorArg2, mu_true = mu_true, N = N, ndim = ndim, chain_num=chain_num, eps=eps)


In [7]:
def plot_posterior_hist_sigma2(res, data, sigma2_true, ig_1, ig_2, mu_true, N, ndim, chain_num, eps):
    plt.hist(res.posterior.sigma2[0,:], bins=np.logspace(np.log10(0.01),np.log10(50.0), 50), density=True, color='gray', alpha=0.8)
    plt.axvline(x = sigma2_true, label = '$\sigma^2$ used for data simulation')
    x = np.linspace (0.01, 50, 5000) 

    #calculate pdf of Gamma distribution for each x-value
    y = st.invgamma.pdf(x, a = ig_1, loc = 0, scale = ig_2)
    plt.plot(x, y, linestyle='dashed', label='$\sigma^2$ prior' )

    
    #sigmaPost_x, sigmaPost_value=forplot_sigma_posterior(res, inv_1, inv_2)
    #plt.plot(sigmaPost_x, sigmaPost_value, color='black', label='analytic posterior' )

    xmin, xmax = plt.xlim()
    #xx = np.linspace(xmin, xmax, 100)
    postig_1 = ig_1 + max(N, ndim)/2
    postig_2 = ig_2 + 0.5 * sum((data - mu_true) ** 2)
    plt.plot(x, st.invgamma.pdf(x, a = postig_1, loc = 0, scale = postig_2), alpha=1, label="analytic posterior")


    ax=plt.gca()
    ax.set_xscale('log')
    ax.set_xlim(0.01, 50)
    ax.set_ylim(0.0, 10)

    plt.legend(loc="upper right")
    plt.title('$\sigma^2$ posterior \n N:'+str(N) +'  ndim:'+str(ndim) + '\n chain num:' + str(chain_num)+'\n eps:'+str(eps))
    plt.show()

In [8]:
def plot_posterior_hist_mu(res, data, mu_true, norm_1, norm_2, sigma2_true, N, ndim, chain_num, eps):
    plt.hist(res.posterior.mu[0,:], bins=50, density=True, color='gray', alpha=0.8)
    plt.axvline(x = mu_true, label = '$\mu$ used for data simulation')
    x = np.linspace(-3, 3, 1000) 

    #calculate pdf of Gamma distribution for each x-value
    y = st.norm.pdf(x, norm_1, np.sqrt(norm_2))
    plt.plot(x, y, linestyle='dashed', label='$\mu$ prior' )

    muPost_m = 1/(1 + sigma2_true / max(ndim,N)) * np.mean(data)
    muPost_var = 1/(1 + max(ndim,N) / sigma2_true)
    muPost = st.norm.pdf(x, muPost_m, np.sqrt(muPost_var))
    #sigmaPost_x, sigmaPost_value=forplot_sigma_posterior(res, inv_1, inv_2)
    plt.plot(x, muPost, label='analytic posterior' )
  
    ax=plt.gca()
    #ax.set_xscale('log')
    ax.set_xlim(-3, 3)
    ax.set_ylim(0.0, 5)

    plt.legend(loc="upper right")
    plt.title('$\mu$ posterior \n N:'+str(N) +'  ndim:'+str(ndim) + '\n chain num:' + str(chain_num)+'\n eps:'+str(eps))
    plt.show()

## Analytical model functions

In [1]:
# Setting up the dataset
def generateData(N = 50, sigma2_true = 1.5 ** 2):
    x_sim = np.random.normal(0, np.sqrt(1), N)
    y_sim = st.multivariate_normal(x_sim, np.identity(N) * sigma2_true).rvs(1)
    z_sim = st.multivariate_normal((1 / 1 + sigma2_true) * y_sim, np.identity(N) * sigma2_true / (1 + sigma2_true)).rvs(1)
    
    return x_sim, y_sim, z_sim


In [None]:
def genModel_analytic(x_sim, z_sim, ig_1, ig_2):
  with pm.Model() as model: # model of the experimenter
    #x = pm.Normal("x", 0, 1, observed = stimulus_simulation) # defining the prior distribution for stimuli
    sigma2 = pm.InverseGamma("sigma2", ig_1, ig_2) 
    y = pm.Normal("y", x_sim, np.sqrt(sigma2)) 
      
    z = pm.Normal("z", mu = 1 / (1 + sigma2) * y, sigma = np.sqrt(sigma2 / (1 + sigma2)), observed = z_sim)
    return model
    

In [None]:
def genDatawModel_analytic(N, sigma2_true, ig_1, ig_2):
    x_sim, y_sim, z_sim = generateData(N, sigma2_true)
    model = genModel_analytic(x_sim, z_sim, ig_1, ig_2)
    return x_sim, y_sim, z_sim, model


### ABC for the CT model

In [None]:
def defineSharedVarsIdObs(y_sim):

    sigma2 = aesara.shared(np.array(1.0), shape=(), name="sigma2")
    y_shared = aesara.shared(y_sim, shape = y_sim.shape, name="y_shared")

    with pm.Model() as ideal_observer_model:
        x = pm.Normal("x", 0, 1, shape = y_shared.shape)
        pm.Normal("y", x, np.sqrt(sigma2), observed = y_shared)

    return ideal_observer_model

In [None]:
def compileIdObs(y):
    ideal_observer_model = defineSharedVarsIdObs(y)
    compiled_ideal_observer_model = nutpie.compile_pymc_model(ideal_observer_model)
    return compiled_ideal_observer_model

In [None]:
def experimenter(x_sim, z_sim, eps, ig_1, ig_2):
    
    compiled_ideal_observer_model = compileIdObs(x_sim) #for defining we need an input with the same shape
    
    def simulator_fn(rng, y, sigma2, size):
        i = rng.integers(2**32)
        
        # Add irreducible observational noise
        # x_tilde = rng.normal(x_real, sigma)
        ideal_observer_withData = compiled_ideal_observer_model.with_data(y_shared = y, sigma2 = sigma2)
        
        # Take one posterior draw of x from the ideal oberver model
        posterior_x = nutpie.sample(
            ideal_observer_withData, 
            chains = 1,
            draws = 1,
            progress_bar = False,
            save_warmup = False,
            seed = i
        ).posterior["x"].values


       

        # Simulate response
        response_simulation = posterior_x 

## save for debug
        #global Xpost_sim_4
        #global sigma2_sim_4
        #global y_sim_4

        #Xpost_sim_4=np.append(Xpost_sim_4, response_simulation)
        #sigma2_sim_4=np.append(sigma2_sim_4, sigma2)
        #y_sim_4=np.append(y_sim_4, y)

        #print(response_simulation)
       # print(sigma2)
## end save

        return response_simulation

    with pm.Model() as experimenter_model:
        sigma2 = pm.InverseGamma("sigma2", ig_1, ig_2)
        # Instead of sampling x_tilde, you can make it an implicit part of the simulator_fn
        y = pm.Normal("y", x_sim, np.sqrt(sigma2))  
        #pm.InverseGamma("sigma_hat", 2, 1, initval=0.8)

        pm.Simulator(
            "z",
            fn = simulator_fn, 
            distance = "gaussian",
            sum_stat = "identity",
            epsilon = eps,
            params = (y, sigma2),  
            observed = z_sim,
        )
        return experimenter_model
#     trace = pm.sample(chains=2, cores=1, idata_kwargs={"log_likelihood": False})
    

In [None]:
def runModel_ABC(sigma2_true, N, eps, ig_1, ig_2, ndraws, nchains, ncores, threshold, corr_threshold):

    #observation_var = observation_sigma**2
    #response_noise = response_sigma**2


    x_sim, y_sim, z_sim = generateData(N, sigma2_true )
    
    model_experimenter = experimenter(x_sim, z_sim, eps, ig_1, ig_2)
    
    trace = pm.sample_smc(model = model_experimenter, draws = ndraws, chains = nchains, cores = ncores, idata_kwargs={"log_likelihood": False}, threshold = threshold, correlation_threshold = corr_threshold)

    global result
    result = {'inputs':(sigma2_true, N, eps, ig_1, ig_2, ndraws, nchains, ncores), 
              'x_sim': x_sim, 
              'y_sim':y_sim,
              'z_sim': z_sim,
              'model_experimenter': model_experimenter,
              'trace': trace}

    return result

### for analytical posterior


In [1]:
def experimenter_analyticPost(x_sim, z_sim, eps, ig_1, ig_2):
    
    #compiled_ideal_observer_model = compileIdObs(x_sim) #for defining we need an input with the same shape
    
    def simulator_fn(rng, y, sigma2, size):
        i = rng.integers(2**32)
        
        xpost_mean = 1/(1 + sigma2) * y # analytically derived response mean
        xpost_var = sigma2 / (1 + sigma2) # analytically derived response std


        response_simulation = np.random.normal(xpost_mean, np.sqrt(xpost_var)) #st.multivariate_normal(mean=xpost_mean, cov=np.identity(len(xpost_mean))*xpost_var).rvs(1)
        
        ## for debugging
        #global Xpost_sim_3
        #global sigma2_sim_3
        #global y_sim_3

        #Xpost_sim_3=np.append(Xpost_sim_3, response_simulation)
        #sigma2_sim_3=np.append(sigma2_sim_3, sigma2)
        #y_sim_3=np.append(y_sim_3, y)

        #print(sigma2)
        # for debugging

        return response_simulation

    with pm.Model() as experimenter_model:
        sigma2 = pm.InverseGamma("sigma2", ig_1, ig_2)
        # Instead of sampling x_tilde, you can make it an implicit part of the simulator_fn
        y = pm.Normal("y", x_sim, np.sqrt(sigma2))    ## pm.MvNormal("y", x_sim, np.identity(len(x_sim))*sigma2)  ## 
        #pm.InverseGamma("sigma_hat", 2, 1, initval=0.8)

        pm.Simulator(
            "z",
            fn = simulator_fn, 
            distance = "gaussian",
            sum_stat = "identity",
            epsilon = eps,
            params = (y, sigma2),  
            observed = z_sim,
        )
        return experimenter_model

In [None]:
def runModel_ABC_analyticPost(sigma2_true, N, eps, ig_1, ig_2, ndraws, nchains, ncores):

    #observation_var = observation_sigma**2
    #response_noise = response_sigma**2


    x_sim, y_sim, z_sim = generateData(N, sigma2_true )
    
    model_experimenter = experimenter_analyticPost(x_sim, z_sim, eps, ig_1, ig_2)
    
    trace = pm.sample_smc(model = model_experimenter, draws = ndraws, chains = nchains, cores = ncores, idata_kwargs={"log_likelihood": False})

    global result
    result = {'inputs':(sigma2_true, N, eps, ig_1, ig_2, ndraws, nchains, ncores), 
              'x_sim': x_sim, 
              'y_sim':y_sim,
              'z_sim': z_sim,
              'model_experimenter': model_experimenter,
              'trace': trace}

    return result

### Plotting functions

In [13]:
def xhat_given_x_sigma2_pdf(xhat, x, sigma2):
    m = 1 / (1 + sigma2) * x
    v = sigma2 / (1 + sigma2) ** 2 + sigma2 / (1 + sigma2)
    mv_xhat = st.multivariate_normal(mean = m, cov = np.identity(len(xhat)) * v)
    return mv_xhat.pdf(xhat)

In [14]:
def sigma2_given_xhat_x_pdf(xhat, x, sigma2, iv_1, iv_2):
   pdf_xhat = xhat_given_x_sigma2_pdf(xhat, x, sigma2)
   mv_x = st.multivariate_normal(mean = np.zeros(len(x)), cov = np.identity(len(x)))
   pdf_x = mv_x.pdf(x)
   pdf_sigma2 = st.invgamma.pdf(sigma2, a = iv_1, loc = 0, scale = iv_2)
   return pdf_xhat * pdf_x * pdf_sigma2

In [15]:
def forplot_sigma_posterior_fromstrore(trace, stimulus_simulation, iv_1, iv_2):
    x = np.linspace(0.01, 50, 10000) 

    Ys = np.empty(len(x))
    i = 0
    for s in x:
        y = sigma2_given_xhat_x_pdf(trace.observed_data.z.values, stimulus_simulation, s, iv_1, iv_2)
        Ys[i] = y
        i += 1
        
    dx = x[2] - x[1]
    mass = np.sum(dx * Ys)
    Ys_normalized = Ys / mass

    return x, Ys_normalized

In [2]:
""" def forplot_sigma_posterior_fromstrore(trace, stimulus_simulation, iv_1, iv_2):
    x = np.linspace(0.01, 50, 10000) 

    Ys = np.empty(len(x))
    i = 0
    for s in x:
        y = sigma2_given_xhat_x_pdf(trace.observed_data.response_simulation.values, stimulus_simulation, s, iv_1, iv_2)
        Ys[i] = y
        i += 1
        
    dx = x[2] - x[1]
    mass = np.sum(dx * Ys)
    Ys_normalized = Ys / mass

    return x, Ys_normalized """

In [31]:
def plot_posterior_hist_fromstore(trace, stimulus_simulation, sigma2, inv_1, inv_2, N, chain_num, draw, rt):
    plt.hist(trace.posterior.sigma2[0,:], bins = np.logspace(np.log10(0.01), np.log10(50.0), 50), density = True, color = 'gray', alpha = 0.8)
    plt.axvline(x = sigma2,  label = '$\sigma^2$ used for data simulation')
    x = np.linspace (0.01, 50, 1000) 

    #calculate pdf of Gamma distribution for each x-value
    y = st.invgamma.pdf(x, a = inv_1, loc = 0, scale = inv_2)
    plt.plot(x, y, linestyle = 'dashed',  label = 'prior' )

    sigmaPost_x, sigmaPost_value = forplot_sigma_posterior_fromstrore(trace, stimulus_simulation, inv_1, inv_2)
    plt.plot(sigmaPost_x, sigmaPost_value,  label = 'analytic posterior' )

    ax=plt.gca()
    ax.set_xscale('log')
    ax.set_xlim(0.01, 50)
    ax.set_ylim(0.0, 10)

    plt.xlabel('$\sigma^2$')
    plt.legend(loc="upper right")
    plt.title('$\sigma^2$ posterior \n \n N:'+str(N)+'\n prior: IG('+str(inv_1)+','+str(inv_2)+')' +'\n chain num:' + str(chain_num)+', draws:'+str(draw)+'\n rt:'+rt)
    plt.show()