# Cluster Size Distribution (HPC)

## Pip install

In [None]:
!pip install dask
!pip install crtoolbox
!pip install numpy
!pip install matplotlib
!pip install distributed

## Imports

In [None]:
# Dask imports
from dask.distributed import Client, as_completed
from dask_jobqueue import SLURMCluster

# Basic imports
import os
import sys
import shutil
import numpy as np
from matplotlib import pyplot as plt
import pandas as pd

# Import supporting functions
from crtoolbox.lib.boundary import *
from crtoolbox.lib.regression import *

# Import bootstrap functions
from crtoolbox.bootstrap import *

# Import data generation
from crtoolbox.tests.generate_2d_data import *

## Simulation Functions

Bootstrap function

In [None]:
def bootstrap(resids,bdry_weights,sigma_bdry_interp_concat,muhat_grad_bdry_interp_concat,n,n_boot=5000):

    # Dimensions of bootstrap variables
    boot_dim = np.array([n, 1, 1]) 

    # Initialize numpy array of zeros to store integral values
    integral_values = np.zeros((1,n_boot))

    # Perform bootstrap
    for b in np.arange(n_boot):

        print('Boot: ' + str(b))

        # Obtain bootstrap variables
        boot_vars = 2*np.random.randint(0,2,boot_dim,dtype="int8")-1

        # ------------------------------------------------------
        # Bootstrap residuals
        # ------------------------------------------------------

        # Multiply by rademacher variables
        boot_resids = boot_vars*resids

        # ------------------------------------------------------
        # Get gi along dalpha FcHat
        # ------------------------------------------------------
        # Sum across subjects to get the bootstrapped a values along
        # the boundary of dalphaFcHat. (Note: For some reason this is 
        # much faster if performed seperately for each of the last rows. 
        # I am still looking into why this is)
        mean_outer = np.sum(boot_resids[...,0], axis=0)/n
        mean_inner = np.sum(boot_resids[...,1], axis=0)/n

        # Get sum of squares
        ssq_outer = np.sum((boot_resids[...,0]-mean_outer)**2, axis=0)/(n-1)
        ssq_inner = np.sum((boot_resids[...,1]-mean_inner)**2, axis=0)/(n-1)

        # Obtain bootstrap G
        boot_G_outer = np.sqrt(n)*mean_outer/np.sqrt(ssq_outer)
        boot_G_inner = np.sqrt(n)*mean_inner/np.sqrt(ssq_inner)

        # Work out weights
        outer_weights = bdry_weights[...,0]
        inner_weights = bdry_weights[...,1]


        # Work out interpolated values
        boot_G_bdry_interp_concat = inner_weights*boot_G_inner + outer_weights*boot_G_outer

        # Get bootstrap estimate of integral
        integral_value = np.sum(sigma_bdry_interp_concat*boot_G_bdry_interp_concat/muhat_grad_bdry_interp_concat)

        # Save integral value
        integral_values[0,b] = integral_value

    # Sort integral values
    integral_values[0,:] = np.sort(integral_values[0,:].reshape(n_boot))

    # Return integral values
    return integral_values

Simulation function

