In [None]:
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import gymnasium as gym

In [None]:
BUFFER_SIZE     = 10_000      # replay buffer capacity
BATCH_SIZE      = 64          # samples per learning step
GAMMA           = 0.99        # discount factor for future rewards
LR              = 1e-3        # learning rate for optimizer
TARGET_UPDATE   = 10          # how often (in episodes) to sync target network
EPS_START       = 1.0         # initial ε for ε-greedy
EPS_END         = 0.01        # final ε
EPS_DECAY       = 500         # decay rate of ε (in steps)

In [None]:
# replay buffer
# breaks up correlation between sequential samples
# smoothes out learning by sampling random batches
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, s, a, r, s_next, done):
        # store transition tuple
        self.buffer.append((s, a, r, s_next, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return (
            torch.FloatTensor(states),
            torch.LongTensor(actions),
            torch.FloatTensor(rewards),
            torch.FloatTensor(next_states),
            torch.FloatTensor(dones),
        )

    def __len__(self):
        return len(self.buffer)


In [None]:
class QNetwork(nn.Module):
    def __init__(self, state_dim, action_dim):
        super().__init__()
        self.fc1 = nn.Linear(state_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, action_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        output = self.fc3(x)
        return output

In [None]:
env         = gym.make("CartPole-v1")
state_dim   = env.observation_space.shape[0]
action_dim  = env.action_space.n

policy_net  = QNetwork(state_dim, action_dim)
target_net  = QNetwork(state_dim, action_dim)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer   = optim.Adam(policy_net.parameters(), lr=LR)
memory      = ReplayBuffer(BUFFER_SIZE)

In [None]:
steps_done = 0

def select_action(state):
    global steps_done
    eps = EPS_END + (EPS_START - EPS_END) * np.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1

    if random.random() < eps:
        return torch.tensor([[random.randrange(action_dim)]], dtype=torch.long)
    else:
        with torch.no_grad():
            return policy_net(state).max(1)[1].view(1, 1)


In [None]:
def optimize_model():
    if len(memory) < BATCH_SIZE:
        return

    states, actions, rewards, next_states, dones = memory.sample(BATCH_SIZE)

    # 1. Compute current Q(s,a) from policy_net
    q_values = policy_net(states).gather(1, actions.unsqueeze(1)).squeeze(1)

    # 2. Double DQN:
    #    a) action selection by policy_net
    next_actions = policy_net(next_states).max(1)[1].unsqueeze(1)
    #    b) value evaluation by target_net
    next_q_values = target_net(next_states).gather(1, next_actions).squeeze(1)

    # 3. Compute TD target: r + γ·Q_target(s', argmax_a Q_policy(s',a))
    expected_q = rewards + (GAMMA * next_q_values * (1 - dones))

    # 4. MSE loss & backprop
    loss = nn.MSELoss()(q_values, expected_q.detach())
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()


In [None]:
num_episodes = 3_000

for episode in range(1, num_episodes + 1):
    # 1) Unpack reset() into obs and info
    obs, info = env.reset()  

    # 2) Turn the numpy array `obs` into a batch-shaped torch FloatTensor
    state = torch.from_numpy(obs).float().unsqueeze(0)  # shape: [1, state_dim]

    done = False
    while not done:
        action = select_action(state)
        
        # 3) Take a step: unpack the new obs plus any extra flags/info
        next_obs, reward, terminated, truncated, info = env.step(action.item())
        done = terminated or truncated

        # 4) Convert next_obs into the same shape
        next_state = torch.from_numpy(next_obs).float().unsqueeze(0)

        
        memory.push(
            state.squeeze(0).numpy(),      # -> shape (state_dim,)
            action.item(),
            reward,
            next_state.squeeze(0).numpy(), # -> shape (state_dim,)
            done
        )
        state = next_state

        optimize_model()

    # 5) Periodically sync the target network
    if episode % TARGET_UPDATE == 0:
        target_net.load_state_dict(policy_net.state_dict())


In [None]:
def evaluate_policy(policy_net, device="cpu", num_episodes=5):
    """
    Run the trained policy_net for a number of episodes with human rendering enabled.

    Args:
        env          : a Gym environment created with render_mode="human", e.g.
                       env = gym.make("CartPole-v1", render_mode="human")
        policy_net   : your PyTorch Q-network (policy_net)
        device       : "cpu" or "cuda"
        num_episodes : how many full episodes to play

    Returns:
        List of total rewards per episode.
    """
    env = gym.make("CartPole-v1", render_mode='human')
    policy_net.to(device).eval()           # 1. Switch to eval mode (disable dropout/batchnorm)
    returns = []

    for ep in range(1, num_episodes + 1):
        obs, info = env.reset()            # 2. Gymnasium-style reset returns (obs, info)
        state = torch.from_numpy(obs)     \
                        .float()          \
                        .unsqueeze(0)     \
                        .to(device)      # 3. Shape [1, state_dim]
        ep_reward = 0.0
        done = False

        while not done:
            env.render()                   # 4. Render the current frame to the screen

            with torch.no_grad():
                # 5. Compute Q-values and pick the greedy action
                q_values = policy_net(state)
                action = q_values.argmax(dim=1).item()

            obs, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated

            next_state = torch.from_numpy(obs) \
                              .float()         \
                              .unsqueeze(0)    \
                              .to(device)
            state = next_state
            ep_reward += reward

        returns.append(ep_reward)
        print(f"Episode {ep:2d} ▶ Return: {ep_reward:.1f}")

    env.close()                            # 6. Clean up the render window
    policy_net.train()                     # 7. Back to train mode if you continue training
    avg_return = sum(returns) / len(returns)
    print(f"\nAverage return over {num_episodes} episodes: {avg_return:.2f}")
    return returns

In [None]:
evaluate_policy(policy_net, device="cuda", num_episodes=3)