### Ustawienie hardware

In [35]:
import torch

OPTIMIZE_WITH_HARDWARE = True

device = torch.device('cpu')
if OPTIMIZE_WITH_HARDWARE:
    if torch.backends.mps.is_available():
        device = torch.device('mps')
        print(f'Selected device: MPS')
    elif torch.cuda.is_available():
        device = torch.device('cuda')
        print(f'Selected device: GPU with CUDA support')
        print(f'CUDA device name: {torch.cuda.get_device_name()}')
        print(f'CUDA device count: {torch.cuda.device_count()}')
        print(f'CUDA device index: {torch.cuda.current_device()}')
else:
    print(f'Selected device: CPU')
    

Selected device: GPU with CUDA support
CUDA device name: Quadro M1200
CUDA device count: 1
CUDA device index: 0


# Ogólne przedstawienie problemu

In [36]:
import gymnasium as gym
env = gym.make("LunarLander-v3")

In [37]:
import warnings
warnings.filterwarnings('ignore')

env.reset()

termined = False
truncated = False

while not (termined or truncated):
    action = 1
    obs, reward, terminated, truncated, info  = env.step(action)
    env.render()
    
env.close()

## Co zwraca środowisko?

- pozycja pozioma lądownika (x);
- pozycja pionowa lądownika (y)
- prędkość pozioma lądownika
- prędkość pionowa lądownika
- kąt nachylenia lądownika
- prędkość kątowa lądownika
- czy noga nr 1 (lewa) lądownika dotyka podłoża
- czy noga nr 2 (prawa) lądownika dotyka podłoża

## Ile akcji może wykonać agent?

In [38]:
print(f'Ilość możliwych akcji: {env.action_space.n}')

Ilość możliwych akcji: 4


1. Brak działania
2. Uruchomiony główny silnik
3. Uruchomiony lewy silnik
4. Uruchomiony peawy silnik

## Struktura sieci głębokiej

- Otrzymujemy stan w postaci wektora ośmiu wyżej wymienionych parametrów. Definiujemy 3 liniowe warstwy, tzn. przetwrzające dane liniowo, realizujące sume ważoną.
- Pierwsza warstwa przyjmuje stan w którym znajduje się łazik (state_size), rozszerza na 128 parametrów.
- Druga warstwa przyjmuje 128 parametrów i na wyjściu ma 128 parametrów.
- Trzecia warstwa która będzie podawać wartość Q dla każdej możliwej do podjęcia akcji przyjmuje 128 parametrów i zwraca 4 wyjścia (action_size)

In [39]:
from torch import nn as nn # importujemy moduł nn z biblioteki torch
class DQNetwork(nn.Module):
    def __init__(self, input_size, output_size):
        super(DQNetwork, self).__init__()
        self.fc1 = nn.Linear(input_size, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, output_size)
        
    def forward(self, state):
        x = torch.nn.leaky_relu(self.fc1(state), negative_slope=0.01)
        x = torch.nn.leaky_relu(self.fc2(x), negative_slope=0.01)
        x = self.fc3(state)
        return x

## Definicja agenta sieci DQN

In [40]:
from collections import deque # impor kolejki deque z modułu collections
import numpy as np
import random

# Hyperparametry sieci neuronowej
BATCH_SIZE = 16
LEARNING_RATE = 0.001

class DQNAgent(nn.Module):
    def __init__(self, input_size, output_size):
        super(DQNetwork, self).__init__()
        self.input_size = input_size # ilość wejść, ilo informacji dot. stanu środowiska
        self.output_size = output_size # ilość wyjść, ilość możliwych akcji
        self.discount_factor = 0.99 # współczynnik dyskontujący
        self.epsilon_greedy = 1.0 # początkowya wartość losowego wyboru akcji
        self.epsilon_min = 0.01 # minimalna wartość losowego wyboru akcji
        self.epsilon_greedy_decay = 0.995 # współczynnik zmniejszający wartość losowego wyboru akcji (co iteracje o 5%)
        self.train_start = 500 # ilość próbek w pamięci agenta, po której rozpoczynamy uczenie
        self.memory = deque(maxlen=1000) # pamięć agenta

        self.model = DQNetwork(input_size, output_size).to(device)
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=LEARNING_RATE)
        self.loss_function = nn.MSELoss()

    def memorize(self, state, action, reward, next_state, done): 
        '''metoda dodająca informacje do pamięci agenta'''
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        '''wybiera akcję na podstawie stanu, jeśli model zwróci wartość większą niż epsilon_greedy,
        to wybieramy akcję o największej wartości, w przeciwnym wypadku wybieramy losową akcję. unsqueeze(0) dodaje dodatkowy wymiar [BATCH_SIZE, input_size].
        Pytorch operuje na tensorach, który w pierwszym wymiarze zawiera informacje i ilość paczek, następnie dane treningowe.
        "unsqueeze(0)" dodaje dodatkowy wymiar, który jest wymagany przez model, mimo że jest jedna paczka w funkcji.'''
        
        if np.random.rand() <= self.epsilon_greedy:
            return random.randrange(self.output_size)
        
        state = torch.FloatTensor(state).to(device).unsqueeze(0)
        with torch.no_grad():
            q_values_predicted = self.model(state)
            return torch.argmax(q_values_predicted).item()

    def replay(self):
        '''metoda ucząca model na podstawie próbek z pamięci agenta'''
        total_mse_loss = 0
        if len(self.memory) < self.train_start:
            return
        
        data_batch = random.sample(self.memory, BATCH_SIZE) # losujemy próbki z pamięci agenta

        for state, action, reward, next_state, done in data_batch:
            state = torch.FloatTensor(state).to(device)
            next_state = torch.FloatTensor(next_state).to(device) #następny stan który zwróci środowisko dla danego doświdczenia
            reward = torch.FloatTensor([reward]).to(device)
            discount_reward = reward
            if not done:
                discount_reward = reward + torch.max(self.model(next_state)) * self.discount_factor # obliczamy zniżone nagrody
            
            dqnprediction = self.model(state)
            true_reward = dqnprediction.clone() # klonujemy wartość predykcji
            true_reward[action] = discount_reward # nadpisujemy wartość nagrody dla danej akcji

            loss = self.loss_function(dqnprediction, true_reward)

            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step() # aktualizujemy wagi modelu
            total_mse_loss += loss.item()

        if self.epsilon_greedy > self.epsilon_min:
            self.epsilon_greedy *= self.epsilon_greedy_decay # zmniejszamy wartość losowego wyboru akcji
        
        return total_mse_loss/BATCH_SIZE # zwracamy średni błąd kwadratowy