In [None]:
# This function will run n_sim simulations and save the results to a csv file.
def run_simulation(n_sim, n, sim_instance, out_dir,n_boot=5000):

    # Obtain data directory
    out_dir = os.path.join(out_dir, str(n) + 'subjects')
    
    # Obtain data directory
    data_dir = os.path.join(out_dir, 'instance_' + str(sim_instance))

    # Initialize array of zeros of length n_sim for clt_est
    clt_est_store = np.zeros(n_sim)

    # Initialize array of zeros of length n_sim for integral values
    sim_integral_values = np.zeros(n_sim)

    # Initialize array of zeros of size (n_sim,n_boot) for boot integral values
    boot_integral_values = np.zeros((n_sim,n_boot))

    # Empirical cdf
    for i in np.arange(n_sim):

        print('Sim: ' + str(i))
        
        # This command will define a circular smooth signal
        signal = CircleSignal(r=250, fwhm=[100,100], mag=3,dim=[1000,1000])

        # Generate signal
        signal.generate()

        # Get signal
        mu = signal.mu
        
        # Specify the noise 
        noise = Noise(var=1,fwhm=[60,60],n=n,dim=[1000,1000])   
 
        # Threshold c
        c = 2

        # We can generate some 2D data using the generate_data_2D function
        data_files, mu_file = generate_data_2D(signal, noise, out_dir=data_dir)

        # Construct a design matrix with only an intercept (this means that the parameter estimate from the regression will be the mean of the data)
        X = np.ones((n,1))

        # Perform regression to get residuals, muhat, and sigma
        muhat_file, sigma_file, resid_files = regression(data_files, X, out_dir=data_dir)

        # Read in muhat and sigma
        muhat = read_image(muhat_file)
        sigma = read_image(sigma_file)

        # Compute G = (muhat-mu)/(sigma/sqrt(n))
        G = (muhat-mu)/(sigma/np.sqrt(n))

        # Get tau
        tau = 1/np.sqrt(n) 

        # Ac 
        Ac = mu > c

        # AcHat
        AcHat = muhat > c
        
        def thicken_set(binary_map):

            # Generate boundary map
            bdry = get_bdry_map_combined(binary_map)

            # Add bdry to binary map and rethreshold
            binary_map = np.logical_or(binary_map, bdry)

            # Return binary map
            return binary_map

#         # Thicken sets
#         AcHat_thickened = thicken_set(AcHat)
#         Ac_thickened = thicken_set(Ac)

        # Get size of estimated cluster
        est_clus_size = np.sum(AcHat)

        # Get size of true cluster
        true_clus_size = np.sum(Ac)

        # Get estimated clt value
        clt_est = (est_clus_size - true_clus_size)/tau

#         # Get size of estimated cluster thickened
#         est_clus_size_thickened = np.sum(AcHat_thickened)

#         # Get size of true cluster thickened
#         true_clus_size_thickened = np.sum(Ac_thickened)
        
#         # Get estimated clt value thickened
#         clt_est_thickened = (est_clus_size_thickened - true_clus_size_thickened)/tau
        
        # Record clt_est
        clt_est_store[i] = clt_est #+ clt_est_thickened)/2

        # Get boolean maps for the boundary of AcHat
        AcHat_bdry_map = get_bdry_maps(muhat, c)

        # Get coordinates for the boundary of AcHat
        AcHat_bdry_locs = get_bdry_locs(AcHat_bdry_map)

        # Obtain the muhat values along the boundary for AcHat
        muhat_bdry_vals_concat = get_bdry_values_concat(muhat, AcHat_bdry_locs)

        # Obtain the weights along the boundary for AcHat
        AcHat_bdry_weights_concat = get_bdry_weights_concat(muhat_bdry_vals_concat, c)

        # Get G values along the boundary of AcHat
        G_bdry_vals_concat = get_bdry_values_concat(G, AcHat_bdry_locs)

        # Interpolate G along the boundary of AcHat
        G_bdry_interp_concat = get_bdry_vals_interpolated_concat(G_bdry_vals_concat, AcHat_bdry_weights_concat)

        # Obtain the sigmahat values along the boundary for AcHat
        sigma_bdry_vals_concat = get_bdry_values_concat(sigma, AcHat_bdry_locs)

        # Interpolate sigma along the boundary of AcHat
        sigma_bdry_interp_concat = get_bdry_vals_interpolated_concat(sigma_bdry_vals_concat, AcHat_bdry_weights_concat)

        # Obtain the gradient of muhat from numpy
        muhat_grad = np.gradient(muhat)

        # Obtain the magnitude of the gradient from the partial derivatives
        muhat_grad = np.sqrt(muhat_grad[0]**2 + muhat_grad[1]**2)

        # Get muhat_grad values along the boundary of AcHat
        muhat_grad_bdry_vals_concat = get_bdry_values_concat(muhat_grad, AcHat_bdry_locs)

        # Interpolate muhat_grad along the boundary of AcHat
        muhat_grad_bdry_interp_concat = get_bdry_vals_interpolated_concat(muhat_grad_bdry_vals_concat, AcHat_bdry_weights_concat)

        # Integral value
        sim_integral_value = np.sum(sigma_bdry_interp_concat*G_bdry_interp_concat/muhat_grad_bdry_interp_concat)

        # Save integral value
        sim_integral_values[i] = sim_integral_value

        # Loop through subjects
        for j in np.arange(n):

            # Obtain residuals
            resid = read_images(resid_files[j])[...,0]

            # Standardize residuals
            resid = (resid/sigma).reshape((1,) + resid.shape)

            # Residuals along boundary for current subject
            resid_bdry_concat_j = get_bdry_values_concat(resid, AcHat_bdry_locs)

            # Concatenate residuals
            if j == 0:
                resids = resid_bdry_concat_j
            else:
                resids = np.concatenate((resids, resid_bdry_concat_j), axis=0)

        # Run bootstrap
        boot_integral_values_i = bootstrap(resids, AcHat_bdry_weights_concat, sigma_bdry_interp_concat,muhat_grad_bdry_interp_concat, n, n_boot=n_boot)

        # Save integral values
        boot_integral_values[i,:] = boot_integral_values_i

    # Sort clt_est_store
    clt_est_store = np.sort(clt_est_store.reshape(n_sim))

    # Save clt_est_store
    append_to_file(os.path.join(out_dir,'clt_est_store.csv'), clt_est_store)

    # Sort sim_integral_values
    sim_integral_values = np.sort(sim_integral_values.reshape(n_sim))

    # Save sim_integral_values
    append_to_file(os.path.join(out_dir,'sim_integral_values.csv'), sim_integral_values)

    # Save boot_integral_values
    append_to_file(os.path.join(out_dir,'boot_integral_values.csv'), boot_integral_values)

    # Remove other files
    remove_files([mu_file, *data_files, muhat_file, sigma_file, *resid_files])
        

