# Project 1: Simple 3X3 obstacle avoiding game
### Set up env and actions
Env is made up of x and y co-ordinates

In [10]:
import numpy as np
import random
from typing import Tuple, List

GRID_SIZE = 3
START = (0,0)
GOAL = (GRID_SIZE-1, GRID_SIZE-1)

OBSTACLE = (1,1)

ACTIONS = [
    (-1,0),
    (0,1),
    (0,-1),
    (1,0)
]

In [2]:
def is_valid_state(state: Tuple[int, int]) -> bool:
    return (0 <= state[0] < GRID_SIZE and 0 <= state[1] < GRID_SIZE and state != OBSTACLE)

In [3]:
def get_next_state(state: Tuple[int, int], action: Tuple[int, int]) -> Tuple[int, int]:
    next_state = (state[0] + action[0], state[1] + action[1])
    return next_state if is_valid_state(next_state) else state

In [4]:
EPSILON = 0.3
ALPHA = 0.3
GAMMA = 0.99
EPISODES = 10000

In [5]:
def get_reward(state: Tuple[int, int], next_state: Tuple[int, int]) -> int:
    if next_state == GOAL:
        return 100
    elif next_state == OBSTACLE and next_state == state:
        return -10
    else:
        return -1

In [6]:
def choose_action(state: Tuple[int, int], q_table: np.ndarray) -> Tuple[int, int]:
    if random.uniform(0,1) < EPSILON:
        return random.choice(ACTIONS)
    else:
        return ACTIONS[np.argmax(q_table[state])]

In [8]:
def update_q_table(q_table: np.ndarray, state: Tuple[int, int], action: Tuple[int, int], reward: int, next_state: Tuple[int, int]) -> None:
    action_idx = ACTIONS.index(action)
    old_q_value = q_table[state][action_idx]
    q_table[state][action_idx] = old_q_value + ALPHA * (reward + GAMMA * np.max(q_table[next_state]) - old_q_value)

In [13]:
def train_agent() -> np.ndarray:
    q_table = np.zeros((GRID_SIZE, GRID_SIZE, len(ACTIONS)))
    for _ in range(EPISODES):
        state = START
        while state != GOAL:
            action = choose_action(state, q_table)
            next_state = get_next_state(state, action)
            reward = get_reward(state, next_state)
            update_q_table(q_table, state, action, reward, next_state)
            state = next_state

    return q_table

q_table = train_agent()

In [14]:
q_table

array([[[ 92.119202,  94.0598  ,  92.119202,  94.0598  ],
        [ 94.0598  ,  96.02    ,  92.119202,  94.0598  ],
        [ 96.02    ,  96.02    ,  94.0598  ,  98.      ]],

       [[ 92.119202,  94.0598  ,  94.0598  ,  96.02    ],
        [  0.      ,   0.      ,   0.      ,   0.      ],
        [ 96.02    ,  98.      ,  98.      , 100.      ]],

       [[ 94.0598  ,  98.      ,  96.02    ,  96.02    ],
        [ 98.      , 100.      ,  96.02    ,  98.      ],
        [  0.      ,   0.      ,   0.      ,   0.      ]]])

In [18]:
q_table[0][0][1]

94.05979999999994

# Project 2: Balancing a Pole using Gymnasium

In [2]:
pip install tensorflow

Collecting tensorflow
  Downloading tensorflow-2.19.0-cp311-cp311-macosx_12_0_arm64.whl.metadata (4.0 kB)
Collecting absl-py>=1.0.0 (from tensorflow)
  Downloading absl_py-2.2.0-py3-none-any.whl.metadata (2.4 kB)
Collecting astunparse>=1.6.0 (from tensorflow)
  Downloading astunparse-1.6.3-py2.py3-none-any.whl.metadata (4.4 kB)
Collecting gast!=0.5.0,!=0.5.1,!=0.5.2,>=0.2.1 (from tensorflow)
  Downloading gast-0.6.0-py3-none-any.whl.metadata (1.3 kB)
Collecting google-pasta>=0.1.1 (from tensorflow)
  Downloading google_pasta-0.2.0-py3-none-any.whl.metadata (814 bytes)
Collecting libclang>=13.0.0 (from tensorflow)
  Downloading libclang-18.1.1-1-py2.py3-none-macosx_11_0_arm64.whl.metadata (5.2 kB)
