Code for generating results for the Vietnam draft and earnings data (Figure 4(c)).

In [None]:
import numpy
import sympy
import pandas
import numpy as np
import pandas as pd
import sympy as sp
import datetime
import copy
import attr
import time
import pickle
import os
import functools

import matplotlib.pyplot as plt
from matplotlib import rcParams
import collections

from scipy import interpolate
from scipy import stats

from scipy.optimize import minimize

import warnings
from IPython.display import clear_output

HOME_DIR = ""
warnings.simplefilter(action='ignore', category=UserWarning)

In [None]:
df_angrist = pd.read_csv(os.path.join(HOME_DIR, "angrist_1951_cohort.csv")).drop("index", axis=1)
print(df_angrist.shape)
col = "earnings_normed"
# standardize the 'earnings` column.
df_angrist[col] = (df_angrist[col] - np.mean(df_angrist[col])) / np.std(df_angrist[col])
df_angrist.head()

(2117, 2)


Unnamed: 0,eligible,earnings_normed
0,0,0.232759
1,1,0.36723
2,1,0.030211
3,1,0.228705
4,0,0.705876


In [None]:
class ModelParams:

  def __init__(self):
    p_z_1 = np.mean(df_angrist["eligible"])
    p_z_0 = 1 - p_z_1

    # theta_1 := P(veteran|eligible=1) = 0.2831
    # theta_0 := P(veteran|eligible=0) = 0.1468
    theta_1 = 0.2831
    theta_0 = 0.1468
    
    self.c = stats.norm.ppf(theta_0)
    self.alpha = stats.norm.ppf(theta_1) - self.c

    y_mean_eligble_1 = np.mean(df_angrist[df_angrist["eligible"] == 1]["earnings_normed"])
    y_mean_eligble_0 = np.mean(df_angrist[df_angrist["eligible"] == 0]["earnings_normed"])
    # self.beta = (y_mean_eligble_1 - y_mean_eligble_0) / (theta_1 - theta_0)

    mean_veteran = theta_1 * p_z_1 + theta_0 * p_z_0
    self.beta = np.mean(df_angrist["earnings_normed"] * df_angrist["eligible"]) / (theta_1*p_z_1 -  mean_veteran*p_z_1)
    self.gamma = np.mean(df_angrist["earnings_normed"]) - self.beta * mean_veteran

    var_veteran = (
        p_z_1 * p_z_0 * (theta_1 - theta_0)**2 +
        p_z_1 * theta_1 * (1 - theta_1) + p_z_0 * theta_0 * (1 - theta_0)
    )
    cov_X_epsilon_x = (1 / np.sqrt(2 * np.pi)) * (np.exp(-self.c**2 / 2) * p_z_0 + np.exp(-(self.alpha + self.c)**2 / 2) * p_z_1)

    self.c_x = -.5
    self.epsilon_y_var = np.var(df_angrist["earnings_normed"]) - self.beta**2 * var_veteran - self.c_x**2 - 2 * self.beta * self.c_x * cov_X_epsilon_x
    
  def get_true_param(self):
    return self.beta
  
  def __str__(self):
    return "beta=%f, c_star=%f, alpha=%f, c_x=%f, epsilon_y_var=%f, gamma=%f" % (
        self.get_true_param(), self.c, self.alpha, self.c_x, self.epsilon_y_var, self.gamma
    )
    
truth = ModelParams()
print(truth)

beta=-0.431270, c_star=-1.050257, alpha=0.476600, c_x=-0.500000, epsilon_y_var=0.605823, gamma=0.083441


