# DQN Implmentation

### Installations

In [12]:
# Install environment and agent
!pip install highway-env
!pip install --upgrade sympy torch

2024-12-17 16:24:05.748 python[2021:12877892] +[IMKClient subclass]: chose IMKClient_Legacy
2024-12-17 16:24:05.748 python[2021:12877892] +[IMKInputSession subclass]: chose IMKInputSession_Legacy



### Learning using existing model

The following is the pesudocode that will be followed when creating the DQN

Useful: https://www.youtube.com/watch?v=RVMpm86equc&list=PL58zEckBH8fCMIVzQCRSZVPUp3ZAVagWi&index=2

https://github.com/saashanair/rl-series/tree/master/dqn

https://github.com/johnnycode8/gym_solutions/blob/main/frozen_lake_dql.py

<img src="DQN.png" style="width: 900px;" align="left"/>




In [10]:
import gymnasium as gym
import highway_env
import numpy as np
import random
import torch
from torch import nn
import torch.nn.functional as F
import torch.optim as optim
import torch.distributions as dist

# Define model
class MLPNetwork(nn.Module):
    def __init__(self, in_states, h1_nodes, out_actions):
        super(MLPNetwork, self).__init__()

        # Define network layers
        self.fc1 = nn.Linear(in_states, h1_nodes)   # first fully connected layer
        self.out = nn.Linear(h1_nodes, out_actions) # output layer
        self.out2 = nn.Linear(out_actions, 1) # output layer
        self.softmax = nn.Softmax(dim=1)
        
        self.optimizer = optim.Adam(self.parameters(), lr=1e-3)

    def forward(self, x):
        x = F.relu(self.fc1(x)) # Apply rectified linear unit (ReLU) activation
        x = self.out(x)         
        x = self.out2(x)         
        x = self.softmax(x)
        return x
    
    def save_model(self, filename):
        filename = filename + ".pth"
        torch.save(self.state_dict(), filename)
    
    def load_model(self, filename, device):
        filename = filename + ".pth"
        self.load_state_dict(torch.load(filename, map_location=device))
        
class CNN(nn.Module):
    def __init__(self, input_shape, num_actions):
        super(CNN, self).__init__()
        # greyscale Image is(stack,height,width)
        stack, height, width = input_shape
        self.conv = nn.Sequential(
            # first Conv layer
            nn.Conv2d(stack,32,kernel_size=3),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2,stride=2),

            # second Conv layer
            nn.Conv2d(32,64,kernel_size=3),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2,stride=2),
        )
        
        # This is for finding the size to dense more robust compared to decision manually
        with torch.no_grad():
                # Torch uses(1,channels,height,width)
                test = torch.zeros(1, stack, height, width)
                find_conv_size = self.conv(test)
                conv_size = find_conv_size.numel()
        self.fc = nn.Linear(conv_size,num_actions)
        self.softmax = nn.Softmax(dim=0)
        self.optimizer = optim.Adam(self.parameters(), lr=1e-3)

    def forward(self,x):
        x = self.conv(x)
        x = torch.flatten(x, start_dim=1)
        x = self.fc(x)
        x = x.unsqueeze(2)
        x = self.softmax(x)
        return x

    
    def save_model(self, filename):
        filename = filename + ".pth"
        torch.save(self.state_dict(), filename)

    def load_model(self, filename, device):
        filename = filename + ".pth"
        self.load_state_dict(torch.load(filename, map_location=device))
    
