In [27]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import gymnasium as gym

import random
from collections import deque

device = torch.device(
    "cuda" if torch.cuda.is_available() else
    "mps" if torch.backends.mps.is_available() else
    "cpu"
)

env = gym.make("Acrobot-v1", render_mode = "human")

""" theta_1 = absolute angle of first link
    theta_2 = angle of second link in relation to first link
    state: [cos(theta_1), sin(theta_1), cos(theta_2), sin(theta_2), omega of theta_1, omega of theta_2]
    action: [left, nothing, right]
    left is minus, right is plus 

    Constant -1 reward at every step; 0 at termination
    Termination condition: Free end reaches target height (-cos(theta1) - cos(theta2 + theta1) > 1.0)
    Truncation condition: Episode length > 500
    
    Job of agent is to reach target height in as little steps as possible.
"""

def generate_episodes(n_ep, env):
    
    for _ in range(n_ep):
        observation, info = env.reset()
        total_reward = 0

        while True:
            action = env.action_space.sample()
            observation, reward, terminated, truncated, info = env.step(action)
            
            total_reward += reward
            # print(total_reward)
            
            if terminated or total_reward < -99:
                break

In [33]:
def epsilon_greedy(env, Q_net, S, epsilon):
    r = torch.rand(1)
    if r > epsilon:
        return torch.argmax(Q_net.forward(S))
    else:
        return torch.tensor([[env.action_space.sample()]])

class DQN(nn.Module):
    def __init__(self, in_dim, out_dim) -> None:
        super().__init__()
        # note to self: nn.Linear() represents the transformation, not the matrices themselves.
        self.il = nn.Linear(in_dim, 200)
        self.hl = nn.Linear(200, 200)
        self.ol = nn.Linear(200, out_dim)
        self.relu = nn.ReLU()
        
    def forward(self, x):
        x = F.relu(self.il(x))
        x = F.relu(self.hl(x))
        return self.ol(x)
        
class Replay_Memory():
    def __init__(self, cap):
        self.memory = deque([], maxlen = cap)

    def push(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        mem_sample = random.sample(self.memory, batch_size)
    
        states, actions, rewards, next_states, dones = zip(*mem_sample)
        
        return (torch.stack(states), torch.stack(actions), torch.stack(rewards), 
                torch.stack(next_states), torch.stack(dones))

    def __len__(self):
        return len(self.memory)

def DQL(_env, n_ep):
    state_dim = _env.observation_space.shape[0]
    action_dim = _env.action_space.n
    buffer_cap = 100000
    
    replay_buffer = Replay_Memory(buffer_cap)
    behaviour_net = DQN(state_dim, action_dim).to(device)
    target_net = DQN(state_dim, action_dim).to(device)

    cumulative_reward_list = []
    
    target_net.load_state_dict(behaviour_net.state_dict())
    target_net.eval()
    
    optimizer = torch.optim.Adam(behaviour_net.parameters(), lr=1e-3)
    loss_func = nn.MSELoss();

    n_steps = 0
    decay_min = 0.01
    decay_const = 0.001
    successes = 0
    _gamma = 0.99

    for i in range(n_ep):
        S, info = _env.reset()
        S = torch.tensor(S, dtype=torch.float32, device=device)
        done = False
        while not done:
            epsilon = decay_min + (1 - decay_min) * np.exp(-decay_const * i)
            
            A = int(epsilon_greedy(env, behaviour_net, S, epsilon))
            S_prime, R, terminated, truncated, info = _env.step(A)
            done = terminated or truncated
            n_steps += 1

            S_prime = torch.tensor(S_prime, dtype=torch.float32, device=device)
            A = torch.tensor([A], dtype=torch.int64, device=device) # Use int64 for indexing
            R = torch.tensor([R], dtype=torch.float32, device=device)
            done = torch.tensor([done], dtype=torch.float32, device=device)
            
            replay_buffer.push(S, A, R, S_prime, done)

            S = S_prime

            if terminated:
                print("Episode", i + 1)
                print("success!")
                
            if len(replay_buffer) > 64:
                # sample from replay buffer
                state_batch, action_batch, reward_batch, next_state_batch, done_batch = replay_buffer.sample(64)
                # feed these values to Q-network, get the Q-values
                
                q_values = behaviour_net(state_batch)
                # gather values from q_values with the index being action_batch values
                specific_qs = q_values.gather(1, action_batch)

                # get max q-value from target net
                with torch.no_grad():
                    specific_target_qs = target_net(next_state_batch).max(1)[0].unsqueeze(1)

                # calculate target q-value
                target_q_values = reward_batch + (_gamma * specific_target_qs * (1 - done_batch))            

                #loss
                loss = loss_func(specific_qs, target_q_values)

                #backprop
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

            if (n_steps % 100) == 0:
                target_net.load_state_dict(behaviour_net.state_dict())
                
    return behaviour_net

In [34]:
env = gym.make("Acrobot-v1")

Q_net = DQL(env, 100)
env.close()

In [54]:
# saving the neural net

# torch.save(Q_net.state_dict(), "models/acrobot_DQN_100_iters")

In [57]:
from gymnasium.wrappers import RecordVideo

def greedy(Q_net, S):
    return torch.argmax(Q_net.forward(S))
    
def generate_episode(policy, _env, Q_net):
    observation, info = _env.reset()
    observation = torch.tensor(observation, dtype=torch.float32, device=device)

    while True:
        action = int(greedy(Q_net, observation))
        observation, reward, terminated, truncated, info = _env.step(action)
        observation = torch.tensor(observation, dtype=torch.float32, device=device)
        
        if terminated or truncated or reward < -100:
            break

state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

Q_net_load = DQN(state_dim, action_dim).to(device)
Q_net_load.load_state_dict(torch.load("models/acrobot_DQN_100_iters", weights_only=True))
Q_net_load.eval()

env = gym.make("Acrobot-v1", render_mode="rgb_array")
observation, info = env.reset(seed=42)

env = RecordVideo(env, video_folder="./videos")

for _ in range(1):
    generate_episode(greedy, env, Q_net_load)
    
env.close()