In [None]:
def generate_data_samples(num_samples, model, k=0.5):
  np = numpy
  pd = pandas

  df_idxs = np.argmax(np.random.multinomial(1, np.ones((df_angrist.shape[0],)) / df_angrist.shape[0],
                                            size=(num_samples,)), axis=-1)
  eligible = df_angrist["eligible"].loc[df_idxs]
  earnings = df_angrist["earnings_normed"].loc[df_idxs]

  u_x = np.random.normal(size=(num_samples,))
  # u_y = np.random.normal(scale=np.sqrt(model.epsilon_y_var / 100), size=(num_samples,))
  u_y = np.random.normal(scale=np.sqrt(model.epsilon_y_var / 10), size=(num_samples,))

  def get_Z():
    return eligible
  
  def get_X(Z, u_x):
    X_star = model.alpha * Z + model.c + u_x
    return (X_star > 0).astype(np.float)
  
  def get_Y(X, u_x, u_y):
    return model.beta * X + model.gamma + model.c_x * u_x + u_y
  
  Z = get_Z()
  X = get_X(Z, u_x)
  Y = get_Y(X, u_x, u_y)

  SEL = np.zeros_like(Y, np.int32)
  SEL[:int(SEL.shape[0] * k)] = 1

  df = pd.DataFrame({
      "Z": Z,
      "X": X,
      "Y": Y,
      "SEL": SEL
  }).reset_index(drop=True)
  return df

In [None]:
df = generate_data_samples(num_samples=10000, model=truth, k=0.5)
df.head()

Unnamed: 0,Z,X,Y,SEL
0,1,0.0,0.145354,1
1,0,0.0,0.091405,1
2,0,0.0,-0.231617,1
3,1,0.0,0.694866,1
4,0,0.0,-0.420311,1


In [None]:
class GMMEqs:

  def __init__(self):
    self.moment_eqs, self.jacobian_eqs, self.true_param_grad, self.moment_reweighting = self._get_equations()
  
  def get(self):
    return self.moment_eqs, self.jacobian_eqs, self.true_param_grad, self.moment_reweighting
  
  def _get_equations(self):
    sp = sympy
    
    df_symbols = sp.symbols('s1, s0, S, Z, X, Y')
    s1, s0, S, Z, X, Y = df_symbols
    params = sp.symbols("y1, y0, t1, t0")
    y1, y0, t1, t0 = params

    beta = (y1 - y0) / (t1 - t0)

    # moment weights
    m_wts = [s1, s1, s0, s0]
    moment_reweighting = sp.zeros(len(m_wts), len(m_wts))
    for i, mw1 in enumerate(m_wts):
      for j, mw2 in enumerate(m_wts):
        if mw1 == 1 or mw2 == 1:
          moment_reweighting[i, j] = mw1 * mw2
        elif mw1 == mw2:
          moment_reweighting[i, j] = mw1

    m1 = m_wts[0] * S * (Z * (Y - y1))
    m2 = m_wts[1] * S * ((1-Z) * (Y - y0))
    m3 = m_wts[2] * (1-S) * (Z*(X - t1))
    m4 = m_wts[3] * (1-S) * ((1-Z)*(X - t0))

    all_symbols = df_symbols + params
    moments = [m1, m2, m3, m4]
    jacobian = []
    for mom in moments:
      jac_row = []
      for p in params:
        eq = sp.simplify(sp.diff(mom, p))
        jac_row.append(sp.lambdify(all_symbols, eq, "numpy"))
      jacobian.append(jac_row)
    
    true_param_grad = []
    for p in params:
      eq = sp.simplify(sp.diff(beta, p))
      true_param_grad.append(sp.lambdify(params, eq, "numpy"))
    
    moments = [sp.lambdify(all_symbols, eq, "numpy") for eq in moments]
    moment_reweighting = sp.lambdify((s1, s0), moment_reweighting, "numpy")
    return moments, jacobian, true_param_grad, moment_reweighting

