In [1]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import scipy.stats as stats
from scipy.stats import logistic
import pymc3 as pm
import theano
import theano.tensor as tt
import arviz as az
from scipy.integrate import quad
from scipy.stats import norm

# Get data for all emotions

In [2]:
# load rating data from pilot study 2
all_data = pd.read_csv("/Users/jadeserfaty/Library/Mobile Documents/com~apple~CloudDocs/code/lido/introspection_task/pilot_study_2_data/pilot_study_2_both_parts_rating_data/rating_data_anxiety.csv")

In [3]:
all_data.head()

## Choose one emotion

In [4]:
# so let's start by testing out anxiety for example
emotion_focus = "anxiety"
all_data_emo = all_data[all_data["emotionName"] == emotion_focus].copy()

# Define functions for bayesian model

In [5]:
def f(v, mu, s):
    return 0.5 * (1 + np.tanh((v - mu) / (2 * s)))

def f_theano(v,mu,s):
    return 0.5 * (1 + tt.tanh((v - mu) / (2 * s)))

def f_inv(v_tilde, mu, s):
    v_tilde = np.clip(v_tilde, 1e-8, 1 - 1e-8) # find another way of doing this but basically will always need to clip because extremeties don't work in this model
    result = mu + s * np.log(v_tilde / (1 - v_tilde))
    return result

# The first derivative of the inverse CDF
def phi_prime(v_tilde, s):
    return s / (v_tilde * (1 - v_tilde))

# The second derivative of the inverse CDF
def phi_double_prime(v_tilde, s):
    return s * (2 * v_tilde - 1) / (v_tilde ** 2 * (1 - v_tilde) ** 2)

def f_prime_theano(v, mu, s):
    p = f_theano(v, mu, s)
    return p * (1 - p)

def g(v_hat):
    return 1 / (1 + np.exp(-v_hat))

def g_inv_theano(v):
    return tt.log(v / (1 - v))

def g_inv_prime_theano(v):
    return 1 / (v * (1 - v))

# Define the sech function using theano
def sech(x):
    return 2 / (tt.exp(x) + tt.exp(-x))

# Define the log probability function for the logistic distribution
def logp_logistic(v, mu, s):
    # logistic PDF transformed to log-probability
    return tt.log(sech((v - mu) / (2 * s))**2 / (4 * s))

# Adaptation of functions using theanos, with numpy for visualization of prior distribution
def sech_bis(x):
    return 2 / (np.exp(x) + np.exp(-x))

def logp_logistic_bis(v, mu, s):
    return np.log(sech_bis((v - mu) / (2 * s))**2 / (4 * s))

# Fit Bayesian model

In [10]:
# Parameters
delta = 0.05 

In [9]:
all_participants = np.unique(all_data_emo["subject_id"])

In [None]:
posterior_distributions_all_participants = {}

In [48]:
participant_no_convergence = ["yv4dL8QeFhgdGWHbmE7mGTBeWfk2", "yG2DqQEFfTUa8ia9kV3TXMS3Xvh2", "pEws4YaPQLhWeNFNtVVwDIx4vvM2", "il7v5DrkOOUZT4zsi6xGPE74lhC3", "aM1CxLBZisfaOXevMwqsUy7nQE23", "FcfUH1l7WOdZFIhDl2RNobD9bJq1", "HrVEfoH7KFTgCClM7OFCQ2S0Azz1"]
print(len(participant_no_convergence))

In [56]:
# TODO: modify code such that I save for each participant their behavioral normalized data too

