**Deep Q Learning Project RL**

General Task Description:

-> Given a fixed neural network, you will train an agent which will be able to play cartpole V1 on different pole lengths, e.g. make a generalist.

1. IMPORT WHAT WE NEED

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
import gymnasium as gym
from collections import deque
import matplotlib.pyplot as plt
from test_script import QNetwork
from test_script import bar_plot, test_pole_length, test_script

2. THE BASE DEEP Q LEARNING 

In [2]:
#replay buffer
class ReplayBuffer:
    """
    Replay Buffer to store experience tuples for deep q learning.
    The replay buffer stores experiences from many episodes and randomly samples them during training.
    """
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)
    
    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        return random.sample(self.buffer, batch_size)
    
    def __len__(self):
        return len(self.buffer)


def select_action(state, policy_net, epsilon, action_dim):
    """
    Select action using epsilon-greedy policy - did it with epsilon-greedy because of Assignent 1
    """
    if random.random() < epsilon:
        return random.randrange(action_dim)
    else:
        with torch.no_grad():
            state_tensor = torch.FloatTensor(state).unsqueeze(0)
            q_values = policy_net(state_tensor)
            return q_values.argmax().item()


def deep_q_learning(epsilon, gamma, alpha, q_network, n_episodes, 
                    pole_lengths=None, env_name='CartPole-v1',
                    batch_size=64, buffer_capacity=50000, 
                    update_target_every=10, epsilon_decay=0.995, 
                    epsilon_min=0.01):
    """
    Deep q learning agent for CartPole-v1 environment with varying pole lengths.
    
    param: epsilon : float - initial exploration rate
    param: gamma : float - discount factor
    param: alpha : float - learning rate
    param: q_network : QNetwork or None - pre-initialized network or None to create new one
    param: n_episodes : int - number of training episodes
    param: pole_lengths : array-like or None - array of pole lengths to train on (default: linspace(0.4, 1.8, 30))
    param: env_name : str - gym environment name
    param: batch_size : int - batch size for training
    param: buffer_capacity : int - replay buffer capacity
    param: update_target_every : int - how often to update target network
    param: epsilon_decay : float - epsilon decay rate per episode
    param: epsilon_min : float - minimum epsilon value

    return: tuple : (policy_net, target_net, episode_returns)
        - trained networks and list of episode rewards
    """

    # initialization of environment
    env = gym.make(env_name)
    state_dim = env.observation_space.shape[0]
    action_dim = env.action_space.n

    # initialization of networks if not provided
    if q_network is None:
        policy_net = QNetwork(state_dim, action_dim)
        target_net = QNetwork(state_dim, action_dim)
        target_net.load_state_dict(policy_net.state_dict())
        target_net.eval()
    else:
        policy_net = q_network
        target_net = QNetwork(state_dim, action_dim)
        target_net.load_state_dict(policy_net.state_dict())
        target_net.eval()

    # initialization of optimizer
    optimizer = optim.Adam(policy_net.parameters(), lr=alpha)

    # initialization of replay buffer
    replay_buffer = ReplayBuffer(buffer_capacity)
    
    # pole lengths for training
    if pole_lengths is None:
        pole_lengths = np.linspace(0.4, 1.8, 30)

    # storing episode returns for plotting
    episode_returns = []
    
    # copy of current epsilon value for decay
    epsi = epsilon
    
    # training loop
    for episode in range(n_episodes):
        # randomly select pole length for this episode (we need to figure an experimental setup)
        pole_length = np.random.choice(pole_lengths)
        env.unwrapped.length = pole_length
        
        # reset environment
        state = env.reset()[0]
        episode_reward = 0.0
        
        # epsilon decay
        if epsi > epsilon_min:
            epsi = max(epsilon_min, epsi * epsilon_decay)
        
        # episode loop (1 episode = 1 pole length)
        done = False
        
        while not done:
            # select action
            action = select_action(state, policy_net, epsi, action_dim)
            
            # take step
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated

            # store transition in replay buffer
            replay_buffer.push(state, action, reward, next_state, float(done))

            # deep q learning update (using mini-batch from replay buffer)
            if len(replay_buffer) >= batch_size:
                # sample batch from replay buffer
                batch = replay_buffer.sample(batch_size)
                states, actions, rewards, next_states, dones = zip(*batch)
                
                # convert to tensors 
                states_t = torch.FloatTensor(states)
                actions_t = torch.LongTensor(actions).unsqueeze(1)
                rewards_t = torch.FloatTensor(rewards).unsqueeze(1)
                next_states_t = torch.FloatTensor(next_states)
                dones_t = torch.FloatTensor(dones).unsqueeze(1)

                #get current q values
                current_q = policy_net(states_t).gather(1, actions_t)
                
                # target values
                with torch.no_grad():
                    next_max = target_net(next_states_t).max(1)[0].unsqueeze(1)
                    td_target = rewards_t + gamma * next_max * (1 - dones_t)
                
                # loss calc
                loss = nn.MSELoss()(current_q, td_target)
                
                # backprop and optimize + gradient clipping
                optimizer.zero_grad()
                loss.backward()
                torch.nn.utils.clip_grad_norm_(policy_net.parameters(), 1.0)
                optimizer.step()
            
            episode_reward += reward
            state = next_state
        
        
        #update target network periodically
        if episode % update_target_every == 0:
            target_net.load_state_dict(policy_net.state_dict())

        #store episode reward
        episode_returns.append(episode_reward)
        
        #only for seeing the progress
        if episode % 100 == 0:
            avg_reward = np.mean(episode_returns[-100:]) if len(episode_returns) >= 100 else np.mean(episode_returns)
            print(f"Episode {episode}/{n_episodes} | "
                  f"Avg Reward: {avg_reward:.1f} | "
                  f"Epsilon: {epsi:.3f}")
    
    env.close()
    
    return (policy_net, target_net, episode_returns)

