In [1]:
import os
import json
import pickle
import numpy as np

# Raw data location

**In order to run this notebook, you need to first download the raw data from Dryad.**  
https://doi.org/10.5061/dryad.bnzs7h4c4

```DATA_DIR``` in the cell below must provide a path to the raw data folder.

In [2]:
DATA_DIR = 'raw_data' # <--- Fill in after downloading the raw data.

#  
# Pre-processing functions
#  
## Analyzing resource flow between agents
BaseFlow sets up a base class used by subsequent analysis classes

In [3]:
class BaseFlow:
    def __init__(self, log, remap_by_skill=False):
        self._log = log
        self._t = len(self._log['states']) - 1

        self._s0 = self._log['states'][0]
        self._num_agents = len(self._s0) - 1

        self._remap_by_skill = bool(remap_by_skill)
        if self._remap_by_skill:
            assert 'build_payment' in self._log['states'][0]['0']
            build_skills = np.array([self._s0[str(i)]['build_payment'] for i in range(self.num_agents)])
            self._aidx = np.argsort(build_skills).tolist()
        else:
            self._aidx = list(range(self.num_agents))

        self._decay_array = np.arange(self._t)[::-1]

    @property
    def num_agents(self):
        return self._num_agents

    @property
    def remap_by_skill(self):
        return self._remap_by_skill

    @property
    def aidx(self):
        return self._aidx

    @property
    def env_flow(self):
        raise NotImplementedError

    @property
    def interaction_flow(self):
        raise NotImplementedError

    def get_weighted_sum(self, x, decay):
        n = x.shape[0]
        assert 0 <= decay <= 1.0
        t_back = self._decay_array[-n:]
        t_weights = decay ** t_back
        t_weights[t_weights < 0.01] = 0.0
        for _ in range(1, len(x.shape)):
            t_weights = t_weights[:, None]
        return np.sum(x * t_weights, axis=0)

    def direction_flow(self, dst, src, t_s=0, t_e=None, decay=1.0):
        dst = int(dst)
        assert 0 <= dst < self.num_agents

        if t_e is None:
            t_e = self._t

        # From the environment to an agent
        if isinstance(src, str) or dst == src:
            src = src.lower()[0]
            assert src == 'e'

            to_dst_from_env = self.get_weighted_sum(self.env_flow[t_s:t_e, dst], decay)
            return to_dst_from_env

        # From an agent to another agent
        else:
            src = int(src)
            assert 0 <=src < self.num_agents

            to_dst_from_src = self.get_weighted_sum(self.interaction_flow[t_s:t_e, dst, src], decay)
            return to_dst_from_src

    def get_net_flow_matrix(self, t_s=0, t_e=None, decay=1.0):
        if t_e is None:
            t_e = self._t

        # Diagonal entries denote flow created by interactions with the environment (compute last)
        # Off-diagnonal entries denote flow created by interactions with other agents

        # First, compute direction flows for agent-to-agent interactions
        flow_matrix = self.get_weighted_sum(self.interaction_flow[t_s:t_e], decay)

        # Subtract incoming from outgoing and rectify (so that edges only show positive net flow)
        flow_matrix = np.maximum(0, flow_matrix - flow_matrix.T)

        # Add flow from the environment
        flow_matrix = flow_matrix + np.diag(self.get_weighted_sum(self.env_flow[t_s:t_e], decay))

        return flow_matrix

ResourceFlow tracks the flow of resources (i.e. Stone or Wood) from gathering and between agents via trading