In [45]:
for participant in all_participants: 
    if participant not in posterior_distributions_all_participants.keys() and participant not in ["yv4dL8QeFhgdGWHbmE7mGTBeWfk2", "yG2DqQEFfTUa8ia9kV3TXMS3Xvh2", "pEws4YaPQLhWeNFNtVVwDIx4vvM2", "il7v5DrkOOUZT4zsi6xGPE74lhC3", "aM1CxLBZisfaOXevMwqsUy7nQE23", "FcfUH1l7WOdZFIhDl2RNobD9bJq1", "HrVEfoH7KFTgCClM7OFCQ2S0Azz1"]:
        print(participant)
        # we're fitting the model for each emotion and for each participant
        participant_emo = all_data_emo[all_data_emo["subject_id"] == participant].copy()

        # Extract the number of videos 
        num_videos = len(participant_emo)

        # Define the parameters for the prior distribution
        # Normalize the 'average_rating' to a 0-1 scale
        participant_emo.loc[:, 'normalized_rating_phase1'] = (participant_emo['rating_phase1'] - 1) / (7 - 1)
        participant_emo.loc[:, 'normalized_rating_phase2'] = (participant_emo['rating_phase2'] - 1) / (7 - 1)

        # Compute the average of the normalized ratings
        participant_emo.loc[:, 'normalized_average_rating'] = participant_emo.loc[:, ['normalized_rating_phase1', 'normalized_rating_phase2']].mean(axis=1)
        participant_emo.loc[:, 'normalized_variance_rating'] = participant_emo.loc[:, ['normalized_rating_phase1', 'normalized_rating_phase2']].std(axis=1)

        # Plot of 'normalized_average_rating' so normalized average ratings on anxiety for participant of focus 
        plt.figure(figsize=(10, 6))
        plt.hist(participant_emo['normalized_average_rating'], bins=10, color='skyblue', edgecolor='black')
        plt.title('Histogram of Normalized Average Ratings')
        plt.xlabel('Normalized Average Rating')
        plt.ylabel('Frequency')
        plt.show()

        # Calculate new parameters on the normalized scale
        mu_empirical = participant_emo['normalized_average_rating'].mean()
        s_empirical = participant_emo['normalized_average_rating'].std()

        print("Estimated Prior Mean:", mu_empirical)
        print("Estimated Prior Standard Deviation:", s_empirical)

        # Plot priors for visualization 
        # Define the range for v
        v_values = np.linspace(0, 1, 100)

        # Plot Uniform distribution for v
        plt.figure(figsize=(12, 4))
        plt.subplot(1, 3, 1)
        plt.plot(v_values, np.ones_like(v_values), label='Uniform(0,1)')
        plt.title('Uniform Prior for v')
        plt.xlabel('v')
        plt.ylabel('Probability Density')
        plt.legend()

        # Plot HalfNormal distribution for standard deviations
        sigma_values = np.linspace(0, 3 * s_empirical, 100)
        plt.subplot(1, 3, 2)
        plt.plot(sigma_values, (np.sqrt(2/np.pi) * np.exp(-0.5 * (sigma_values/s_empirical)**2)), label=f'HalfNormal({np.round(s_empirical,3)})')
        plt.title('HalfNormal Prior for Standard Deviations')
        plt.xlabel('sigma')
        plt.ylabel('Probability Density')
        plt.legend()

        plt.subplot(1, 3, 3)
        logistic_prior = logp_logistic_bis(v_values, mu_empirical, s_empirical)
        plt.plot(v_values, np.exp(logistic_prior), label=f'Logistic Prior\nmu={np.round(mu_empirical,3)}, s={np.round(s_empirical,3)}')
        plt.title('Custom Logistic Prior')
        plt.xlabel('v')
        plt.ylabel('Log Probability Density')
        plt.legend()

        plt.tight_layout()
        plt.show()

        if __name__ == '__main__':
            # Bayesian model Setup
            with pm.Model() as model:
                # Prior distributions for parameters
                mu = pm.Normal('mu', mu=mu_empirical, sigma=s_empirical)
                s = pm.HalfNormal('s', sigma=s_empirical)
                sigma = pm.HalfNormal('sigma', sigma=1)
                sigma_ext = pm.HalfNormal('sigma_ext', sigma=1)

                # Observations
                # Convert simulated_noisy_ratings from a NumPy array to a Theano tensor by using it as observed data
                observed_noisy_ratings = pm.Data("observed_noisy_ratings", participant_emo['normalized_average_rating'])

                # Define the latent variable v with a logistic prior
                v = pm.Uniform('v', 0, 1, shape=num_videos)  # Starting with a Uniform prior for simplicity
                # Add the custom logistic prior using a Potential
                pm.Potential('v_prior_potential', logp_logistic(v, mu, s))

                # Likelihood function components
                phi_prime_val = phi_prime(v, s)
                phi_double_prime_val = phi_double_prime(v, s)
                F_prime_v0_val = f_prime_theano(v, mu, s)
                g_inv_prime_val = g_inv_prime_theano(observed_noisy_ratings)

                # The mean and variance for the likelihood normal distribution
                mean_likelihood = v + phi_double_prime_val * sigma**2
                sd_likelihood = tt.sqrt((phi_prime_val**2) * sigma**2 + sigma_ext)

                # Define the observed likelihood
                observed = pm.Normal('observed',
                                     mu=mean_likelihood,
                                     sigma=sd_likelihood,
                                     observed=observed_noisy_ratings)

                # Include the multiplicative factors in the likelihood using a Potential
                # The Potential object is used to add unnormalized log-probability terms to the model's joint density. 
                # These terms are often used for including domain-specific constraints or additional likelihood terms 
                # that don't conform to standard probability distributions.
                # Ensure that F_prime_v0 and g_inv_prime are theano tensors to enable automatic differentiation
                likelihood_factors = pm.Potential('likelihood_factors',
                                                  tt.log(F_prime_v0_val) +
                                                  tt.log(g_inv_prime_val))


                # Sampling from the posterior
                trace = pm.sample(2000, tune=1000, cores=2, target_accept=0.95, return_inferencedata=False)

                # Check the convergence
                # visualize the trace of each parameter to check for convergence visually
                pm.plot_trace(trace)
                # provide a statistical summary of the trace (mean, standard deviation, quantiles, etc.). 
                summary_stats = pm.summary(trace).round(2)

                # Save posterior distributions for particpant
                posterior_distributions_all_participants[participant] = {}
                posterior_distributions_all_participants[participant]["summary_stats"] = summary_stats

                # The Gelman-Rubin diagnostic (or Rhat)
                # Assesses the convergence of the MCMC chains by comparing the variance between chains to the variance within chains. 
                # A value close to 1 (typically, 1.01 or lower) indicates good convergence.
                gelman_rubin = az.rhat(trace)
                print("Gelman-Rubin Diagnostic:")
                print(gelman_rubin)

                # Effective Sample Size
                # Measures the number of independent-like samples in the chain. 
                # Higher values are better, as they indicate more information and less autocorrelation.
                effective_sample_size = az.ess(trace)
                print("Effective Sample Size:")
                print(effective_sample_size)

                # Convergence Diagnostics
                # checks for autocorrelation within each parameter's trace. 
                # High autocorrelation can indicate slow mixing and might suggest that more tuning 
                # or a higher number of iterations are needed during sampling.
                az.plot_autocorr(trace)

                # Posterior Predictive Checks
                # Generates new data points from the posterior distribution of the model parameters to assess 
                # how well the model could have predicted the observed data. 
                # This is crucial for validating the model's predictive power.
                ppc = pm.sample_posterior_predictive(trace, var_names=["observed"])

                # We'll plot this once we do the adjustments for the predictions
    #             az.plot_ppc(az.from_pymc3(posterior_predictive=adjusted_ppc, model=model))

                # Extract the predicted data
                predicted_data = ppc['observed']
                posterior_distributions_all_participants[participant]["predicted_data"] = predicted_data

