# Using dask to export parameters from MCMC HDF5 batches and into ascii file

# Libraries

In [72]:
import sys, os, glob
sys.path.insert(0, '/freya/ptmp/mpa/minh/python_script')
from pathlib import Path

import h5py as h5
import numpy as np
import scipy
import dask
from dask.distributed import Client, LocalCluster
import dask.array as da
import dask.dataframe as dd

import matplotlib
matplotlib.style.use('Minh_paper_style')
%matplotlib inline
import matplotlib.pyplot as plt
from matplotlib.ticker import (FixedLocator, MultipleLocator, ScalarFormatter, FixedFormatter, FormatStrFormatter,
                               AutoMinorLocator)
from matplotlib.colors import ListedColormap
from matplotlib import animation
from colormap import RdBu_cmap, Planck_cmap
plt.set_cmap('RdBu_r')
plt.rcParams.update({
    "text.usetex": False,
    "font.sans-serif": "Fira Sans",
})
alpha=1.
dir_prefix_ak='/freya/ptmp/mpa/akostic/eftcode_sampling/'
dir_prefix_mn='/freya/ptmp/mpa/minh/eftcode_run/'
common_path='paperChains/2lpt_tests/higherbias_nonzero_zero_blapl/free_phases/mockseed_111321/'
dir_path=dir_prefix_ak+common_path

<Figure size 800x600 with 0 Axes>

# Get list of chain batches

In [91]:
filename_prefix='Lambda_0.1_NGeul_192_seed_1121_'
p=Path(dir_path)
sorted_filenames=sorted(p.glob(filename_prefix+'[0-9]*.h5'), 
                       key=lambda path: int(path.stem.rsplit("_", 1)[1]))
sorted_filenames

