<h2>Q - Learning Algorithm</h2>

In [1]:
import numpy as np
import random
from typing import Tuple, List

# Define the grid world -> environment
GRID_SIZE = 3
START = (0, 0)
GOAL = (2, 2)
OBSTACLE = (1, 1)

# Define Actions -> action space
ACTIONS = [
    (-1, 0), # up
    (0, 1), # right
    (1, 0), # down
    (0, -1) # left
] 

In [2]:
def is_valid_state(state: Tuple[int, int]) -> bool:
    return (0 <= state[0] < GRID_SIZE and
            0 <= state[1] < GRID_SIZE and
            state != OBSTACLE)

In [3]:
def get_next_state(state: Tuple[int, int], action: Tuple[int, int]) -> Tuple[int, int]:
    next_state = (state[0] + action[0], state[1] + action[1])
    return next_state if is_valid_state(next_state) else state

In [4]:
# Defining Q - Learning parameters
EPSILON = 0.3
ALPHA = 0.3
GAMMA = 0.99
EPISODES = 10000

In [5]:
def get_reward(state: Tuple[int, int], next_state: Tuple[int, int]) -> int:
    if next_state == GOAL:
        return 100
    elif next_state == OBSTACLE or next_state == state:
        return -10
    else: return -1

In [6]:
def choose_action(state: Tuple[int, int], q_table: np.ndarray) -> Tuple[int, int]:
    if random.uniform(0, 1) < EPSILON:
        return random.choice(ACTIONS)
    else:
        return ACTIONS[np.argmax(q_table[state])]

In [7]:
# The Bellman Equation
def update_q_table(q_table: np.ndarray, state: Tuple[int, int], action: Tuple[int, int], 
                   reward: int, next_state: Tuple[int, int]) -> None:
    action_idx = ACTIONS.index(action)
    q_table[state][action_idx] += ALPHA * (reward + GAMMA * np.max(q_table[next_state]) - q_table[state][action_idx])

In [8]:
# Training
def train_agent() -> np.ndarray:
    q_table = np.zeros((GRID_SIZE, GRID_SIZE, len(ACTIONS)))
    
    for _ in range(EPISODES):
        state = START
        while state != GOAL:
            action = choose_action(state, q_table)
            next_state = get_next_state(state, action)
            reward = get_reward(state, next_state)
            update_q_table(q_table, state, action, reward, next_state)
            state = next_state
    
    return q_table

# Train the agent
q_table = train_agent()

In [9]:
# Now that we have a final Q-Table representing our trained agent, we can visualize what was learned during the training

def visualize_q_table_as_grid(q_table: np.ndarray) -> None:
    """Visualize the Q-table as a grid with all action values for each state."""
    action_symbols = ['^', '>', 'v', '<']
    
    print("\nDetailed Q-table Grid:")
    
    # Header
    header = "   |" + "|".join(f"   ({i},{j})   " for i in range(GRID_SIZE) for j in range(GRID_SIZE)) + "|"
    print(header)
    print("-" * len(header))

    for action_idx, action_symbol in enumerate(action_symbols):
        row = f" {action_symbol} |"
        for i in range(GRID_SIZE):
            for j in range(GRID_SIZE):
                if (i, j) == GOAL:
                    cell = "   GOAL    "
                elif (i, j) == OBSTACLE:
                    cell = " OBSTACLE  "
                else:
                    q_value = q_table[i, j, action_idx]
                    cell = f" {q_value:9.2f} "
                row += cell + "|"
        print(row)
        print("-" * len(header))

def visualize_best_actions_grid(q_table: np.ndarray) -> None:
    """Visualize the best action and its Q-value for each state in a grid."""
    action_symbols = ['^', '>', 'v', '<']
    
    print("\nBest Actions Grid:")
    header = "-" * (14 * GRID_SIZE + 1)
    print(header)

    for i in range(GRID_SIZE):
        row = "| "
        for j in range(GRID_SIZE):
            if (i, j) == GOAL:
                cell = "   GOAL    "
            elif (i, j) == OBSTACLE:
                cell = " OBSTACLE  "
            else:
                best_action_idx = np.argmax(q_table[i, j])
                best_q_value = q_table[i, j, best_action_idx]
                cell = f"{action_symbols[best_action_idx]}:{best_q_value:7.2f}  "
            row += cell + " | "
        print(row)
        print(header)

In [10]:
# Visualize the Q-table as a grid
visualize_q_table_as_grid(q_table)

# Visualize the best actions and their Q-values in a grid
visualize_best_actions_grid(q_table)


Detailed Q-table Grid:
   |   (0,0)   |   (0,1)   |   (0,2)   |   (1,0)   |   (1,1)   |   (1,2)   |   (2,0)   |   (2,1)   |   (2,2)   |
----------------------------------------------------------------------------------------------------------------
 ^ |     83.12 |     85.06 |     87.02 |     92.12 | OBSTACLE  |     96.02 |     94.06 |     89.00 |   GOAL    |
----------------------------------------------------------------------------------------------------------------
 > |     94.06 |     96.02 |     87.02 |     85.06 | OBSTACLE  |     89.00 |     98.00 |    100.00 |   GOAL    |