In [49]:
print(len(posterior_distributions_all_participants))

In [58]:
for participant in posterior_distributions_all_participants.keys():
    
    participant_emo = all_data_emo[all_data_emo["subject_id"] == participant].copy()

    # Define the parameters for the prior distribution
    # Normalize the 'average_rating' to a 0-1 scale
    participant_emo.loc[:, 'normalized_rating_phase1'] = (participant_emo['rating_phase1'] - 1) / (7 - 1)
    participant_emo.loc[:, 'normalized_rating_phase2'] = (participant_emo['rating_phase2'] - 1) / (7 - 1)

    # Compute the average of the normalized ratings
    participant_emo.loc[:, 'normalized_average_rating'] = participant_emo.loc[:, ['normalized_rating_phase1', 'normalized_rating_phase2']].mean(axis=1)     
        
    fig = plt.figure()
    plt.hist(participant_emo["normalized_average_rating"], label= "behavioral")
    plt.hist(posterior_distributions_all_participants[participant]["summary_stats"]['mean'][4:], label = "posterior")
    plt.title(participant)
    plt.legend()

# Modeling choice

In [61]:
# Define the integrands for mu and sigma^2 calculations
def mu_integrand(v, v0, sigma, sigma_ext ,s):
    phi_prime_val = phi_prime(v , s)
    phi_double_prime_val =  phi_double_prime(v, s)
    density = norm.pdf(v, loc=v0 + phi_double_prime_val * sigma**2, scale=np.sqrt(phi_prime_val**2 * sigma**2 + sigma_ext))
    return g(v) * density