[PosixPath('/freya/ptmp/mpa/akostic/eftcode_sampling/paperChains/2lpt_tests/higherbias_nonzero_zero_blapl/free_phases/mockseed_111321/Lambda_0.1_NGeul_192_seed_1121_1.h5'),
 PosixPath('/freya/ptmp/mpa/akostic/eftcode_sampling/paperChains/2lpt_tests/higherbias_nonzero_zero_blapl/free_phases/mockseed_111321/Lambda_0.1_NGeul_192_seed_1121_2.h5'),
 PosixPath('/freya/ptmp/mpa/akostic/eftcode_sampling/paperChains/2lpt_tests/higherbias_nonzero_zero_blapl/free_phases/mockseed_111321/Lambda_0.1_NGeul_192_seed_1121_3.h5'),
 PosixPath('/freya/ptmp/mpa/akostic/eftcode_sampling/paperChains/2lpt_tests/higherbias_nonzero_zero_blapl/free_phases/mockseed_111321/Lambda_0.1_NGeul_192_seed_1121_4.h5'),
 PosixPath('/freya/ptmp/mpa/akostic/eftcode_sampling/paperChains/2lpt_tests/higherbias_nonzero_zero_blapl/free_phases/mockseed_111321/Lambda_0.1_NGeul_192_seed_1121_5.h5'),
 PosixPath('/freya/ptmp/mpa/akostic/eftcode_sampling/paperChains/2lpt_tests/higherbias_nonzero_zero_blapl/free_phases/mockseed_111321/L

# Get parameter list

In [82]:
with h5.File(sorted_filenames[0],'r') as fhandle:
    param_list=[key for key in fhandle['pars_0'].keys()]
param_list

['alpha',
 'b_delta',
 'b_lapl(delta)',
 'b_sigma sigma',
 'b_tr[M^(1) M^(1)]',
 'sigma',
 'sigmaEpsk2',
 'sigmaEpsk4']

# Get parameter chains

## Utility functions

In [4]:
def get_param_value_chain(chain,sample_range,param_list=param_list):
    params=np.empty(((sample_range[1]-sample_range[0]+1),len(param_list),))
    params[:]=np.nan
    param_value_chain = []
    with h5.File(chain,'r') as fhandle:
        for s,sample in enumerate(np.arange(sample_range[0],sample_range[1]+1,dtype=int)):
            for p, param in enumerate(param_list):
                params[s,p]=fhandle['pars_'+str(sample)][param][0]
    return params

def get_min_sample_in_batch(batch_path,keyword='sample_range'):
    with h5.File(batch_path) as fhandle:
        min_sample=fhandle['sample_range'][()]
    return min_sample

def get_sample_range_in_batch(batch_path,keyword='pars_'):
    with h5.File(batch_path) as fhandle:
        batch_keys=list(fhandle.keys())
    sample_id = []
    for key in batch_keys:
        if keyword in key:
            sample_id.append(key.replace(keyword,''))
    sample_id = np.asarray(sample_id,dtype=int)
    return np.array([sample_id.min(), sample_id.max()])

def unnest_tuple_of_tuples(nested_tuple):
    return [param for tup in nested_tuple for param in tup]

## Setup dask client

In [5]:
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(threads_per_worker=10,n_workers=8,memory_limit='20GB')
client = Client(cluster)
client

2022-12-18 21:49:08,623 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-80p6xgo0', purging


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 8
Total threads: 80,Total memory: 149.01 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:33589,Workers: 8
Dashboard: http://127.0.0.1:8787/status,Total threads: 80
Started: Just now,Total memory: 149.01 GiB

0,1
Comm: tcp://127.0.0.1:39851,Total threads: 10
Dashboard: http://127.0.0.1:44683/status,Memory: 18.63 GiB
Nanny: tcp://127.0.0.1:40397,
Local directory: /tmp/dask-worker-space/worker-nm63bvam,Local directory: /tmp/dask-worker-space/worker-nm63bvam

0,1
Comm: tcp://127.0.0.1:44757,Total threads: 10
Dashboard: http://127.0.0.1:40927/status,Memory: 18.63 GiB
Nanny: tcp://127.0.0.1:34977,
Local directory: /tmp/dask-worker-space/worker-499cz5sk,Local directory: /tmp/dask-worker-space/worker-499cz5sk

0,1
Comm: tcp://127.0.0.1:41957,Total threads: 10
Dashboard: http://127.0.0.1:35975/status,Memory: 18.63 GiB
Nanny: tcp://127.0.0.1:46169,
Local directory: /tmp/dask-worker-space/worker-pf1tq0dv,Local directory: /tmp/dask-worker-space/worker-pf1tq0dv

0,1
Comm: tcp://127.0.0.1:41901,Total threads: 10
Dashboard: http://127.0.0.1:35607/status,Memory: 18.63 GiB
Nanny: tcp://127.0.0.1:41759,
Local directory: /tmp/dask-worker-space/worker-kbbs6mxa,Local directory: /tmp/dask-worker-space/worker-kbbs6mxa

0,1
Comm: tcp://127.0.0.1:38747,Total threads: 10
Dashboard: http://127.0.0.1:40817/status,Memory: 18.63 GiB
Nanny: tcp://127.0.0.1:38117,
Local directory: /tmp/dask-worker-space/worker-owaxzsx4,Local directory: /tmp/dask-worker-space/worker-owaxzsx4

0,1
Comm: tcp://127.0.0.1:40223,Total threads: 10
Dashboard: http://127.0.0.1:46373/status,Memory: 18.63 GiB
Nanny: tcp://127.0.0.1:40207,
Local directory: /tmp/dask-worker-space/worker-newkvd03,Local directory: /tmp/dask-worker-space/worker-newkvd03

0,1
Comm: tcp://127.0.0.1:37205,Total threads: 10
Dashboard: http://127.0.0.1:46671/status,Memory: 18.63 GiB
Nanny: tcp://127.0.0.1:39733,
Local directory: /tmp/dask-worker-space/worker-1gv5y1rh,Local directory: /tmp/dask-worker-space/worker-1gv5y1rh

0,1
Comm: tcp://127.0.0.1:38509,Total threads: 10
Dashboard: http://127.0.0.1:33907/status,Memory: 18.63 GiB
Nanny: tcp://127.0.0.1:38885,
Local directory: /tmp/dask-worker-space/worker-scrl69wl,Local directory: /tmp/dask-worker-space/worker-scrl69wl


## Get sample ranges for all MCMC batches

In [92]:
min_sample_futures=client.map(get_min_sample_in_batch,sorted_filenames)
sample_range_futures=client.map(get_sample_range_in_batch,sorted_filenames)

In [93]:
min_sample=np.asarray(client.gather(min_sample_futures));
sample_range=np.asarray(client.gather(sample_range_futures));

## Get parameters from all MCMC batches

In [94]:
params_futures=client.map(get_param_value_chain,sorted_filenames,sample_range)
params_futures

[<Future: pending, key: get_param_value_chain-a822764b3553a5e3d9d4a7c0b6da88c2>,
 <Future: pending, key: get_param_value_chain-dc9eb0e181c2e29aca50ace1106786b6>,
 <Future: pending, key: get_param_value_chain-24d07e493a2bc6fbd2e379097758fa91>,
 <Future: pending, key: get_param_value_chain-bfb087a3e3b1a2da2ca9fdaac36ca282>,
 <Future: pending, key: get_param_value_chain-bb2ef3ca0354c4bcc32a5e7cc9c15efd>,
 <Future: pending, key: get_param_value_chain-24933cddd4f814fa6ad4d5f1923df62b>,
 <Future: pending, key: get_param_value_chain-4285e90d3455be9f77a49ec7ff31a902>,
 <Future: pending, key: get_param_value_chain-ae6ab271dc8ac5aa06e391e26dadf9b9>,
 <Future: pending, key: get_param_value_chain-5fce41a365a75fdbf4d660336b6a3d99>,
 <Future: pending, key: get_param_value_chain-f12147d1f1a402087964d6f6bb64fac5>,
 <Future: pending, key: get_param_value_chain-ce36c11e5e6956de491f8f5df779818b>,
 <Future: pending, key: get_param_value_chain-677891bdf01b9b2623704f493830e4f1>,
 <Future: pending, key: get_

In [95]:
params=client.gather(params_futures)

## Reshape the chain of parameters before output

In [96]:
param_chain=unnest_tuple_of_tuples(params)
param_chain_block=np.block(param_chain)
param_chain_reshaped=param_chain_block.reshape(int(param_chain_block.shape[0]/len(param_list)),len(param_list))

## Output

In [97]:
param_outfile_path=dir_prefix_mn+common_path+filename_prefix+'chain_param.txt'
np.savetxt(param_outfile_path,param_chain_reshaped,header='\t'.join(param_list))