# Define memory for Experience Replay
class ReplayMemory():
    def __init__(self, capacity, device):
        self.capacity = capacity
        self.buffer_state = []
        self.buffer_action = []
        self.buffer_next_state = []
        self.buffer_reward = []
        self.buffer_done = []
        self.index = 0
        self.memory = []
        self.device = device
    
    def store(self, state, action, next_state, reward, done):
        if len(self.buffer_state) < self.capacity:
            self.buffer_state.append(state)
            self.buffer_action.append(action)
            self.buffer_next_state.append(next_state)
            self.buffer_reward.append(reward)
            self.buffer_done.append(done)
        else:
            self.buffer_state[self.index] = state
            self.buffer_action[self.index] = action
            self.buffer_next_state[self.index] = next_state
            self.buffer_reward[self.index] = reward
            self.buffer_done[self.index] = done

        self.index = (self.index+1)%self.capacity # for circular memory
        # TODO: prioritize some memories

    def sample(self, batch_size):
        if batch_size >  len(self.buffer_state):
            batch_size = len(self.buffer_state)
        
        indices_to_sample = random.sample(range(len(self.buffer_state)), batch_size)
        
        states = torch.from_numpy(np.array(self.buffer_state)[indices_to_sample]).float().to(self.device)
        actions = torch.from_numpy(np.array(self.buffer_action)[indices_to_sample]).to(self.device)
        next_states = torch.from_numpy(np.array(self.buffer_next_state)[indices_to_sample]).float().to(self.device)
        rewards = torch.from_numpy(np.array(self.buffer_reward)[indices_to_sample]).float().to(self.device)
        done = torch.from_numpy(np.array(self.buffer_done)[indices_to_sample]).to(self.device)

        return states, actions, next_states, rewards, done

    def __len__(self):
        return len(self.memory)

class DQNAgent:
    def __init__(self, epsilon, discount, batch_size, device, memory_capacity):
        # weights
        self.w1 = []
        
        self.q_net = {}
        self.w2 = []
        self.q_target_net = self.q_net # the same
        self.policy = {}

        self.epsilon = epsilon
        self.discount = discount
        self.batch_size = batch_size
        self.device = device
        
        # the weights of q1 and q2 should be the same
        self.memory_capacity = memory_capacity
        self.memory = {} # this is the memory buffer -> setting a limit
        
    def create_network(self, env, policy):
        if policy == "CnnPolicy":
            self.create_CNN(env)
        
        if policy == "MlpPolicy":
            self.create_MLP_Network(env)
    
    def create_CNN(self, env):
        self.num_states = env.observation_space.shape
        self.num_actions = env.action_space.n

        self.q_net = CNN(self.num_states, self.num_actions).to(self.device)
        self.q_target_net = CNN(self.num_states, self.num_actions).to(self.device)
        self.update_target_network()
    
    def create_MLP_Network(self, env):
        # the lanes
        self.num_states = env.observation_space.shape[1]
        self.num_actions = env.action_space.n

        self.q_net = MLPNetwork(self.num_states, self.num_states, self.num_actions).to(self.device)
        self.q_target_net = MLPNetwork(self.num_states, self.num_states, self.num_actions).to(self.device)
        self.update_target_network()
    
    def update_target_network(self):
        # make the weights and biases the same
        self.q_target_net.load_state_dict(self.q_net.state_dict())
    
    def learn(self, env, policy):
        self.create_network(env, policy)
        
        self.memory = ReplayMemory(capacity=self.memory_capacity, device=self.device)

        env.reset()
        num_episodes = 1
        prefill_num = 2

        self.prefill_memory(env, prefill_num)

        print("training...")
        
        for _ in range(num_episodes):
            state = env.reset()[0]
            
            # True when agent reaches the end states (colliding or passing the time)
            done = False 
            # True when agent takes more than 100 actions    
            truncated = False  
            # while(not done and not truncated):
            for i in range(1):
                # choose best action
                action = self.get_action(state)
                next_state, reward, done, truncated, info = env.step(action)
                self.memory.store(state=state, action=action, next_state=next_state, reward=reward, done=done)
                self.experience_replay()
                
                self.update_target_network()
                
                state = next_state
                
                # TODO: add metrics here
        
    # either the policies are able to get miltuple actions and into the NN or the input of NN should be able to handle all of these
    # output (one of): {0: 'LANE_LEFT', 1: 'IDLE', 2: 'LANE_RIGHT', 3: 'FASTER', 4: 'SLOWER'}
    def get_action(self, state):
        if random.random() <= self.epsilon: # amount of exploration reduces with the epsilon value
            return random.randrange(self.num_actions)
        
        state = torch.tensor(np.array([state]), dtype=torch.float32).to(self.device)
        actions = self.q_net(state)
        return torch.argmax(actions).item()             

    def experience_replay(self):
        states, actions, next_states, rewards, dones = self.memory.sample(self.batch_size)
        
        q_pred = self.q_net(states)
        # q value of the action taken
        q_pred = q_pred.gather(1, actions.view(-1, 1, 1)) 
        q_pred = q_pred.squeeze(1)
        
        q_target = self.q_target_net(next_states)
        q_target = q_target.gather(1, actions.view(-1, 1, 1))
        q_target = q_target.squeeze(1)
        
        # setting Q(s',a') to 0 when the current state is a terminal state
        q_target[dones] = 0.0
        
        y_j = rewards + (self.discount * q_target)
        y_j = y_j.mean(dim=0, keepdim=True)
        y_j = y_j.view(-1, 1)
        
        # calculate the loss as the mean-squared error of yj and qpred
        self.q_net.optimizer.zero_grad()
        loss = F.mse_loss(y_j, q_pred).mean()
        loss.backward()
        self.q_net.optimizer.step()
        
    def prefill_memory(self, env, prefill_num):
        print("prefilling memory...")
        for _ in range(prefill_num):
            done = False
            truncated = False
            state = env.reset()[0]

            while not done and not truncated:
                action = env.action_space.sample()
                next_state, reward, done, truncated, info = env.step(action)
                self.memory.store(state=state, 
                                    action=action, 
                                    next_state=next_state, 
                                    reward=reward, 
                                    done=done)
    def save_model(self, filename):
        self.q_net.save_model(filename)
        
    def load_model(self, env, filename):
        self.create_CNN(env)
        self.q_net.load_model(filename, self.device)
        # turn into evaluation model for test
        self.q_net.eval()

