In [1]:
import plotly.graph_objects as go
import numpy as np
from plotly.subplots import make_subplots
import plotly.graph_objects as go
import plotly.express as px
import plotly.io as pio
pio.renderers.default = "notebook"
figs_folder = '/Users/alfredoparra/Library/CloudStorage/GoogleDrive-parra.h.alfredo@gmail.com/My Drive/My Documents/Career & Work/QRI/2024-08 Cluster headaches/Figures/'


In [3]:
def transform_intensity(intensities, method='linear', max_value=100, power=2, base=np.e, scaling_factor=1.0):
    if method == 'linear':
        return intensities * (max_value / 10)
    
    elif method == 'piecewise_linear':
        breakpoint = 8
        lower_slope = (max_value / 2) / breakpoint
        upper_slope = (max_value / 2) / (10 - breakpoint)
        return np.where(intensities <= breakpoint,
                        lower_slope * intensities,
                        (max_value / 2) + upper_slope * (intensities - breakpoint))
    
    elif method == 'power':
        return (intensities / 10)**power * max_value
    
    elif method == 'exponential':
        return (base**(intensities/scaling_factor) - 1) * (max_value / (base**(10/scaling_factor) - 1))
    else:
        raise ValueError("Invalid method.")

def plot_intensity_transformations(max_value=1):
    intensities = np.linspace(0, 10, 101)  # 0 to 10 in steps of 0.1

    fig = go.Figure()
    
    methods = ['linear', 'power', 'exponential']
    labels = ['Linear', 'Power', 'Exponential']
    colors = ['rgb(31, 119, 180)', 'rgb(255, 127, 14)', 'rgb(44, 160, 44)']
    markers = ['circle', 'square', 'diamond']
    
    for method, label, color, marker in zip(methods, labels, colors, markers):
        transformed = transform_intensity(intensities, method=method, max_value=max_value, power=2)
        fig.add_trace(go.Scatter(
            x=intensities,
            y=transformed,
            mode='lines+markers',
            name=label,
            line=dict(color=color, width=2),
            marker=dict(
                color=color,
                size=[10 if x.is_integer() else 0 for x in intensities],
                symbol=marker,
                line=dict(width=2, color='white')  # Add a white border to markers
            )
        ))

    fig.update_layout(
        xaxis_title='Original pain intensity',
        yaxis_title='Transformed intensity (weight)',
        template='plotly_white',
        width=500,
        height=500,
        legend=dict(
            orientation="v",
            yanchor="top",
            y=.9,
            xanchor="left",
            x=0.1,
            itemsizing='constant',  # This ensures all legend items are the same size
            font=dict(size=12),
            bordercolor="Grey",
            borderwidth=1,
        ),
        xaxis=dict(
            range=[0, 10],
            showgrid=True,
            gridwidth=1,
            gridcolor='rgba(0, 0, 0, 0.1)',
            color='black',
            linecolor='black',
            ticks='outside',
            tickcolor='black',
            ticklen=6,
            tickmode='array',
            tickvals=list(range(11)),  # 0 to 10
            ticktext=[str(i) for i in range(11)],
        ),
        yaxis=dict(
            range=[0, 1],
            tickmode='array',
            tickvals=[i/10 for i in range(11)],  # 0 to 1 in steps of 0.1
            ticktext=['0'].append([f'{i/10:.1f}' for i in range(1,11)]),
            showgrid=True,
            gridwidth=1,
            gridcolor='rgba(0, 0, 0, 0.1)',
            color='black',
            linecolor='black',
            ticks='outside',
            tickcolor='black',
            ticklen=6,
        ),
        margin=dict(l=50, r=50, t=10, b=50),

    )

    fig.show()
    fig.write_image(figs_folder + "Intensity-weights.png", scale=4)
    fig.write_image(figs_folder + "Intensity-weights.svg")

# Call the function
plot_intensity_transformations(max_value=1)

In [3]:
from scipy.stats import truncnorm, gaussian_kde

