## Initializing the environment

In [1]:
import gymnasium as gym
import numpy as np
import pygame

# Initialize the environment
env = gym.make("LunarLander-v3", render_mode="human") 
env.reset(seed=42)

# play one complete episode
while True:
    action = env.action_space.sample()  # Sample a random action
    observation, reward, terminated, truncated, info = env.step(action)  # Take the action in the environment

    if terminated or truncated:
        print("Episode finished")
        break

env.close()  # Close the environment

  from pkg_resources import resource_stream, resource_exists


Episode finished


## Safe Agent (keep the lander from touching the ground)

In [2]:
class SafeAgent:
    def act(self, observation):
        # minimum height
        MIN_HEIGHT = 1
        
        # if the lander is too low, apply upward force
        if observation[1] < MIN_HEIGHT:
            return 2
        else:
            return 0
        
def play_episode(agent, env):
    observation, info = env.reset(seed=42)
    done = False
    total_reward = 0

    while not done:
        action = agent.act(observation)
        observation, reward, terminated, truncated, info = env.step(action)
        total_reward += reward
        done = terminated or truncated

    return total_reward

env = gym.make("LunarLander-v3", render_mode="human")
agent = SafeAgent()
total_reward = play_episode(agent, env)
print(f"Total reward: {total_reward}")
env.close()  # Close the environment

Total reward: -260.82875108605117


## Stable Agent (Keeps the lander stable in air)

In [3]:
class StableAgent:
    def act(self, observation):
        """
            It will operate via the following rules:

            1. If below height of 1: action = 2 (main engine)
            2. If angle is above π/50: action = 1 (fire right engine)
            3. If angle is above π/50: action = 1 (fire left engine)
            4. If x distance is above 0.4: action = 3 (fire left engine)
            5. If x distance is below -0.4: action = 1 (fire left engine)
            6. If below height of 1.5: action = 2 (main engine)
            7. Else: action = 0 (do nothing)
        """
        MIN_HEIGHT = 1
        MAX_ANGLE = 3.14 / 50
        MAX_X_DISTANCE = 0.4

        if observation[1] < MIN_HEIGHT:
            return 2
        elif observation[6] > MAX_ANGLE:
            return 1
        elif observation[6] < -MAX_ANGLE:
            return 3
        elif observation[0] > MAX_X_DISTANCE:
            return 3
        elif observation[0] < -MAX_X_DISTANCE:
            return 1
        elif observation[1] < 1.5:
            return 2
        else:
            return 0
        


env = gym.make("LunarLander-v3", render_mode="human")
agent = StableAgent()
total_reward = play_episode(agent, env)
print(f"Total reward: {total_reward}")
env.close()  # Close the environment


Total reward: -549.4518126418703


## Deep lerning method to land the lunar lander

In [4]:
# DQN algorithm
import torch

class DQN(torch.nn.Module):
    def __init__(self, state_size, action_size, hidden_size=64):
        '''
            Architecture of the DQN model:
            1. Input layer
            2. Hidden layer
            3. Output layer
        '''
        super(DQN, self).__init__()
        self.layer1 = torch.nn.Linear(state_size, hidden_size)
        self.layer2 = torch.nn.Linear(hidden_size, hidden_size)
        self.layer3 = torch.nn.Linear(hidden_size, action_size)
        
    def forward(self, state):
        x = torch.relu(self.layer1(state))
        x = torch.relu(self.layer2(x))
        return self.layer3(x)
    

        

## Replay Buffer

In [5]:
import numpy as np
import random
from collections import deque

