# 5. Gyakorlat: Mély $Q$-tanulás

In [1]:
%pip install torch

Note: you may need to restart the kernel to use updated packages.


In [5]:
# Könyvtárak
import os
import gym
import random
import warnings
import matplotlib
import base64, io
import numpy as np
from collections import deque
from collections import namedtuple
import matplotlib.pyplot as plt
from matplotlib import animation
from scipy.signal import fftconvolve

# PyTorch
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

# Konfigurációk
seed = 0
np.random.seed(seed)
random.seed(seed)
%matplotlib inline
matplotlib.rc('animation', html='jshtml')
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"
warnings.filterwarnings('ignore')

## Segédfüggvények

In [6]:
# Jeleneti változó inicializálása
def update_scene(num, frames, patch):
    patch.set_data(frames[num])
    return patch,

# Képkockák lejátszása egymás után
def plot_animation(frames, repeat=True, interval=40):
    fig = plt.figure()
    patch = plt.imshow(frames[0])
    plt.axis('off')
    anim = animation.FuncAnimation(fig, update_scene, fargs=(frames, patch), frames=len(frames), repeat=repeat, interval=interval)
    plt.close()
    return anim

# Mozgóátlagolás (gyors Fourier-transzformáció konvolúcióval)
def window_avg(lst, window_size):
    window_size = int(window_size)
    kernel = np.ones(window_size) / window_size
    return fftconvolve(lst, kernel, mode='valid')

# Teljes átlagolás
def rolling_avg(lst):
    lst_sum = np.cumsum(lst)
    lst_avg = lst_sum / (np.arange(len(lst)) + 1)
    return list(lst_avg)

---
## Környezet

In [7]:
env = gym.make('FrozenLake-v1', desc=None, map_name="8x8", is_slippery=True, render_mode='rgb_array')

print(f"Cselekvések tere (A): {env.action_space}")
print(f'Állapotok tere: {env.observation_space}')

# Cselekvések társítása kiíratási műveletekhez
action_mapping = {
    0: "←",
    1: "↓",
    2: "→",
    3: "↑"
}

A = list(action_mapping.keys())  # Cselekvési tér

Cselekvések tere (A): Discrete(4)
Állapotok tere: Discrete(64)


---
## $Q$-hálózat az ügynökhöz

In [8]:
class QNetwork(nn.Module):
    def __init__(self, state_size, action_size):  # Architektúra definíció
        super(QNetwork, self).__init__()
        self.fc1 = nn.Linear(state_size, 64)  # Input réteg (teljesen becsatolt)
        self.fc2 = nn.Linear(64, 64)  # Rejtett réteg (teljesen becsatolt)
        self.fc3 = nn.Linear(64, action_size)  # Output réteg (teljesen becsatolt)
        
    def forward(self, state):  # Előrecsatolási csővezeték
        x = self.fc1(state)  # Első rétegen áramoltatás
        x = F.relu(x)  # Első réteg aktivációs függvénye (ReLu)
        x = self.fc2(x)
        x = F.relu(x)
        return self.fc3(x)  # Output előállítása

---
## Tapasztalat visszajátszás az ügynökhöz

In [None]:
class ReplayBuffer:
    def __init__(self, buffer_size, batch_size):
        self.batch_size = batch_size
        self.memory = deque(maxlen=buffer_size)
        self.experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"])  # Nevesített tömb típus létrehozása
    
    def add(self, state, action, reward, next_state, done):
        e = self.experience(state, action, reward, next_state, done)  # Egy nevesített tömb létrehozása a megkapott paraméterekkel
        self.memory.append(e)
    
    def sample(self):
        experiences = random.sample(self.memory, k=self.batch_size)  # Véletlen minta a memóriából

        # A véletlen minta átalakítása
        states = torch.from_numpy(np.vstack([e.state for e in experiences if e is not None])).float()
        actions = torch.from_numpy(np.vstack([e.action for e in experiences if e is not None])).long()
        rewards = torch.from_numpy(np.vstack([e.reward for e in experiences if e is not None])).float()
        next_states = torch.from_numpy(np.vstack([e.next_state for e in experiences if e is not None])).float()
        dones = torch.from_numpy(np.vstack([e.done for e in experiences if e is not None]).astype(np.uint8)).float()
  
        return states, actions, rewards, next_states, dones

    def __len__(self):  # Ha a __len__ függvény definiálva van le lehet kérdezni a struktúra hosszát a len() függvénnyel
        return len(self.memory)

