In [1]:
import yaml


with open("/Users/henry/Documents/new-ml/ai-economist/tutorials/rllib/test/config.yaml", "r") as f:
    config = yaml.safe_load(f)


In [2]:
env_config = config.get("env")

In [3]:
from ai_economist import foundation
from ai_economist.foundation.scenarios.simple_wood_and_stone.layout_from_file import LayoutFromFile

env = foundation.make_env_instance(**env_config)


Inside covid19_components.py: 0 GPUs are available.
No GPUs found! Running the simulation on a CPU.
Inside covid19_env.py: 0 GPUs are available.
No GPUs found! Running the simulation on a CPU.


In [4]:
obs = env.reset()

In [5]:
obs.keys()

dict_keys(['0', '1', '2', '3', 'p'])

In [6]:
obs['p'].keys()

dict_keys(['time', 'flat', 'p0', 'p1', 'p2', 'p3', 'action_mask'])

In [7]:
import numpy as np


def recursive_list_to_np_array(d):
    if isinstance(d, dict):
        new_d = {}
        for k, v in d.items():
            if isinstance(v, list):
                new_d[k] = np.array(v)
            elif isinstance(v, dict):
                new_d[k] = recursive_list_to_np_array(v)
            elif isinstance(v, (float, int, np.floating, np.integer)):
                new_d[k] = np.array([v])
            elif isinstance(v, np.ndarray):
                new_d[k] = v
            else:
                raise AssertionError
        return new_d
    raise AssertionError

In [8]:
nobs = recursive_list_to_np_array(obs)

In [9]:
nobs['p']

