<a href="https://colab.research.google.com/github/hohaithuy/AI-Pacman-CS106/blob/main/CS106_Q_Learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import gym
import numpy as np
import random

In [2]:
env = gym.make('FrozenLake-v0')

# Q-Learning

In [3]:
# Initialize Q-value table randomly
q_table = np.zeros((env.observation_space.n, env.action_space.n))
print(q_table)

[[0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]]


In [4]:
# Hyperparameters
gamma = 0.99
learning_rate = 0.1
max_epsilon = 1.0
min_epsilon = 0.01
epsilon_decay_rate = 0.005

num_episodes = 20000
num_steps_per_episode = 100

In [5]:
def q_learning(env, num_episodes, num_steps_per_episode, learning_rate, gamma, max_epsilon, min_epsilon, epsilon_decay_rate):
    q_table = np.zeros((env.observation_space.n, env.action_space.n))
    rewards_all = []
    for episode in range(num_episodes):
        state = env.reset()

        reward_episode = 0.0
        done = False
        epsilon = min_epsilon + (max_epsilon - min_epsilon) * np.exp(-epsilon_decay_rate*episode)
        for step in range(num_steps_per_episode):
            exploration = random.uniform(0,1)
            if exploration < epsilon:
                action = env.action_space.sample()
            else:
                action = np.argmax(q_table[state, :])

            next_state, reward, done, info = env.step(action)
            q_table[state, action] = q_table[state, action] * (1 - learning_rate) + learning_rate * (reward + gamma * np.max(q_table[next_state,:]))

            reward_episode += reward
            state = next_state

            if done:
                break
        rewards_all.append(reward_episode)
    print(f'Episode {episode} finished')
    return q_table, rewards_all

### TEST

In [6]:
q_table, rewards_all = q_learning(env, num_episodes, num_steps_per_episode, learning_rate, gamma, max_epsilon, min_epsilon, epsilon_decay_rate)

Episode 19999 finished


In [7]:
q_table

array([[0.59815267, 0.52704808, 0.51828891, 0.50471752],
       [0.23055112, 0.28765303, 0.23299557, 0.43090304],
       [0.3975804 , 0.22651537, 0.23006189, 0.30742742],
       [0.1887658 , 0.        , 0.        , 0.        ],
       [0.62072763, 0.35982193, 0.32465072, 0.43216566],
       [0.        , 0.        , 0.        , 0.        ],
       [0.15283387, 0.12008668, 0.327298  , 0.10527145],
       [0.        , 0.        , 0.        , 0.        ],
       [0.43030108, 0.39065775, 0.43333165, 0.65899706],
       [0.35539006, 0.70523842, 0.47541626, 0.36466339],
       [0.76550288, 0.35993733, 0.33011948, 0.31737901],
       [0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.        , 0.        , 0.        ],
       [0.46299143, 0.59684331, 0.82336505, 0.44776503],
       [0.75899014, 0.94633304, 0.80260274, 0.70904058],
       [0.        , 0.        , 0.        , 0.        ]])

In [8]:
print(sum(rewards_all))
print(sum(rewards_all[0:1000]))
print(sum(rewards_all[1000:2000]))
print(sum(rewards_all[2000:3000]))
print(sum(rewards_all[9000:10000]))
print(sum(rewards_all[11000:12000]))
print(sum(rewards_all[14000:15000]))
print(sum(rewards_all[19000:]))

12575.0
161.0
426.0
489.0
687.0
680.0
698.0
704.0


In [9]:
def play(env, q_table, render=False):
    state = env.reset()
    total_reward = 0
    steps = 0
    done = False
    while not done:
        action = np.argmax(q_table[state, :])
        next_state, reward, done, info = env.step(action)
        total_reward += reward
        steps += 1
        if render:
            env.render()
            time.sleep(0.2)
            if not done:
                display.clear_output(wait=True)
        state = next_state

    return (total_reward, steps)

In [10]:
def play_multiple_times(env, q_table, max_episodes):
    success = 0
    list_of_steps = []
    for i in range(max_episodes):
        total_reward, steps = play(env, q_table)

        if total_reward > 0:
            success += 1
            list_of_steps.append(steps)

    print(f'Number of successes: {success}/{max_episodes}')
    print(f'Average number of steps: {np.mean(list_of_steps)}')

In [11]:
play_multiple_times(env, q_table, 1000)

Number of successes: 708/1000
Average number of steps: 36.693502824858754


