 Task 1: Tabular Q-Learning Update

Task 1.1- Q-table initialization


In [None]:
import numpy as np

In [None]:
!pip install numpy==1.23.5
#ignore.It is needed for task 5. Was done while debugging task 5 due to gym and numpy versions compatibility error

In [None]:
def init_q_table(n_states,n_actions) :
 return np.zeros((n_states,n_actions))

Task 1.2- Q-table update

In [None]:
 def q_update(Q, s, a, r, s_next, alpha, gamma):
    best_next_q = np.max(Q[s_next])          # max_a′ Q[s_next, a′]
    td_target = r + gamma * best_next_q      # target value
    td_error = td_target - Q[s, a]           # TD error
    Q[s, a] += alpha * td_error              # Q-learning update
    return Q

Task 2: ε-Greedy Policy on a Custom GridWorld


In [None]:
import matplotlib.pyplot as plt

class GridWorld:
    def __init__(self, size=4):
        self.size = size
        self.n_states = size * size
        self.n_actions = 4  # 0=up, 1=down, 2=left, 3=right
        self.terminal_states = [0, self.n_states - 1]
        self.reset()

    def reset(self):
        self.state = 1  # Start from a non-terminal state
        return self.state

    def step(self, action):
        row, col = divmod(self.state, self.size)

        if self.state in self.terminal_states:
            return self.state, 0, True  # No move from terminal

        # Move based on action
        if action == 0 and row > 0:                # Up
            row -= 1
        elif action == 1 and row < self.size - 1:  # Down
            row += 1
        elif action == 2 and col > 0:              # Left
            col -= 1
        elif action == 3 and col < self.size - 1:  # Right
            col += 1

        new_state = row * self.size + col
        reward = 0 if new_state in self.terminal_states else -1
        done = new_state in self.terminal_states
        self.state = new_state
        return new_state, reward, done

    def render(self):
        grid = np.full((self.size, self.size), '.', dtype=str)
        r, c = divmod(self.state, self.size)
        grid[r, c] = 'A'
        for t in self.terminal_states:
            tr, tc = divmod(t, self.size)
            grid[tr, tc] = 'T'
        print('\n'.join(' '.join(row) for row in grid))
        print()

def select_action(Q, state, epsilon):
    if np.random.rand() < epsilon:
        return np.random.randint(Q.shape[1])  # explore
    return np.argmax(Q[state])  # exploit

def q_update(Q, s, a, r, s_next, alpha, gamma):
    best_next_q = np.max(Q[s_next])
    td_target = r + gamma * best_next_q
    Q[s, a] += alpha * (td_target - Q[s, a])
    return Q

def train_agent(epsilon):
 env = GridWorld()
 n_states = env.n_states
 n_actions = env.n_actions

 Q = np.zeros((n_states, n_actions))  # Initialize Q-table

 # Hyperparameters
 alpha = 0.1        # Learning rate
 gamma = 0.99       # Discount factor
 episodes = 500     # Number of episodes
 reward_log = []  # To store total reward per episode
 for episode in range(episodes):
    state = env.reset()
    done = False
    total_reward = 0  # Reset total reward


    while not done:
        action = select_action(Q, state, epsilon)
        next_state, reward, done = env.step(action)
        Q = q_update(Q, state, action, reward, next_state, alpha, gamma)
        state = next_state
        total_reward += reward
    reward_log.append(total_reward)  # Store episode reward
 return reward_log


rewards_eps_01 = train_agent(0.1)
rewards_eps_02 = train_agent(0.2)





In [None]:
import numpy as np
import matplotlib.pyplot as plt

def moving_average(data, window_size=50):
    return np.convolve(data, np.ones(window_size)/window_size, mode='valid')

# Assume these are your reward logs from training with different epsilons
# rewards_eps_01 = train_agent(0.1)
# rewards_eps_02 = train_agent(0.2)

# Compute moving averages
ma_rewards_01 = moving_average(rewards_eps_01, 50)
ma_rewards_02 = moving_average(rewards_eps_02, 50)

plt.figure(figsize=(20,6))
plt.plot(range(len(ma_rewards_01)), ma_rewards_01, label='ε=0.1')
plt.plot(range(len(ma_rewards_02)), ma_rewards_02, label='ε=0.2')

plt.xlabel('Episode')
plt.ylabel('Moving Average Reward (window=50)')
plt.title('Moving Average Reward vs Episodes for Different ε')
plt.legend()
plt.grid(True)
plt.show()



Task 3: Experience Replay Buffer


In [None]:
import numpy as np
import random

# Replay Buffer Implementation:

from collections import deque

class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        states = np.stack(states).astype(np.float32)
        actions = np.array(actions, dtype=np.int64) # Actions are usually integers
        rewards = np.array(rewards, dtype=np.float32)
        next_states = np.stack(next_states).astype(np.float32)
        dones = np.array(dones, dtype=np.bool_) # Dones are booleans

        return (states, actions, rewards, next_states, dones)

    def __len__(self):
        return len(self.buffer)

 #Create a ReplayBuffer instance with capacity 100

# 1. Create a ReplayBuffer instance with capacity 100

replay_buffer = ReplayBuffer(capacity=100)

