In [15]:
import gymnasium as gym
import torch
from torch import nn
from torch import optim
from collections import deque
import random
import numpy as np

In [10]:
env = gym.make("LunarLander-v3")
env.reset()

terminated = False
truncated = False

while not (terminated or truncated):
    action = 1
    obs, reward, terminated, truncated, info = env.step(action)
    env.render()

env.close()

  gym.logger.warn(


In [11]:
print(f'Ilość możliwych akcji: {env.action_space.n}')

Ilość możliwych akcji: 4


In [12]:
OPTIMIZE_WITH_HARDWARE = False

device = torch.device('cpu')
if OPTIMIZE_WITH_HARDWARE:
    if torch.backends.mps.is_available():
        device = torch.device('mps')
        print(f'Selected device: MPS (Metal Performance Shaders)')
    elif torch.backends.cuda.is_available():
        device = torch.device('cuda')
        print(f'Selected device: GPU with CUDA support')
else:
    print(f'Selected device: CPU')

Selected device: CPU


In [14]:
class DQNetwork(nn.Module):
    def __init__(self, state_size, action_size):
        super(DQNetwork, self).__init__()
        self.fc1 = nn.Linear(state_size, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, action_size)
    
    def forward(self, state):
        x = torch.nn.functional.leaky_relu(self.fc1(state), negative_slope=0.01)
        x = torch.nn.functional.leaky_relu(self.fc2(x), negative_slope=0.01)
        return self.fc3(x)

In [None]:
# Hiperparametry treningu sieci DQN
LEARNING_RATE = 0.001
BATCH_SIZE = 16

class DQNAgent(nn.Module):
    def __init__(self, state_size, action_size):
        super(DQNAgent, self).__init__()
        self.state_size = state_size        # ilość informacji dot. stanu środowiska
        self.action_size = action_size      # ilość akcji, które agent może wykonać
        self.discount_factor = 0.99         # współczynnik spadku wartości nagrody
        self.epsilon_greedy = 1.0           # początkowy współczynnik losowości (1 = 100% losowości)
        self.epsilon_greedy_min = 0.1       # minimalny współczynnik losowości
        self.epsilon_greedy_decay = 0.995   # zmniejszanie stopnia losowości co iterację o 5%
        self.memory = deque(maxlen=1000)    # kolekcja przechowująca 1000 ostatnich zdarzeń
        self.train_start = 500              # liczba zdarzeń, od której zaczynamy trenować model

        self.model = DQNetwork(state_size, action_size)
        self.optimizer = optim.Adam(self.model.parameters(), lr=LEARNING_RATE)
        self.criterion = nn.MSELoss()

    # Zapisuje podjętą akcję w danym stanie i jej skutki 
    def memorize(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
    
    # Wybiera akcje dla danego stanu. Jeśli aktualnie model
    # nie eksploruje (wykonuje losową akcje) to wybierana jest
    # akcja o najlepszym potencjale (najwyższa wartość nagrody)
    def act(self, state):
        if np.random.rand() <= self.epsilon_greedy:
            return random.randrange(self.action_size)
        # unsqueeze zapewnia odpowiedni wymiar [batch_size, state_size]
        # PyTorch narzuca format danych treningowych w postaci tensora, który
        # w pierwszym wymiarze zawiera informację i ilości paczek a następnie same
        # dane treningowe, dlatego 'unsqueeze' rozszerza wymiar danych mimo tego, że
        # mamy tylko jedną paczkę w tej funkcji
        state = torch.FloatTensor(state).unsqueeze(0)
        with torch.no_grad():
            q_values_predicted = self.model(state)
        return torch.argmax(q_values_predicted).item()
    

    def replay(self):
        # Nie zaczynamy trenować modelu dopóki nie zbierzemy
        # minimalnej ilości danych w buforze memory
        if len(self.memory) < self.train_start:
            return
        
        data_batch = random.sample(self.memory, BATCH_SIZE) # Losujemy paczkę danych do treningu
        
        total_mse_loss = 0
        for state, action, reward, next_state, done in data_batch:
            state = torch.FloatTensor(state)
            next_state = torch.FloatTensor(next_state)
            reward = torch.FloatTensor([reward])
            discounted_reward = reward
            if not done:
                discounted_reward += self.discount_factor * torch.max(self.model(next_state))
            
            dqn_prediction = self.model(state)
            true_reward = dqn_prediction.clone()     # Tworzymy klon aby nadpisać wynik dla akcji niżej
            true_reward[action] = discounted_reward  # Nadpisujemy wartość nagrody dla wykonanej akcji
            
            loss = self.criterion(dqn_prediction, true_reward)
            
            self.optimizer.zero_grad()  # Zerujemy gradient
            loss.backward()             # Liczymy gradient
            self.optimizer.step()       # Aktualizujemy wagi sieci

            total_mse_loss += loss.item()
        
        # Jeśli nie doszliśmy do minimalnej wartości współczynnika
        # eksploracji to nadal go zmniejszamy z każdą iteracją
        if self.epsilon_greedy > self.epsilon_greedy_min:
            self.epsilon_greedy *= self.epsilon_greedy_decay
        
        return total_mse_loss / BATCH_SIZE # zwracamy średni błąd MSE