# Lunar Lander Box2D lösning med DQN (Deep Q-Network)
-----
#### Karim Kanji & Sebastian Fallström
#### Preskriptiv Analytik, IA-20



In [3]:
!pip install gymnasium
!pip install stable_baselines3
!pip install ufal.pybox2d
!pip install tensorflow
!pip install gymnasium
!pip install torch
!pip install numpy



In [4]:
import gym
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import matplotlib.pyplot as plt
import base64, io

import numpy as np
from collections import deque, namedtuple

# For visualization
from gym.wrappers.monitoring import video_recorder
from IPython.display import HTML
from IPython import display 
import glob

  from .autonotebook import tqdm as notebook_tqdm


In [10]:
import torch

# Check if a GPU is available
device = torch.device("cpu")


In [11]:
import torch

if torch.cuda.is_available():
    print("CUDA is available. Using GPU.")
else:
    print("CUDA is not available. Using CPU.")


CUDA is available. Using GPU.


In [12]:
class ReplayBuffer:
  
    def __init__(self, action_size, buffer_size, batch_size, seed):
 
        self.action_size = action_size
        self.memory = deque(maxlen=buffer_size)  
        self.batch_size = batch_size
        self.experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"])
        self.seed = random.seed(seed)
    
    def add(self, state, action, reward, next_state, done):

        e = self.experience(state, action, reward, next_state, done)
        self.memory.append(e)
    
    def sample(self):
  
        experiences = random.sample(self.memory, k=self.batch_size)

        states = torch.from_numpy(np.vstack([e.state for e in experiences if e is not None])).float().to(device)
        actions = torch.from_numpy(np.vstack([e.action for e in experiences if e is not None])).long().to(device)
        rewards = torch.from_numpy(np.vstack([e.reward for e in experiences if e is not None])).float().to(device)
        next_states = torch.from_numpy(np.vstack([e.next_state for e in experiences if e is not None])).float().to(device)
        dones = torch.from_numpy(np.vstack([e.done for e in experiences if e is not None]).astype(np.uint8)).float().to(device)
  
        return (states, actions, rewards, next_states, dones)

    def __len__(self):
      
        return len(self.memory)

In [13]:
env = gym.make('LunarLander-v2')


# Hyperparameters
BUFFER_SIZE = int(1e5)  
BATCH_SIZE = 64        
GAMMA = 0.99           
TAU = 1e-3              
LR = 5e-4                
UPDATE_EVERY = 4       



# DQN Neural network architecture
class QNetwork(nn.Module):
    def __init__(self, state_size, action_size, seed):
        super(QNetwork, self).__init__()
        self.seed = torch.manual_seed(seed)
        self.fc1 = nn.Linear(state_size, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_size)
        
    def forward(self, state):
        x = self.fc1(state)
        x = F.relu(x)
        x = self.fc2(x)
        x = F.relu(x)
        return self.fc3(x)


class DQNAgent():

    def __init__(self, state_size, action_size, seed):

        self.state_size = state_size
        self.action_size = action_size
        self.seed = random.seed(seed)

        # Q-Network
        self.qnetwork_local = QNetwork(state_size, action_size, seed).to(device)
        self.qnetwork_target = QNetwork(state_size, action_size, seed).to(device)
        self.optimizer = optim.Adam(self.qnetwork_local.parameters(), lr=LR)

        # Replay memory
        self.memory = ReplayBuffer(action_size, BUFFER_SIZE, BATCH_SIZE, seed)
        # Initialize time step (for updating every UPDATE_EVERY steps)
        self.t_step = 0
    
    def step(self, state, action, reward, next_state, done):
        # Save experience in replay memory
        self.memory.add(state, action, reward, next_state, done)
        
        # Learn every UPDATE_EVERY time steps.
        self.t_step = (self.t_step + 1) % UPDATE_EVERY
        if self.t_step == 0:
            # If enough samples are available in memory, get random subset and learn
            if len(self.memory) > BATCH_SIZE:
                experiences = self.memory.sample()
                self.learn(experiences, GAMMA)

    def act(self, state, eps=0.):

        state = torch.from_numpy(state).float().unsqueeze(0).to(device)
        self.qnetwork_local.eval()
        with torch.no_grad():
            action_values = self.qnetwork_local(state)
        self.qnetwork_local.train()

        # Epsilon-greedy action selection
        if random.random() > eps:
            return np.argmax(action_values.cpu().data.numpy())
        else:
            return random.choice(np.arange(self.action_size))

    def learn(self, experiences, gamma):

        # Obtain random minibatch of tuples from D
        states, actions, rewards, next_states, dones = experiences

        ## Compute and minimize the loss
        ### Extract next maximum estimated value from target network
        q_targets_next = self.qnetwork_target(next_states).detach().max(1)[0].unsqueeze(1)
        ### Calculate target value from bellman equation
        q_targets = rewards + gamma * q_targets_next * (1 - dones)
        ### Calculate expected value from local network
        q_expected = self.qnetwork_local(states).gather(1, actions)
        
        ### Loss calculation (we used Mean squared error)
        loss = F.mse_loss(q_expected, q_targets)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # ------------------- update target network ------------------- #
        self.soft_update(self.qnetwork_local, self.qnetwork_target, TAU)                     

    def soft_update(self, local_model, target_model, tau):

        for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
            target_param.data.copy_(tau*local_param.data + (1.0-tau)*target_param.data)
       


