In [3]:
# Setup

import ultimate_sleuthbuilder as usb

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

import os
import concurrent.futures

# Set parameters
N = 32 # Sequence length
sample_size = 100 # Sample size
trials = 1000 # Number of trials

test_string = "00111110010100001110010000111110"


In [None]:
# Generate sampling distribution of p-values (single core)
usb.set_use_db(True)
statistics_df = usb.get_statistics(N)

mean_p_values = []
for trial in range(trials):
    # Generate a sample of n random sequences of length N
    sequences = [''.join(np.random.choice(['0', '1'], N)) for _ in range(sample_size)]
    p_values = []
    for seq in sequences:
        sample_stats = usb.analyze_sequence(seq, statistics_df)
        p_values.append(sample_stats['p_value'])
    # sample_df = usb.analyze_sequence_set(sequences)
    mean_p_values.append(np.mean(p_values))

# mean_p_values = np.array(mean_p_values)
sampling_distribution = pd.DataFrame(mean_p_values, columns=['Mean P-value'])

# Save to csv
sampling_distribution.to_csv('data/mean_p_values_v1000.csv', index=False)

In [5]:
# Generate sampling distribution of p-values (multi-core)

print(f'Number of cores available: {os.cpu_count()}')

usb.set_use_db(True)
statistics_df = usb.get_statistics(N)
# usb.set_use_db(False)

# Top-level function to run a single trial (wrapper for ProcessPoolExecutor)
def perform_trial(args):
    sample_size, N = args
    sequences = [''.join(np.random.choice(['0', '1'], N)) for _ in range(sample_size)]
    p_values = []
    for seq in sequences:
        sample_stats = usb.analyze_sequence(seq, statistics_df)
        p_values.append(sample_stats['p_value'])
    # sample_df = usb.analyze_sequence_set(sequences)
    # return sample_df['p_value'].mean()
    return np.mean(p_values)

# Function to run all trials
def run_trials(trials, sample_size, N):
    with concurrent.futures.ProcessPoolExecutor() as executor:
        mean_p_values = list(executor.map(perform_trial, [(sample_size, N) for _ in range(trials)]))
    return mean_p_values

# Run trials
mean_p_values = run_trials(trials, sample_size, N)

# Save results to csv
sampling_distribution = pd.DataFrame(mean_p_values, columns=['Mean P-value'])
sampling_distribution.to_csv('data/mean_p_values.csv', index=False)


Number of cores available: 4


In [None]:
# Calculate margin of error for various confidence levels

db_path = usb.get_db_path()  # Get the path to the database
key = usb.get_db_key('summary/p_value')

z_scores = {0.90 : 1.645, 0.95 : 1.960, 0.99 : 2.576, 0.999 : 3.291}
confidence_levels = z_scores.keys()
margin_of_errors = {level : [] for level in confidence_levels}
for level in confidence_levels:
    # Open the hd5 database at usb.get_db_path()

    with pd.HDFStore(db_path, mode='r') as store:  # Open the store in append mode
        summary_df = store[key]
        
        # Get standard deviation for sequences of length N
        std_dev = summary_df.loc[N, 'std_dev']
        margin_of_error = z_scores[level] * std_dev / np.sqrt(sample_size)
        margin_of_errors[level].append(margin_of_error)

print('Margin of Error for Various Confidence Levels')
for level in confidence_levels:
    print(f'{level} confidence level: {margin_of_errors[level]}')