In [1]:
import chaospy as cp
import numpy as np
import pandas as pd
import respy as rp
from os import getpid
import time
from joblib import Parallel, delayed

## Preparation

Draw the sample of random input parameters.   
For simplicity I draw 10 sampled parameters and use `joblib.Parallel`for parallel computing

In [3]:
# load model specifications
_, base_options = rp.get_example_model("kw_94_one", with_data=False)
base_params = pd.read_pickle("./input/respy-se-collection/params_kw_94_one_se.pkl")
constraints = rp.get_parameter_constraints("kw_94_one")
constraints.remove({"loc": "shocks_sdcorr", "type": "sdcorr"})
# mean and cov for sampling
mean = base_params["value"].to_numpy()[:27]
cov = pd.read_pickle("./input/respy-se-collection/covariance_kw_94_one.pkl").to_numpy()

In [4]:
# sample input parameters
np.random.seed(123)
distribution = cp.MvNormal(loc=mean, scale=cov)
input_params_10 = list(distribution.sample(10).T) # 10 draws for simplicity

## Function to use multiprocessing

In [5]:
def quantitiy_of_interest(input_params, *args):
    # We need the baseline options and a grid for the indices.
    # It does not matter which of the three KW94 specifications we use here.
    _, base_options = rp.get_example_model("kw_94_one", with_data=False)
    base_params = pd.read_pickle("./input/respy-se-collection/params_kw_94_one_se.pkl")

    params_idx = pd.Series(data=input_params, index=base_params.index[0:27])

    assert len(params_idx) == 27, "Length of KW94 vector must be 27."
    part_1 = params_idx

    rp_params, _ = rp.get_example_model("kw_94_one", with_data=False)
    part_2 = rp_params.iloc[27:31, 0]

    parts = [part_1, part_2]
    rp_params_series = pd.concat(parts)
    params_idx_respy = pd.DataFrame(rp_params_series, columns=["value"])

    simulate = rp.get_simulate_func(params_idx_respy, base_options)
    base_params = params_idx_respy.copy()
    base_df = simulate(base_params)
    base_edu = base_df.groupby("Identifier")["Experience_Edu"].max().mean()

    policy_params = params_idx_respy.copy()
    policy_params.loc[
        ("nonpec_edu", "at_least_twelve_exp_edu"), "value"
    ] += 500
    policy_df = simulate(policy_params)
    policy_edu = policy_df.groupby("Identifier")["Experience_Edu"].max().mean()

    change_mean_edu = policy_edu - base_edu
    
    print("I'm process", getpid(), ':', change_mean_edu)


    return change_mean_edu

<span style='color:blue'> "quantitiy_of_interest()" with multiprocessing (10 input draws with 1 processer): worked </span>

In [6]:
Parallel(n_jobs=1)(delayed(quantitiy_of_interest)(i) for i in input_params_10)

I'm process 63202 : 1.391
I'm process 63202 : 1.3960000000000008
I'm process 63202 : 1.5569999999999986
I'm process 63202 : 1.564
I'm process 63202 : 1.5259999999999998
I'm process 63202 : 1.5120000000000005
I'm process 63202 : 1.4510000000000005
I'm process 63202 : 1.5310000000000006
I'm process 63202 : 1.4900000000000002
I'm process 63202 : 1.6509999999999998


[1.391,
 1.3960000000000008,
 1.5569999999999986,
 1.564,
 1.5259999999999998,
 1.5120000000000005,
 1.4510000000000005,
 1.5310000000000006,
 1.4900000000000002,
 1.6509999999999998]

<span style='color:red'> "quantitiy_of_interest()" with multiprocessing(10 input draws with all 8 processor): sometimes fail, sometimes work </span>

In [11]:
Parallel(n_jobs=-1)(delayed(quantitiy_of_interest)(i) for i in input_params_10) # works sometime

[1.391,
 1.3960000000000008,
 1.5569999999999986,
 1.564,
 1.5259999999999998,
 1.5120000000000005,
 1.4510000000000005,
 1.5310000000000006,
 1.4900000000000002,
 1.6509999999999998]

<span style='color:red'> "quantitiy_of_interest()" with multiprocessing(100 input draws with all 8 processor): fail</span>

In [15]:
input_params_100 = list(distribution.sample(100).T)
Parallel(n_jobs=-1)(delayed(quantitiy_of_interest)(i) for i in input_params_100)

UncompressError: Error while decompressing: invalid input

## debug: function `quantitiy_of_interest` with multiprocessing works fine until the code `simulate = get_simulate_func()`

Create input arguments for `rp.get_simulate_func(params, options)`

In [20]:
def params_idx_respy(input_params, *args):
    """transfer sampled paramters to respy format."""
    
    # baseline options and params for the indices.
    _, base_options = rp.get_example_model("kw_94_one", with_data=False)
    base_params = pd.read_pickle("./input/respy-se-collection/params_kw_94_one_se.pkl")

    params_idx = pd.Series(data=input_params, index=base_params.index[0:27])

    assert len(params_idx) == 27, "Length of KW94 vector must be 27."
    part_1 = params_idx

    rp_params, _ = rp.get_example_model("kw_94_one", with_data=False)
    part_2 = rp_params.iloc[27:31, 0]

    parts = [part_1, part_2]
    rp_params_series = pd.concat(parts)
    params_idx_respy = pd.DataFrame(rp_params_series, columns=["value"])

    return params_idx_respy

In [21]:
start = time.time()
params_idx_respy = Parallel(n_jobs=-1)(delayed(params_idx_respy)(i) for i in input_params_10)
end = time.time()
# print(params_idx_respy)
print(f'\nTime to complete: {end - start:.2f}s\n')


Time to complete: 0.08s



In [22]:
params_idx_respy[9]

Unnamed: 0_level_0,Unnamed: 1_level_0,value
category,name,Unnamed: 2_level_1
delta,delta,0.949831
wage_a,constant,9.21028
wage_a,exp_edu,0.038065
wage_a,exp_a,0.032887
wage_a,exp_a_square,-0.000497
wage_a,exp_b,4.8e-05
wage_a,exp_b_square,2e-06
wage_b,constant,8.48375
wage_b,exp_edu,0.069812
wage_b,exp_b,0.06679


In [23]:
# wrap get_simulate_func
from functools import partial
get_simulate_func = partial(rp.get_simulate_func, options=base_options)

<span style='color:blue'> "rp.get_simulate_func()" with multiprocessing (10 input draws with 1 processer): worked </span>

In [24]:
start = time.time()
simulate = Parallel(n_jobs=1)(delayed(get_simulate_func)(params) for params in params_idx_respy)
end = time.time()
print(f'\nTime to complete: {end - start:.2f}s\n')


Time to complete: 44.70s




<span style='color:red'> "rp.get_simulate_func()" with multiprocessing (10 input draws with 8 processer):failed </span>

In [32]:
start = time.time()
simulate = Parallel(n_jobs=-1)(delayed(get_simulate_func)(params) for params in params_idx_respy)
end = time.time()
# print(simulate)
print(f'\nTime to complete: {end - start:.2f}s\n')

UncompressError: Error while decompressing: invalid input