In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import gym
from nle import nethack
from torch.distributions import Categorical

device = torch.device('cpu')

STATS_INDICES = {
    'x_coordinate': 0,
    'y_coordinate': 1,
    'score': 9,
    'health_points': 10,
    'health_points_max': 11,
    'hunger_level': 18,
}

ACTIONS = [
    nethack.CompassCardinalDirection.N,
    nethack.CompassCardinalDirection.E,
    nethack.CompassCardinalDirection.S,
    nethack.CompassCardinalDirection.W,
]

def crop_glyphs(glyphs, x, y, size=7):
    x_max = 79
    y_max = 21

    x_start = x - size
    x_end = x + size

    if x_start < 0:
        x_end = x_end + (-1 * x_start)
        x_start = 0

    if x_end > x_max:
        x_start = x_start - (x_end - x_max)
        x_end = x_max

    y_start = y - size
    y_end = y + size

    if y_start < 0:
        y_end = y_end + (-1 * y_start)
        y_start = 0

    if y_end > y_max:
        y_start = y_start - (y_end - y_max)
        y_end = y_max

    y_range = np.arange(y_start, (y_end), 1)
    x_range = np.arange(x_start, (x_end), 1)
    window_glyphs = []
    for row in y_range:
        for col in x_range:
            window_glyphs.append(glyphs[row][col])

    crop = np.asarray(window_glyphs)

    return crop

def transform_observation(observation):
    """Process the state into the model input shape
    of ([glyphs, stats], )"""
#     observed_glyphs = observation['glyphs']

    stat_x_coord = observation['blstats'][STATS_INDICES['x_coordinate']]
    stat_y_coord = observation['blstats'][STATS_INDICES['y_coordinate']]
#     stat_health = float(observation['blstats'][STATS_INDICES['health_points']]) - float(
#         observation['blstats'][STATS_INDICES['health_points_max']] / 2)
#     stat_hunger = observation['blstats'][STATS_INDICES['hunger_level']]


    observed_chars = observation['chars']
    cropped_chars = crop_glyphs(observed_chars, stat_x_coord, stat_y_coord)
    # chars_mean = np.mean(cropped_chars)
    # chars_std = np.std(cropped_chars)
    # print('MEAN:', chars_mean)
    # print('STD:', chars_std)
    # norm_chars = (cropped_chars - chars_mean)/chars_std
#     chars_min = np.min(cropped_chars)
#     chars_max = np.max(cropped_chars)
#     chars_range = chars_max - chars_min
#     norm_chars = (cropped_chars - chars_min) / chars_range
    max_char = 255 #nethack.OBSERVATION_DESC['chars']
    norm_chars = cropped_chars/max_char
    return norm_chars


class Policy(nn.Module):
    def __init__(self, obs_size, act_size):
        super(Policy, self).__init__()
        self.affine1 = nn.Linear(obs_size, 256)
        self.dropout = nn.Dropout(p=0.5)
        self.affine2 = nn.Linear(256, 128)
#         self.dropout = nn.Dropout(p=0.5)
        self.affine3 = nn.Linear(128, 64)
#        self.dropout = nn.Dropout(p=0.4)
        self.affine4 = nn.Linear(64, act_size)

        self.saved_log_probs = []
        self.rewards = []

    def forward(self, x):
        x = self.affine1(x)
        x = self.dropout(x)
        x = F.relu(x)
        x = self.affine2(x)
#         x = self.dropout(x)
        x = F.relu(x)
        x = self.affine3(x)
#         x = self.dropout(x)
        x = F.relu(x)
        action_scores = self.affine4(x)
        return F.softmax(action_scores, dim=1)

class MyAgent:
    def __init__(self, observation_space, action_space, seeds):
        policy_model = torch.load('/home/clarise/Desktop/COMS7053A - RL/mod_chars2.pt')
        self.model = policy_model.eval()


    def act(self, state):
        obs = transform_observation(state)
        state_obs = torch.from_numpy(obs).float().to(device).unsqueeze(0)
        action_probs = self.model(state_obs) #[0].detach().numpy() #torch.from_numpy(obs).float().to(device)).detach().numpy()
        #act_probs_copy = action_probs.copy()
        #action = np.argmax(act_probs_copy)
        m = Categorical(action_probs)
        action = m.sample()
        #print('Action:', action+1)        
        return action.item()+1

In [3]:
import numpy as np
import gym
import nle
import random


def run_episode(env):

    done = False
    episode_return = 0.0
    state = env.reset()

    # create instance of MyAgent
    #from MyAgent import MyAgent
    agent = MyAgent(env.observation_space, env.action_space, seeds=env.get_seeds())

    while not done:
        # pass state to agent and let agent decide action
        action = agent.act(state)
        new_state, reward, done, _ = env.step(action)
        episode_return += reward
        state = new_state
    return episode_return


if __name__ == '__main__':
    # Seed
    seeds = [1, 2, 3, 4, 5]

    # Initialise environment
    env = gym.make("NetHackScore-v0")

    # Number of times each seed will be run
    num_runs = 10

    # Run a few episodes on each seed
    rewards = []
    for seed in seeds:
        env.seed(seed, seed, False)
        seed_rewards = []
        for i in range(num_runs):
            print('Seed {} Run {}'.format(seed, i))
            seed_rewards.append(run_episode(env))
        rewards.append(np.mean(seed_rewards))

    # Close environment and print average reward
    env.close()
    print(np.mean(rewards))

Seed 1 Run 0
Seed 1 Run 1
Seed 1 Run 2
Seed 1 Run 3
Seed 1 Run 4
Seed 1 Run 5
Seed 1 Run 6
Seed 1 Run 7
Seed 1 Run 8
Seed 1 Run 9
Seed 2 Run 0
Seed 2 Run 1
Seed 2 Run 2
Seed 2 Run 3
Seed 2 Run 4
Seed 2 Run 5
Seed 2 Run 6
Seed 2 Run 7
Seed 2 Run 8
Seed 2 Run 9
Seed 3 Run 0
Seed 3 Run 1
Seed 3 Run 2
Seed 3 Run 3
Seed 3 Run 4
Seed 3 Run 5
Seed 3 Run 6
Seed 3 Run 7
Seed 3 Run 8
Seed 3 Run 9
Seed 4 Run 0
Seed 4 Run 1
Seed 4 Run 2
Seed 4 Run 3
Seed 4 Run 4
Seed 4 Run 5
Seed 4 Run 6
Seed 4 Run 7
Seed 4 Run 8
Seed 4 Run 9
Seed 5 Run 0
Seed 5 Run 1
Seed 5 Run 2
Seed 5 Run 3
Seed 5 Run 4
Seed 5 Run 5
Seed 5 Run 6
Seed 5 Run 7
Seed 5 Run 8
Seed 5 Run 9
21.02060000000001
