In [1]:
import gym
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
from matplotlib import animation
mpl.rc('animation', html='jshtml')
import warnings
warnings.filterwarnings('ignore')

### Vizualizálási függvények

In [2]:
def update_scene(num, frames, patch):
    patch.set_data(frames[num])
    return patch,

def plot_animation(frames, repeat=True, interval=40):
    fig = plt.figure()
    patch = plt.imshow(frames[0])
    plt.axis('off')
    anim = animation.FuncAnimation(
        fig, update_scene, fargs=(frames, patch),
        frames=len(frames), repeat=repeat, interval=interval)
    plt.close()
    return anim

### Egyszerű mozgatása az ágensnek

In [3]:
# Egyszerű mozgatása az ágensnek 
env = gym.make('CartPole-v1')
obs = env.reset()
print(obs) # környezeti változók
print(env.action_space) # lehetséges lépések könyvtára

obs, reward, done, info = env.step(1) # egy lépés jobbra

print(obs) # változók újra
print(reward) # jutalom 
print(done) # vége az epizódnak?
print(info) # extra adatok debugging-hoz

frames=[]
for i in range(100):
    env.step(1)
    img = env.render(mode="rgb_array")
    frames.append(img)

env.close()
plot_animation(frames)

[-0.04515433 -0.02118124  0.02400017  0.02524274]
Discrete(2)
[-0.04557796  0.17358845  0.02450502 -0.25977229]
1.0
False
{}


### Véletlen mozgatása az ágensnek

In [4]:
env = gym.make('CartPole-v1')
env.reset() # minden környezeti változó visszaállítása
frames = [] 
for _ in range(100):
    env.step(np.random.randint(2)) # véletlen cselekvés végrehajtása
    img = env.render(mode="rgb_array")
    frames.append(img)
env.close() # Környezet bezárása
plot_animation(frames)

### CartPole egyszerű Hardcode szabályrendszerrel

In [9]:
import gym
env = gym.make('CartPole-v1')

def basic_policy(obs): # jutalmak 
    angle = obs[2]
    return 0 if angle < 0 else 1 # az oszlop balra vagy jobbra dől? 

totals = []
for episode in range(500): # 500 epizód
    episode_rewards = 0 # kezdeti jutalom
    obs = env.reset() # környezet init
    
    for step in range(200): # 200 válasz iterációnként
        action = basic_policy(obs) # jelenlegi szög
        obs, reward, done, info = env.step(action) # válasz
        episode_rewards += reward # jutalmak mentése
        if done: # kilépés
            break
    totals.append(episode_rewards) 

print('átlag', np.mean(totals))
print('szórás', np.std(totals))
print('min', np.min(totals))
print('max', np.max(totals))
env.close()

átlag 42.064
szórás 8.844653978534152
min 24.0
max 68.0


### Ábrázoljunk egy epizódot!

In [10]:
env.seed(42)
frames = []
obs = env.reset()
for step in range(200):
    img = env.render(mode="rgb_array") # bitmap-be konvertálás
    frames.append(img) # hozzáadja a listához
    action = basic_policy(obs)
    obs, reward, done, info = env.step(action)
    if done:
        env.close()
        break
    
plot_animation(frames) # végigmegy a listán és minden bitmap-et lejátszik

### Neurális hálózat irányelvek

In [11]:
import tensorflow as tf
from tensorflow import keras
keras.backend.clear_session()

In [12]:
n_inputs = 4 # env.observation_space.shape[0]

model = keras.models.Sequential([ # hálózat definiálása
    keras.layers.Dense(5, activation="elu", input_shape=[n_inputs]),
    keras.layers.Dense(1, activation="sigmoid")])

In [13]:
# függvény ami ábrázolja a neurális hálózat által adott predikciókat
def render_policy_net(model, n_max_steps=200, seed=42, done_close=True, set_env=False):
    frames = []
    env = gym.make("CartPole-v1")
    obs = env.reset()
    for step in range(n_max_steps):
        frames.append(env.render(mode="rgb_array"))
        left_proba = model.predict(obs.reshape(1, -1)) # neurális háló modell predikciója
        # mintavétel egyenletes eloszlású halmazból, becsült valószínűség küszöbbel
        action = int(np.random.rand() > left_proba) 
        obs, reward, done, info = env.step(action) # lépés végrehajtása
        if done_close and done:
            break
    env.close()
    return frames