def generate_max_pain_intensity(is_treated, size, weight_study_1=0.5, weight_severe=0.8):
    def discretize(values, bins):
        return np.digitize(values, bins) * 0.1

    if not is_treated:
        # Data for untreated patients
        data1 = np.array([9.5, 7.5, 5.5, 3.5, 1.5])  # Study 1 (Russell)
        freq1 = np.array([23, 17, 20, 5, 12])
        data2 = np.array([9.5, 8.5, 7.5, 6.5])  # Study 2 (Torelli & Manzoni)
        freq2 = np.array([29, 7, 3, 3])
        weight_study_2 = 1 - weight_study_1
        n_severe = int(np.round(size * weight_severe))
        n_mild = size - n_severe
        # Calculate weighted mean
        mean1 = np.average(data1, weights=freq1)
        mean2 = np.average(data2, weights=freq2)
        mean_severe = mean1 * weight_study_1 + mean2 * weight_study_2

        # Calculate weighted standard deviation
        variance1 = np.average((data1 - mean1)**2, weights=freq1)
        variance2 = np.average((data2 - mean2)**2, weights=freq2)
        std_severe = np.sqrt(variance1 * weight_study_1 + variance2 * weight_study_2)

        # Parameters for milder attacks (similar to your treated patient approach)
        mean_mild = mean_severe * 0.7
        std_mild = std_severe * 1.0

        # Truncation bounds
        lower, upper = 0, 10

        # Generate samples for severe attacks
        a_severe, b_severe = (lower - mean_severe) / std_severe, (upper - mean_severe) / std_severe
        severe_samples = truncnorm.rvs(a_severe, b_severe, loc=mean_severe, scale=std_severe, size=n_severe)

        # Generate samples for mild attacks
        a_mild, b_mild = (lower - mean_mild) / std_mild, (upper - mean_mild) / std_mild
        mild_samples = truncnorm.rvs(a_mild, b_mild, loc=mean_mild, scale=std_mild, size=n_mild)

        # Combine the samples
        continuous_samples = np.concatenate([severe_samples, mild_samples])        
    else:
        # Parameters for treated patients (truncated normal distribution, Snoer data)
        weight_severe = weight_severe * 0.8
        n_severe = int(np.round(size * weight_severe))
        n_mild = size - n_severe
        median_severe = 7.3
        q1_severe, q3_severe = 5.9, 8.7
        mean_severe = median_severe
        std_severe = (q3_severe - q1_severe) / 1.34  # Approximate std from IQR

        # Parameters for milder attacks
        mean_mild = mean_severe * 0.5
        std_mild = std_severe * 1.0

        lower, upper = 0, 10
        
        # Generate samples for severe attacks
        a_severe, b_severe = (lower - mean_severe) / std_severe, (upper - mean_severe) / std_severe
        severe_samples = truncnorm.rvs(a_severe, b_severe, loc=mean_severe, scale=std_severe, size=n_severe)
        
        # Generate samples for mild attacks
        a_mild, b_mild = (lower - mean_mild) / std_mild, (upper - mean_mild) / std_mild
        mild_samples = truncnorm.rvs(a_mild, b_mild, loc=mean_mild, scale=std_mild, size=n_mild)
        
        # Combine the samples
        continuous_samples = np.concatenate([severe_samples, mild_samples])

    # Discretize to 0.1 steps
    bins = np.arange(0, 10.1, 0.1)
    intensities = discretize(continuous_samples, bins)

    return intensities

from scipy.optimize import minimize

# Generate data
np.random.seed(42)
size = 10000
measured_data = generate_max_pain_intensity(is_treated=False, size=size)

# Calculate the kernel density estimation for measured data
x_range = np.linspace(0, 10, 200)
x_range_extended = np.linspace(0, 16, 280)
kde_measured = gaussian_kde(measured_data)
y_measured = kde_measured(x_range)

# Parameters for theoretical distribution
weight_severe = 0.8
mean_severe = 7.5
std_severe = 2.0
mean_mild = mean_severe * 0.7
std_mild = std_severe * 0.8

# Generate theoretical distribution
y_theoretical = (weight_severe * np.exp(-0.5 * ((x_range_extended - mean_severe)/std_severe)**2) / (std_severe * np.sqrt(2*np.pi)) +
                (1-weight_severe) * np.exp(-0.5 * ((x_range_extended - mean_mild)/std_mild)**2) / (std_mild * np.sqrt(2*np.pi)))
y_theoretical = y_theoretical / np.sum(y_theoretical) * len(x_range_extended) * 0.05

# Reconstruct distribution using single Gaussian
cutoff = 9.5
data_below_cutoff = measured_data[measured_data < cutoff]

# Fit Gaussian to data below cutoff
mu_fit = np.mean(data_below_cutoff)
sigma_fit = np.std(data_below_cutoff)

from scipy.stats import truncnorm
from scipy.optimize import minimize

def truncated_normal_log_likelihood(params):
    mu, sigma = params
    a = (0 - mu) / sigma  # lower bound
    b = (cutoff - mu) / sigma  # upper bound
    return -np.sum(truncnorm.logpdf(data_below_cutoff, a, b, mu, sigma))

# Initial guess using uncorrected estimates
initial_params = [mu_fit, sigma_fit]

# Fit truncated normal to data below cutoff
result = minimize(truncated_normal_log_likelihood, initial_params, method='Nelder-Mead')
mu_corrected, sigma_corrected = result.x

# Generate reconstructed distribution using corrected parameters
y_reconstructed = np.exp(-0.5 * ((x_range_extended - mu_corrected)/sigma_corrected)**2) / (sigma_corrected * np.sqrt(2*np.pi))
y_reconstructed = y_reconstructed / np.sum(y_reconstructed) * len(x_range_extended) * 0.05