Collecting opt-einsum>=2.3.2 (from tensorflow)
  Downloading opt_einsum-3.4.0-py3-none-any.whl.metadata (6.3 kB)
Collecting termcolor>=1.1.0 (from tensorflow)
  Downloading termcolor-2.5.0-py3-none-any.whl.metadata (6.1 kB)
Collecting tensorboard~=2.19.0 (from tensorflow)
  Downloading tensorb

In [20]:
pip install gymnasium
# pip install "gymnasium[classic-control]"

SyntaxError: invalid syntax (4176897279.py, line 1)

In [21]:
pip install "gymnasium[classic-control]"

Collecting pygame>=2.1.3 (from gymnasium[classic-control])
  Downloading pygame-2.6.1-cp311-cp311-macosx_11_0_arm64.whl.metadata (12 kB)
Downloading pygame-2.6.1-cp311-cp311-macosx_11_0_arm64.whl (12.4 MB)
[2K   [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.4/12.4 MB[0m [31m16.0 MB/s[0m eta [36m0:00:00[0m MB/s[0m eta [36m0:00:01[0m
Installing collected packages: pygame
Successfully installed pygame-2.6.1
Note: you may need to restart the kernel to use updated packages.


In [3]:
import numpy as np
import random
from collections import deque
import gymnasium as gym
import tensorflow as tf
from tensorflow.keras import Model, layers
import os

In [4]:
env = gym.make('CartPole-v1')
state_size = env.observation_space.shape[0]
action_size = env.action_space.n

print("Action size: ", action_size)
print("State size: ", state_size)

Action size:  2
State size:  4


In [5]:
class DQN(Model):
    def __init__(self, action_size, **kwargs):
        super(DQN, self).__init__(**kwargs)
        self.action_size = action_size
        self.d1 = layers.Dense(24, activation='relu', name='d1')
        self.d2 = layers.Dense(24, activation='relu', name='d2')
        self.d3 = layers.Dense(action_size, activation='linear', name='d3')

    def call(self, x):
        return self.d3(self.d2(self.d1(x)))

    def get_config(self):
        config = super(DQN, self).get_config()
        config.update({"action_size": self.action_size})
        return config

    @classmethod
    def from_config(cls, config):
        return cls(**config)

memory = deque(maxlen=2000)

In [6]:
class Agent:
    def __init__(self, state_size, action_size, gamma=0.99, epsilon=1.0, epsilon_min=0.01, epsilon_decay=0.995, learning_rate=0.001):
        self.state_size = state_size
        self.action_size = action_size
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.learning_rate = learning_rate

        self.model = self._build_model()
        self.target_model = self._build_model()
        self.update_target_model()

        self.optimizer = tf.keras.optimizers.Adam(learning_rate=self.learning_rate)

    def _build_model(self):
        return DQN(self.action_size)

    def update_target_model(self):
        self.target_model.set_weights(self.model.get_weights())

    def remember(self, state, action, reward, next_state, done):
        memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        q_values = self.model(np.array([state]))
        return np.argmax(q_values[0].numpy())

    def save_model(self, filepath):
        self.model.save(filepath)

    def load_model(self, filepath):
        self.model = tf.keras.models.load_model(filepath, custom_objects={"DQN": DQN})
        self.target_model = tf.keras.models.load_model(filepath, custom_objects={"DQN": DQN})

    def replay(self, batch_size):
        minibatch = random.sample(memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            with tf.GradientTape() as tape:
                q_values = self.model(np.array([state]), training=True)
                q_value = q_values[0][action]

                if done:
                    target = reward
                else:
                    next_action = np.argmax(self.model(np.array([next_state]))[0].numpy())
                    t = self.target_model(np.array([next_state]))[0][next_action]
                    target = reward + self.gamma * t

                loss = tf.reduce_mean(tf.square(target - q_value))

            grads = tape.gradient(loss, self.model.trainable_variables)
            self.optimizer.apply_gradients(zip(grads, self.model. trainable_variables))

In [7]:
batch_size = 32           # Number of samples used for each training step
n_episodes = 500          # Total number of episodes to train on
gamma = 0.95              # Discount factor for future rewards (0 to 1)
epsilon = 1.0             # Initial exploration rate (1 = 100% random actions)
epsilon_min = 0.01        # Minimum exploration rate
epsilon_decay = 0.995     # Decay factor for epsilon after each episode
learning_rate = 0.001     # Step size for neural network weight updates
update_target_every = 5   # Number of episodes between target network updates

In [8]:
output_dir = './cartpole_model/'

# Initialize the Agent
agent = Agent(state_size, action_size, gamma=gamma, epsilon=epsilon, epsilon_min=epsilon_min, epsilon_decay=epsilon_decay, learning_rate=learning_rate)
done = False

# Main Script
for e in range(n_episodes):
    state = env.reset()[0]
    state = np.reshape(state, [1, state_size])
    total_reward = 0

    for time_t in range(500):
        action = agent.act(state[0])
        next_state, reward, done, truncated, _ = env.step(action)
        done = done or truncated
        next_state = np.reshape(next_state, [1, state_size])
        agent.remember(state[0], action, reward, next_state[0], done)
        state = next_state
        total_reward += reward

        if done:
            print(f"Episode: {e}/{n_episodes}, Score: {time_t}, Epsilon: {agent.epsilon:.2f}")
            break

    if len(memory) > batch_size:
        loss = agent.replay(batch_size)

    # Update epsilon
    if agent.epsilon > agent.epsilon_min:
        agent.epsilon *= agent.epsilon_decay

    # Update target network
    if e % update_target_every == 0:
        agent.update_target_model()

    
    if e % 100 == 0:
        agent.save_model(os.path.join(output_dir, f'model_{e}.keras'))

agent.save_model(os.path.join(output_dir, f'model_500.keras'))

Episode: 0/500, Score: 22, Epsilon: 1.00
Episode: 1/500, Score: 70, Epsilon: 0.99


  return saving_lib.save_model(model, filepath)


Episode: 2/500, Score: 28, Epsilon: 0.99
Episode: 3/500, Score: 18, Epsilon: 0.99
Episode: 4/500, Score: 14, Epsilon: 0.98
Episode: 5/500, Score: 8, Epsilon: 0.98
Episode: 6/500, Score: 15, Epsilon: 0.97
Episode: 7/500, Score: 16, Epsilon: 0.97
Episode: 8/500, Score: 15, Epsilon: 0.96
Episode: 9/500, Score: 15, Epsilon: 0.96
Episode: 10/500, Score: 11, Epsilon: 0.95
Episode: 11/500, Score: 22, Epsilon: 0.95
Episode: 12/500, Score: 21, Epsilon: 0.94
Episode: 13/500, Score: 8, Epsilon: 0.94
Episode: 14/500, Score: 20, Epsilon: 0.93
Episode: 15/500, Score: 21, Epsilon: 0.93
Episode: 16/500, Score: 13, Epsilon: 0.92
Episode: 17/500, Score: 25, Epsilon: 0.92
Episode: 18/500, Score: 14, Epsilon: 0.91
Episode: 19/500, Score: 16, Epsilon: 0.91
Episode: 20/500, Score: 21, Epsilon: 0.90
Episode: 21/500, Score: 17, Epsilon: 0.90
Episode: 22/500, Score: 23, Epsilon: 0.90
Episode: 23/500, Score: 11, Epsilon: 0.89
Episode: 24/500, Score: 18, Epsilon: 0.89
Episode: 25/500, Score: 11, Epsilon: 0.88
Ep

In [10]:
def render_episode(agent, model_path, num_episodes=1):
    # Load the model
    agent.load_model(model_path)
    
    env = gym.make('CartPole-v1', render_mode='human')
    for episode in range(num_episodes):
        state, _ = env.reset()
        state = state.reshape(1, -1)
        done = False
        total_reward = 0
        while not done:
            env.render()
            action = agent.act(state)
            next_state, reward, done, truncated, _ = env.step(action)
            next_state = next_state.reshape(1, -1)
            total_reward += reward
            state = next_state
            if truncated:
                done = True
        print(f"Episode {episode + 1} reward: {total_reward}")
    env.close()

# Initializing saved model
state_size = 4
action_size = 2
agent = Agent(state_size, action_size)
agent.epsilon = 0.0  # 0.0 for pure exploitation

# Load the model and render episodes
model_path = "./cartpole_model/model_100.keras"
# model_path = "./cartpole_model/DQN_old.keras"
render_episode(agent, model_path, num_episodes=2)

Episode 1 reward: 27.0
Episode 2 reward: 38.0