Cleanup function

In [None]:
# Reset previous simulation run
def reset_sim(n, sim_instance, out_dir):
    
    # Obtain output directory
    out_dir = os.path.join(out_dir, str(n) + 'subjects')
    
    # Obtain data directory
    data_dir = os.path.join(out_dir, 'instance_' + str(sim_instance))

    # Remove data dir
    if os.path.exists(data_dir):

        # Remove data dir
        shutil.rmtree(data_dir)
    
    # If files exist remove them
    if os.path.isfile(os.path.join(out_dir,'clt_est_store.csv')) or os.path.isfile(os.path.join(out_dir,'sim_integral_values.csv')) or os.path.isfile(os.path.join(out_dir,'boot_integral_values.csv')):

        # Remove files
        remove_files([os.path.join(out_dir,'clt_est_store.csv'), os.path.join(out_dir,'sim_integral_values.csv'), os.path.join(out_dir,'boot_integral_values.csv')])


## Test Run

Run a simulation instance.

In [None]:
# Output directory (feel free to change this to your desired output directory)
out_dir = os.path.join(os.getcwd(),'results')

# Number of simulations
n_sim = 30

# Number of subjects
n = 100

# Number for simulation instance
sim_instance = 39

# Run a simulation with 100 instance and 100 subjects
#run_simulation(n_sim, n, sim_instance, out_dir, 5000)

Plot the results.

In [None]:
# Obtain data directory
data_dir = os.path.join(out_dir, str(n) + 'subjects')

# Read in results
clt_est_store = pd.read_csv(os.path.join(data_dir,'clt_est_store.csv'), header=None).values
sim_integral_values = pd.read_csv(os.path.join(data_dir,'sim_integral_values.csv'), header=None).values
boot_integral_values = pd.read_csv(os.path.join(data_dir,'boot_integral_values.csv'), header=None).values

# Plot empirical cdf of clt_est_store
plt.plot(np.sort(clt_est_store), np.linspace(0, 1, n_sim, endpoint=False))