# Create the plot
fig = go.Figure()

# Create arrays for markers at integer x values
x_markers = np.arange(0, 11)  # Integer values 0 to 10
x_markers_extended = np.arange(0, 17)  # Integer values 0 to 16

# Get y values for markers by interpolating from the existing distributions
from scipy.interpolate import interp1d

# Create interpolation functions
f_measured = interp1d(x_range, y_measured)
f_theoretical = interp1d(x_range_extended, y_theoretical)
f_reconstructed = interp1d(x_range_extended, y_reconstructed)

# Get y values for markers
y_measured_markers = f_measured(x_markers)
y_theoretical_markers = f_theoretical(x_markers_extended)
y_reconstructed_markers = f_reconstructed(x_markers_extended)

# Add traces
fig.add_trace(go.Scatter(
    x=x_range,
    y=y_measured,
    name='Measured',
    mode='lines',
    line=dict(color='rgb(31, 119, 180)', width=2)
))

fig.add_trace(go.Scatter(
    x=x_markers,
    y=y_measured_markers,
    mode='markers',
    marker=dict(symbol='circle', size=6, color='rgb(31, 119, 180)'),
    showlegend=False
))

fig.add_trace(go.Scatter(
    x=x_range_extended,
    y=y_theoretical,
    name='Theoretical',
    mode='lines',
    line=dict(color='rgb(255, 127, 14)', width=2)
))

fig.add_trace(go.Scatter(
    x=x_markers_extended,
    y=y_theoretical_markers,
    mode='markers',
    marker=dict(symbol='square', size=6, color='rgb(255, 127, 14)'),
    showlegend=False
))

fig.add_trace(go.Scatter(
    x=x_range_extended,
    y=y_reconstructed,
    name='Reconstructed',
    mode='lines',
    line=dict(color='rgb(44, 160, 44)', width=2)
))

fig.add_trace(go.Scatter(
    x=x_markers_extended,
    y=y_reconstructed_markers,
    mode='markers',
    marker=dict(symbol='diamond', size=6, color='rgb(44, 160, 44)'),
    showlegend=False
))

# Update layout
fig.update_layout(
    template='plotly_white',
    width=550,
    height=500,
    xaxis=dict(
        title='Pain Intensity',
        range=[0, 16],
        showgrid=True,
        gridwidth=1,
        gridcolor='rgba(0, 0, 0, 0.1)',
        color='black',
        linecolor='black',
        ticks='outside',
        tickcolor='black',
        ticklen=6,
        dtick=1
    ),
    yaxis=dict(
        title='Density',
        showgrid=True,
        gridwidth=1,
        gridcolor='rgba(0, 0, 0, 0.1)',
        color='black',
        linecolor='black',
        ticks='outside',
        tickcolor='black',
        ticklen=6
    ),
    legend=dict(
        orientation="v",
        yanchor="top",
        y=0.95,
        xanchor="left",
        x=0.01,
        bgcolor='rgba(255, 255, 255, 0.8)',
        bordercolor="Grey",
        borderwidth=1
    ),
    margin=dict(l=20, r=50, t=20, b=50)
)

# Add vertical lines
fig.add_vline(x=10, line_dash="dash", line_color="grey")

fig.show()
#fig.write_image(figs_folder + "Ceiling-effect.png", scale=4)

# For measured distribution (need to account for the ceiling effect)
measured_mean = np.mean(measured_data)
measured_median = np.median(measured_data)
measured_sd = np.std(measured_data)

# For theoretical distribution (use the continuous values)
dx = x_range_extended[1] - x_range_extended[0]
theoretical_mean = np.sum(x_range_extended * y_theoretical) * dx / np.sum(y_theoretical * dx)
theoretical_median = x_range_extended[np.argmin(np.abs(np.cumsum(y_theoretical) * dx - 0.5 * np.sum(y_theoretical * dx)))]
theoretical_sd = np.sqrt(np.sum((x_range_extended - theoretical_mean)**2 * y_theoretical) * dx / np.sum(y_theoretical * dx))

print(f"Measured Distribution:      Mean = {measured_mean:.2f}, Median = {measured_median:.2f}, SD = {measured_sd:.2f}")
print(f"Theoretical Distribution:   Mean = {theoretical_mean:.2f}, Median = {theoretical_median:.2f}, SD = {theoretical_sd:.2f}")
print(f"Reconstructed Distribution: Mean = {mu_corrected:.2f}, Median = {mu_corrected:.2f}, SD = {sigma_corrected:.2f}")

Measured Distribution:      Mean = 6.83, Median = 7.00, SD = 1.88
Theoretical Distribution:   Mean = 7.05, Median = 7.00, SD = 2.12
Reconstructed Distribution: Mean = 7.27, Median = 7.27, SD = 2.25


