In [1]:
import gymnasium as gym
import minigrid
import random
from minigrid.wrappers import *
import time
import pickle
from os.path import exists
from torch.utils.tensorboard import SummaryWriter

pygame 2.5.0 (SDL 2.28.0, Python 3.9.13)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
# Preprocess observation state
def preprocess(observation):
    (rows, cols, x) = obs.shape
    tmp = np.reshape(obs,[rows*cols*x,1], 'F')[0:rows*cols]
    return np.reshape(tmp, [rows,cols],'C')

In [3]:
#state hash function
def hashState(state):
    hash = state.tobytes()
    return hash

# Q-learning

In [55]:
# Make the gym environment
env = gym.make('MiniGrid-Empty-8x8-v0')
env = ImgObsWrapper(env)
max_steps = env.max_steps
numActions = 2
N = 30
best_result = 0

# Hyperparameter tuning
for n in range(N):
    
    # declare the variable to store the tabular value-function
    Q = {}
    
    # set hyperparameters  
    episodes = 3000
    epsilon_max = np.round(np.random.rand()/2 + 0.5, 2)
    epsilon_min = np.round(np.random.rand()/2, 2)
    alpha = np.round(np.random.rand(), 2)
    discount = np.round(np.random.rand(), 2) 
    total_reward = 0

    epsilon = epsilon_max
    for e in range(episodes):
        # reset the environment
        obs, _ = env.reset()
        currentS = preprocess(obs)   
        for i in range(0, max_steps):

            # hash state
            currentS_Key = hashState(currentS)

            if currentS_Key not in Q:
                Q[currentS_Key] = np.zeros(3)

            # Choose an action
            # perform epsilon greedy action
            if (random.random() < epsilon):
                # Explore the environment by selecting a random action
                a = random.randint(0, numActions)
            else:
                # Exploit the environment by selecting an action that is the maximum of the value function at the current State
                a = np.argmax(Q[currentS_Key])

            # take action 'a', receive reward 'reward', and observe next state 'obs'
            # 'done' indicate if the termination state was reached
            obs, reward, done, truncated, info = env.step(a)

            # extract the next state from the observation and hash it
            nextS = preprocess(obs)
            nextS_Key = hashState(nextS)
            if nextS_Key not in Q:
                Q[nextS_Key] = np.zeros(3)

            # select next action greedily
            nextA = np.argmax(Q[nextS_Key])

            # update value in Q-table
            Q[currentS_Key][a] = Q[currentS_Key][a] + alpha*(reward+discount*Q[nextS_Key][nextA]-Q[currentS_Key][a])

            if (done == True or truncated == True):
                # if agent reached its goal successfully
                steps_done = i
                break

            # since the episode is not done, store the next state as the current state for the next step
            currentS = nextS
            
        total_reward += reward

        # anneal epsilon
        epsilon = max(epsilon*0.999, epsilon_min)
        
    if (n % 10 == 0 and n > 0):
        print('Done training iteration', n)
        
    if total_reward/episodes > best_result:
            best_result = total_reward/episodes
            best_params = [epsilon_max, epsilon_min, alpha, discount]
            best_Q = Q.copy()
    
env.close()

print('Best average reward is', best_result, 'achieved with hyperparmater configuration:', best_params)

Done training iteration 10
Done training iteration 20
Best average reward is 0.9219403645833336 achieved with hyperparmater configuration: [0.56, 0.12, 0.89, 0.67]


In [62]:
# Train with best hypermarameters

# Make the gym environment
env = gym.make('MiniGrid-Empty-8x8-v0')
writer = SummaryWriter()

# declare the variable to store the tabular value-function
Q = {}
numActions = 2
episodes = 3000
epsilon_max = 0.56
epsilon_min = 0.12
alpha = 0.89
discount = 0.67
max_steps = env.max_steps

# Use a wrapper so the observation only contains the grid information
env = ImgObsWrapper(env)

