In [1]:
# load modules
#--------------------------------------------------------------
import numpy as np
import pandas as pd
import sys, os, time
import datetime
import re
from scipy.stats import norm
import matplotlib.pyplot as plt
import ray
sys.path.append('../..')

# import helpers
from src.utils import interp, get_xs, Params, get_xtv
from src.plots import scatter_plot, line_plot


from src.models.hierarchical_gaussian import Hierarchical_Gaussian
from src.mcmc_diagnostics.diagnostic import MCMCDiagnostic
from src.utils.serialize import pickle_obj, unpickle_obj, load_json, save_json
from src.utils.params import hash_dict

import torch
import hamiltorch
from hamiltorch import Sampler

# import sampler classes
from src.sampling_algorithms import MaskedLocalBPS, LocalBPS
from src.sampling_algorithms.masked_bps.masked_bps_output import OutputReader
# plot settings
from matplotlib import rc
rc('text', usetex=False)

import arviz as az
from arviz.stats import ess

az.style.use('arviz-darkgrid')

In [2]:
parent_dir = '../..'
os.environ["PYTHONPATH"] = parent_dir + ":" + os.environ.get("PYTHONPATH", "")


params_list = [Params({
    "rho": rho,
    "refresh_rate": refresh_rate,
    "num_local": num_factors,
    "global_mu": 0.,
    "global_prec": 1.,
    "local_mu": 0.,
    "local_prec":1.,
    'run_time': run_time,
    'switch_prob': 0.8,
    'num_workers': 46
})
               for refresh_rate in [0.1, 0.01, 0.001]
               for rho in [0.5]
               for run_time in [60, 90, 600, 900]
               for num_factors in [10, 45, 100, 150]
              ]
    

output_dir = "./"

In [3]:
res_cols = ['num_factors', 'iter_speed', 'ess_speed', 'sampler']
agg_res = []

In [4]:

os.environ['OPENBLAS_NUM_THREADS'] = '1'
os.environ['MKL_NUM_THREADS'] = '1'

In [5]:
ray.shutdown()
ray_details = ray.init(memory=5*10**10, object_store_memory = 9*10**10, include_webui=True)

2020-02-20 14:46:30,540	INFO resource_spec.py:205 -- Starting Ray with 46.53 GiB memory available for workers and up to 83.82 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).



View the dashboard at http://163.1.210.96:8080/?token=c4383e2369fb45d0383cb474f2d852d3bbe1194bae8acd6c



## Masked BPS

In [6]:


def split_mask_into_groups(factor_indices, mask, num_workers = 45):
    blocks = []
    if mask[0] == 0:
        if len(factor_indices) > num_workers:
            blocks = [[] for i in np.arange(num_workers)]
            for i in np.arange(1, len(factor_indices)):
                blocks[i%num_workers].append(i)
        else:
            blocks = [[i] for i in np.arange(1, len(factor_indices))]
    else:
        blocks = [[i for i in np.arange(0, len(factor_indices))]]
    return blocks

def split_mask_func(factor_ind, mask):
    return split_mask_into_groups(factor_ind, mask, num_workers = params.num_workers)

In [7]:
# run masked bps
model_output_dir = os.path.join(output_dir, "masked_bps")


if not os.path.exists(model_output_dir):
        os.mkdir(model_output_dir)
    