def sigma_integrand(v, v0, mu_v_hat, sigma, sigma_ext, s):
    phi_prime_val = phi_prime(v, s)
    phi_double_prime_val =  phi_double_prime(v, s)
    density = norm.pdf(v, loc=v0 + phi_double_prime_val * sigma**2, scale=np.sqrt(phi_prime_val**2 * sigma**2 + sigma_ext))
    return (g(v) - mu_v_hat)**2 * density

# Calculate mu_v_hat and sigma_v_hat^2 for a given v0
def calculate_mu_sigma(v0, sigma, sigma_ext, s):
    # Calculate mu_v_hat
    mu_v_hat, _ = quad(mu_integrand, -np.inf, np.inf, args=(v0, sigma, sigma_ext, s))
    
    # Calculate sigma_v_hat^2 using mu_v_hat
    sigma_v_hat_squared, _ = quad(sigma_integrand, -np.inf, np.inf, args=(v0, mu_v_hat, sigma, sigma_ext, s))
    
    return mu_v_hat, sigma_v_hat_squared

In [62]:
# Define the sequence of subjective values v_hat on the rating scale (1 to 7 with step 0.01)
v_hat_sequence = np.arange(1, 7.01, 0.01)

In [30]:
# Placeholder for storing mu_v_hat and var_v_hat
mu_v_hat_values = np.zeros_like(v_hat_sequence)
var_v_hat_values = np.zeros_like(v_hat_sequence)

# Calculate mu_v_hat and var_v_hat for each v_hat in the sequence
for i, v_hat in enumerate(v_hat_sequence):
    mu_v_hat, var_v_hat = calculate_mu_sigma(v0 = v_hat, sigma = sigma_estimate, sigma_ext = sigma_ext_estimate, s = s_estimate)
    mu_v_hat_values[i] = mu_v_hat
    var_v_hat_values[i] = var_v_hat

In [32]:
var_v_hat_values

In [29]:
# Function to calculate choice consistency for a given pair of stimuli
def choice_consistency(mu_v_hat, var_v_hat, mu_v_hat_plus_delta, var_v_hat_plus_delta, mu_v_hat_min_delta, var_v_hat_min_delta):
    # Calculating choice consistency using the provided formula
    # This part involves calculating the cumulative distribution function values
    # for the normal distribution based on the differences in expected values and variances.
    consistency = 0.5 * (norm.cdf((mu_v_hat_plus_delta - mu_v_hat) / np.sqrt(var_v_hat_plus_delta - var_v_hat)) +
                         norm.cdf((mu_v_hat_min_delta - mu_v_hat) / np.sqrt(var_v_hat_min_delta - var_v_hat)))
    return consistency

In [None]:
# Example calculation for a pair of stimuli
# This is a simplified example; in practice, you would apply this to all pairs of stimuli in your choice task.
v_hat_1, v_hat_2 = 3, 5  # Example subjective values for two stimuli
v_hat_1_plus_delta, v_hat_2_plus_delta = v_hat_1 + delta, v_hat_2 + delta
v_hat_1_min_delta, v_hat_2_min_delta = v_hat_1 - delta, v_hat_2 - delta

mu_v_hat_1, var_v_hat_1 = calculate_mu_var(v_hat_1, lambda_param, sigma, sigma_ext)
mu_v_hat_2, var_v_hat_2 = calculate_mu_var(v_hat_2, lambda_param, sigma, sigma_ext)

mu_v_hat_1_plus_delta, var_v_hat_1_plus_delta = calculate_mu_var(v_hat_1_plus_delta, lambda_param, sigma, sigma_ext)
mu_v_hat_2_plus_delta, var_v_hat_2_plus_delta = calculate_mu_var(v_hat_2_plus_delta, lambda_param, sigma, sigma_ext)

mu_v_hat_1_min_delta, var_v_hat_1_min_delta = calculate_mu_var(v_hat_1_min_delta, lambda_param, sigma, sigma_ext)
mu_v_hat_2_min_delta, var_v_hat_2_min_delta = calculate_mu_var(v_hat_2_min_delta, lambda_param, sigma, sigma_ext)


consistency_1 = choice_consistency(mu_v_hat_1, var_v_hat_1, mu_v_hat_1_plus_delta, var_v_hat_1_plus_delta, mu_v_hat_1_min_delta, var_v_hat_1_min_delta)
consistency_2 = choice_consistency(mu_v_hat_2, var_v_hat_2, mu_v_hat_2_plus_delta, var_v_hat_2_plus_delta, mu_v_hat_2_min_delta, var_v_hat_2_min_delta)