class GMM:

  def __init__(self, df, gmm_eqs):
    np = numpy

    # self.df = df
    self.df_dict = {k: df[k].values for k in df.columns}
    self.df = self.df_dict
    self.moment_eqs, self.jacobian_eqs, self.true_param_grad_eqs, self.moment_reweighting = gmm_eqs.get()
    self.num_samples = len(df)
    self.num_moments = len(self.moment_eqs)
    self.momconds_arr = np.zeros((self.num_moments, self.num_samples))
  
  def momconds(self, params):
    """Returns the emprical moment vector (equivalent of \hat{g}_n(\theta))."""
    np = numpy

    n = self.num_samples
    return np.sum(self._momconds_arr(params), axis=-1) / n
  
  def _compute_moment_covariance(self, params):
    """Estimate the optimal weight matrix using the emprical moments."""
    n = self.num_samples
    moms = self._momconds_arr(params)
    moment_covariance = (moms @ moms.T) / n
    return moment_covariance
  
  def _get_objective_fn(self, weight_matrix_inv):
    """Returns the GMM objective function."""
    np = numpy
    
    # print(weight_matrix_inv)
    def objective(params):
      moms = self.momconds(params)

      w_inv_mom = np.linalg.solve(weight_matrix_inv, moms)
      obj = moms.T @ w_inv_mom

      return obj
    
    return objective
  
  def _momconds_arr(self, params):
    np = numpy

    p = params
    df = self.df_dict
    n = self.num_samples

    for i, eq in enumerate(self.moment_eqs):
      self.momconds_arr[i, :] = eq(S=df["SEL"], Z=df["Z"],
                                   X=df["X"], Y=df["Y"],
                                   s0=1, s1=1,
                                   y1=p[0], y0=p[1], t1=p[2], t0=p[3])
    
    return self.momconds_arr

  def _get_asymptotic_variance(self, k, moment_covariance, params):
    np = numpy

    current_k = np.mean(self.df["SEL"])

    p = params
    df = self.df
    n = self.num_samples

    jacobian = np.zeros((self.num_moments, len(p)))

    for i, jac_row in enumerate(self.jacobian_eqs):
      for j, eq in enumerate(jac_row):
        jacobian[i, j] = np.sum(eq(S=df["SEL"], Z=df["Z"],
                                   X=df["X"], Y=df["Y"],
                                   s1=k/current_k, s0=(1-k)/(1-current_k),
                                   y1=p[0], y0=p[1], t1=p[2], t0=p[3])) / n

    moment_covariance_reweight = self.moment_reweighting(s1=k/current_k, s0=(1-k)/(1-current_k))
    
    moment_covariance = moment_covariance * moment_covariance_reweight

    mom_cov_inv_jac = np.linalg.solve(moment_covariance, jacobian)
    variance_matrix = np.linalg.inv(jacobian.T @ mom_cov_inv_jac)

    true_param_grad = np.zeros((variance_matrix.shape[0]))
    for j, eq in enumerate(self.true_param_grad_eqs):
      true_param_grad[j] = eq(y1=p[0], y0=p[1], t1=p[2], t0=p[3])
    
    return true_param_grad.T @ variance_matrix @ true_param_grad
  
  def get_ate_estimate(self, params):
    return (params[0] - params[1]) / (params[2] - params[3])

  def _optimize_find_parameters(self, weight_matrix_inv, initial_guess=None):
    np = numpy

    initial_guess = np.array([1, 1, 0.5, 0.5]) if initial_guess is None else initial_guess
    res = minimize(self._get_objective_fn(weight_matrix_inv), initial_guess,
                   bounds=[
                           (-np.inf, np.inf),
                           (-np.inf, np.inf),
                           (0.01, 0.99),
                           (0.01, 0.99),
                   ],
                  )
    return res.x
  
  def find_parameters(self, num_iters=2, weight_matrix_reg=None):
    np = numpy

    weight_matrix_inv = np.eye(self.num_moments)
    for i in range(num_iters):
      params = self._optimize_find_parameters(weight_matrix_inv, initial_guess=params if i > 0 else None)
      # print(params)
      weight_matrix_inv = self._compute_moment_covariance(params)

      if weight_matrix_reg is not None:
        weight_matrix_inv += weight_matrix_reg * np.eye(weight_matrix_inv.shape[0])
    
    return params, weight_matrix_inv
  
  def find_optimal_k(self, moment_covariance, params):
    np = numpy
    
    initial_guess = np.array([0.5])
    lower_bound = 0.05
    upper_bound = 0.95
    res = minimize(lambda x: self._get_asymptotic_variance(x[0], moment_covariance, params),
                   initial_guess, bounds=[(lower_bound, upper_bound)])
    
    if res.x[0] == lower_bound:
      return 0
    
    if res.x[0] == upper_bound:
      return 1
      
    return res.x[0]

In [None]:
gmm = GMM(df, GMMEqs())
params, moment_covariance = gmm.find_parameters(num_iters=2)