In [14]:
frames = render_policy_net(model, n_max_steps=100, done_close=False)
plot_animation(frames)

### Játszassuk a neurális hálózatot egymás mellett 50 különböző környezetben, 5000 iteráción keresztül.
#### Ez a hálózat a klasszikus felügyelt módon tanulja el a megfelelő cselekvéseket: 0->bal, 1->jobb. Kategorikus keresztentrópia költségfüggvénnyel.

In [15]:
n_environments = 50
n_iterations = 5000

envs = [gym.make("CartPole-v1") for _ in range(n_environments)] # 50 környezet létrehozása
for index, env in enumerate(envs): # random generáló bázisértékek állítása
    env.seed(index)
np.random.seed(42)
observations = [env.reset() for env in envs] # környezetek inícializálása
optimizer = keras.optimizers.RMSprop() # gradiensek számolása RMSprop-pal
loss_fn = keras.losses.binary_crossentropy # költségfüggvény

for iteration in range(n_iterations): 
    # ha a szög < 0, akkor cselekvés P(bal) = 1., másképpen P(bal) = 0. ==> TANÍTÓ HALMAZ
    target_probas = np.array([([1.] if obs[2] < 0 else [0.]) for obs in observations])
    
    with tf.GradientTape() as tape: # automatikus differenciálás
        left_probas = model(np.array(observations))
        loss = tf.reduce_mean(loss_fn(target_probas, left_probas)) # költség kiszámítása
    
    print("\rIteration: {}, Loss: {:.3f}".format(iteration, loss.numpy()), end="") # költség kiíratás
    
    grads = tape.gradient(loss, model.trainable_variables) # gradiensek kiszámítása
    optimizer.apply_gradients(zip(grads, model.trainable_variables)) # súlyok frissítése gradiensek szerint
    actions = (np.random.rand(n_environments, 1) > left_probas.numpy()).astype(np.int32) # új cselekvések
    
    for env_index, env in enumerate(envs): # minden 
        obs, reward, done, info = env.step(actions[env_index][0]) # cselekvések végrehajtása
        observations[env_index] = obs if not done else env.reset() # ha vége az epizódnak akkor reset

for env in envs: # minden környezet bezárása
    env.close() 

Iteration: 4999, Loss: 0.137

In [16]:
frames = render_policy_net(model) 
plot_animation(frames)

### Irányelv gradiensek
A megerősített tanulásban nem lehet hagyományos módon a model.fit() metódussal dolgozni, ezért van szükség a tanítható súlyok manuális frissítésére.
#### Kezdjük azzal, hogy csinálunk egy függvény, ami egyetlen lépést hajt végre a modell felhasználásával. Egyelőre azt feltételezzük, hogy bármely cselekvés, amit csinál az a helyes, hogy ki tudjuk számítani a gradienseket.
#### Ha a left_proba magas, a cselekvés valószínűleg hamis (0) lesz.  

In [17]:
def play_one_step(env, obs, model, loss_fn):
    with tf.GradientTape() as tape:
        left_proba = model(obs[np.newaxis]) # predikció kérése a modelltől
        action = (tf.random.uniform([1, 1]) > left_proba) # cselekvés a valószínűség-eloszlás szerint (igaz-hamis)
        y_target = tf.constant([[1.]]) - tf.cast(action, tf.float32) # Boolean átalakítása számmá
        loss = tf.reduce_mean(loss_fn(y_target, left_proba)) # költség kiszámítása
    grads = tape.gradient(loss, model.trainable_variables) # gradiensek kiszámítása
    
    obs, reward, done, info = env.step(int(action[0, 0].numpy())) # lépés végrehajtása
    return obs, reward, done, grads # környezeti változók visszatérítése