{'time': array([0.]),
 'flat': array([0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ,
        0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ,
        0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ,
        0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ,
        0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ,
        0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  ,
        0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 0.  , 1.  , 0.  ,
        0.  , 0.  , 0.  , 0.  , 0.01, 0.  , 0.  , 0.  , 0.  ],
       dtype=float32),
 'p0': array([0., 0., 0., 0., 0., 0.], dtype=float32),
 'p1': array([0., 0., 0., 0., 0., 0.], dtype=float32),
 'p2': array([0., 0., 0., 0., 0., 0.], dtype=float32),
 'p3': array([0., 0., 0., 0., 0., 0.], dtype=float32),
 'action_mask': array([1., 1., 1., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 1., 1., 1., 1., 0., 0., 0.,

In [14]:
import warnings
from gym import spaces

def _dict_to_spaces_dict(obs):
        dict_of_spaces = {}
        for k, v in obs.items():

            # list of lists are listified np arrays
            _v = v
            if isinstance(v, list):
                _v = np.array(v)
            elif isinstance(v, (int, float, np.floating, np.integer)):
                _v = np.array([v])

            # assign Space
            if isinstance(_v, np.ndarray):
                x = float(1e20)
                # Warnings for extreme values
                if np.max(_v) > x:
                    warnings.warn("Input is too large!")
                if np.min(_v) < -x:
                    warnings.warn("Input is too small!")
                box = spaces.Box(low=-x, high=x, shape=_v.shape, dtype=_v.dtype)
                low_high_valid = (box.low < 0).all() and (box.high > 0).all()

                # This loop avoids issues with overflow to make sure low/high are good.
                while not low_high_valid:
                    x = x // 2
                    box = spaces.Box(low=-x, high=x, shape=_v.shape, dtype=_v.dtype)
                    low_high_valid = (box.low < 0).all() and (box.high > 0).all()

                dict_of_spaces[k] = box

            elif isinstance(_v, dict):
                dict_of_spaces[k] = self._dict_to_spaces_dict(_v)
            else:
                raise TypeError
        return spaces.Dict(dict_of_spaces)

In [16]:
_dict_to_spaces_dict(obs['p']).keys()

odict_keys(['action_mask', 'flat', 'p0', 'p1', 'p2', 'p3', 'time'])

In [27]:
obs['0'].keys()

dict_keys(['world-map', 'world-idx_map', 'time', 'flat', 'action_mask'])

In [None]:
class KerasConvLSTM(RecurrentTFModelV2):
    """
    The model used in the paper "The AI Economist: Optimal Economic Policy
    Design via Two-level Deep Reinforcement Learning"
    (https://arxiv.org/abs/2108.02755)
    We combine convolutional, fully connected, and recurrent layers to process
    spatial, non-spatial, and historical information, respectively.
    For recurrent components, each agent maintains its own hidden state.
    """

    custom_name = "keras_conv_lstm"

    def __init__(self, obs_space, action_space, num_outputs, model_config, name):
        super().__init__(obs_space, action_space, num_outputs, model_config, name)

        input_emb_vocab = self.model_config["custom_options"]["input_emb_vocab"]
        emb_dim = self.model_config["custom_options"]["idx_emb_dim"]
        num_conv = self.model_config["custom_options"]["num_conv"]
        num_fc = self.model_config["custom_options"]["num_fc"]
        fc_dim = self.model_config["custom_options"]["fc_dim"]
        cell_size = self.model_config["custom_options"]["lstm_cell_size"]
        generic_name = self.model_config["custom_options"].get("generic_name", None)

        self.cell_size = cell_size

        if hasattr(obs_space, "original_space"):
            obs_space = obs_space.original_space

        if not isinstance(obs_space, Dict):
            if isinstance(obs_space, Box):
                raise TypeError(
                    "({}) Observation space should be a gym Dict."
                    " Is a Box of shape {}".format(name, obs_space.shape)
                )
            raise TypeError(
                "({}) Observation space should be a gym Dict."
                " Is {} instead.".format(name, type(obs_space))
            )

        # Define input layers
        self._input_keys = []
        non_conv_input_keys = []
        input_dict = {}
        conv_shape_r = None
        conv_shape_c = None
        conv_map_channels = None
        conv_idx_channels = None
        found_world_map = False
        found_world_idx = False
        for k, v in obs_space.spaces.items():
            shape = (None,) + v.shape
            input_dict[k] = tf.keras.layers.Input(shape=shape, name=k)
            self._input_keys.append(k)
            if k == _MASK_NAME:
                pass
            else:
                non_conv_input_keys.append(k)

        # Cell state and hidden state for the
        # policy and value function networks.
        state_in_h_p = tf.keras.layers.Input(shape=(cell_size,), name="h_pol")
        state_in_c_p = tf.keras.layers.Input(shape=(cell_size,), name="c_pol")
        state_in_h_v = tf.keras.layers.Input(shape=(cell_size,), name="h_val")
        state_in_c_v = tf.keras.layers.Input(shape=(cell_size,), name="c_val")
        seq_in = tf.keras.layers.Input(shape=(), name="seq_in")

        # Determine which of the inputs are treated as non-conv inputs
        non_conv_inputs = tf.keras.layers.concatenate([input_dict[k] for k in non_conv_input_keys])

        logits, values, state_h_p, state_c_p, state_h_v, state_c_v = (None,None,None,None,None,None,)

        # Define the policy and value function models
        for tag in ["_pol", "_val"]:
            if tag == "_pol":
                state_in = [state_in_h_p, state_in_c_p]
            elif tag == "_val":
                state_in = [state_in_h_v, state_in_c_v]
            else:
                raise NotImplementedError

            dense = non_conv_inputs

            # Preprocess observation with hidden layers and send to LSTM cell
            for i in range(num_fc):
                layer = tf.keras.layers.Dense(fc_dim, activation=tf.nn.relu, name="dense{}".format(i + 1) + tag)
                dense = layer(dense)

            dense = tf.keras.layers.LayerNormalization(name="layer_norm" + tag)(dense)

            lstm_out, state_h, state_c = tf.keras.layers.LSTM(
                cell_size, return_sequences=True, return_state=True, name="lstm" + tag
            )(inputs=dense, mask=tf.sequence_mask(seq_in), initial_state=state_in)

            # Project LSTM output to logits or value
            output = tf.keras.layers.Dense(
                self.num_outputs if tag == "_pol" else 1,
                activation=tf.keras.activations.linear,
                name="logits" if tag == "_pol" else "value",
            )(lstm_out)

            if tag == "_pol":
                state_h_p, state_c_p = state_h, state_c
                logits = apply_logit_mask(output, input_dict[_MASK_NAME])
            elif tag == "_val":
                state_h_v, state_c_v = state_h, state_c
                values = output
            else:
                raise NotImplementedError

        self.input_dict = input_dict

        # This will be set in the forward_rnn() call below
        self._value_out = None

        for out in [logits, values, state_h_p, state_c_p, state_h_v, state_c_v]:
            assert out is not None

        # Create the RNN model
        self.rnn_model = tf.keras.Model(
            inputs=self._extract_input_list(input_dict)
            + [seq_in, state_in_h_p, state_in_c_p, state_in_h_v, state_in_c_v],
            outputs=[logits, values, state_h_p, state_c_p, state_h_v, state_c_v],
        )
        self.register_variables(self.rnn_model.variables)