----------------------------------------------------------------------------------------------------------------
 v |     94.06 |     85.06 |     98.00 |     96.02 | OBSTACLE  |    100.00 |     87.02 |     89.00 |   GOAL    |
----------------------------------------------------------------------------------------------------------------
 < |     83.12 |     92.12 |     94.06 |     85.06 | OBSTACLE  |     89.

<h2>DQN Algorithm</h2>

In [13]:
import numpy as np
import random
from collections import deque
import gymnasium as gym
import tensorflow as tf
from tensorflow.keras import Model, layers
import os

In [14]:
env = gym.make('CartPole-v1')

state_size = env.observation_space.shape[0]
action_size = env.action_space.n

print("State Size: ", state_size)
print("Action Size: ", action_size)

State Size:  4
Action Size:  2


In [15]:
class DQN(Model):
    def __init__(self, action_size, **kwargs):
        super(DQN, self).__init__(**kwargs)
        self.action_size = action_size
        self.d1 = layers.Dense(24, activation='relu', name='d1')
        self.d2 = layers.Dense(24, activation='relu', name='d2')
        self.d3 = layers.Dense(action_size, activation='linear', name='d3')

    def call(self, x):
        x = self.d1(x)
        x = self.d2(x)
        return self.d3(x)

    # Configs for loading the saved model file later on
    def get_config(self):
        config = super(DQN, self).get_config()
        config.update({"action_size": self.action_size})
        return config

    @classmethod
    def from_config(cls, config):
        return cls(**config)

In [16]:
# Replay memory deque
memory = deque(maxlen=2000)

In [17]:
class Agent:
    def __init__(self, state_size, action_size, gamma=0.99, epsilon=1.0, epsilon_min=0.01, epsilon_decay=0.995, learning_rate=0.001):
        self.state_size = state_size
        self.action_size = action_size
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.learning_rate = learning_rate

        self.model = self._build_model()
        self.target_model = self._build_model()
        self.update_target_model()

        self.optimizer = tf.keras.optimizers.Adam(learning_rate=self.learning_rate)

    def _build_model(self):
        return DQN(self.action_size)

    def update_target_model(self):
        self.target_model.set_weights(self.model.get_weights())

    def remember(self, state, action, reward, next_state, done):
        memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        q_values = self.model(np.array([state]))
        return np.argmax(q_values[0].numpy())

    def save_model(self, filepath):
        self.model.save(filepath)

    def load_model(self, filepath):
        # Load the saved model from the specified filepath
        self.model = tf.keras.models.load_model(filepath, custom_objects={"DQN": DQN})
        self.target_model = tf.keras.models.load_model(filepath, custom_objects={"DQN": DQN})
        
    def replay(self, batch_size):
        minibatch = random.sample(memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            with tf.GradientTape() as tape:
                q_values = self.model(np.array([state]), training=True)
                q_value = q_values[0][action]

                if done:
                    target = reward
                else:
                    next_action = np.argmax(self.model(np.array([next_state]))[0].numpy())
                    t = self.target_model(np.array([next_state]))[0][next_action]
                    target = reward + self.gamma * t

                loss = tf.reduce_mean(tf.square(target - q_value))

            grads = tape.gradient(loss, self.model.trainable_variables)
            self.optimizer.apply_gradients(zip(grads, self.model.trainable_variables))

In [18]:
batch_size = 32           # Number of samples used for each training step
n_episodes = 100          # Total number of episodes to train on
gamma = 0.95              # Discount factor for future rewards (0 to 1)
epsilon = 1.0             # Initial exploration rate (1 = 100% random actions)
epsilon_min = 0.01        # Minimum exploration rate
epsilon_decay = 0.995     # Decay factor for epsilon after each episode
learning_rate = 0.001     # Step size for neural network weight updates
update_target_every = 5   # Number of episodes between target network updates

In [19]:
output_dir = './cartpole_model/'

# Initialize the Agent
agent = Agent(state_size, action_size, gamma=gamma, epsilon=epsilon, epsilon_min=epsilon_min, epsilon_decay=epsilon_decay, learning_rate=learning_rate)
done = False

# Main Script
for e in range(n_episodes):
    state = env.reset()[0]
    state = np.reshape(state, [1, state_size])
    total_reward = 0

    for time_t in range(500):
        action = agent.act(state[0])
        next_state, reward, done, truncated, _ = env.step(action)
        done = done or truncated
        next_state = np.reshape(next_state, [1, state_size])
        agent.remember(state[0], action, reward, next_state[0], done)
        state = next_state
        total_reward += reward

        if done:
            print(f"Episode: {e}/{n_episodes}, Score: {time_t}, Epsilon: {agent.epsilon:.2f}")
            break

    if len(memory) > batch_size:
        loss = agent.replay(batch_size)

    # Update epsilon
    if agent.epsilon > agent.epsilon_min:
        agent.epsilon *= agent.epsilon_decay

    # Update target network
    if e % update_target_every == 0:
        agent.update_target_model()

    
    if e % 100 == 0:
        agent.save_model(os.path.join(output_dir, f'model_{e}.keras'))

agent.save_model(os.path.join(output_dir, f'model_500.keras'))

Episode: 0/100, Score: 22, Epsilon: 1.00


  return saving_lib.save_model(model, filepath)


FileNotFoundError: [Errno 2] No such file or directory: './cartpole_model/model_0.keras'