In [1]:
import os
import h5py
import json
import pandas as pd
import tensorflow as tf
import numpy as np
from tensorflow.keras.models import load_model, Model
from tensorflow.keras.layers import Concatenate, ZeroPadding1D
from util.helper import ensure_dir


def load_h5_model_patch_input_layer(path):
    with h5py.File(path, 'r+') as f:
        model_config = f.attrs.get('model_config')
        if model_config is None:
            raise ValueError("No model config found in file.")
        if isinstance(model_config, bytes):
            model_config = model_config.decode('utf-8')
        model_config_json = json.loads(model_config)

        for layer in model_config_json['config']['layers']:
            config = layer['config']
            if 'batch_shape' in config:
                config['batch_input_shape'] = config.pop('batch_shape')

        f.attrs.modify('model_config', json.dumps(model_config_json).encode('utf-8'))

    return load_model(path, compile=False, custom_objects={'Functional': Model})


def sim_rnn(model_path, data, export_path, file_name, n_cells):
    model = load_h5_model_patch_input_layer(model_path)

    # Ensure both inputs are Tensors
    hidden = tf.convert_to_tensor(np.zeros((data.shape[0], n_cells), dtype=np.float32), dtype=tf.float32)
    policies = model([data, hidden])[1]

    ensure_dir(export_path)
    pd.DataFrame(policies[0].numpy()).to_csv(os.path.join(export_path, file_name))


def get_data(input_path):
    dataset = pd.read_csv(input_path, header=0, sep=',', quotechar='"', keep_default_na=False)

    action = np.zeros((dataset.shape[0], 2))
    action[np.array(dataset['rnn action']) == '[[1 0]]'] = [1, 0]
    action[np.array(dataset['rnn action']) == '[[0 1]]'] = [0, 1]

    reward = np.concatenate([
        dataset['r1'].to_numpy()[:, np.newaxis],
        dataset['r2'].to_numpy()[:, np.newaxis]
    ], axis=1)

    reward = (reward * action).sum(axis=1)

    action = tf.convert_to_tensor(action[np.newaxis], dtype=tf.float32)
    reward = tf.convert_to_tensor(reward[np.newaxis], dtype=tf.float32)

    action_reward = Concatenate(axis=2)([reward[:, :, np.newaxis], action])
    action_reward = ZeroPadding1D(padding=[1, 0])(action_reward)
    action_reward = action_reward[:, :-1, :]

    return action_reward


if __name__ == '__main__':
    for i in range(1, 10):
        # For Gemini
        #data_path = f'evaluate_model/RNN_adv_sim_400000_eps_0.01_lr_0.001/gemini_geminienv/events_{i}.csv'
        #model_path = 'trained_model/RNN_learner_single/cells_5_gemini/model-49900.weights_final.h5'
        #output_path = 'evaluate_model/RNN_sim/geminienv/policies/'
        # For GPT
        data_path = f'evaluate_model/RNN_adv_sim_400000_eps_0.01_lr_0.001/gpt/events_{i}.csv'
        model_path = 'trained_model/RNN_learner_single/cells_5gpt/model-49900.weights_final.h5'
        output_path = 'evaluate_model/RNN_sim/gpt/policies/'
        file_name = f'policies_{i}.csv'

        data = get_data(data_path)
        sim_rnn(model_path, data, output_path, file_name, n_cells=5)
