In [1]:
#Import list
%load_ext autoreload
%autoreload 2
import warnings
import xlsxwriter
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import Generative_model
from scipy import stats
import pandas as pd

import copy

#all pymc3 requirement
import pymc3 as pm
import theano.tensor as tt
import theano as th
import arviz as az


from pymc3 import Model, Normal, Slice, sample, Uniform, Binomial, HalfNormal
from pymc3.distributions import Interpolated



plt.style.use("seaborn-darkgrid")
print(f"Running on PyMC3 v{pm.__version__}")



Running on PyMC3 v3.11.5


  plt.style.use("seaborn-darkgrid")


In [2]:

#Function to generate samples
#The function will add the results to the previous data set
# it will also add the total samples used per degree to a list called n_samples_per_degree
def generate_samples(mu,sigma,n_samples,n_samples_per_degree,degree,data_set):
    #create normal distribution
    normal_dist = stats.norm(loc = mu, scale=sigma)
    #sample data given amount of samples and distribution
    responses = np.random.binomial(n_samples,p=normal_dist.cdf(degree))
    #add responses to data set with +8 shift to account for degrees 
    data_set[degree+8] += responses
    #add amount of samples to n_samples_per_degree
    n_samples_per_degree[degree+8] += n_samples
    #return file
    return data_set,n_samples_per_degree
    



In [3]:

#this function calculates a cumulative densistiy function based on a given mean and std. This is used to convert the input variables.
def cumulative_normal(x, mean, sigma, s=np.sqrt(2)):
    # Cumulative distribution function for the standard normal distribution
    return 0.5 + 0.5 * tt.erf((x-mean)/(sigma*s))

In [4]:
#function to calculate entropy
def Calculate_entropy(lambda_list, theta):
    # Initialize lists
    # Probability of clockwise
    pcw = np.zeros(len(theta))
    # Probability of counter clockwise
    pccw = np.zeros(len(theta))  # Corrected to create a new zero array instead of sharing the reference

    # The dimensions of the input lambdas 
    dimensions_lambda = np.shape(lambda_list)
    
    # Initialization of empty list for the probability of a given lambda with a given theta being clockwise
    param_theta_cw = np.zeros((dimensions_lambda[0], len(theta)))
    param_theta_ccw = np.zeros((dimensions_lambda[0], len(theta)))  # Corrected to create a new zero array instead of sharing the reference

    # Go over all theta values
    for idx, theta_val in enumerate(theta):     
        Pcw_theta_lambda = []
        for lamb in lambda_list:
            # Given a specific lambda and degree (theta) clockwise probability
            Pcw_theta_lambda.append(stats.norm.cdf(theta_val, lamb[0], lamb[1]))
        # What is the total probability of clockwise for this degree (theta) 
        pcw[idx] = sum(Pcw_theta_lambda) / dimensions_lambda[0]
        # Probability counterclockwise
        pccw[idx] = 1 - pcw[idx]

    # Clip lists to avoid NaN
    pcw = np.clip(pcw, 1e-6, 1 - 1e-6)
    pccw = np.clip(pccw, 1e-6, 1 - 1e-6)

    # For each parameter pair
    for lamb_idx, lamb in enumerate(lambda_list):      # does this just calculate the mean? 
        # For each theta_val
        for idx, theta_val in enumerate(theta):
            # Find given probability
            p_cw = stats.norm.cdf(theta_val, lamb[0], lamb[1])
            p_ccw = 1 - p_cw
            #Calculate probability of a lambda given theta and direction (cw or ccw)
            param_theta_cw[lamb_idx, idx] = p_cw / pcw[idx]
            param_theta_ccw[lamb_idx, idx] = p_ccw / pccw[idx]

    # Calculate the log value of the probabilities
    # Clip is included to avoid NaN values
    log_param_theta_cw = np.log(np.clip(param_theta_cw, 1e-6, None))
    log_param_theta_ccw = np.log(np.clip(param_theta_ccw, 1e-6, None))

    # Calculate individual multiplications
    pre_sum_cw = log_param_theta_cw * param_theta_cw
    pre_sum_ccw = log_param_theta_ccw * param_theta_ccw
    # Calculate H (entropy component for cw and ccw)
    H_cw = np.sum(pre_sum_cw, axis=0)
    H_ccw = np.sum(pre_sum_ccw, axis=0)

    # Entropy calculation
    entropy = H_cw * pcw + H_ccw * pccw
    return entropy,(np.argmax(entropy)-8)