print("Estimated:")
print(params)
print(gmm.get_ate_estimate(params))
print("True params:")
print(truth)

optimal_k = gmm.find_optimal_k(moment_covariance, params)
print("Optimal k: %f" % optimal_k)

del gmm, params, moment_covariance, optimal_k

Estimated:
[-0.04482065  0.03092293  0.27816271  0.1508664 ]
-0.5950178504639706
True params:
beta=-0.431270, c_star=-1.050257, alpha=0.476600, c_x=-0.500000, epsilon_y_var=0.605823, gamma=0.083441
Optimal k: 0.731171


In [None]:
class SampleRevealer:

  def __init__(self, budget, df):
    self.initial_budget = budget
    self.budget = budget
    self.buffer_size = len(df)
    self.df = df
    self.counter = 0
    self.cost_per_reveal = 1

  def reset(self):
    self.counter = 0
    self.budget = self.initial_budget
  
  def is_budget_left(self, samples_to_reveal):
    return self.budget >= (self.cost_per_reveal * samples_to_reveal)
  
  def reveal(self, reveal_k, samples_to_reveal):
    if self.counter + samples_to_reveal >= self.buffer_size:
      raise ValueError("no buffer")
    
    observe_count = int(reveal_k * samples_to_reveal)
    self.df["SEL"].values[self.counter:self.counter+observe_count] = 1
    self.df["SEL"].values[self.counter+observe_count:self.counter+samples_to_reveal] = 0

    self.counter += samples_to_reveal
    self.budget -= (self.cost_per_reveal * samples_to_reveal)
  
  def get_dataset(self):
    return self.df[:self.counter]

In [None]:
def batch_fractions_to_sizes(horizon, batch_fractions):
  batch_sizes = []
    
  for i in range(len(batch_fractions) + 1):
    prev_batch_end = 0 if i == 0 else int(horizon * batch_fractions[i - 1])
    curr_batch_end = horizon if i == len(batch_fractions) else int(horizon * batch_fractions[i])

    batch_sizes.append(curr_batch_end - prev_batch_end)
  
  return batch_sizes

In [None]:
def compute_reveal_k(current_samples, samples_to_reveal, current_k, target_k):
  reveal_k = (target_k * (current_samples + samples_to_reveal) - current_k * current_samples) / (samples_to_reveal)
  return min(1, (max(0, reveal_k)))

In [None]:
class StrategyRunResult:

  def __init__(self):
    self.budgets_used = []
    self.squared_errors = []
    self.optimal_ks = []
    self.current_ks = []
  
  def append(self, budget_used, squared_error, optimal_k, current_k):
    self.budgets_used.append(budget_used)
    self.squared_errors.append(squared_error)
    self.optimal_ks.append(optimal_k)
    self.current_ks.append(current_k)


In [None]:
class OracleStrategy:

  def __init__(self, sample_revealer, gmm_equations, optimal_k, horizon, batch_fractions):
    self.sample_revealer = sample_revealer
    self.name = 'oracle'
    self.batch_sizes = batch_fractions_to_sizes(horizon, batch_fractions)
    self.optimal_k = optimal_k
    self.gmm_equations = gmm_equations
  
  def get_current_df_vals(self):
    np = numpy

    dataset = self.sample_revealer.get_dataset()

    len_left_corner = np.sum(dataset["SEL"])
    len_right_corner = len(dataset) - len_left_corner
    total = (len_left_corner + len_right_corner)
    return len_left_corner / total, total

  def can_step(self):
    return self.sample_revealer.is_budget_left()
  
  def get_and_store_params(self):
    dataset = self.sample_revealer.get_dataset()
    self.gmm = GMM(dataset, self.gmm_equations)

    self.params, self.moment_covariance = self.gmm.find_parameters(num_iters=2)
    return self.params

  def get_squared_error(self):
    error = (truth.get_true_param() - self.gmm.get_ate_estimate(self.params))**2
    return error
  
  def execute_run(self):
    np = numpy

    result = StrategyRunResult()
    for i, batch_size in enumerate(self.batch_sizes):
      if i == 0:
        current_k, current_samples = 0, 0
      else:
        current_k, current_samples = self.get_current_df_vals()
      reveal_k = compute_reveal_k(current_samples, batch_size, current_k, self.optimal_k)

      self.sample_revealer.reveal(reveal_k=reveal_k, samples_to_reveal=batch_size)

      if i == 0:
        df = self.sample_revealer.get_dataset()
        if np.mean(df["X"]) <= 0.1 or np.mean(df["Z"]) <= 0.1:
          return None

      _ = self.get_and_store_params()

      current_k, _ = self.get_current_df_vals()

      result.append(
          self.sample_revealer.initial_budget - self.sample_revealer.budget,
          self.get_squared_error(), self.optimal_k, current_k,
      )
    
    return result