class ReplayBuffer:
    def __init__(self, buffer_size=10000):
        self.buffer = deque(maxlen=buffer_size)
        
    def push(self, state, action, reward, next_state, done):
        """
        Push a transition into the replay buffer.
        
        :param state: The current state
        :param action: The action taken
        :param reward: The reward received
        :param next_state: The next state after taking the action
        :param done: Whether the episode has ended
        """
        self.buffer.append((state, action, reward, next_state, done))
        
    def sample(self, batch_size):
        """
        Sample a batch of transitions from the replay buffer.
        
        :param batch_size: The number of transitions to sample
        :return: tuple of numpy.ndarray - A batch of transitions
        """
        states, actions, rewards, next_states, dones = zip(*random.sample(self.buffer, batch_size))
        return np.stack(states), actions, rewards, np.stack(next_states), dones
    
    def __len__(self):
        """
        Get the current size of the replay buffer.
        
        :return: The number of transitions in the buffer
        """
        return len(self.buffer)


## Define the DQN agent

In [6]:
class DQNAgent:
    def __init__(self, state_size=8, action_size=4, hidden_size=64, learning_rate=1e-3, gamma=0.99, buffer_size=10000, batch_size=64):
        """
        Initialize the DQN agent with the necessary parameters.
        Args:
            state_size (int): The size of the state space.
            action_size (int): The size of the action space.
            hidden_size (int): The size of the hidden layer in the DQN model.
            learning_rate (float): The learning rate for the optimizer.
            gamma (float): The discount factor for future rewards.
            buffer_size (int): The size of the replay buffer.
            batch_size (int): The size of the batch for training.
        """
        
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        
        self.state_size = state_size
        self.action_size = action_size
        self.hidden_size = hidden_size
        self.learning_rate = learning_rate
        self.gamma = gamma
        self.buffer_size = buffer_size
        self.batch_size = batch_size
        
        # define the target and online DQN networks
        self.target_network = DQN(state_size, action_size, hidden_size).to(self.device)
        self.q_network = DQN(state_size, action_size, hidden_size).to(self.device)
        
        # Set weights of target network to be the same as those of the q network
        self.target_network.load_state_dict(self.q_network.state_dict())
        
        self.target_network.eval()

        self.optimizer = torch.optim.Adam(self.q_network.parameters(), lr=learning_rate)
        
        self.memory = ReplayBuffer(buffer_size)
        
    def step(self, state, action, reward, next_state, done):
        """
        Store the transition in the replay buffer.
        
        :param state: The current state
        :param action: The action taken
        :param reward: The reward received
        :param next_state: The next state after taking the action
        :param done: Whether the episode has ended
        """
        self.memory.push(state, action, reward, next_state, done)
        
        # if the buffer has enough samples, update the model
        if len(self.memory) > self.batch_size:
            self.update_model()
            
    def act(self, state, epsilon):
        '''
            Choose an action based on the current state and the epsilon-greedy policy.
            :param state: The current state
            :param epsilon: The probability of choosing a random action
        '''
        
        if random.random() < epsilon:
            return random.randint(0, self.action_size - 1)      
        
        else:
            # convert state to tensor and pass it through the q network
            state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
            self.q_network.eval()
            
            with torch.no_grad():
                action_values = self.q_network(state_tensor)
            
            self.q_network.train()
            # get the action with the highest value    
            return np.argmax(action_values.cpu().data.numpy())
        
    def update_model(self):
        """
        Update the Q-network using a batch of transitions from the replay buffer.
        """
        
        # sample the batch from the replay buffer
        states, actions, rewards, next_states, dones = self.memory.sample(self.batch_size)
        
        # convert numpy arrays to tensors
        states = torch.FloatTensor(states).to(self.device)
        actions = torch.LongTensor(actions).to(self.device)
        rewards = torch.FloatTensor(rewards).to(self.device)
        next_states = torch.FloatTensor(next_states).to(self.device)
        dones = torch.FloatTensor(dones).to(self.device)
        
        # compute the Q-values for the current states - forward pass through the q network
        q_values = self.q_network(states).gather(1, actions.unsqueeze(1)).squeeze(1)
        
        # compute the Q-values for the next states - forward pass through the target network
        next_q_values = self.target_network(next_states).max(1)[0]
        
        # compute the target Q-values
        target_q_values = rewards + (1 - dones) * self.gamma * next_q_values
        
        # compute the loss
        loss = torch.nn.functional.mse_loss(q_values, target_q_values.detach())
        
        self.optimizer.zero_grad()  # zero the gradients
        
        loss.backward()
        
        self.optimizer.step()
        
        
    def update_target_network(self):
        """
        Update the target network by copying the weights from the Q-network.
        """
        self.target_network.load_state_dict(self.q_network.state_dict())