# Plot empirical cdf of boot_integral_values, averaged over simulations
plt.plot(np.mean(boot_integral_values, axis=0), np.linspace(0, 1, len(np.mean(boot_integral_values, axis=0)), endpoint=False))

# Plot empirical cdf of sim_integral_values
plt.plot(sim_integral_values, np.linspace(0, 1, len(sim_integral_values), endpoint=False))

Clean up the test run.

In [None]:
reset_sim(n, sim_instance, out_dir)

## Cluster Submission

Simulation variables for running on the cluster.

In [None]:
# Setup for running on cluster
n_sim = 500

# Number of nodes
n_node = 200

# Number of sims per node
n_sim_per_node = n_sim//n_node

# Range of subject values
n_subs = np.array([200])#[25,50,100,200])

Boot up 100 nodes.

In [None]:
# Load the SLURM Cluster
cluster = SLURMCluster()

# --------------------------------------------------------------------------------
# Connect to client
# --------------------------------------------------------------------------------

# Connect to cluster
client = Client(cluster)   


# Ask for 100 nodes for computation
cluster.scale(50)

In [None]:
# Empty futures list
futures = []

# Loop through subjects
for n in n_subs:

    # Submit jobs
    for instance in np.arange(n_node):

        # Run the jobNum^{th} job.run_simulation(n_sim, n, out_dir, 5000)
        future_b = client.submit(run_simulation, n_sim_per_node, n, instance, out_dir, pure=False)

        # Append to list 
        futures.append(future_b)

# Completed jobs
completed = as_completed(futures)

# Wait for results
for i in completed:
    i.result()

# Delete the future objects (NOTE: see above comment in setup section).
del i, completed, futures, future_b

Shutdown the client

In [None]:
# Close the client
client.close()
client.shutdown()

Reset the simulation

In [None]:
# Output directory (feel free to change this to your desired output directory)
out_dir = os.path.join(os.getcwd(),'results')

# Set to true to delete simulation outputs
reset = False

# If we are resetting, delete everything
if reset:
    
    # Loop through sample sizes
    for n in n_subs:

        # Loop through simulation instances
        for instance in np.arange(n_node):

            # Reset
            reset_sim(n, instance, out_dir)

## View and sort results

In [None]:
# Number of subjects
n = 200

n_sim=500

# Obtain data directory
data_dir = os.path.join(out_dir, str(n) + 'subjects')

# Read in results
clt_est_store = pd.read_csv(os.path.join(data_dir,'clt_est_store.csv'), header=None).values
sim_integral_values = pd.read_csv(os.path.join(data_dir,'sim_integral_values.csv'), header=None).values
boot_integral_values = pd.read_csv(os.path.join(data_dir,'boot_integral_values.csv'), header=None).values

# Plot empirical cdf of clt_est_store
plt.plot(np.sort(clt_est_store[0:500].reshape(n_sim)), np.linspace(0, 1, n_sim, endpoint=False),label='Empirical (muhat-mu)/tau')

# Plot empirical cdf of boot_integral_values, averaged over simulations
plt.plot(np.mean(boot_integral_values[0:500,:], axis=0), np.linspace(0, 1, len(np.mean(boot_integral_values[0:500,:], axis=0)), endpoint=False),label='Bootstrap Integral')

# Plot empirical cdf of boot_integral_values, averaged over simulations
plt.plot(np.percentile(boot_integral_values[0:500,:], 0.95, axis=0), np.linspace(0, 1, len(np.mean(boot_integral_values[0:500,:], axis=0)), endpoint=False))

# Plot empirical cdf of boot_integral_values, averaged over simulations
plt.plot(np.percentile(boot_integral_values[0:500,:], 0.05, axis=0), np.linspace(0, 1, len(np.mean(boot_integral_values[0:500,:], axis=0)), endpoint=False))

# Plot empirical cdf of sim_integral_values
plt.plot(np.sort(sim_integral_values[0:500].reshape(n_sim)), np.linspace(0, 1, n_sim, endpoint=False),label='Empirical Integral')

plt.title('Simulation results: ' + str(n) + ' subjects')
plt.legend()