print('Start training...')
epsilon = epsilon_max
total_reward = 0
for e in range(episodes):
    # reset the environment
    obs, _ = env.reset()
    currentS = preprocess(obs)   
    for i in range(0, max_steps):

        # hash state
        currentS_Key = hashState(currentS)
        
        if currentS_Key not in Q:
            Q[currentS_Key] = np.zeros(3)
        
        # Choose an action
        # perform epsilon greedy action
        if (random.random() < epsilon):
            # Explore the environment by selecting a random action
            a = random.randint(0, numActions)
        else:
            # Exploit the environment by selecting an action that is the maximum of the value function at the current State
            a = np.argmax(Q[currentS_Key])

        # take action 'a', receive reward 'reward', and observe next state 'obs'
        # 'done' indicate if the termination state was reached
        obs, reward, done, truncated, info = env.step(a)
        
        # extract the next state from the observation and hash it
        nextS = preprocess(obs)
        nextS_Key = hashState(nextS)
        if nextS_Key not in Q:
            Q[nextS_Key] = np.zeros(3)
        
        # select next action greedily
        nextA = np.argmax(Q[nextS_Key])
        
        # update value in Q-table
        Q[currentS_Key][a] = Q[currentS_Key][a] + alpha*(reward+discount*Q[nextS_Key][nextA]-Q[currentS_Key][a])
         
        if (done == True or truncated == True):
            # if agent reached its goal successfully
            steps_done = i
            break
            
        # since the episode is not done, store the next state as the current state for the next step
        currentS = nextS
    total_reward += reward
    writer.add_scalar("Reward/training", total_reward/(e+1), e)
    
    # anneal epsilon
    epsilon = max(epsilon*0.999, epsilon_min)
        
print('Done training...')
env.close()
writer.flush()
writer.close()

filename = 'q-learn_qtable.pickle'

# Saving the value-function to file
with open(filename, 'wb') as handle:
    pickle.dump(Q, handle, protocol=pickle.HIGHEST_PROTOCOL)
    handle.close()

Start training...
Done training...


In [None]:
%tensorboard --logdir=runs

In [14]:
# Loading the value-function from file
filename = 'q-learn_qtable.pickle'
if (exists(filename)):
    print('Loading existing Q values')
    # Load data (deserialize)
    with open(filename, 'rb') as handle:
        Q = pickle.load(handle)
        handle.close()
else:
    print('Filename %s does not exist, could not load data' % filename) 

Loading existing Q values


In [15]:
# Visualize policy

env = gym.make('MiniGrid-Empty-8x8-v0', render_mode='human')
env = ImgObsWrapper(env)
# reset the environment
obs, _ = env.reset()
currentS = preprocess(obs)
for i in range(0, max_steps):
    
    currentS_Key = hashState(currentS)

    if currentS_Key not in Q:
        Q[currentS_Key] = np.zeros(3)

    # Choose an action
    a = np.argmax(Q[currentS_Key])

    # take action 'a', receive reward 'reward', and observe next state 'obs'
    # 'done' indicate if the termination state was reached
    obs, reward, done, truncated, info = env.step(a)
    # extract the next state from the observation
    nextS = preprocess(obs)

    if (done == True):
        # if agent reached its goal successfully
        print('Finished episode successfully taking %d steps and receiving reward %f' % (i+1, reward))
        break
    if (truncated == True):
        # agent failed to reach its goal successfully 
        print('Truncated episode taking %d steps and receiving reward %f' % (i+1, reward))
        break

    # since the episode is not done, store the next state as the current state for the next step
    currentS = nextS
env.close()

Finished episode successfully taking 12 steps and receiving reward 0.957812


In [64]:
# Evaluate model for 1000 iterations
env = gym.make('MiniGrid-Empty-8x8-v0')
env = ImgObsWrapper(env)
mean_reward = 0
completion_rate = 0
average_steps = 0

for run in range(1000):
    # reset the environment
    obs, _ = env.reset()
    currentS = preprocess(obs)

    for i in range(0, max_steps):

        # hash state
        currentS_Key = hashState(currentS)

        if currentS_Key not in Q:
            Q[currentS_Key] = np.zeros(3)

        # Choose an action
        a = np.argmax(Q[currentS_Key])

        # take action 'a', receive reward 'reward', and observe next state 'obs'
        # 'done' indicate if the termination state was reached
        obs, reward, done, truncated, info = env.step(a)
        
        # extract the next state from the observation
        nextS = preprocess(obs)

        if (done == True):
            # if agent reached its goal successfully
            mean_reward += reward
            completion_rate += 1
            average_steps += i+1
            break
        if (truncated == True):
            # agent failed to reach its goal successfully 
            mean_reward += reward
            average_steps += i+1
            break

        # since the episode is not done, store the next state as the current state for the next step
        currentS = nextS
    
env.close()

