In [1]:
import numpy as np
import gym

import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras.layers import Dense, Conv2D, MaxPool2D, Flatten
from tensorflow.keras.optimizers import Adam
import tensorflow_probability as tfp

import matplotlib.pyplot as plt 

Init Plugin
Init Graph Optimizer
Init Kernel


In [2]:
class PolicyGradientNetwork(keras.Model):
    
    # Initialize network and archtitecture
    def __init__(self, n_actions, fc1_dims=256, fc2_dims=256):
        super(PolicyGradientNetwork,self).__init__()
        self.fc1_dims = fc1_dims
        self.fc2_dims = fc2_dims 
        self.n_actions = n_actions
        
        self.conv1 = Conv2D(filters=8, kernel_size=(2,2) , padding = 'same', activation='relu')
        self.maxpool1 = MaxPool2D(pool_size=(2, 2))
        self.conv2 = Conv2D(filters=16, kernel_size=(2,2) , padding = 'same', activation='relu')
        self.maxpool2 =MaxPool2D(pool_size=(2, 2))
        self.flatten = Flatten()
        
        self.fc1 = Dense(self.fc1_dims, activation = 'relu')
        # sself.fc2 = Dense(self.fc2_dims, activation = 'relu')
        self.pi = Dense(self.n_actions, activation = 'softmax')
    
    # Forward Pass
    def call(self,state):
        
        value = self.conv1(state)
        value = self.maxpool1(value)
        
        value = self.conv2(value)
        value = self.maxpool2(value)
        
        value = self.flatten(value)
        
        value = self.fc1(value)
        #value = self.fc2(value)
        pi = self.pi(value)
        
        return pi 

In [3]:
class Agent:
    def __init__(self, alpha = 0.003, gamma = 0.99, n_actions = 4, fc1_dims = 256, fc2_dims = 256):
        self.gamma = gamma
        self.alpha = alpha
        self.n_actions = n_actions
        
        # Keep tracks of the actions we take, the states we visit and the rewards we get 
        self.state_memory = [] 
        self.action_memory = [] 
        self.reward_memory = [] 
        
        # Policy Network 
        self.policy = PolicyGradientNetwork(n_actions=n_actions)
        self.policy.compile(optimizer=Adam(learning_rate=self.alpha))
    
    def choose_action(self, observation):
        # Convert numpy array to tensorflow tensor
        state = tf.convert_to_tensor([observation], dtype=tf.float32)
        
        # Then we want to pass that state through the policy network 
        # Which gives us the probability distribution for choosing each action 
        probs = self.policy(state)
        
        # Used when you have a set of discrete classes and you want to choose one of classes with a particular proba
        action_probs = tfp.distributions.Categorical(probs=probs)
        action = action_probs.sample()
        
        return action.numpy()[0]
    
    def store_transition(self, observation, action, reward):
        # Stores the transitions of the current episode 
        self.state_memory.append(observation)
        self.action_memory.append(action)
        self.reward_memory.append(reward)
        
    def learn(self):
        # Now we need a function to actually perform the agents learning 
        
        # Convert actions and rewards to tensors
        actions = tf.convert_to_tensor(self.action_memory, dtype=tf.float32)
        rewards = tf.convert_to_tensor(self.reward_memory, dtype=tf.float32)
        
        # Calculate sum of future rewards which follow each time step 
        G = np.zeros_like(rewards)
        # Now we iterate over the agents memory 
        for t in range(len(rewards)):
            G_sum = 0 
            discount = 1 # Present step 
            for k in range(t, len(rewards)): # Goes from current time step to end of the episode 
                G_sum += rewards[k]*discount
                discount *= self.gamma
            G[t]= G_sum 
        
        # Now we can calculate the gradient at each time step in the episode 
        with tf.GradientTape() as tape:
            loss = 0 
            # Now workign back through the agent's memory 
            for idx, (g,state) in enumerate(zip(G,self.state_memory)):
                state = tf.convert_to_tensor([state], dtype=tf.float32)
                probs = self.policy(state)
                action_probs = tfp.distributions.Categorical(probs=probs)
                # Log probability for the gradient calculation
                log_prob = action_probs.log_prob(actions[idx])
                loss += -g * tf.squeeze(log_prob)
                
        gradient = tape.gradient(loss, self.policy.trainable_variables)
        self.policy.optimizer.apply_gradients(zip(gradient, self.policy.trainable_variables))
        
        # Now at the end of every episode we need to zero out (empty) our agent's memory 
        
        self.state_memory = [] 
        self.action_memory = [] 
        self.reward_memory = [] 

In [4]:
import gym
import minihack
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
from nle import nethack
import time

In [5]:
env = gym.make("MiniHack-Quest-Hard-v0", observation_keys=["glyphs","pixel"])
#display_screen(env.reset())

In [6]:
env.action_space.n

78

In [7]:
#def glyph_array(state):
    #return state['glyphs_crop'].flatten()

In [8]:
21*79

1659

In [9]:
def glyph_array(state):
    #glyphs = state['glyphs'].flatten()
    #glyphs = glyphs[glyphs!=2359]
    glyphs= state['glyphs']
    glyphs = glyphs/2383
    glyphs = glyphs.reshape((21,79,1))
    return glyphs

In [None]:
agent = Agent(alpha=0.0005, gamma =0.9, n_actions = env.action_space.n)

score_history = [] 
n_episodes = 5000

for i in range(n_episodes):
    done = False
    score = 0 
    observation = glyph_array(env.reset())
    
    while not done:
        action = agent.choose_action(observation)
        #print(action)
        observation_, reward, done, info = env.step(action)
        observation_ = glyph_array(observation_)
        agent.store_transition(observation, action, reward)
        observation = observation_ 
        score += reward 
    score_history.append(score)
    
    agent.learn()
    
    avg_score = np.mean(score_history[-100:])
    print('episode ', i, 'score %.1f' % score, 'avg score %.1f' % avg_score)
    
plt.plot(score_history)

Metal device set to: Apple M1


2021-10-27 19:41:21.067664: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:305] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
2021-10-27 19:41:21.067786: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:271] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)


In [None]:
#def glyph_array(state):
   #glyphs = state['glyphs'].flatten()
    #glyphs = glyphs[glyphs!=2359]
    #glyphs = glyphs/2383
   # return glyphs

In [None]:
def display_screen(state): # state in rgb 
    screen = Image.fromarray(np.uint8(state['pixel']))
    display(screen)

In [None]:
observation = env.reset()
done = False 
while not done:
    display_screen(observation)
    action = agent.choose_action(glyph_array(observation))
    observation_, reward, done, info = env.step(action)
    observation = observation_ 