#### Csináljunk egy olyan függvényt, ami az előzőleg definiáltat veszi alapul, és egy epizódot sokszor lefuttat, majd visszatéríti a jutalmakat és a gradienseket minden epizódhoz és lépéshez tartozóan.

In [18]:
def play_multiple_episodes(env, n_episodes, n_max_steps, model, loss_fn):
    all_rewards = [] # ide jönnek a jutalmak
    all_grads = [] # ide jönnek a gradiensek
    
    for episode in range(n_episodes): # epizódokon iterálás (több modell) 
        current_rewards = [] # egy epizód jutalma
        current_grads = [] # egy epizód gradiense
        obs = env.reset() # környezet visszaállítása
        
        for step in range(n_max_steps): # epizódon iterálás (egy modell cselekvései)
            obs, reward, done, grads = play_one_step(env, obs, model, loss_fn) # egy lépés futtatása
            current_rewards.append(reward) # jutalmak elmentése
            current_grads.append(grads) # gradiensek elmentése
            if done: # véges állapot
                break 
                
        all_rewards.append(current_rewards) # összes epizód jutalma 
        all_grads.append(current_grads) # összes epizód gradiense
    return all_rewards, all_grads # visszatérés

### A PG algoritmus lejátszat egy epizódot a modellel több alkalommal, majd visszamegy, és megnézi a jutalmakat.
#### A jutalmakat diszkontálja és normalizálja. Tehát kellenek függvények a diszkontált jutalmak kiszámítására. A második a diszkontált jutalmakat fogja epizódok szerint normalizálni.

In [19]:
def discount_rewards(rewards, discount_rate): # jutalmak diszkontálása
    discounted = np.array(rewards) # jutalmak tömbbé alakítása
    for step in range(len(rewards) - 2, -1, -1): # iterálás a jutalmakon és mindegyik diszkontálása
        discounted[step] += discounted[step + 1] * discount_rate
    return discounted

def discount_and_normalize_rewards(all_rewards, discount_rate):
    # minden jutalom diszkontálása
    all_discounted_rewards = [discount_rewards(rewards, discount_rate) for rewards in all_rewards] 
    
    flat_rewards = np.concatenate(all_discounted_rewards) # diszkontált jutalmak összefűzése
    reward_mean = flat_rewards.mean() # jutalmak átlagolása
    reward_std = flat_rewards.std() # jutalmak szórása
    
    # visszatérés minden diszkontált jutalom normalizált megfelelőjével
    return [(discounted_rewards - reward_mean) / reward_std for discounted_rewards in all_discounted_rewards]

### Próbáljuk ki a kódot

In [20]:
print('diszkontált jutalmak: ', discount_rewards([10, 0, -50], discount_rate=0.8))
print()
print('diszkontált és normlaizált jutalmak: ', discount_and_normalize_rewards([[10, 0, -50], [10, 20]], discount_rate=0.8))


diszkontált jutalmak:  [-22 -40 -50]

diszkontált és normlaizált jutalmak:  [array([-0.28435071, -0.86597718, -1.18910299]), array([1.26665318, 1.0727777 ])]


### Neurális hálózat definiálása
#### 1 rejtett réteg, 5 teljesen becsatolt neuronnal. 1 szigmoid aktivációs out neuron

In [21]:
optimizer = keras.optimizers.Adam(lr=0.01)
loss_fn = keras.losses.binary_crossentropy

keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    keras.layers.Dense(5, activation="elu", input_shape=[4]),
    keras.layers.Dense(1, activation="sigmoid")])

### A tanítási ciklus:
- Minden tanítási iterációban, a ciklus meghívja a play_multiple_episodes() függvényt, ami a játékot 10-szer lejátssza, majd visszatéríti a gradienseket.
- Aztán meghívja a discount_and_normalize_rewards() függvényt, hogy kiszámolja minden cselekvés normalizált előnyét, amit a kódban final_reward-nak nevezünk. 
- Ezután végig megyünk minden tanítási változón, és mindegyikre kiszámítjuk a gradiensek súlyozott átlagát, minden epizód és minden lépés szerint, a final_reward által normalizálva
- Végül, alkalmazzuk a kiszámolt gradienseket az optimalizáló segítségével: a modell tanítható változói (remélhetőleg) javulnak, és az irányelv is finomhangolódik. 