In [None]:
class ETCStrategy:

  def __init__(self, sample_revealer, gmm_equations, horizon, batch_fractions):
    self.sample_revealer = sample_revealer
    self.name = 'etc'
    # We assume that the first batch size is exploration.
    self.batch_sizes = batch_fractions_to_sizes(horizon, batch_fractions)
    self.gmm_equations = gmm_equations
  
  def get_current_df_vals(self):
    np = numpy

    dataset = self.sample_revealer.get_dataset()

    len_left_corner = np.sum(dataset["SEL"])
    len_right_corner = len(dataset) - len_left_corner
    total = (len_left_corner + len_right_corner)
    return len_left_corner / total, total

  def can_step(self):
    return self.sample_revealer.is_budget_left()
  
  def get_and_store_params(self):
    dataset = self.sample_revealer.get_dataset()
    self.gmm = GMM(dataset, self.gmm_equations)
    self.params, self.moment_covariance = self.gmm.find_parameters(num_iters=2)
    return self.params

  def get_squared_error(self):
    error = (truth.get_true_param() - self.gmm.get_ate_estimate(self.params))**2
    return error
  
  def execute_run(self):
    np = numpy

    result = StrategyRunResult()
    for i, batch_size in enumerate(self.batch_sizes):
      if i == 0:
        reveal_k = 0.5
      else:
        current_k, current_samples = self.get_current_df_vals()
        reveal_k = compute_reveal_k(current_samples, batch_size, current_k, self.optimal_k)

      self.sample_revealer.reveal(reveal_k=reveal_k, samples_to_reveal=batch_size)

      if i == 0:
        df = self.sample_revealer.get_dataset()
        if np.mean(df["X"]*df["SEL"]) < 0.05 or np.mean(df["X"]*(1-df["SEL"])) < 0.05:
          return None
        if np.mean(df["Z"]*df["SEL"]) < 0.05 or np.mean(df["Z"]*(1-df["SEL"])) < 0.05:
          return None

      params = self.get_and_store_params()

      if i == 0:
        self.optimal_k = self.gmm.find_optimal_k(self.moment_covariance, params)

      current_k, _ = self.get_current_df_vals()

      result.append(
          self.sample_revealer.initial_budget - self.sample_revealer.budget,
          self.get_squared_error(), self.optimal_k, current_k,
      )
    
    return result

