## RL Agents using Q-Learning and Vanilla Policy Gradient

I am Rishabh Jain and this is my task of making agents using q-learning and vpg

### STEP 1: Importing Libraries

In [50]:
import gymnasium as gym
import numpy as np
import torch
import torch.optim as optim
import torch.nn as nn
import random
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import seaborn as sns
from torch.distributions import Categorical

### STEP 2: Making the Environment

In [51]:
env = gym.make("FrozenLake-v1", map_name="8x8", is_slippery=True)
state_size = env.observation_space.n
action_size = env.action_space.n
np.random.seed(1)
random.seed(1)
torch.manual_seed(1)

<torch._C.Generator at 0x7cdd2bf89690>

### STEP 3: The code for the Q-Learning

In [52]:
episodes = 20000
gamma = 0.99
alpha = 0.1
epsilon = 1.0
epsilon_decay = 0.9999999999999999
min_epsilon = 0.3

q_table = np.zeros((state_size, action_size))
q_rewards = []
success_count = 0

for ep in range(episodes):
    state, _ = env.reset(seed=1)
    done = False
    total_reward = 0
    while not done:
        if np.random.rand() < epsilon:
            action = env.action_space.sample()
        else:
            action = np.argmax(q_table[state])

        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated

        q_table[state, action] += alpha * (
            reward + gamma * np.max(q_table[next_state]) - q_table[state, action]
        )

        state = next_state
        total_reward += reward

    q_rewards.append(total_reward)
    if total_reward > 0:
        success_count += 1

    epsilon = max(min_epsilon, epsilon * epsilon_decay)

print(f"Total Successes: {success_count}/{episodes}")
print(f"Success Rate: {success_count / episodes * 100:.2f}%")
print(q_table)