---
## Ügynök

In [None]:
class Agent():
    def __init__(self, alpha, gamma, state_size, action_size, buffer_size, batch_size, update_every):
        # Belső paraméterek
        self.t_step = 0
        self.alpha = alpha  # Tanulási sebesség
        self.gamma = gamma  # Diszkont faktor
        self.batch_size = batch_size  # Kötegméret a tanuláshoz (ekkora mintát fog venni az ügynök a tapasztalat visszajátszásból)
        self.action_size = action_size  # Lehetséges cselekvések száma (ekkora lesz a Q-hálózat output rétege)
        self.buffer_size = buffer_size  # Buffer méret (összesen ennyi rekord lesz a tapasztalat memóriában)
        self.update_every = update_every  # Ennyi tanítási iterációnként fog történni egy tanítási lépés
        # Hálózat és belső memória
        self.qnetwork = QNetwork(state_size, action_size)  # Q-hálózat
        self.optimizer = optim.Adam(self.qnetwork.parameters(), lr=alpha)  # Optimalizáló algoritmus
        self.memory = ReplayBuffer(action_size, buffer_size, batch_size)  # Tapasztalat memória
        # Követési struktúrák
        self.mse_track = []  # MSE hiba követése
        self.action_stack = np.zeros((0, 2))  # Cselekvések követése
        self.state_action_stack = np.zeros((0, action_size))  # Állapotok követése

    def step(self, state, action, reward, next_state, done):
        self.memory.add(state, action, reward, next_state, done)  # A tapasztalat hozzáadása a tapasztalati memóriához
        self.action_stack = np.vstack([self.action_stack, np.array([action, reward])])
        self.t_step = (self.t_step + 1) % self.update_every  # Ha elér 0-hoz akkor tanítási ciklus fog történni

        if self.t_step == 0 and len(self.memory) > self.update_every:  # Ha az ügynök elért egy tanítási iterációt és van elég összegyűlt tapasztalat
            experiences = self.memory.sample()  # A tapasztalat memória véletlen mintázása
            self.learn(experiences, self.gamma)  # Tanulás a tapasztalattal

    def act(self, state, eps=0.):  # Cselekvés választása az állapot alapján
        state = torch.from_numpy(state).float().unsqueeze(0)
        self.qnetwork.eval()  # Hálózát átállítása kiértékelési módba
        with torch.no_grad():  # Predikció végrehajtása
            action_values = self.qnetwork(state)  
        self.qnetwork.train()  # Hálózat tanító módba állítása

        self.state_action_stack = np.vstack([self.state_action_stack, np.array(action_values)])  # Követési struktúrához hozzáfűzés

        if(random.random() > eps):  # Epszilon-mohó cselekvés választás
            return np.argmax(action_values.cpu().data.numpy())  # Legjobb cselekvés az aktuális állapotból
        else:
            return random.choice(np.arange(self.action_size))  # Véletlen cselekvés

    def learn(self, experiences):  # Ügynök tanulásának eljárása
        states, actions, rewards, next_states, dones = experiences  # Véletlen miniköteg kicsomagolása

        # Költség kiszámítása
        q_targets_next = self.qnetwork(next_states).detach().max(1)[0].unsqueeze(1)  # Q-értékek a következő állapotban
        q_targets = rewards + self.gamma * q_targets_next * (1 - dones)  # Q célértékek számítása
        q_expected = self.qnetwork(states).gather(1, actions)  # Q-értékek az aktuális állapotban
        
        # Költség számítása
        loss = F.mse_loss(q_expected, q_targets)
        self.mse_track.append(loss)
        self.optimizer.zero_grad()  # Gradiensek hozzájárulásának törlése a kötegelt költség számítása előtt
        loss.backward()  # Hiba visszaáramoltatása a hálózatba
        self.optimizer.step()  # Lépés végrehajtása az optimalizációs algoritmussal