# 2. Fill the buffer with 100 random transitions
for _ in range(100):
    state = np.random.randint(0, 10, size=(4,))        # Example state: 4D integer vector
    action = np.random.randint(0, 4)                   # Example action: integer 0–3
    reward = np.random.uniform(-1, 1)                  # Example reward: float between -1 and 1
    next_state = np.random.randint(0, 10, size=(4,))   # Example next state
    done = np.random.choice([True, False])             # Done flag
    replay_buffer.push(state, action, reward, next_state, done)

# 3. Sample a batch of 32 transitions
batch_size = 32
states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size)

# 4. Print shapes and data types
print("Sampled batch shapes and types:")
print("States:",states.shape, states.dtype)
print("Actions:", actions.shape, actions.dtype)
print("Rewards:", rewards.shape, rewards.dtype)
print("Next States:", next_states.shape, next_states.dtype)
print("Dones:", dones.shape, dones.dtype)
# Buffer enteries.
"""
def print_buffer(replay_buffer):
    for i, (state, action, reward, next_state, done) in enumerate(replay_buffer.buffer):
        print(f"Transition #{i+1}")
        print(f"  State      : {state}")
        print(f"  Action     : {action}")
        print(f"  Reward     : {reward}")
        print(f"  Next State : {next_state}")
        print(f"  Done       : {done}")
        print("-" * 40)

# Call it with your ReplayBuffer instance
print_buffer(replay_buffer)

"""


Task 4: Deep Q-Network with Target Copy


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class DQNPolicy(nn.Module):
    def __init__(self, obs_dim, n_actions):
        super(DQNPolicy, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, n_actions)
        )

    def forward(self, x):
        return self.net(x)

# DQNTarget is identical to DQNPolicy
class DQNTarget(nn.Module):
    def __init__(self, obs_dim, n_actions):
        super(DQNTarget, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, n_actions)
        )

    def forward(self, x):
        return self.net(x)
def update_target(policy_net, target_net):
    target_net.load_state_dict(policy_net.state_dict())


Task 5: Full DQN Training Loop on CartPole-v1


In [None]:
import gym
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt

# Hyperparameters
N = 1000              # Number of episodes
M = N // 2            # ε decay episodes
X = 10                # Target update frequency
gamma = 0.99          # Discount factor
batch_size = 64
learning_rate = 1e-3
buffer_capacity = 10000

# Initialize
env = gym.make("CartPole-v1",new_step_api=True)
obs_dim = env.observation_space.shape[0]
n_actions = env.action_space.n

policy_net = DQNPolicy(obs_dim, n_actions)
target_net = DQNTarget(obs_dim, n_actions)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)
loss_fn = nn.MSELoss()
replay_buffer = ReplayBuffer(buffer_capacity)

episode_rewards = []

# Training loop
epsilon_start = 1.0
epsilon_end = 0.01
epsilon_decay = (epsilon_start - epsilon_end) / M

for episode in range(1, N + 1):
    result = env.reset()
    state = result[0] if isinstance(result, tuple) else result

    total_reward = 0

    done = False
    while not done:
        epsilon = max(epsilon_end, epsilon_start - episode * epsilon_decay)
        if np.random.rand() < epsilon:
            action = env.action_space.sample()
        else:
            with torch.no_grad():
                state_tensor = torch.FloatTensor(state).unsqueeze(0)
                q_values = policy_net(state_tensor)
                action = q_values.argmax().item()

        next_state, reward, terminated, truncated,info = env.step(action)
        done = terminated or truncated
        replay_buffer.push(state, action, reward, next_state, done)
        state = next_state
        total_reward += reward

        if len(replay_buffer.buffer) >= batch_size:
            states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size)
            states = torch.FloatTensor(states)
            actions = torch.LongTensor(actions).unsqueeze(1)
            rewards = torch.FloatTensor(rewards).unsqueeze(1)
            next_states = torch.FloatTensor(next_states)
            dones = torch.FloatTensor(dones).unsqueeze(1)

            # Q(s, a)
            q_values = policy_net(states).gather(1, actions)

            # max_a' Q_target(s', a')
            with torch.no_grad():
                max_next_q_values = target_net(next_states).max(dim=1, keepdim=True)[0]
                target_q_values = rewards + gamma * max_next_q_values * (1 - dones)

            loss = loss_fn(q_values, target_q_values)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

    episode_rewards.append(total_reward)

    if episode % X == 0:
        target_net.load_state_dict(policy_net.state_dict())

    if episode % (N // 10) == 0:
        avg_reward = np.mean(episode_rewards[-(N // 10):])
        print(f"Episode {episode}, Avg reward over last {N // 10}: {avg_reward:.2f}")

# Plotting
window = N // 10
moving_avg = [np.mean(episode_rewards[max(0, i-window):i+1]) for i in range(N)]

plt.plot(range(N), episode_rewards, label='Reward')
plt.plot(range(N), moving_avg, label='Moving Avg', linewidth=2)
plt.xlabel("Episode")
plt.ylabel("Total Reward")
plt.title("DQN Training on CartPole")
plt.legend()
plt.grid(True)
plt.show()

# Final success rate
success_count = sum(r >= 195 for r in episode_rewards)
success_rate = 100 * success_count / N
print(f"Success Rate: {success_rate:.2f}%")
