# Parallelization with Dask

In [1]:
import numpy as np
import ipyparallel as ipp
import itertools
from distributed import progress
import pandas as pd
from typing import NamedTuple

import smpsite as smp

## 1. Dask Setup

In [None]:
number_of_clusters = 60

rc = ipp.Cluster(n=number_of_clusters).start_and_connect_sync()

In [None]:
dask_client = rc.become_dask()
dask_client

In [None]:
# Check to see if the threads are ready
dview = rc[:]
len(dview)

## 2. Macro function definition

In [5]:
def ipp_simulate_estimations(N,
                             n0,
                             kappa_within_site, 
                             site_lat,
                             site_long,
                             outlier_rate, 
                             secular_method,
                             kappa_secular, 
                             ignore_outliers, 
                             seed, 
                             n_sim):
    
    class Params(NamedTuple):
        """
        Macro to encapsulate all the parameters in the sampling model.
        """

        # Number of sites
        N : int
        # Number of samples per site
        n0 : int

        # Concentration parameter within site
        kappa_within_site : float    

        # Latitude and longitude of site
        site_lat  : float 
        site_long : float

        # Proportion of outliers to be sampled from uniform distribution
        outlier_rate : float

        # Method to sample secular variation. Options are ("tk03", "G", "Fisher")
        secular_method : str 
        kappa_secular : float    # Just needed for Fisher sampler
        
        
    params = Params(N=N,
                    n0=n0, 
                    kappa_within_site=kappa_within_site,
                    site_lat=site_lat, 
                    site_long=site_long, 
                    outlier_rate=outlier_rate, 
                    secular_method=secular_method,
                    kappa_secular=kappa_secular)
    
    
    # Create all samples
    df_tot = smp.simulate_estimations(params, 
                                      n_iters=n_sim,
                                      ignore_outliers=ignore_outliers, 
                                      seed=seed)
    
    simulation_hash = hash((N, n0, kappa_within_site, site_lat, site_long, outlier_rate, secular_method, kappa_secular))
    df_tot["hash"] = simulation_hash
    
    return df_tot

## 3. Parameter space exploration

### 3.1. Figure 1

In [6]:
min_n, max_n = 1, 300
n_simulations = 10000
N_max = 100
n0_max = 20

params_iter = {'N': np.arange(1, N_max+1, 1),
               'n0': np.arange(1, n0_max+1, 1), 
               'kappa_within_site': 50,
               'site_lat': [30.0],
               'site_long': [0.0], 
               'outlier_rate': [0.0],
               'secular_method': ["G"], 
               'kappa_secular': [np.nan],
               'ignore_outliers': [False]}

output_file_name = "fig1a_bis_{}sim".format(n_simulations)

In [7]:
params_iter_mesh = np.meshgrid(*[params_iter[key] for key in params_iter.keys()])

for i, key in enumerate(params_iter.keys()):
    params_iter[key] = params_iter_mesh[i].ravel()
    
all_n_tot = params_iter['N'] * params_iter['n0']
valid_index = (min_n <= all_n_tot) & (all_n_tot <= max_n) 

n_tasks = np.sum(valid_index)
print("Total number of simulations: ", n_tasks)

indices = np.arange(n_tasks)
np.random.shuffle(indices)

for key in params_iter.keys():
    params_iter[key] = params_iter[key][valid_index]
    # Shuffle
    params_iter[key] = params_iter[key][indices]

params_iter["seed"] = np.random.randint(0, 2**32-1, n_tasks)
params_iter["n_sim"] = np.repeat(n_simulations, n_tasks)

Total number of simulations:  824


### 3.2. Figure 2

In [12]:
min_n, max_n = 50, 50

params_iter = {'N': [10, 50],
               'n0': [5, 1], 
               'kappa_within_site': np.arange(10, 101, 10),
               'site_lat': np.arange(0, 81, 10),
               'site_long': [0.0], 
               'outlier_rate': np.arange(0.10, 0.40, .02),
               'secular_method': ["G"], 
               'kappa_secular': [np.nan],
               'ignore_outliers': [False]}

In [13]:
params_iter_mesh = np.meshgrid(*[params_iter[key] for key in params_iter.keys()])

for i, key in enumerate(params_iter.keys()):
    params_iter[key] = params_iter_mesh[i].ravel()
    
all_n_tot = params_iter['N'] * params_iter['n0']
valid_index = (min_n <= all_n_tot) & (all_n_tot <= max_n)

n_tasks = np.sum(valid_index)
print("Total number of simulations: ", n_tasks)

indices = np.arange(n_tasks)
np.random.shuffle(indices)

for key in params_iter.keys():
    params_iter[key] = params_iter[key][valid_index]
    # Shuffle
    params_iter[key] = params_iter[key][indices]

params_iter["seed"] = np.random.randint(0, 2**32-1, n_tasks)

params_iter['ignore_outliers'] = params_iter['n0'] == 5
params_iter["n_sim"] = np.repeat(n_simulations, n_tasks)

Total number of simulations:  2880


### Figure 2 - Function of N

In [14]:
min_n, max_n = 1, 300

params_iter = {'N': np.arange(min_n, max_n, 1),
               'n0': np.arange(1, 8, 1), 
               'kappa_within_site': 50,
               'site_lat': [30.0],
               'site_long': [0.0], 
               'outlier_rate': [0.0],
               'secular_method': ["G"], 
               'kappa_secular': [np.nan],
               'ignore_outliers': [False]}