In [None]:
class ETGreedyStrategy:

  def __init__(self, sample_revealer, gmm_equations, horizon, batch_fractions):
    self.sample_revealer = sample_revealer
    self.name = 'etg'
    # We assume that the first batch size is exploration.
    self.batch_sizes = batch_fractions_to_sizes(horizon, batch_fractions)
    self.gmm_equations = gmm_equations
    self.weight_matrix_reg = 0.01
  
  def get_current_df_vals(self):
    np = numpy

    dataset = self.sample_revealer.get_dataset()

    len_left_corner = np.sum(dataset["SEL"])
    len_right_corner = len(dataset) - len_left_corner
    total = (len_left_corner + len_right_corner)
    return len_left_corner / total, total

  def can_step(self):
    return self.sample_revealer.is_budget_left()
  
  def get_and_store_params(self, is_last_step=False):
    dataset = self.sample_revealer.get_dataset()
    self.gmm = GMM(dataset, self.gmm_equations)
    if is_last_step:
      self.params, self.moment_covariance = self.gmm.find_parameters(num_iters=2)
    else:
      self.params, self.moment_covariance = (
          self.gmm.find_parameters(num_iters=2, weight_matrix_reg=self.weight_matrix_reg)
      )
    return self.params

  def get_squared_error(self):
    error = (truth.get_true_param() - self.gmm.get_ate_estimate(self.params))**2
    return error
  
  def execute_run(self):
    np = numpy

    result = StrategyRunResult()
    for i, batch_size in enumerate(self.batch_sizes):
      if i == 0:
        reveal_k = 0.5
      else:
        current_k, current_samples = self.get_current_df_vals()
        reveal_k = compute_reveal_k(current_samples, batch_size, current_k, self.optimal_k)

      self.sample_revealer.reveal(reveal_k=reveal_k, samples_to_reveal=batch_size)
      
      if i == 0:
        df = self.sample_revealer.get_dataset()
        if np.mean(df["X"]*df["SEL"]) < 0.05 or np.mean(df["X"]*(1-df["SEL"])) < 0.05:
          return None
        if np.mean(df["Z"]*df["SEL"]) < 0.05 or np.mean(df["Z"]*(1-df["SEL"])) < 0.05:
          return None

      params = self.get_and_store_params(is_last_step=(i==len(self.batch_sizes)-1))

      self.optimal_k = self.gmm.find_optimal_k(self.moment_covariance, params)

      current_k, _ = self.get_current_df_vals()

      result.append(
          self.sample_revealer.initial_budget - self.sample_revealer.budget,
          self.get_squared_error(), self.optimal_k, current_k,
      )
    
    return result

In [None]:
def execute_strategy_iteration(strategy_name, iteration_num, horizon):
  np = numpy

  gmm_equations = GMMEqs()

  # Uncomment the next two lines to replicate the exact runs used in the paper.
  # random_seed = 232281293 + iteration_num
  # np.random.seed(random_seed)
  df = generate_data_samples(num_samples=horizon + 1000, model=truth)
  np.random.seed(None)

  sample_revealer = SampleRevealer(budget=horizon, df=df)

  if strategy_name == "etc_0.2":
    strategy = ETCStrategy(sample_revealer, gmm_equations=gmm_equations,
                           horizon=horizon,
                           batch_fractions=[0.2, 0.9])
  elif strategy_name == "etc_0.4":
    strategy = ETCStrategy(sample_revealer, gmm_equations=gmm_equations,
                           horizon=horizon,
                           batch_fractions=[0.4, 0.9])
  elif strategy_name == "oracle":
    strategy = OracleStrategy(sample_revealer, gmm_equations=gmm_equations, optimal_k=0.7826,
                              horizon=horizon, batch_fractions=[0.8, 0.9])
  elif strategy_name == "fixed_equal":
    strategy = OracleStrategy(sample_revealer, gmm_equations=gmm_equations, optimal_k=0.50,
                              horizon=horizon, batch_fractions=[0.8, 0.9])
  elif strategy_name == "etg_0.1":
    strategy = ETGreedyStrategy(sample_revealer, gmm_equations=gmm_equations,
                        horizon=horizon,
                        batch_fractions=[0.1, 0.2, 0.3, 0.4, 0.6, 0.8])
  elif strategy_name == "etg_0.2":
    strategy = ETGreedyStrategy(sample_revealer, gmm_equations=gmm_equations,
                        horizon=horizon,
                        batch_fractions=[0.2, 0.4, 0.6, 0.8])
  else:
    raise ValueError("invalid strategy_name")

  return strategy.execute_run()

In [None]:
# Test a strategy.

result = execute_strategy_iteration("etc_0.4", 1, horizon=1000)
print(result.squared_errors)

[1.4711640605397087, 0.11348665235017591, 0.07238505893981426]


**Executing the runs for each strategy in parallel.**