for param_index in range(len(params_list)):
    params = params_list[param_index]
    param_hash = hash_dict(params.param_dict())
    
    ## set up output dir
    dir_name = "experiment_{0}".format(param_hash)
    dir_path = os.path.join(model_output_dir, dir_name)
    if not os.path.exists(dir_path):
        os.mkdir(dir_path)
    
        # serialize params
        param_filepath = os.path.join(dir_path, "params.json")
        params.save_to_file(param_filepath)

        ## Define model
        model = Hierarchical_Gaussian(params)
        
        # masked sampler
        #-----------------------------------------------------------------------------
        init_mask = model.sample_mask(0, model.num_params)
        sample_mask_fn = lambda : model.sample_mask(0,model.num_params)
        
        # Shutdown and init ray
        if ray.is_initialized():
            ray.shutdown()

        ray.init(memory=5*10**10, object_store_memory = 9*10**10)
        

        # init values
        init_x = np.array([np.random.rand() for _ in range(model.num_params)])
        init_v = np.array([np.random.rand() for _ in range(model.num_params)])

        # init sampler
        #--------------------------------------------------------------------------------------
        mlbps = MaskedLocalBPS(init_x = init_x,
                       init_v = init_v,
                       init_mask = init_mask,
                       factor_graph=model,
                       bounce_fns=model.bounce_fns,
                       refresh_rate=model.params.refresh_rate,
                       split_mask_fn= split_mask_func,
                       sample_mask_fn=sample_mask_fn,
                       max_number_sub_samplers = params.num_workers)

        # run sampler
        print('Masked Local BPS')
        print(params)
        start = datetime.datetime.now()
        print(start)
        res = mlbps.simulate_for_time(params.run_time, output_dir=dir_path)
        results, groups, masks = res
        stop = datetime.datetime.now()
        print(stop)
        time_delta = (stop-start).seconds
        print("Duration: {0}s".format(time_delta))
        
        output_file = os.path.join(dir_path, "time_delta.pickle")
        pickle_obj(time_delta, output_file)



In [8]:
res_cols = ['num_factors', 'iter_speed', 'ess_speed', 'sampler']
agg_res = []

In [None]:
#check masked bps
model_output_dir = os.path.join(output_dir, "masked_bps")
masked_bps_diagnostics = []
    
for param_index in range(len(params_list)):
    params = params_list[param_index]
    param_hash = hash_dict(params.param_dict())

    
    ## set up output dir
    dir_name = "experiment_{0}".format(param_hash)
    dir_path = os.path.join(model_output_dir, dir_name)
    if os.path.exists(dir_path):
        print(params)
                ## Define model
        model = Hierarchical_Gaussian(params)
        
        # masked sampler
        #-----------------------------------------------------------------------------
        init_mask = model.sample_mask(0, model.num_params)
        sample_mask_fn = lambda : model.sample_mask(0,model.num_params)
        
        # Shutdown and init ray
        if ray.is_initialized():
            ray.shutdown()

        ray.init(memory=5*10**10, object_store_memory = 9*10**10)
        

        # init values
        init_x = np.array([np.random.rand() for _ in range(model.num_params)])
        init_v = np.array([np.random.rand() for _ in range(model.num_params)])

        # init sampler
        #--------------------------------------------------------------------------------------
        mlbps = MaskedLocalBPS(init_x = init_x,
                       init_v = init_v,
                       init_mask = init_mask,
                       factor_graph=model,
                       bounce_fns=model.bounce_fns,
                       refresh_rate=model.params.refresh_rate,
                       split_mask_fn= split_mask_func,
                       sample_mask_fn=sample_mask_fn,
                       max_number_sub_samplers = params.num_workers)
        output_reader = OutputReader(mlbps)
        output_file = os.path.join(dir_path, "time_delta.pickle")
        time_delta = unpickle_obj(output_file)
        output, num_iterations = output_reader.read_output(dir_path, verbose = False, inplace =False)

        chains = {}
        for i in range(mlbps.d):
            x, v, t, mask = output[i]['x'], output[i]['v'], output[i]['t'], output[i]['mask']
            x = np.array(x)
            v = np.array(v)
            t = np.array(t)
            mask = np.array(mask)
            nsim = len(x)
            xs = interp(x, t, v*mask, num_intervals= nsim*12)
            chains["x_{0}".format(i)] = xs
        mcmc_diagnostic_obj = MCMCDiagnostic(chains)
        masked_bps_diagnostics.append(mcmc_diagnostic_obj)
        esses = [mcmc_diagnostic_obj.ess('x_{0}'.format(i)) for i in range(mlbps.d)]
        
        
        iteration_speed = num_iterations/time_delta
        ess_speed  = np.mean(esses)/time_delta
        agg_res.append([params.num_local, iteration_speed, ess_speed, 'masked_bps'])
        print(params)
        print("ESS /S: {0}".format(ess_speed))
        print("Iterations /S: {0}".format(iteration_speed))
        print("\n")