In [15]:
params_iter_mesh = np.meshgrid(*[params_iter[key] for key in params_iter.keys()])

for i, key in enumerate(params_iter.keys()):
    params_iter[key] = params_iter_mesh[i].ravel()
    
all_n_tot = params_iter['N'] * params_iter['n0']
valid_index = (min_n <= all_n_tot) & (all_n_tot <= max_n) #& (all_n_tot % 5 == 0)

n_tasks = np.sum(valid_index)
print("Total number of simulations: ", n_tasks)

indices = np.arange(n_tasks)
np.random.shuffle(indices)

for key in params_iter.keys():
    params_iter[key] = params_iter[key][valid_index]
    # Shuffle
    params_iter[key] = params_iter[key][indices]

params_iter["seed"] = np.random.randint(0, 2**32-1, n_tasks)
params_iter["n_sim"] = np.repeat(n_simulations, n_tasks)

Total number of simulations:  776


### Figure 2 - Function of p

In [8]:
smp.kappa2angular(20)

array(18.27871119933776)

In [10]:
smp.kappa_from_latitude(30, degrees=True)

array(34.70455294405772)

In [16]:
min_n, max_n = 1, 300
k_max = 5
n_simulations = 5000

params_iter = {'N': np.arange(min_n, max_n, 1),
               'n0': [1, k_max], 
               'kappa_within_site': 20,
               'site_lat': [30.0],
               'site_long': [0.0], 
               'outlier_rate': np.arange(0.0, 0.7, 0.1),
               'secular_method': ["G"], 
               'kappa_secular': [np.nan],
               'ignore_outliers': [False]}

output_file_name = "fig2b_{}sim".format(n_simulations)

In [17]:
params_iter_mesh = np.meshgrid(*[params_iter[key] for key in params_iter.keys()])

for i, key in enumerate(params_iter.keys()):
    params_iter[key] = params_iter_mesh[i].ravel()
    
all_n_tot = params_iter['N'] * params_iter['n0']
valid_index = (min_n <= all_n_tot) & (all_n_tot <= max_n) & np.logical_or(all_n_tot % k_max == 0, all_n_tot == 1)

n_tasks = np.sum(valid_index)
print("Total number of simulations: ", n_tasks)

indices = np.arange(n_tasks)
np.random.shuffle(indices)

for key in params_iter.keys():
    params_iter[key] = params_iter[key][valid_index]
    # Shuffle
    params_iter[key] = params_iter[key][indices]

params_iter["ignore_outliers"] = params_iter["n0"] == k_max
params_iter["seed"] = np.random.randint(0, 2**32-1, n_tasks)
params_iter["n_sim"] = np.repeat(n_simulations, n_tasks)

Total number of simulations:  840


### Figure 3 - Intersection

In [18]:
min_n, max_n = 40, 40

params_iter = {'N': [8, 40],
               'n0': [1, 5], 
               'kappa_within_site': 50,
               'site_lat': [30.0],
               'site_long': [0.0], 
               'outlier_rate': np.arange(0, 0.61, 0.02),
               'secular_method': ["G"], 
               'kappa_secular': [np.nan],
               'ignore_outliers': [False]}

In [19]:
params_iter_mesh = np.meshgrid(*[params_iter[key] for key in params_iter.keys()])

for i, key in enumerate(params_iter.keys()):
    params_iter[key] = params_iter_mesh[i].ravel()
    
all_n_tot = params_iter['N'] * params_iter['n0']
valid_index = (min_n <= all_n_tot) & (all_n_tot <= max_n) #& np.logical_or(all_n_tot % 10 == 0, all_n_tot == 1)

n_tasks = np.sum(valid_index)
print("Total number of simulations: ", n_tasks)

indices = np.arange(n_tasks)
np.random.shuffle(indices)

for key in params_iter.keys():
    params_iter[key] = params_iter[key][valid_index]
    # Shuffle
    params_iter[key] = params_iter[key][indices]

params_iter["ignore_outliers"] = params_iter["n0"] == 5
params_iter["seed"] = np.random.randint(0, 2**32-1, n_tasks)
params_iter["n_sim"] = np.repeat(n_simulations, n_tasks)

Total number of simulations:  62


### Figure 3 - Heatmap

## 4. Run Simulation

In [9]:
task = dask_client.map(ipp_simulate_estimations, 
                       params_iter['N'],
                       params_iter['n0'],
                       params_iter['kappa_within_site'], 
                       params_iter['site_lat'], 
                       params_iter['site_long'], 
                       params_iter['outlier_rate'],
                       params_iter['secular_method'], 
                       params_iter['kappa_secular'], 
                       params_iter['ignore_outliers'], 
                       params_iter['seed'], 
                       params_iter["n_sim"])

res = dask_client.submit(pd.concat, task)

progress(res)

VBox()

In [None]:
%%time
df_all = res.result()

In [None]:
df_all

In [None]:
df_all.to_csv('../outputs/' + output_file_name + '_total.csv')

In [None]:
# Summary table
df_summary = df_all.groupby("hash", as_index=False).apply(smp.summary_simulations)
df_summary

In [None]:
df_summary.to_csv('../outputs/' + output_file_name + '_summary.csv')

In [15]:
1 + 1

2