In [4]:
import numpy as np
import plotly.graph_objects as go
from scipy.stats import norm
from scipy.optimize import minimize

# Parameters
mu_true = 7.5
sigma_true = 2
n_samples = 1000
fraction_of_tens = .01  # Fraction of ALL values that should be 10
measurement_noise = 0.5  # Standard deviation of measurement noise

# Generate true scores from the theoretical distribution
true_scores = np.random.normal(mu_true, sigma_true, n_samples)

# Add measurement noise
measured_scores = true_scores + np.random.normal(0, measurement_noise, n_samples)

# Determine how many values should be 10s
n_tens = int(n_samples * fraction_of_tens)

# Sort scores and set the highest n_tens scores to 10
sorted_indices = np.argsort(measured_scores)
measured_scores[sorted_indices[-n_tens:]] = 10

# Clip remaining values to [0, 10] range
measured_scores = np.clip(measured_scores, 0, 10)

# Create histogram of measured data (without density normalization)
bin_edges = np.arange(-0.25, 11.25, 0.5)  # Start at -0.25 to center the first bin at 0
hist, _ = np.histogram(measured_scores, bins=bin_edges)
bin_centers = np.arange(0, 11, 0.5)  # Direct centers at multiples of 0.5

# Reconstruction process
cutoff = 0
filtered_data = measured_scores[measured_scores >= cutoff]
filtered_data = filtered_data[filtered_data < 10]  # Also remove 10s

# Estimate parameters using MLE on filtered data
def neg_log_likelihood(params, data):
    mu, sigma = params
    return -np.sum(norm.logpdf(data, mu, sigma))

initial_guess = [10, 2]
result = minimize(neg_log_likelihood, initial_guess, args=(filtered_data,), method='Nelder-Mead')
mu_est, sigma_est = result.x

# Generate x ranges for plotting
x_range = np.linspace(0, 16, 200)

# Calculate theoretical and reconstructed distributions
# Scale the continuous distributions to match histogram counts
bin_width = bin_edges[1] - bin_edges[0]
scale_factor = n_samples * bin_width
y_theoretical = norm.pdf(x_range, mu_true, sigma_true) * scale_factor
y_reconstructed = norm.pdf(x_range, mu_est, sigma_est) * scale_factor

# Create the plot
fig = go.Figure()

# Add measured distribution (histogram)
fig.add_trace(go.Bar(
    x=bin_centers,
    y=hist,
    name='Measured',
    marker_color='rgb(31, 119, 180)',
    opacity=0.7,
    width=0.5
))

# Add theoretical distribution
fig.add_trace(go.Scatter(
    x=x_range,
    y=y_theoretical,
    name='Theoretical',
    mode='lines',
    line=dict(color='rgb(255, 127, 14)', width=2)
))

# Add reconstructed distribution
fig.add_trace(go.Scatter(
    x=x_range,
    y=y_reconstructed,
    name=f'Reconstructed',
    mode='lines',
    line=dict(color='rgb(44, 160, 44)', width=2)
))

# Update layout
fig.update_layout(
    template='plotly_white',
    width=550,
    height=500,
    xaxis=dict(
        title='Pain intensity',
        range=[0, 16],
        showgrid=True,
        gridwidth=1,
        gridcolor='rgba(0, 0, 0, 0.1)',
        color='black',
        linecolor='black',
        ticks='outside',
        tickcolor='black',
        ticklen=6,
        dtick=1
    ),
    yaxis=dict(
        title='Count',
        showgrid=True,
        gridwidth=1,
        gridcolor='rgba(0, 0, 0, 0.1)',
        color='black',
        linecolor='black',
        ticks='outside',
        tickcolor='black',
        ticklen=6
    ),
    legend=dict(
        orientation="v",
        yanchor="top",
        y=0.95,
        xanchor="left",
        x=0.01,
        bgcolor='rgba(255, 255, 255, 0.8)',
        bordercolor="Grey",
        borderwidth=1
    ),
    margin=dict(l=20, r=50, t=20, b=50)
)

fig.show()
fig.write_image(figs_folder + "Ceiling-effect.png", scale=2)
fig.write_image(figs_folder + "Ceiling-effect.svg")

# Print some statistics
print(f"Number of scores at 10: {np.sum(measured_scores == 10)}")
print(f"Percentage of scores at 10: {np.mean(measured_scores == 10)*100:.1f}%")
print(f"True parameters: μ={mu_true}, σ={sigma_true}")
print(f"Estimated parameters: μ={mu_est:.1f}, σ={sigma_est:.1f}")

Number of scores at 10: 111
Percentage of scores at 10: 11.1%
True parameters: μ=7.5, σ=2
Estimated parameters: μ=7.1, σ=1.7