In [4]:
class ResourceFlow(BaseFlow):
    def __init__(self, log, resource, remap_by_skill=False):
        super().__init__(log, remap_by_skill)

        assert 'Gather' in self._log
        self._resource = resource

        self._gathers = np.zeros((self._t, self.num_agents))
        self._trades  = np.zeros((self._t, self.num_agents, self.num_agents))
        self._prices  = np.zeros((self._t, self.num_agents, self.num_agents))

        self._organize_gathers()
        self._organize_trades()

    @property
    def resource(self):
        return self._resource

    @property
    def gathers(self):
        return self._gathers

    @property
    def trades(self):
        return self._trades
    
    @property
    def prices(self):
        return self._prices

    def _organize_gathers(self):
        self._gathers *= 0

        for t, gathers in enumerate(self._log['Gather']):
            for gather in gathers:
                if gather['resource'] == self.resource:
                    self._gathers[t, gather['agent']] += gather['n']

        self._gathers = self._gathers[:, self.aidx]

    def _organize_trades(self):
        self._trades *= 0

        if 'Trade' not in self._log:
            return

        for t, trades in enumerate(self._log['Trade']):
            if isinstance(trades, dict):
                trades = trades['trades']
            for trade in trades:
                if trade['commodity'] == self.resource:
                    self._trades[t, trade['buyer'], trade['seller']] += 1
                    self._prices[t, trade['buyer'], trade['seller']] += trade['price']

        self._trades = self._trades[:, self.aidx]
        self._trades = self._trades[:, :, self.aidx]
        self._prices = self._prices[:, self.aidx]
        self._prices = self._prices[:, :, self.aidx]
        

    @property
    def env_flow(self):
        return self.gathers

    @property
    def interaction_flow(self):
        return self.trades

IncomeFlow tracks the flow of Coin from building and between agents via trading, and (optionally) as resulting from redistribution of tax revenue.

In [5]:
class IncomeFlow(BaseFlow):
    def __init__(self, log, include_redistribution=False, remap_by_skill=False):
        super().__init__(log, remap_by_skill)

        assert 'Build' in self._log

        self._include_redistribution = bool(include_redistribution)

        self._builds = np.zeros((self._t, self.num_agents))
        self._trades = np.zeros((self._t, self.num_agents, self.num_agents))
        self._redist = np.zeros((self._t, self.num_agents, self.num_agents))

        self._organize_builds()
        self._organize_trades()
        self._organize_redist()

    @property
    def include_redistribution(self):
        return self._include_redistribution

    @property
    def builds(self):
        return self._builds

    @property
    def trades(self):
        return self._trades

    @property
    def redist(self):
        return self._redist

    def _organize_builds(self):
        self._builds *= 0

        for t, builds in enumerate(self._log['Build']):
            if isinstance(builds, dict):
                builds = builds['builds']
            for build in builds:
                self._builds[t, build['builder']] += build['income']

        self._builds = self._builds[:, self.aidx]

    def _organize_trades(self):
        self._trades *= 0

        if 'Trade' not in self._log:
            return

        for t, trades in enumerate(self._log['Trade']):
            if isinstance(trades, dict):
                trades = trades['trades']
            for trade in trades:
                self._trades[t, trade['seller'], trade['buyer']] += trade['income']

        self._trades = self._trades[:, self.aidx]
        self._trades = self._trades[:, :, self.aidx]

    def _organize_redist(self):
        self._redist *= 0

        if 'PeriodicTax' not in self._log:
            return

        if not self.include_redistribution:
            return

        for t, taxes in enumerate(self._log['PeriodicTax']):
            if taxes:
                for i in range(self.num_agents):
                    tax_paid = taxes[str(i)]['tax_paid']
                    redist_share = tax_paid / self.num_agents
                    for dst in range(self.num_agents):
                        if dst == i:
                            continue
                        self._redist[t, dst, i] = redist_share

        self._redist = self._redist[:, self.aidx]
        self._redist = self._redist[:, :, self.aidx]

    @property
    def env_flow(self):
        return self.builds

    @property
    def interaction_flow(self):
        return self.trades + self.redist

## Statistics functions
Helpers:

In [6]:
def get_num_agents(log):
    """Get the number of non-planner agents."""
    s0 = log['states'][0]
    return len(s0) - 1

def sort_by_skill(array_getter_func):
    """Useful decorator to automatically return agent-wise statistics as sorted by build skill."""
    def sorted_array_getter_func(log):
        s0 = log['states'][0]
        skills = [s0[str(i)]['build_payment'] for i in range(get_num_agents(log))]
        s_idx = np.argsort(skills)
        
        out = array_getter_func(log)
        
        if isinstance(out, np.ndarray):
            return out[s_idx]
        if isinstance(out, (tuple, list)):
            return [array[s_idx] for array in out]
        if isinstance(out, dict):
            return {k: v[s_idx] for k, v in out.items()}
        raise NotImplementedError
        
    return sorted_array_getter_func    