# SARSA

In [12]:
# Initialize Q-value table randomly
q_table = np.zeros((env.observation_space.n, env.action_space.n))
print(q_table)

[[0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]
 [0. 0. 0. 0.]]


In [13]:
# Hyperparameters
gamma = 0.99
learning_rate = 0.1
max_epsilon = 1.0
min_epsilon = 0.01
epsilon_decay_rate = 0.005

num_episodes = 20000
num_steps_per_episode = 100

In [14]:
def sarsa(env, num_episodes, num_steps_per_episode, learning_rate, gamma, max_epsilon, min_epsilon, epsilon_decay_rate):
    q_table = np.zeros((env.observation_space.n, env.action_space.n))
    rewards_all = []
    for episode in range(num_episodes):
        state = env.reset()

        reward_episode = 0.0
        done = False
        epsilon = min_epsilon + (max_epsilon - min_epsilon) * np.exp(-epsilon_decay_rate*episode)

        exploration = random.uniform(0,1)
        if exploration < epsilon:
            action = env.action_space.sample()
        else:
            action = np.argmax(q_table[state, :])

        for step in range(num_steps_per_episode):

            next_state, reward, done, info = env.step(action)

            exploration = random.uniform(0,1)
            if exploration < epsilon:
                next_action = env.action_space.sample()
            else:
                next_action = np.argmax(q_table[next_state, :])


            q_table[state, action] = q_table[state, action] * (1 - learning_rate) + learning_rate * (reward + gamma * q_table[next_state, next_action])

            reward_episode += reward
            state = next_state
            action = next_action

            if done:
                break
        rewards_all.append(reward_episode)
    print(f'Episode {episode} finished')
    return q_table, rewards_all

### TEST

In [15]:
q_table, rewards_all = sarsa(env, num_episodes, num_steps_per_episode, learning_rate, gamma, max_epsilon, min_epsilon, epsilon_decay_rate)

Episode 19999 finished


In [16]:
q_table

array([[0.53753331, 0.47856003, 0.49161675, 0.4758208 ],
       [0.31531931, 0.39583936, 0.22441657, 0.48169993],
       [0.38460171, 0.3806945 , 0.35239827, 0.44552165],
       [0.27545707, 0.22215938, 0.23299918, 0.42419435],
       [0.54959234, 0.32951249, 0.35363378, 0.3160213 ],
       [0.        , 0.        , 0.        , 0.        ],
       [0.35004829, 0.14647783, 0.18716203, 0.16056844],
       [0.        , 0.        , 0.        , 0.        ],
       [0.31804055, 0.41171648, 0.28963147, 0.59465981],
       [0.42457454, 0.62121669, 0.38198078, 0.45631307],
       [0.61653384, 0.42645396, 0.19783077, 0.28958207],
       [0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.        , 0.        , 0.        ],
       [0.44801434, 0.39994589, 0.71284392, 0.56214048],
       [0.71701333, 0.90090435, 0.74842935, 0.76764452],
       [0.        , 0.        , 0.        , 0.        ]])

In [17]:
print(sum(rewards_all))
print(sum(rewards_all[0:1000]))
print(sum(rewards_all[1000:2000]))
print(sum(rewards_all[2000:3000]))
print(sum(rewards_all[9000:10000]))
print(sum(rewards_all[11000:12000]))
print(sum(rewards_all[14000:15000]))
print(sum(rewards_all[19000:]))

12981.0
239.0
597.0
648.0
658.0
676.0
698.0
693.0


In [18]:
def play(env, q_table, render=False):
    state = env.reset()
    total_reward = 0
    steps = 0
    done = False
    while not done:
        action = np.argmax(q_table[state, :])
        next_state, reward, done, info = env.step(action)
        total_reward += reward
        steps += 1
        if render:
            env.render()
            time.sleep(0.2)
            if not done:
                display.clear_output(wait=True)
        state = next_state

    return (total_reward, steps)

In [19]:
def play_multiple_times(env, q_table, max_episodes):
    success = 0
    list_of_steps = []
    for i in range(max_episodes):
        total_reward, steps = play(env, q_table)

        if total_reward > 0:
            success += 1
            list_of_steps.append(steps)

    print(f'Number of successes: {success}/{max_episodes}')
    print(f'Average number of steps: {np.mean(list_of_steps)}')

In [20]:
play_multiple_times(env, q_table, 1000)

Number of successes: 750/1000
Average number of steps: 37.48133333333333