For the paper, we execute 12,000 runs for each strategy. We execute the runs
in parallel using
[`ipyparallel`](https://ipyparallel.readthedocs.io/en/latest/).

The following command starts the `ipyparallel` engines:
```
ipcluster start -n <num_engines>
```
 

In [None]:
import ipyparallel as ipp

In [None]:
# Verify that ipcluster is running and import the necessary Python packages.

parallel_client = ipp.Client(debug=False)
dview = parallel_client[:]
# Execute an identity map in parallel.
ar = dview.map(lambda x: x, (i for i in range(0, 2000000, 2)))
assert ar.get()[0] == 0

# Import the required Python packages.
with dview.sync_imports():
  from abc import ABC, abstractmethod
  import numpy
  import sympy
  import pandas
  import sympy
  import datetime
  import copy
  import attr
  import time
  import logging
  import itertools
  import pickle
  import os
  import functools
  import ipyparallel

  import collections

  from scipy.optimize import minimize
  from scipy import interpolate
  from scipy import stats
  import warnings

  try:
    from cPickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL
  except ImportError:
    from pickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL

# Make sure ipyparallel is still able to execute functions.
dview = parallel_client[:]
ar = dview.map(lambda x: x, (i for i in range(0, 2000000, 2)))
assert ar.get()[0] == 0

importing ABC,abstractmethod from abc on engine(s)
importing numpy on engine(s)
importing sympy on engine(s)
importing pandas on engine(s)
importing datetime on engine(s)
importing copy on engine(s)
importing attr on engine(s)
importing time on engine(s)
importing logging on engine(s)
importing itertools on engine(s)
importing pickle on engine(s)
importing os on engine(s)
importing functools on engine(s)
importing ipyparallel on engine(s)
importing collections on engine(s)
importing minimize from scipy.optimize on engine(s)
importing interpolate from scipy on engine(s)
importing stats from scipy on engine(s)


In [None]:
def combine_parallel_results(async_result):
  budgets = async_result.get()[0].budgets_used
  errors = np.vstack([res.squared_errors for res in async_result.get() if res is not None])
  optimal_ks = np.vstack([res.optimal_ks for res in async_result.get() if res is not None])
  current_ks = np.vstack([res.current_ks for res in async_result.get() if res is not None])

  return budgets, errors, optimal_ks, current_ks

def execute_strategy_in_parallel(strategy_name, horizon, iterations):
  num_threads = len(parallel_client.ids)
  dview = parallel_client[:]

  print("Executing %s over %d iterations across %d cores" % (strategy_name, iterations, num_threads))

  dview["batch_fractions_to_sizes"] = batch_fractions_to_sizes
  dview["compute_reveal_k"] = compute_reveal_k
  dview["ModelParams"] = ModelParams
  dview["truth"] = truth
  dview["df_angrist"] = df_angrist
  dview["generate_data_samples"] = generate_data_samples
  dview["StrategyRunResult"] = StrategyRunResult
  dview["GMMEqs"] = GMMEqs
  dview["GMM"] = GMM
  dview["ETCStrategy"] = ETCStrategy
  dview["OracleStrategy"] = OracleStrategy
  dview["ETGreedyStrategy"] = ETGreedyStrategy
  dview["SampleRevealer"] = SampleRevealer
  dview["execute_strategy_iteration"] = execute_strategy_iteration

  def execute_iteration(i):
    return execute_strategy_iteration(strategy_name, i, horizon)

  return dview.map(execute_iteration, range(iterations))

In [None]:
def get_timeseries_for(strategy_names, horizons, iterations, results_dict):
  results_dict["truth"] = truth

  for strategy_name in strategy_names:
    for horizon in horizons:
      print("Timestamp start: %s, Strategy: %s, Horizon: %d, Iters: %d" % (
          datetime.datetime.now(), strategy_name, horizon, iterations))
      async_result = execute_strategy_in_parallel(strategy_name, horizon, iterations)
      budgets, errors, optimal_ks, current_ks = combine_parallel_results(async_result)
      print("Timestamp end: %s" % (datetime.datetime.now()))

      if strategy_name not in results_dict:
        results_dict[strategy_name] = {}
      
      results_dict[strategy_name][horizon] = {
          "budgets": budgets,
          "errors": errors * budgets,
          "optimal_ks": optimal_ks,
          "current_ks": current_ks,
      }

In [None]:
results_dict = {}
get_timeseries_for([
                    "oracle", "fixed_equal",
                    "etc_0.2", "etc_0.4",
                    "etg_0.1", "etg_0.2",
                    ],
                   horizons=[6000, 8000, 10000, 12000, 14000, 16000],
                   iterations=12*1000,
                   results_dict=results_dict)

In [None]:
# Save results to file.
# pickle.dump(results_dict,
#             open(os.path.join(HOME_DIR,
#                               "%s_veitnam_earnings_graph.pkl" % datetime.datetime.now()),
#                  "wb"))

### Plot the results

In [None]:
# For the color map:
# https://gist.github.com/AndiH/c957b4d769e628f506bd

# Tableau 20 Colors
tableau20 = [(31, 119, 180), (174, 199, 232), (255, 127, 14), (255, 187, 120),  
             (44, 160, 44), (152, 223, 138), (214, 39, 40), (255, 152, 150),  
             (148, 103, 189), (197, 176, 213), (140, 86, 75), (196, 156, 148),  
             (227, 119, 194), (247, 182, 210), (127, 127, 127), (199, 199, 199),  
             (188, 189, 34), (219, 219, 141), (23, 190, 207), (158, 218, 229)]
             
# Tableau Color Blind 10
tableau20blind = [(0, 107, 164), (255, 128, 14), (171, 171, 171), (89, 89, 89),
             (95, 158, 209), (200, 82, 0), (137, 137, 137), (163, 200, 236),
             (255, 188, 121), (207, 207, 207)]
  
# Rescale to values between 0 and 1 
for i in range(len(tableau20)):  
    r, g, b = tableau20[i]  
    tableau20[i] = (r / 255., g / 255., b / 255.)
for i in range(len(tableau20blind)):  
    r, g, b = tableau20blind[i]  
    tableau20blind[i] = (r / 255., g / 255., b / 255.)
# Use with plt.plot(…, color=tableau[0],…)

In [None]:
def plot_regret_curve(results_dict):

  clist = rcParams['axes.prop_cycle']
  cgen = itertools.cycle(clist)

  oracle_mses = {}
  for horizon, timeseries in results_dict["oracle"].items():
    oracle_mses[timeseries["budgets"][-1]] = np.mean(timeseries["errors"][:, -1])

  def plot(axs, name, info, result, with_var=False):
    x_vals = []
    y_mean = []
    y_std = []

    current_k_mean = []
    optimal_k_mean = []

    for horizon, timeseries in result.items():
      x_vals.append(timeseries["budgets"][-1])

      y_scaled = ( timeseries["errors"][:, -1] - oracle_mses[x_vals[-1]] )  / oracle_mses[x_vals[-1]] * 100
      y_mean.append(np.mean(y_scaled))
      y_std.append(np.std(y_scaled))

    x_vals = np.array(x_vals)
    y_mean = np.array(y_mean) 
    y_std = np.array(y_std)

    x_sort_idx = np.argsort(x_vals)
    x_vals = x_vals[x_sort_idx]
    y_mean = y_mean[x_sort_idx]
    y_std = y_std[x_sort_idx]
    
    color = next(cgen)["color"]

    plt.plot(x_vals, y_mean, label=name, color=info[1], linestyle=info[0],  marker=info[2])
    plt.legend()

    if with_var:
      ci = 1.96 * y_std / np.sqrt(timeseries["errors"].shape[0])
      axs.errorbar(x_vals, y_mean, yerr=ci, ls="none", color=info[1])
      
  name_to_linestyle_color = {
      "etc_0.2": ["dashdot", tableau20blind[1], "s"],
      "etc_0.4": ["solid", tableau20blind[2], "v"],
      "etg_0.1": ["dotted", tableau20blind[4], "^"],
      "etg_0.2": ["dashed", tableau20blind[6], "D"],
      "fixed_equal": ["solid", tableau20blind[8], ">"],
  }
  plt.title("Relative regret vs horizon")
  plt.xlabel("Total samples collected (Horizon)")
  plt.ylabel("Relative regret (%)")
  for name, info in name_to_linestyle_color.items():  
    plot(plt, name, info, results_dict[name], with_var=True)
  
  # plt.savefig(os.path.join(HOME_DIR, "figures/vietnam_draft_earnings_regret_curve.eps"), bbox_inches='tight', pad_inches=0.0)

In [None]:
plot_regret_curve(results_dict)