completion_rate /= 1000
average_steps /= 1000
print('Completion rate is', completion_rate)
print('Average number of steps is', average_steps)
print('Average reward over 1000 runs is', np.round(mean_reward/1000, 4))

Completion rate is 1.0
Average number of steps is 12.0
Average reward over 1000 runs is 0.9578


# SARSA

In [65]:
# Hyperparameter tuning

# Make the gym environment
env = gym.make('MiniGrid-Empty-8x8-v0')
env = ImgObsWrapper(env)
max_steps = env.max_steps
numActions = 2
N = 30
best_result = 0
for n in range(N):
    
    # declare the variable to store the tabular value-function
    Q = {}
    
    # set hyperparameters  
    episodes = 3000
    epsilon_max = np.round(np.random.rand()/2 + 0.5, 2)
    epsilon_min = np.round(np.random.rand()/2, 2)
    alpha = np.round(np.random.rand(), 2)
    discount = np.round(np.random.rand(), 2) 
    total_reward = 0

    epsilon = epsilon_max
    for e in range(episodes):
        # reset the environment
        obs, _ = env.reset()
        currentS = preprocess(obs)   
        a = random.randint(0, numActions)
        for i in range(0, max_steps):

            # hash state
            currentS_Key = hashState(currentS)

            if currentS_Key not in Q:
                Q[currentS_Key] = np.zeros(3)

            # take action 'a', receive reward 'reward', and observe next state 'obs'
            # 'done' indicate if the termination state was reached
            obs, reward, done, truncated, info = env.step(a)

            # extract the next state from the observation and hash it
            nextS = preprocess(obs)
            nextS_Key = hashState(nextS)
            if nextS_Key not in Q:
                Q[nextS_Key] = np.zeros(3)

            # select next action epsilon-greedily
            if (random.random() < epsilon):
                # Explore the environment by selecting a random action
                nextA = random.randint(0, numActions)
            else:
                # Exploit the environment by selecting an action that is the maximum of the value function at the current State
                nextA = np.argmax(Q[nextS_Key])

            # Update values in Q-table
            Q[currentS_Key][a] = Q[currentS_Key][a] + alpha*(reward+discount*Q[nextS_Key][nextA]-Q[currentS_Key][a])

            if (done == True or truncated == True):
                # if agent reached its goal successfully
                steps_done = i
                break

            # store the next state and action as the current state and action for the next step
            currentS = nextS
            a = nextA
            
        total_reward += reward

        # anneal epsilon
        epsilon = max(epsilon*0.999, epsilon_min)
        
    if (n % 10 == 0 and n > 0):
        print('Done training iteration', n)
        
    if total_reward/episodes > best_result:
            best_result = total_reward/episodes
            best_params = [epsilon_max, epsilon_min, alpha, discount]
            best_Q = Q.copy()
    
env.close()

print('Best average reward is', best_result, 'achieved with hyperparamater configuration:', best_params)

Done training iteration 10
Done training iteration 20
Best average reward is 0.9108329427083401 achieved with hyperparamater configuration: [0.61, 0.03, 0.32, 0.44]


In [66]:
# Train with best hypermarameters

# Make the gym environment
env = gym.make('MiniGrid-Empty-8x8-v0')
writer = SummaryWriter()

# declare the variable to store the tabular value-function
Q = {}
numActions = 2
episodes = 3000
epsilon_max = 0.61
epsilon_min = 0.03
alpha = 0.32
discount = 0.44
max_steps = env.max_steps

# Use a wrapper so the observation only contains the grid information
env = ImgObsWrapper(env)

print('Start training...')
epsilon = epsilon_max
total_reward = 0
for e in range(episodes):
    # reset the environment
    obs, _ = env.reset()
    currentS = preprocess(obs)   
    a = random.randint(0, numActions)
    for i in range(0, max_steps):

        # hash state
        currentS_Key = hashState(currentS)

        if currentS_Key not in Q:
            Q[currentS_Key] = np.zeros(3)

        # take action 'a', receive reward 'reward', and observe next state 'obs'
        # 'done' indicate if the termination state was reached
        obs, reward, done, truncated, info = env.step(a)

        # extract the next state from the observation and hash it
        nextS = preprocess(obs)
        nextS_Key = hashState(nextS)
        if nextS_Key not in Q:
            Q[nextS_Key] = np.zeros(3)

        # select next action epsilon-greedily
        if (random.random() < epsilon):
            # Explore the environment by selecting a random action
            nextA = random.randint(0, numActions)
        else:
            # Exploit the environment by selecting an action that is the maximum of the value function at the current State
            nextA = np.argmax(Q[nextS_Key])

        # Update values in Q-table
        Q[currentS_Key][a] = Q[currentS_Key][a] + alpha*(reward+discount*Q[nextS_Key][nextA]-Q[currentS_Key][a])

        if (done == True or truncated == True):
            # if agent reached its goal successfully
            steps_done = i
            break

        # store the next state and action as the current state and action for the next step
        currentS = nextS
        a = nextA
    total_reward += reward
    writer.add_scalar("Reward/training", total_reward/(e+1), e)
    
    # anneal epsilon
    epsilon = max(epsilon*0.999, epsilon_min)
        