## Local BPS

In [10]:
# run local bps
model_output_dir = os.path.join(output_dir, "local_bps")


if not os.path.exists(model_output_dir):
        os.mkdir(model_output_dir)
    
for param_index in range(len(params_list)):
    params = params_list[param_index]
    param_hash = hash_dict(params.param_dict())
    
    ## set up output dir
    dir_name = "experiment_{0}".format(param_hash)
    dir_path = os.path.join(model_output_dir, dir_name)
    if not os.path.exists(dir_path):
        os.mkdir(dir_path)
    
        # serialize params
        param_filepath = os.path.join(dir_path, "params.json")
        params.save_to_file(param_filepath)

        ## Define model
        model = Hierarchical_Gaussian(params)
        
        
        # init values
        init_x = np.array([np.random.rand() for _ in range(model.num_params)])
        init_v = np.array([np.random.rand() for _ in range(model.num_params)])

        # init sampler
        #--------------------------------------------------------------------------------------
        lbps = LocalBPS(init_x = init_x,
                       init_v = init_v,
                       factor_graph=model,
                       bounce_fns=model.bounce_fns,
                       refresh_rate=model.params.refresh_rate)

        # run sampler
        print('Local BPS')
        start = datetime.datetime.now()
        print(start)
        res = lbps.simulate_for_time(params.run_time)
        stop = datetime.datetime.now()
        print(stop)
        time_delta = (stop-start).seconds
        print("Duration: {0}s".format(time_delta))
        
        output_file = os.path.join(dir_path, "chains.pickle")
        pickle_obj(res, output_file)
        
        output_file = os.path.join(dir_path, "time_delta.pickle")
        pickle_obj(time_delta, output_file)

In [None]:
#check local bps
model_output_dir = os.path.join(output_dir, "local_bps")
local_bps_diagnostics = []
    
for param_index in range(len(params_list)):
    params = params_list[param_index]
    param_hash = hash_dict(params.param_dict())

    
    ## set up output dir
    dir_name = "experiment_{0}".format(param_hash)
    dir_path = os.path.join(model_output_dir, dir_name)
    if os.path.exists(dir_path):
        print(params)
        output_file = os.path.join(dir_path, "chains.pickle")
        res = unpickle_obj(output_file)

        output_file = os.path.join(dir_path, "time_delta.pickle")
        time_delta = unpickle_obj(output_file)

        chains = {}
        d = params.num_local
        for i in range(d):
            x1,v1,t1=get_xtv(res,i)
            nsim = len(x1)
            x = interp(x1,t1,v1, num_intervals=nsim*12)
            chains["x_{0}".format(i)] = x

        mcmc_diagnostic_obj = MCMCDiagnostic(chains)
        esses = [mcmc_diagnostic_obj.ess('x_{0}'.format(i)) for i in range(d)]
        local_bps_diagnostics.append(mcmc_diagnostic_obj)
        
        ess_speed = np.mean(esses)/time_delta
        iteration_speed = np.shape(res)[0]/time_delta
        
        agg_res.append([params.num_local, iteration_speed, ess_speed, 'local_bps'])
        print(params)
        print("ESS /S: {0}".format(ess_speed))
        print("Iterations /S: {0}".format(iteration_speed))
        print("\n")

## HMC

In [12]:
num_samples = 10000
step_size = .3
num_steps_per_sample = 5

In [13]:
# run hmc 
model_output_dir = os.path.join(output_dir, "hmc")

if not os.path.exists(model_output_dir):
        os.mkdir(model_output_dir)
        