Build-skill-sorted, agent-wise stats:

In [7]:
@sort_by_skill
def get_income_and_tax_stats(log):
    """Get build-skill-sorted, agent-wise stats related to labor, income, and taxes."""
    nA = get_num_agents(log)
    
    if 'PeriodicTax' not in log:
        tax_paid = np.zeros(nA)
        lump_sum = np.zeros(nA)
        
    else:
        periods = log['PeriodicTax'][99::100]
        tax_paid = np.array([[period[str(i)]['tax_paid'] for i in range(nA)] for period in periods]).sum(0)
        lump_sum = np.array([[period[str(i)]['lump_sum'] for i in range(nA)] for period in periods]).sum(0)
        
    net_tax_paid = tax_paid - lump_sum
        
    sT = log['states'][-1]
    
    post_tax_income = np.array([sT[str(i)]['inventory']['Coin']+sT[str(i)]['escrow']['Coin'] for i in range(nA)])
    pre_tax_income = post_tax_income + net_tax_paid
    
    labor = np.array([sT[str(i)]['endogenous']['Labor'] for i in range(nA)])
    
    return {
        'pre_tax_income': pre_tax_income,
        'post_tax_income': post_tax_income,
        'tax_paid': tax_paid,
        'lump_sum': lump_sum,
        'net_tax_paid': net_tax_paid,
        'labor': labor,
    }

@sort_by_skill
def get_net_trade_income(log):
    """Get build-skill-sorted, agent-wise net income specifically from trading."""
    i_flow = IncomeFlow(log, include_redistribution=False, remap_by_skill=False)
    trade_flow = i_flow.trades.sum(0)
    revenue = trade_flow.sum(1)
    cost = trade_flow.sum(0)
    return {
        'net_trade_income': revenue - cost
    }

@sort_by_skill
def get_net_build_income(log):
    """Get build-skill-sorted, agent-wise net income specifically from building."""
    i_flow = IncomeFlow(log, include_redistribution=False, remap_by_skill=False)
    return {
        'net_build_income': i_flow.builds.sum(0)
    }

@sort_by_skill
def get_gather_info(log):
    """Get build-skill-sorted, agent-wise resource gathering counts."""
    w_flow = ResourceFlow(log, 'Wood', remap_by_skill=False)
    s_flow = ResourceFlow(log, 'Stone', remap_by_skill=False)
    return {
        'total_wood_collected':  w_flow.gathers.sum(0),
        'total_stone_collected': s_flow.gathers.sum(0),
        'total_resources_collected': w_flow.gathers.sum(0) + s_flow.gathers.sum(0),
    }

@sort_by_skill
def get_tax_gaming_info(log):
    """Get build-skill-sorted, agent-wise tax due based on actual incomes and temporally smoothed incomes."""
    if 'PeriodicTax' not in log:
        zs = np.zeros(get_num_agents(log))
        return {
            'tax_burden_actual_inc': zs,
            'tax_burden_average_inc': zs
        }
    
    def taxes_due(income, cutoffs, rates):
        """Return the total amount of taxes due at this income level."""
        past_cutoff = np.maximum(0, income - cutoffs)
        bracket_sizes = np.concatenate([cutoffs[1:]-cutoffs[:-1], [np.inf]])
        bin_income = np.minimum(bracket_sizes, past_cutoff)
        bin_taxes = rates * bin_income
        return np.sum(bin_taxes)
    
    nA = get_num_agents(log)
    
    periods = log['PeriodicTax'][99::100]
    
    incomes = np.array([[period[str(i)]['income'] for i in range(nA)] for period in periods])
    avg_inc = incomes.mean(0)
    
    due_actual_inc  = np.zeros((len(periods), nA))
    due_average_inc = np.zeros((len(periods), nA))
    
    for ip, period in enumerate(periods):
        cutoffs = np.array(period['cutoffs'])
        rates = np.array(period['schedule'])
        for ia in range(nA):
            due_actual_inc[ip, ia] = taxes_due(incomes[ip, ia], cutoffs, rates)
            due_average_inc[ip, ia] = taxes_due(avg_inc[ia], cutoffs, rates)
            
    return {
        'tax_burden_actual_inc': due_actual_inc.sum(0),
        'tax_burden_average_inc': due_average_inc.sum(0),
    }