In [3]:
#Test Replay Buffer
# --- IGNORE ---
rb=ReplayBuffer(3)
rb.push([0,0,0,0], 1, 1.0, [1,1,1,1], False)
print(len(rb), rb.sample(1)[0])

1 ([0, 0, 0, 0], 1, 1.0, [1, 1, 1, 1], False)


In [5]:
#Test the deep q learning function
# --- IGNORE ---
seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)

# --- training test ---
policy_net, target_net, returns = deep_q_learning(
    epsilon=1.0,
    gamma=0.99,
    alpha=1e-3,
    q_network=None,           # create fresh nets
    n_episodes=400, 
    pole_lengths=np.linspace(0.4, 1.2, 5),
    env_name='CartPole-v1',
    batch_size=32,
    buffer_capacity=10000,
    update_target_every=5,
    epsilon_decay=0.98,
    epsilon_min=0.05
)

print("----finished training----")
print("Last 3 episode returns:", returns[-3:] if len(returns) >= 3 else returns)

Episode 0/400 | Avg Reward: 12.0 | Epsilon: 0.980
Episode 100/400 | Avg Reward: 126.2 | Epsilon: 0.130
Episode 200/400 | Avg Reward: 272.3 | Epsilon: 0.050
Episode 300/400 | Avg Reward: 252.0 | Epsilon: 0.050
----finished training----
Last 3 episode returns: [500.0, 214.0, 500.0]


2.1 Approach 1: Adaptive Reward 

Smth like: reward = reward + abs((1 - abs(angle)/12 degrees)) + abs((1 - abs(position)/2.4)) - 0.2*abs(angular velocity) - 0.2*abs(cart velocity)

2.2 Approach 2: Scrappy Adversial - hitting the weak spots

2.3 Approach 3: ????

3. MAIN TRAINING FUNCTION

4. TESTING BUT JUST FOR US - not using test_script.py

5. MAIN EXECUTION WITH PLOTTING