def dqn(n_episodes=2000, max_t=1000, eps_start=1.0, eps_end=0.01, eps_decay=0.995):
    #DQN learning
    
    scores = []                        
    scores_window = deque(maxlen=100)  
    eps = eps_start                    
    for i_episode in range(1, n_episodes+1):
        state_data, _ = env.reset()  # Assuming the environment returns a tuple (state_data, additional_info)
        score = 0
        for t in range(max_t):
            action = agent.act(state_data, eps)
            # next_state, reward, done, _ = env.step(action)
            
            result = env.step(action)  # Store the entire result
            next_state, reward, done, _ = result[:4]  # Unpack the first four values

            
            agent.step(state_data, action, reward, next_state, done)
            state_data = next_state  # Update state_data with the next state
            score += reward
            if done:
                break 
        scores_window.append(score)       
        scores.append(score)              
        eps = max(eps_end, eps_decay*eps) 
        print('\rEpisode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_window)), end="")
        if i_episode % 100 == 0:
            print('\rEpisode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_window)))
        if np.mean(scores_window)>=200.0:
            print('\nEnvironment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(i_episode-100, np.mean(scores_window)))
            torch.save(agent.qnetwork_local.state_dict(), 'checkpoint.pth')
            break
    return scores

agent = DQNAgent(state_size=8, action_size=4, seed=0)    
scores = dqn()


Episode 100	Average Score: -182.37
Episode 200	Average Score: -126.21
Episode 283	Average Score: -67.799

KeyboardInterrupt: 

In [14]:
# Assuming you've already defined your QNetwork and DQNAgent classes

# Create an instance of your DQNAgent
agent = DQNAgent(state_size=8, action_size=4, seed=0)

# Load the saved model weights into your agent's local Q-Network
checkpoint = torch.load('checkpoint.pth', map_location=device)
agent.qnetwork_local.load_state_dict(checkpoint)


<All keys matched successfully>

In [15]:
def demo(env, agent, num_episodes=5):
    for i_episode in range(num_episodes):
        state = env.reset()  # Reset the environment and get the initial state

        # Check if the state is a tuple and extract the state array if necessary
        if isinstance(state, tuple):
            state = state[0]

        total_reward = 0
        done = False
        while not done:
            env.render()  # Render the environment

            # Get the action from the agent
            action = agent.act(state, eps=0.0)  # Choose action based on policy (no exploration)

            # Take a step in the environment using the action
            step_result = env.step(action)
            next_state, reward, done, _ = step_result[:4]  # Unpack only the first four values

            # Check if the next_state is a tuple and extract the state array if necessary
            if isinstance(next_state, tuple):
                next_state = next_state[0]

            total_reward += reward
            state = next_state  # Update the state for the next step

        print(f"Episode: {i_episode + 1}, Total Reward: {total_reward:.2f}")

    env.close()



In [18]:
# Create the environment
env = gym.make('LunarLander-v2', render_mode="human")

# Run the demo
demo(env, agent, num_episodes=10)


Episode: 1, Total Reward: 244.23
Episode: 2, Total Reward: 258.19
Episode: 3, Total Reward: 222.46


KeyboardInterrupt: 

## Rapport om Användning av DQN-Metoden för Lunar Lander
#### Inledning
I detta projekt har Deep Q-Network (DQN) metoden använts för att träna en agent för att lösa Lunar Lander-miljön. DQN är välkänd för sin förmåga att effektivt balansera mellan utforskning och exploatering inom förstärkningsinlärning, vilket är avgörande för att uppnå framgång i komplexa miljöer som Lunar Lander.

#### Metod
DQN-metoden använder en neural nätverksarkitektur för att optimera agentens beslutsprocess. I det här projektet är agentens mål att framgångsrikt landa en månlandare på en förutbestämd plats. Vi har justerat flera nyckelparametrar, inklusive lärandehastigheten, rabattfaktorn och utforskningsgraden, för att finjustera agentens prestanda under träningen.

#### Träningsresultat
Efter genomförd träning har agenten utvärderats över flera episoder. De statistiska resultaten från denna utvärdering är som följer:

Genomsnittlig belöning: *201.05* <br>
Dessa resultat visar på agentens förmåga att konsekvent uppnå höga belöningar, vilket indikerar en framgångsrik landning i många episoder. 

#### Slutsats
Användningen av DQN-metoden i detta projekt har visat sig vara mycket framgångsrik. Agenten har lärt sig att effektivt navigera och lösa uppgifterna som presenteras i Lunar Lander-miljön. Denna framgång understryker DQN-metodens effektivitet och dess förmåga att hantera utmanande uppgifter inom området förstärkningsinlärning.

#### Källor
Kang, C. (2021, May 7). Deep Q-Network (DQN) on LunarLander-v2. Chan’s Jupyter. https://goodboychan.github.io/python/reinforcement_learning/pytorch/udacity/2021/05/07/DQN-LunarLander.html&#8203;``【oaicite:0】``&#8203;.

<br>
OpenAI. (2023). Svar på fråga om bl.a. ["DQN method for solving Lunar Lander environment"]. ChatGPT verion 4/3.5.