In [5]:
#ground truth
mu,sigma = 1,2
stimuli = np.arange(-8,9)
#set up the data:
#number of samples used per iteration
samples_per_degree = np.zeros(17)
data_set = np.zeros(17)
n_samples = 1
# for i in range(-8,9):
#     data_set,samples_per_degree = generate_samples(mu,sigma,n_samples=1,n_samples_per_degree=samples_per_degree,degree=i,data_set=data_set)


In [6]:
column_names = ["Entropy_x " + str(i) for i in range(-8,9)]
column_names.append("mu")
column_names.append("sigma")
column_names.append("stimulus_used")
column_names.append("lambda_list")
print(column_names)
data_frame_adaptive = pd.DataFrame(columns=column_names)

['Entropy_x -8', 'Entropy_x -7', 'Entropy_x -6', 'Entropy_x -5', 'Entropy_x -4', 'Entropy_x -3', 'Entropy_x -2', 'Entropy_x -1', 'Entropy_x 0', 'Entropy_x 1', 'Entropy_x 2', 'Entropy_x 3', 'Entropy_x 4', 'Entropy_x 5', 'Entropy_x 6', 'Entropy_x 7', 'Entropy_x 8', 'mu', 'sigma', 'stimulus_used', 'lambda_list']


In [7]:
#set up the model:
model_cdf = pm.Model()

with model_cdf:
    curve_mean = pm.Uniform('mn', lower=-8, upper =8)#pm.Normal("curve_mu",mu=0,sigma=5)
    curve_std =pm.Gamma("sd",alpha=3,beta=1)#pm.HalfNormal("curve_sigma",sigma=1.5)

    #calculate the cdfs of the means and std to fit onto the observations. 
    theta = pm.Deterministic('theta', cumulative_normal(stimuli,curve_mean,curve_std))
    Y_obs = pm.Binomial("Y_obs", n=samples_per_degree, p=theta, observed=data_set)
    model_trace = pm.sample(500, cores=4)  # Sampling from the model


  return wrapped_(*args_, **kwargs_)
Auto-assigning NUTS sampler...
Initializing NUTS using jitter+adapt_diag...
Multiprocess sampling (4 chains in 4 jobs)
NUTS: [sd, mn]


In [None]:
lamb_test = [(model_trace["mn"][i],model_trace["sd"][i]) for i in range(len(model_trace["sd"]))]   
for i in lamb_test:
    #plot the line with a 0.01 opacity to ensure viewability
    plt.plot(stimuli,stats.norm(loc = i[0], scale=i[1]).cdf(stimuli),alpha=0.01,color="r")
#plot ground truth
plt.plot(stimuli,stats.norm(loc = mu, scale=sigma).cdf(stimuli),color="b")
plt.show()

In [None]:
iteration_counter = 0
while iteration_counter <20:
    #check mean if criteria is met
    mu_mean = np.mean(model_trace["mn"])
    sd_mean = np.mean(model_trace["sd"])
    
    
    #check if the mu and sigma are within the hitting mark
    #extracting posterior samples
    lambda_list_adaptive = [(model_trace["mn"][i],model_trace["sd"][i]) for i in range(len(model_trace["sd"]))]

    entropy,proposed_stim = Calculate_entropy(lambda_list_adaptive,stimuli)
    iteration_counter += 1
    print(proposed_stim)
    data_set,samples_per_degree = generate_samples(mu,sigma,1,samples_per_degree,proposed_stim,data_set)
    print(data_set)
    print(samples_per_degree)
    
    with model_cdf:
        #fit the new data to the observations
        Y_obs = pm.Binomial("Y_obs"+str(iteration_counter), n=samples_per_degree, p=theta, observed=data_set)
        #trace the posterior
        model_trace = pm.sample(500, cores=4)   # Sampling from the model
    
    
    data_frame_adaptive.loc[iteration_counter] = entropy.tolist() + [mu_mean, sd_mean,proposed_stim]+[lambda_list_adaptive]

In [None]:

lamb_test = [(model_trace["mn"][i],model_trace["sd"][i]) for i in range(len(model_trace["sd"]))]   
for i in lamb_test:
    #plot the line with a 0.01 opacity to ensure viewability
    plt.plot(stimuli,stats.norm(loc = i[0], scale=i[1]).cdf(stimuli),alpha=0.01,color="r")
#plot ground truth
plt.plot(stimuli,stats.norm(loc = mu, scale=sigma).cdf(stimuli),color="b")
plt.show()