Total Successes: 26/20000
Success Rate: 0.13%
[[1.42862053e-01 1.44274683e-01 1.43726643e-01 1.44647893e-01]
 [1.45033946e-01 1.46166970e-01 1.46875454e-01 1.47628717e-01]
 [1.50074175e-01 1.52192201e-01 1.52956074e-01 1.54190606e-01]
 [1.56587036e-01 1.58172265e-01 1.61007725e-01 1.62909171e-01]
 [1.62817476e-01 1.70298246e-01 1.67540633e-01 1.68634077e-01]
 [1.72427443e-01 1.74145581e-01 1.79251398e-01 1.74217393e-01]
 [1.83118206e-01 1.81702795e-01 1.87284189e-01 1.83163299e-01]
 [1.89339980e-01 1.88193543e-01 1.90360698e-01 1.87443385e-01]
 [1.41735407e-01 1.43090817e-01 1.42392069e-01 1.43572626e-01]
 [1.43516447e-01 1.44203394e-01 1.45772390e-01 1.46668118e-01]
 [1.43982317e-01 1.45397485e-01 1.45497234e-01 1.51934309e-01]
 [1.27041616e-01 1.13752162e-01 1.15369192e-01 1.55887948e-01]
 [1.49075593e-01 1.50392409e-01 1.56563471e-01 1.63926328e-01]
 [1.64343684e-01 1.69198242e-01 1.72710438e-01 1.73117598e-01]
 [1.84000050e-01 1.88680511e-01 1.89906775e-01 1.84581976e-01]
 [1.92769

### STEP 4: The code of VPG

In [None]:
class PolicyNetwork(nn.Module):
    def __init__(self, obs_dim, act_dim):
        super(PolicyNetwork, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(obs_dim, 64),
            nn.ReLU(),
            nn.Linear(64, act_dim),
        )

    def forward(self, x):
        x = self.fc(x)
        return torch.softmax(x, dim=-1)

def sample_action(policy, obs):
    obs = torch.tensor(obs, dtype=torch.float32)
    probs = policy(obs)
    dist = torch.distributions.Categorical(probs)
    action = dist.sample()
    return action.item(), dist.log_prob(action)

def compute_returns(rewards, gamma=0.99):
    G = 0
    returns = []
    for r in reversed(rewards):
        G = r + gamma * G
        returns.insert(0, G)
    returns = torch.tensor(returns, dtype=torch.float32)
    return returns

def train_vpg(policy, optimizer, episodes=20000):
    vpg_rewards = []
    vpg_success = 0
    baseline = 0.0
    for episode in range(episodes):
        obs, _ = env.reset(seed=1)
        obs = np.eye(state_size)[obs]
        log_probs, rewards = [], []
        done = False

        while not done:
            action, log_prob = sample_action(policy, obs)
            next_obs, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            log_probs.append(log_prob)
            rewards.append(reward)
            obs = np.eye(state_size)[next_obs]

        returns = compute_returns(rewards)
        baseline = 0.9 * baseline + 0.1 * returns.mean().item()
        advantage = returns - baseline
        loss = -torch.sum(torch.stack(log_probs) * advantage)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        episode_reward = sum(rewards)
        vpg_rewards.append(sum(rewards))
        if episode_reward > 0:
            vpg_success += 1
        if episode % 500 == 0:
            print(f"Episode {episode} — Reward: {sum(rewards)}")

    return vpg_rewards, vpg_success

obs_dim = state_size
act_dim = action_size
policy = PolicyNetwork(obs_dim, act_dim)
optimizer = optim.Adam(policy.parameters(), lr=1e-2)

vpg_rewards, vpg_success = train_vpg(policy, optimizer)
print(f"Total Successes: {vpg_success}/{episodes}")
print(f"Success Rate: {vpg_success / 20000 * 100:.2f}%")

Episode 0 — Reward: 0.0
Episode 500 — Reward: 0.0
Episode 1000 — Reward: 0.0
Episode 1500 — Reward: 0.0
Episode 2000 — Reward: 0.0
Episode 2500 — Reward: 0.0
Episode 3000 — Reward: 0.0
Episode 3500 — Reward: 0.0
Episode 4000 — Reward: 0.0
Episode 4500 — Reward: 0.0
Episode 5000 — Reward: 0.0
Episode 5500 — Reward: 0.0
Episode 6000 — Reward: 0.0
Episode 6500 — Reward: 0.0
Episode 7000 — Reward: 0.0
Episode 7500 — Reward: 0.0


### STEP 5: Visualization

In [None]:
def plot_q_table_heatmap(q_table, title="Q-Table Heatmap"):
    best_actions = np.argmax(q_table, axis=1).reshape((8, 8))
    plt.figure(figsize=(8, 6))
    sns.heatmap(best_actions, annot=True, cmap="YlGnBu", cbar=False, linewidths=0.5)
    plt.title(title)
    plt.xlabel("Column")
    plt.ylabel("Row")
    plt.show()

def plot_policy_probs(policy_net):
    grid = np.zeros((8, 8))
    action_grid = np.empty((8, 8), dtype=object)
    for state in range(state_size):
        one_hot = np.eye(state_size)[state]
        one_hot_tensor = torch.tensor(one_hot, dtype=torch.float32)
        with torch.no_grad():
            probs = policy_net(one_hot_tensor).numpy()
        grid[state // 8][state % 8] = np.max(probs)
        best_action = np.argmax(probs)
        symbols = ["←", "↓", "→", "↑"]
        action_grid[state // 8][state % 8] = symbols[best_action]

    plt.figure(figsize=(10, 6))
    sns.heatmap(grid, annot=action_grid, fmt="", cmap="coolwarm", linewidths=0.5)
    plt.title("Policy Probability Map (VPG)")
    plt.xlabel("Column")
    plt.ylabel("Row")
    plt.show()

def plot_learning_curve(rewards_q, rewards_vpg):
    plt.figure(figsize=(10, 5))
    plt.plot(rewards_q, label="Q-learning", alpha=0.6)
    plt.plot(rewards_vpg, label="VPG", alpha=0.6)
    plt.xlabel("Episode")
    plt.ylabel("Total Reward")
    plt.title("Learning Curve Comparison")
    plt.legend()
    plt.grid(True)
    plt.show()

plot_q_table_heatmap(q_table, title="Q-Learning: Best Actions per State")

plot_policy_probs(policy)

plot_learning_curve(rewards_q=q_rewards, rewards_vpg=vpg_rewards)


### STEP 6: Animation

In [None]:
from IPython.display import Image, display

tile_dict = {
    b'S': 'S',
    b'F': '.',
    b'H': 'H',
    b'G': 'G'
}
agent_icon = 'A'

desc = env.unwrapped.desc
rows, cols = desc.shape

state, _ = env.reset(seed=42)
episode_states = [state]
done = False
while not done:
    action = np.argmax(q_table[state])
    next_state, reward, terminated, truncated, _ = env.step(action)
    episode_states.append(next_state)
    state = next_state
    done = terminated or truncated

def get_rendered_frame(state_idx):
    s = episode_states[state_idx]
    r, c = divmod(s, cols)
    grid = [[tile_dict.get(desc[i, j], '?') for j in range(cols)] for i in range(rows)]
    grid[r][c] = agent_icon
    return grid

fig, ax = plt.subplots(figsize=(cols, rows))
plt.axis('off')
table = ax.table(cellText=get_rendered_frame(0), loc='center', cellLoc='center')
table.scale(1, 1.5)

def update(frame_idx):
    grid = get_rendered_frame(frame_idx)
    for i in range(rows):
        for j in range(cols):
            table[i, j].get_text().set_text(grid[i][j])
    return table

ani = animation.FuncAnimation(fig, update, frames=len(episode_states), interval=500, repeat=False)
gif_path = "/content/frozenlake_animation.gif"
ani.save(gif_path, writer='pillow', fps=2)

display(Image(filename=gif_path))
plt.show()