print(f"Choice consistency for stimulus 1: {consistency_1}")
print(f"Choice consistency for stimulus 2: {consistency_2}")

# Model repulsion

In [66]:
# Define the rate parameter for the exponential distribution
mu = 0.5  # Mean value for the prior
s = 0.1   # Scale value for the prior
# Set your noise levels
sigma = 0.01  # Start with a small amount of internal noise # The encoding noise
sigma_ext = 0.01  # Start with a small amount of external noise # The external noise after decoding
delta = 0.05 
num_videos = 30  # Assuming you have 30 videos per emotion
# num_participants = 100  # Number of participants

In [67]:
# ATTENTION RE RUN mu and s definition otherwise they are theano form 

# Different levels of internal noise to simulate
sigma_levels = [0.05, 0.1, 0.2,0.25,0.3, 0.4,0.45, 0.5]  # Different levels of internal noise

# Placeholder for results
results = []

# Loop over different levels of internal noise
for sigma in sigma_levels:
    # Generate true emotion intensities
    true_emotion_intensities = logistic.rvs(loc=mu, scale=s, size=num_videos)

    # Simulate the encoding process with internal noise
    encoded_responses = f(true_emotion_intensities, mu, s) + np.random.normal(0, sigma, size=num_videos)
    
    # Apply the decoding process
    decoded_estimates = f_inv(encoded_responses, mu, s)

    # Add external noise to the decoded estimates
    observed_noisy_ratings = decoded_estimates + np.random.normal(0, sigma_ext, size=num_videos)
    
    # Calculate bias as the difference between observed noisy ratings and true emotion intensities
    bias = observed_noisy_ratings - true_emotion_intensities
    
    # Store results
    results.append((sigma, true_emotion_intensities, observed_noisy_ratings, bias))

In [68]:
# Plotting
plt.figure(figsize=(14, 8))

for i, (sigma, true_intensities, noisy_ratings, bias) in enumerate(results):
    plt.subplot(2, 4, i+1)
    plt.scatter(true_intensities, bias, alpha=0.2)
    plt.title(f'Internal Noise Level: {sigma}')
    plt.xlabel('True Emotion Intensity')
    plt.ylabel('Bias (Noisy Rating - True Intensity)')
    plt.axhline(0, color='grey', linestyle='--')

plt.tight_layout()
plt.show()

In [69]:
# Plotting
plt.figure(figsize=(14, 8))

for i, (sigma, true_intensities, noisy_ratings, bias) in enumerate(results):
    # Sort the true intensities and corresponding biases
    sorted_indices = np.argsort(true_intensities)
    sorted_true_intensities = true_intensities[sorted_indices]
    sorted_bias = bias[sorted_indices]

    plt.subplot(2, 4, i+1)
    plt.plot(sorted_true_intensities, sorted_bias, '-o', markersize=3, alpha=0.6)  # Line plot with small markers
    plt.title(f'Internal Noise Level: {sigma}')
    plt.xlabel('True Emotion Intensity')
    plt.ylabel('Bias (Noisy Rating - True Intensity)')
    plt.axhline(0, color='grey', linestyle='--')

plt.tight_layout()
plt.show()

In [70]:
import numpy as np
import matplotlib.pyplot as plt

plt.figure(figsize=(14, 8))

degree = 3  # Degree of the polynomial fit, adjust based on your data

for i, (sigma, true_intensities, noisy_ratings, bias) in enumerate(results):
    # Sorting the true_intensities and corresponding bias for plotting
    sorted_indices = np.argsort(true_intensities)
    sorted_intensities = true_intensities[sorted_indices]
    sorted_bias = bias[sorted_indices]
    
    plt.subplot(2, 4, i+1)
    plt.scatter(sorted_intensities, sorted_bias, alpha=0.2)
    
    # Fit a polynomial regression for trend line
    coeffs = np.polyfit(sorted_intensities, sorted_bias, degree)
    polynomial = np.poly1d(coeffs)
    trendline = polynomial(sorted_intensities)
    
    plt.plot(sorted_intensities, trendline, color='red')  # Trend line
    
    plt.title(f'Internal Noise Level: {sigma}')
    plt.xlabel('True Emotion Intensity')
    plt.ylabel('Bias (Noisy Rating - True Intensity)')
    plt.axhline(0, color='grey', linestyle='--')

plt.tight_layout()
plt.show()