In [8]:
config = {}
policy = "CnnPolicy"
# policy = "MlpPolicy"

if policy == "CnnPolicy":
    config={
        "observation": {
            "type": "GrayscaleObservation",
            "observation_shape": (128, 64),
            "stack_size": 4,
            "weights": [0.2989, 0.5870, 0.1140],  # weights for RGB conversion keep this conversion this is in the highway env page
            "scaling": 1.75,
        },
    }
else:
    config = {
        "observation": {
            "type": "Kinematics",
            "vehicles_count": 5,
            "features": ["presence", "x", "y", "vx", "vy", "cos_h", "sin_h"],
            "features_range": {
                "x": [-100, 100],
                "y": [-100, 100],
                "vx": [-20, 20],
                "vy": [-20, 20]
            },
            "absolute": False,
            "order": "sorted"
        }
    }

In [10]:

dqn_agent = DQNAgent(epsilon=0.2, discount=1e-2, batch_size=10, device=torch.device("mps"), memory_capacity=5000)
env = gym.make('highway-fast-v0', render_mode='rgb_array', config=config)
dqn_agent.learn(env, policy=policy)

prefilling memory...
training...


In [327]:
dqn_agent.save_model("highway_dqn_model1")

### Test the Model

In [11]:
import gymnasium as gym
import highway_env
import random
import torch
env = gym.make('highway-v0', render_mode='rgb_array', config=config)
dqn_agent_test = DQNAgent(epsilon=0.2, discount=1e-2, batch_size=10, device=torch.device("mps"), memory_capacity=5000)
dqn_agent_test.load_model(env, "highway_dqn_model")

for i in range(10):
    state = env.reset()[0]  
    done = False      
    truncated = False 

    # Agent navigates map until it falls into a hole (terminated), reaches goal (terminated), or has taken 200 actions (truncated).
    while(not done and not truncated):  
        # Select best action   
        action = dqn_agent_test.get_action(state)
        next_state, reward, done, truncated, info = env.step(action)
        env.render()

  self.load_state_dict(torch.load(filename, map_location=device))


AttributeError: 'DQNAgent' object has no attribute 'epslon'