## Training the agent

In [8]:
def train_agent(agent, env, n_episodes=2000, epsilon_start=1.0, epsilon_end=0.01, epsilon_decay=0.995, target_update_freq=10):
    """
    Train the DQN agent in the LunarLander environment.
    
    :param agent: The DQN agent
    :param env: The LunarLander environment
    :param n_episodes: The number of episodes to train for
    :param epsilon_start: The initial value of epsilon for the epsilon-greedy policy
    :param epsilon_end: The final value of epsilon for the epsilon-greedy policy
    :param epsilon_decay: The decay rate for epsilon
    :param target_update_freq: The frequency of updating the target network
    """
    
    scores = []
    scores_window = deque(maxlen=100)
    epsilon = epsilon_start
    
    for episode in range(n_episodes):
        state, _ = env.reset()
        done = False
        total_reward = 0
        
        while not done:
            action = agent.act(state, epsilon)
            # take the action in the environment
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            
            agent.step(state, action, reward, next_state, done)
            
            state = next_state
            total_reward += reward
            
        scores.append(total_reward)
        scores_window.append(total_reward)
        epsilon = max(epsilon_end, epsilon * epsilon_decay)
        
        # print the progress
        if episode % 100 == 0:
            print(f"Episode {episode}, Average Score: {np.mean(scores_window):.2f}, Epsilon: {epsilon:.2f}")

        # update the target network
        if episode % target_update_freq == 0:
            agent.update_target_network()
            
        # stop training if the average score is above a threshold
        if np.mean(scores_window) >= 200:
            print(f"Environment solved in {episode} episodes!")
            break
    return scores

env = gym.make("LunarLander-v3", render_mode="human")
agent = DQNAgent(state_size=8, action_size=4)
# scores = train_agent(agent, env)

# load the trained model
agent.q_network.load_state_dict(torch.load("q_network.pth"))
agent.target_network.load_state_dict(torch.load("target_network.pth"))
agent.q_network.eval()

DQN(
  (layer1): Linear(in_features=8, out_features=64, bias=True)
  (layer2): Linear(in_features=64, out_features=64, bias=True)
  (layer3): Linear(in_features=64, out_features=4, bias=True)
)

In [None]:
# save the models
torch.save(agent.q_network.state_dict(), "q_network.pth")
torch.save(agent.target_network.state_dict(), "target_network.pth")



## Test the model 

In [11]:
env = gym.make("LunarLander-v3", render_mode="human")

def play_DQN_episode(env, agent):
    """
    Play a single episode using the trained DQN agent.
    
    :param env: The LunarLander environment
    :param agent: The trained DQN agent
    """
    state, _ = env.reset()
    done = False
    total_reward = 0
    
    while not done:
        action = agent.act(state, 0)  # use epsilon=0 for testing
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        
        state = next_state
        total_reward += reward
        
        env.render()  # render the environment
        
    return total_reward

total_reward = play_DQN_episode(env, agent)
print(f"Total reward: {total_reward}")
env.close()  # Close the environment

KeyboardInterrupt: 

## Double DQN model