print('Done training...')
env.close()
writer.flush()
writer.close()

filename = 'SARSA_qtable.pickle'

# Saving the value-function to file
with open(filename, 'wb') as handle:
    pickle.dump(Q, handle, protocol=pickle.HIGHEST_PROTOCOL)
    handle.close()

Start training...
Done training...


In [12]:
# Loading the value-function from file
filename = 'SARSA_qtable.pickle'
if (exists(filename)):
    print('Loading existing Q values')
    # Load data (deserialize)
    with open(filename, 'rb') as handle:
        Q = pickle.load(handle)
        handle.close()
else:
    print('Filename %s does not exist, could not load data' % filename) 

Loading existing Q values


In [13]:
# Visualize optimal policy

env = gym.make('MiniGrid-Empty-8x8-v0', render_mode='human')
env = ImgObsWrapper(env)
max_steps = env.max_steps
# reset the environment
obs, _ = env.reset()
time.sleep(2)
currentS = preprocess(obs)
for i in range(0, max_steps):
    currentS_Key = hashState(currentS)

    if currentS_Key not in Q:
        Q[currentS_Key] = np.zeros(3)

    # Choose an action
    a = np.argmax(Q[currentS_Key])

    # take action 'a', receive reward 'reward', and observe next state 'obs'
    # 'done' indicate if the termination state was reached
    obs, reward, done, truncated, info = env.step(a)
    # extract the next state from the observation
    nextS = preprocess(obs)

    #env.render() # render the environment, this does not work inside Jupyter notebook

    # sleep for 50 milliseconds so we can see the rendering of the environment. 
    #time.sleep(0.05) # When training without rendering remove this line
    if (done == True):
        # if agent reached its goal successfully
        print('Finished episode successfully taking %d steps and receiving reward %f' % (i+1, reward))
        break
    if (truncated == True):
        # agent failed to reach its goal successfully 
        print('Truncated episode taking %d steps and receiving reward %f' % (i+1, reward))
        break

    # since the episode is not done, store the next state as the current state for the next step
    currentS = nextS
env.close()

Finished episode successfully taking 12 steps and receiving reward 0.957812


In [68]:
# Evaluate model for 1000 iterations
env = gym.make('MiniGrid-Empty-8x8-v0')
env = ImgObsWrapper(env)
mean_reward = 0
completion_rate = 0
average_steps = 0

for run in range(1000):
    # reset the environment
    obs, _ = env.reset()
    currentS = preprocess(obs)

    for i in range(0, max_steps):

        # hash state
        currentS_Key = hashState(currentS)

        if currentS_Key not in Q:
            Q[currentS_Key] = np.zeros(3)

        # Choose an action
        a = np.argmax(Q[currentS_Key])

        # take action 'a', receive reward 'reward', and observe next state 'obs'
        # 'done' indicate if the termination state was reached
        obs, reward, done, truncated, info = env.step(a)
        
        # extract the next state from the observation
        nextS = preprocess(obs)

        if (done == True):
            # if agent reached its goal successfully
            mean_reward += reward
            completion_rate += 1
            average_steps += i+1
            break
        if (truncated == True):
            # agent failed to reach its goal successfully 
            mean_reward += reward
            average_steps += i+1
            break

        # since the episode is not done, store the next state as the current state for the next step
        currentS = nextS
    
env.close()

completion_rate /= 1000
average_steps /= 1000
print('Completion rate is', completion_rate)
print('Average number of steps is', average_steps)
print('Average reward over 1000 runs is', np.round(mean_reward/1000, 4))

Completion rate is 1.0
Average number of steps is 12.0
Average reward over 1000 runs is 0.9578