General, non-agent-specific stats:

In [8]:
def get_price_info(log):
    """Get average trading prices for Wood and Stone."""
    w_flow = ResourceFlow(log, 'Wood', remap_by_skill=False)
    s_flow = ResourceFlow(log, 'Stone', remap_by_skill=False)
    return {
        'avg_wood_price':  w_flow.prices.sum() / w_flow.trades.sum(),
        'avg_stone_price': s_flow.prices.sum() / s_flow.trades.sum(),
    }

def get_avg_tax_schedule(log):
    """Get the average tax schedule throughout the episode."""
    if 'PeriodicTax' not in log:
        return {'avg_tax_schedule': np.zeros(7)}
    periods = log['PeriodicTax'][99::100]
    return {
        'avg_tax_schedule': np.array([period['schedule'] for period in periods]).mean(0)
    }


def get_bracket_occupancies(log):
    """Get the fraction of incomes within each tax bracket throughout the episode."""
    def get_all_incomes(log):
        incomes = []
        nA = get_num_agents(log)

        if 'PeriodicTax' not in log:
            def get_agent_coin(idx, t):
                return log['states'][t][str(idx)]['inventory']['Coin'] + log['states'][t][str(idx)]['escrow']['Coin']
            for idx in range(nA):
                last_coin = get_agent_coin(idx, 0)
                for t in [100, 200, 300, 400, 500, 600, 700, 800, 900, 1000]:
                    this_coin = get_agent_coin(idx, t)
                    incomes.append(this_coin - last_coin)
                    last_coin = float(this_coin)

        else:
            for p in log['PeriodicTax'][99::100]:
                for i in range(nA):
                    incomes.append(p[str(i)]['income'])

        return np.array(incomes)
    
    incomes = get_all_incomes(log)
    
    bracket_cutoffs = np.array([-np.inf, 9700, 39475, 84200, 160725, 204100, 510300]) / 1000
    
    bracket_idx = np.sum(incomes[:, None] > bracket_cutoffs[None], 1) - 1
    
    return {
#         'raw_incomes': incomes,
        'avg_bracket_occupancy': np.array([np.mean(bracket_idx == i) for i in range(7)])
    }

## Combined pre-processing function

In [9]:
def collect_stats(log):
    """Processes a dense log into a stats dictionary using the pre-processing functions."""
    stats = {}
    stats.update(get_income_and_tax_stats(log))
    stats.update(get_net_trade_income(log))
    stats.update(get_net_build_income(log))
    stats.update(get_gather_info(log))
    stats.update(get_tax_gaming_info(log))
    stats.update(get_avg_tax_schedule(log))
    stats.update(get_bracket_occupancies(log))
    stats.update(get_price_info(log))
    return stats

#  
# Functions to batch process the raw data

In [10]:
def process_dense_logs_of_run(run_dir, verbose=True):
    """Process the dense logs from this run directory that were used in the paper's analyses."""
    
    if not os.path.isdir(run_dir):
        raise FileNotFoundError("The run directory does not exist.")
        
    logs_dir = os.path.join(run_dir, 'experiment_dense_logs')
    if not os.path.isdir(logs_dir):
        raise FileNotFoundError('There do not appear to be any paper logs in this run directory.')
    paper_log_filenames = os.listdir(logs_dir)
    paper_log_filenames = sorted(paper_log_filenames, key=lambda f: int(f.split('.')[0].split('_')[-1]))

    if verbose:
        print('Processing {} dense logs from {}'.format(len(paper_log_filenames), run_dir))
    
    episode_dicts = []
    for file in paper_log_filenames:
        log_path = os.path.join(logs_dir, file)
        with open(log_path, 'r') as f:
            log = json.load(f)
        episode_dicts.append(
            collect_stats(log)
        )
        
    return episode_dicts