In [22]:
env = gym.make("CartPole-v1")

n_iterations = 150 # hány tanítási ciklus legyen
n_episodes_per_update = 10 # ciklusonkként hány modell játsszon
n_max_steps = 200 # összesen hány lépés engedélyezett a végállapotig
discount_rate = 0.95 # diszkont faktor

for iteration in range(n_iterations):
    # játék szimuláció futtatása 10-szer, gradiensek kiszámolása
    all_rewards, all_grads = play_multiple_episodes(env, n_episodes_per_update, n_max_steps, model, loss_fn)
    
    total_rewards = sum(map(sum, all_rewards)) # összes jutalom kiszámítása
    print("\rIteration: {}, mean rewards: {:.1f}".format(iteration, total_rewards / n_episodes_per_update), end="")
    all_final_rewards = discount_and_normalize_rewards(all_rewards, discount_rate) # diszkontálás és normalizálás
    all_mean_grads = [] # ide jönnek a gradiensek átlagai
    
    for var_index in range(len(model.trainable_variables)): # végigdarlás a tanítható változókon
        
        mean_grads = tf.reduce_mean( # gradiensek átlagainak kiszámítása (reduce_mean: tenzorban lévő összes elem átlaga)
            [final_reward * all_grads[episode_index][step][var_index] # jutalom × gradiens szorzat
             for episode_index, final_rewards in enumerate(all_final_rewards) # minden epizódban
                 for step, final_reward in enumerate(final_rewards)], axis=0) # minden lépésre
        
        all_mean_grads.append(mean_grads) # átlagos gradiensek hozzáfűzése az összeshez
    optimizer.apply_gradients(zip(all_mean_grads, model.trainable_variables)) # paraméter frissítések a gradiensekkel

env.close()

Iteration: 149, mean rewards: 190.6

In [None]:
from keras.models import load_model
# model.save('CartPole_model_neural.h5') Ezzel lehet elmenteni a saját modellt
model = load_model('CartPole_model_neural.h5')

In [23]:
frames = render_policy_net(model)
plot_animation(frames)

### Markov láncok

In [24]:
np.random.seed(42)

transition_probabilities = [ # [s, s'] valószínűség párok
        [0.7, 0.2, 0.0, 0.1],  # s0-ból s1, s2, s3-ba
        [0.0, 0.0, 0.9, 0.1],  # s1-ből a többibe
        [0.0, 1.0, 0.0, 0.0],  # s2-ből a többibe
        [0.0, 0.0, 0.0, 1.0]]  # s3-ból a többibe

n_max_steps = 50

def print_sequence():
    current_state = 0 # kezdőállapot felvétele
    print("States:", end=" ")
    for step in range(n_max_steps): # lépések szimulálása a markov lánc állapotain belül
        print(current_state, end=" ")
        if current_state == 3: # ha elértünk a végső állapotba
            break
            
        current_state = np.random.choice(range(4), p=transition_probabilities[current_state]) # új állapot választása
    else:
        print("...", end="")
    print()

for _ in range(10):
    print_sequence()

States: 0 0 3 
States: 0 1 2 1 2 1 2 1 2 1 3 
States: 0 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3 
States: 0 3 
States: 0 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3 
States: 0 1 3 
States: 0 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 ...
States: 0 0 3 
States: 0 0 0 1 2 1 2 1 3 
States: 0 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 2 1 3 


### Q-tanulás

In [25]:
import time 
import math 

In [26]:
# Kezdeti értékek felvétele
env = gym.make("CartPole-v1")

LEARNING_RATE = 0.1
DISCOUNT = 0.95
EPISODES = 60000

total = 0
total_reward = 0
prior_reward = 0

Observation = [30, 30, 50, 50]
np_array_win_size = np.array([0.25, 0.25, 0.01, 0.1])

epsilon = 1
epsilon_decay_value = 0.99995

In [27]:
# Q-tábla inícializálása
q_table = np.random.uniform(low=0, high=1, size=(Observation + [env.action_space.n]))
q_table.shape