for param_index in range(len(params_list)):
    params = params_list[param_index]
    param_hash = hash_dict(params.param_dict())
    
    ## set up output dir
    dir_name = "experiment_{0}".format(param_hash)
    dir_path = os.path.join(model_output_dir, dir_name)
    if not os.path.exists(dir_path):
        os.mkdir(dir_path)
    
        # serialize params
        param_filepath = os.path.join(dir_path, "params.json")
        params.save_to_file(param_filepath)
    
        # define model
        rho = params.rho
        local_prec = params.local_prec**2
        mu = params.local_mu

        prec = np.array([[local_prec, rho*local_prec], [rho*local_prec, local_prec]])
        Sig = np.linalg.pinv(prec)

        mu2 = torch.tensor(np.array([mu]))
        mu1 = torch.tensor(np.array([mu]))
        Sig2 = torch.tensor(np.array([[Sig[1, 1]]]))
        Sig1 = torch.tensor(np.array([[Sig[0, 0]]]))
        Sig21 = torch.tensor(np.array([[Sig[0, 1]]]))
        Sig12 = Sig21.T

        inv_sig2 = torch.pinverse(Sig2)
        sig_bar = Sig1 - Sig12 * inv_sig2*Sig12.T
        inv_sig = torch.pinverse(sig_bar)
        transform = Sig12 *inv_sig2
        sig_bar = Sig1 - transform * Sig21

        def conditional_func(x1, x2):
            mu_bar = mu1 + transform * (x2 - mu2)
            stddev = torch.sqrt(sig_bar)
            return torch.distributions.Normal(mu_bar, stddev).log_prob(x1).sum()


        def log_prob_x0(x):
            mean = torch.tensor(params.global_mu)
            stddev = torch.tensor(params.global_prec)  

            return torch.distributions.Normal(mean, stddev).log_prob(x).sum()

        def log_prob_func(x):
            total = log_prob_x0(x[0])
            for i in range(len(x)-1):
                total += conditional_func(x[i+1],x[0])
            return total
        
        hamiltorch.set_random_seed(123)
        params_init = torch.tensor(np.random.random(params.num_local))
        
        print('HMC')
        start = datetime.datetime.now()
        print(start)
        params_hmc = hamiltorch.sample(log_prob_func=log_prob_func, 
                                       num_samples = num_samples,
                                       params_init=params_init)
        stop = datetime.datetime.now()
        time_delta = (stop-start).seconds
        numpy_chains = np.swapaxes(torch.stack(params_hmc).numpy(), 0,1)
        
        output_file = os.path.join(dir_path, "chains.pickle")
        pickle_obj(numpy_chains, output_file)
        
        output_file = os.path.join(dir_path, "time_delta.pickle")
        pickle_obj(time_delta, output_file)

In [None]:
#check hmc
model_output_dir = os.path.join(output_dir, "hmc")

    
for param_index in range(len(params_list)):
    params = params_list[param_index]
    param_hash = hash_dict(params.param_dict())

    
    ## set up output dir
    dir_name = "experiment_{0}".format(param_hash)
    dir_path = os.path.join(model_output_dir, dir_name)
    if os.path.exists(dir_path):
        print(params)
        output_file = os.path.join(dir_path, "chains.pickle")
        numpy_chains = unpickle_obj(output_file)

        output_file = os.path.join(dir_path, "time_delta.pickle")
        time_delta = unpickle_obj(output_file)
    
        chains = {}
        d = np.shape(numpy_chains)[0]
        for i in range(d):
            chains['x_{0}'.format(i)] = numpy_chains[i]

        mcmc_diagnostic_obj = MCMCDiagnostic(chains)
        esses = [mcmc_diagnostic_obj.ess('x_{0}'.format(i)) for i in range(d)]
        iteration_speed = np.shape(numpy_chains)[1]/time_delta
        ess_speed  = np.mean(esses)/time_delta
        agg_res.append([params.num_local, iteration_speed, ess_speed, 'hmc'])
        
        print(params)
        print("ESS /S: {0}".format(ess_speed))
        print("Iterations /S: {0}".format(iteration_speed))
        print("\n")

In [15]:
res_df = pd.DataFrame(agg_res, columns = res_cols)


In [17]:
ess_plot = res_df \
.loc[:,['num_factors', 'sampler', 'ess_speed']] \
.set_index(['sampler', 'num_factors'])\
.unstack(1)

ess_plot

Unnamed: 0_level_0,ess_speed,ess_speed,ess_speed,ess_speed
num_factors,10,45,100,150
sampler,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2
hmc,1.830472,0.371143,0.159302,0.024271
local_bps,38.427131,1.936605,0.476188,0.032398
masked_bps,29.529284,4.979264,0.977649,0.453831