def process_dense_logs_of_group(group_dir, verbose=True):
    """Process all the dense logs from this run group that were used in the paper's analyses."""
    
    if not os.path.isdir(group_dir):
        raise FileNotFoundError("The group directory does not exist.")
        
    if verbose:
        print('Processing dense logs under {}'.format(group_dir))
        
    run_dirs = []
    run_idx = 1
    while True:
        run_dir = os.path.join(group_dir, 'seed_{}'.format(run_idx))
        if os.path.isdir(run_dir):
            run_dirs.append(run_dir)
            run_idx += 1
        else:
            break
            
    episode_dicts = []
    for idx, run_dir in enumerate(run_dirs):
        if verbose:
            print('\tSeed {}'.format(idx+1))
        run_dicts = process_dense_logs_of_run(run_dir, verbose=False)
        episode_dicts += run_dicts
        
    return episode_dicts

In [11]:
def get_statistics_dictionary_of_setting(scenario, objective, save=True, verbose=True):
    """
    Gets the group-wise dictionary of processed stats for a setting (defined by the scenario and objective).
    
    Args:
        scenario (str): The name of the scenario used in this setting.
        objective (str): The name of the social welfare objective used in this setting.
        save (bool): Whether to save the statistics dictionary. Defaults to True.
        verbose (bool): Whether to use verbose printouts. Defaults to True.
        
    Returns:
        stats_dict (dict): A dictionary of {group: episode_dict_list}, where each group is one of the 4 tax
            models and episode_dict_list is a list of statistics processed from all of the dense logs
            associated with that group (for the given objective, if applicable).
    """
    
    scenario = scenario.lower()
    objective = objective.lower()
    
    assert scenario in [
        'open_quadrant_4',
        'open_quadrant_10',
        'split_012',
        'split_27',
        'split_45',
        'split_789',
    ]
    
    assert objective in [
        'eq*prod',
        'iiwu',
    ]
    
    scenario_dir = os.path.join(DATA_DIR, scenario)
    
    obj_tag = '({})'.format(objective)
    has_specific_saez = 'saez'+obj_tag in os.listdir(scenario_dir)
    
    # We will process 4 groups: Free Market, US Federal, Saez Formula, and AI Economist
    group_dirs = {
        'Free Market': os.path.join(scenario_dir, 'fm'),
        'US Federal': os.path.join(scenario_dir, 'us'),
        'Saez Formula': os.path.join(scenario_dir, 'saez' + (obj_tag if has_specific_saez else '')),
        'AI Economist': os.path.join(scenario_dir, 'aie' + obj_tag)
    }
    
    # Process each group
    stats_dict = {}
    for group, group_dir in group_dirs.items():
        if verbose:
            print(group)
        stats_dict[group] = process_dense_logs_of_group(group_dir, verbose=verbose)
    
    if save:
        filename = 'preprocessed_dense_logs_stats_dict-{}-{}.pkl'.format(
            scenario,
            'eq_times_prod' if objective=='eq*prod' else 'iiwu'
        )
        if verbose:
            print('\n\nSaving processed statistics to:\n\t{}'.format(filename))
        with open(filename, 'wb') as f:
            pickle.dump(stats_dict, f)
            
    return stats_dict

#  
# Perform the pre-processing

**Note**: The preprocessed data used by the paper are already stored in  
```preprocessed_dense_logs_stats_dict-open_quadrant_4-eq_times_prod-used_in_paper_analyses.pkl```.

This notebook serves as a reference for how that preprocessing was performed.
Additionally, it allows you to process other settings and save them for analysis in  
```Experiment-analysis-and-visualization.ipynb```.

An example for the code you would run to do so is provided in the next cell. It is, by default, commented so you don't accidentally run it.

In [12]:
# get_statistics_dictionary_of_setting(
#     scenario='open_quadrant_4',
#     objective='iiwu',
#     save=True,
#     verbose=True,
# )