(30, 30, 50, 50, 2)

In [28]:
# Függvény ami segít a Q-táblából kiválasztani a megfelelő állapotot
def get_discrete_state(state):
    discrete_state = state/np_array_win_size+ np.array([15,10,1,10])
    return tuple(discrete_state.astype(np.int))

In [29]:
for episode in range(EPISODES + 1): # iterálás az epizódokon 
    t0 = time.time() # kezdeti idő beállítása
    discrete_state = get_discrete_state(env.reset()) # diszkrét indítás a környezetben 
    done = False
    episode_reward = 0 # a jutalom 0-án indul

    if episode % 2000 == 0: # epizód számának logolása
        print(" Epizód: " + str(episode))
    
    frames = []
    while not done: 
        if np.random.random() > epsilon:
            action = np.argmax(q_table[discrete_state]) # tegyen valamilyen koordinált cselekvést
        else:
            action = np.random.randint(0, env.action_space.n) # tegyen random cselekvést

        new_state, reward, done, _ = env.step(action) # cselekvés, hogy megkapjuk az új állapotot, és 'done' státusz

        episode_reward += reward # jutalom hozzáadása

        new_discrete_state = get_discrete_state(new_state) # új állapot kiválasztása

        if episode % 2000 == 0: # render
            frames.append(env.render(mode="rgb_array"))

        if not done: # Q-tábla frissítése
            max_future_q = np.max(q_table[new_discrete_state])
            current_q = q_table[discrete_state + (action,)]

            new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)
            q_table[discrete_state + (action,)] = new_q

        discrete_state = new_discrete_state # új állapot
    
    if len(frames)>0: # tanulás lejátszása minden 2000 epizód után
        plot_animation(frames)
    
    if epsilon > 0.05: # epszilon módosítása
        if episode_reward > prior_reward and episode > 10000:
            epsilon = math.pow(epsilon_decay_value, episode - 10000) # hatványozás epizód szerint

            if episode % 500 == 0:
                print(" Epszilon: " + str(epsilon))

    t1 = time.time() # lezárult az epizód
    episode_total = t1 - t0 # epizód teljes ideje
    total = total + episode_total

    total_reward += episode_reward # epizód teljes jutalma
    prior_reward = episode_reward

    if episode % 1000 == 0: # Minden 1000 epizódonként írassa ki az átlagos időt és jutalmat
        mean = total / 1000 # idő 
        total = 0

        mean_reward = total_reward / 1000 # jutalom
        total_reward = 0
        
        print("\rIdő átlag: {}, Jutalom átlag: {:.3f}".format(mean, mean_reward), end="")
env.close()

 Epizód: 0
Idő átlag: 0.0009484050273895263, Jutalom átlag: 22.646 Epizód: 2000
Idő átlag: 0.0008567647933959961, Jutalom átlag: 22.295 Epizód: 4000
Idő átlag: 0.0009144268035888672, Jutalom átlag: 22.144 Epizód: 6000
Idő átlag: 0.0009145319461822509, Jutalom átlag: 22.733 Epizód: 8000
Idő átlag: 0.0008694899082183838, Jutalom átlag: 22.753 Epizód: 10000
Idő átlag: 0.0011115436553955078, Jutalom átlag: 21.616 Epszilon: 0.9753093024395111
 Epszilon: 0.9512282354250458
Idő átlag: 0.0009149436950683594, Jutalom átlag: 22.777 Epszilon: 0.9277417467531685
 Epizód: 12000
 Epszilon: 0.9048351558698463
Idő átlag: 0.0011129095554351807, Jutalom átlag: 25.912 Epizód: 14000
 Epszilon: 0.818726659298009
Idő átlag: 0.002571077823638916, Jutalom átlag: 27.105 Epszilon: 0.7985117269685725
 Epszilon: 0.7787959154194878
Idő átlag: 0.0012510199546813966, Jutalom átlag: 29.081 Epszilon: 0.7595669010105212
 Epizód: 16000
Idő átlag: 0.0013467669486999512, Jutalom átlag: 34.243 Epizód: 18000
Idő átlag: 0.00