In [None]:
class DDQNAgent:
    def __init__(self, state_size=8, action_size=4, hidden_size=64, learning_rate=1e-3, gamma=0.99, buffer_size=10000, batch_size=64):
        """
        Initialize the DQN agent with the necessary parameters.
        Args:
            state_size (int): The size of the state space.
            action_size (int): The size of the action space.
            hidden_size (int): The size of the hidden layer in the DQN model.
            learning_rate (float): The learning rate for the optimizer.
            gamma (float): The discount factor for future rewards.
            buffer_size (int): The size of the replay buffer.
            batch_size (int): The size of the batch for training.
        """
        
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        
        self.state_size = state_size
        self.action_size = action_size
        self.hidden_size = hidden_size
        self.learning_rate = learning_rate
        self.gamma = gamma
        self.buffer_size = buffer_size
        self.batch_size = batch_size
        
        # define the target and online DQN networks
        self.target_network = DQN(state_size, action_size, hidden_size).to(self.device)
        self.q_network = DQN(state_size, action_size, hidden_size).to(self.device)
        
        # Set weights of target network to be the same as those of the q network
        self.target_network.load_state_dict(self.q_network.state_dict())
        
        self.target_network.eval()

        self.optimizer = torch.optim.Adam(self.q_network.parameters(), lr=learning_rate)
        
        self.memory = ReplayBuffer(buffer_size)
        
    def step(self, state, action, reward, next_state, done):
        """
        Store the transition in the replay buffer.
        
        :param state: The current state
        :param action: The action taken
        :param reward: The reward received
        :param next_state: The next state after taking the action
        :param done: Whether the episode has ended
        """
        self.memory.push(state, action, reward, next_state, done)
        
        # if the buffer has enough samples, update the model
        if len(self.memory) > self.batch_size:
            self.update_model()
            
    def act(self, state, epsilon):
        '''
            Choose an action based on the current state and the epsilon-greedy policy.
            :param state: The current state
            :param epsilon: The probability of choosing a random action
        '''
        
        if random.random() < epsilon:
            return random.randint(0, self.action_size - 1)      
        
        else:
            # convert state to tensor and pass it through the q network
            state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
            self.q_network.eval()
            
            with torch.no_grad():
                action_values = self.q_network(state_tensor)
            
            self.q_network.train()
            # get the action with the highest value    
            return np.argmax(action_values.cpu().data.numpy())
        
    def update_model(self):
        """
        Update the Q-network using a batch of transitions from the replay buffer.
        """
        
        # sample the batch from the replay buffer
        states, actions, rewards, next_states, dones = self.memory.sample(self.batch_size)
        
        # convert numpy arrays to tensors
        states = torch.FloatTensor(states).to(self.device)
        actions = torch.LongTensor(actions).to(self.device)
        rewards = torch.FloatTensor(rewards).to(self.device)
        next_states = torch.FloatTensor(next_states).to(self.device)
        dones = torch.FloatTensor(dones).to(self.device)
        
        # compute the Q-values for the current states - forward pass through the q network
        q_values = self.q_network(states).gather(1, actions.unsqueeze(1)).squeeze(1)
        
        # the only difference from DQN is here
        # get the actions for the next states using the q network
        next_actions = self.q_network(next_states).argmax(1)  # get the actions for the next states
        # compute the Q-values for the next states - forward pass through the target network
        next_q_values = self.target_network(next_states).gather(1, next_actions.unsqueeze(1)).squeeze(1)

        # compute the target Q-values
        target_q_values = rewards + (1 - dones) * self.gamma * next_q_values
        
        # compute the loss
        loss = torch.nn.functional.mse_loss(q_values, target_q_values.detach())
        
        self.optimizer.zero_grad()  # zero the gradients
        
        loss.backward()
        
        self.optimizer.step()
        
        
    def update_target_network(self):
        """
        Update the target network by copying the weights from the Q-network.
        """
        self.target_network.load_state_dict(self.q_network.state_dict())
        

env = gym.make("LunarLander-v3", render_mode="human")
agent = DDQNAgent(state_size=8, action_size=4)
scores = train_agent(agent, env)

score = play_DQN_episode(env, agent)
print(f"Total reward: {score}")
env.close()  # Close the environment

## Dueling Deep Q-Networks (Dueling-DQN)