# Mountain Car - Q учење

## Проблем

Во оваа тетратка се решава проблемот на Автомобил на планински пат(Mountain Car) со користење на алгоритмот Q учење. Целта е агентот-количката, која се наоѓа помеѓу 2 рида, да научи да се искачи на десниот рид и да стигне до знаменцето кое се наоѓа на неговиот врв. Повеќе детали за проблемот може да се најдат на [официјалната страна на библиотеката Gymnasium.](https://gymnasium.farama.org/environments/classic_control/mountain_car/)


![Mountain_Car](images/mountain_car.gif)

### Што претставува **Q-учење**? 
([Книга AI Crash Course, Chapter 7](https://e-kursevi.feit.ukim.edu.mk/pluginfile.php/109092/mod_resource/content/1/Crash%20Course%20-%20Reinforcement%20Learning%2C%20Deep%20Learning%2C%20and%20Artificial%20Intelligence%20with%20Python%20%282019%29.pdf))
1. Q-учењето е модел на учење со поттикнување (Reinforcement Learning);
2. Q-учењето функционира според принципот на влезови (состојби) и излези (акции);
3. Q-учењето се изведува во унапред дефинирана околина, која ги содржи состојбите (влезовите), акциите (излезите) и наградите;
4. Q-учењето се моделира со Марков процес на одлучување (Markov Decision Process);
5. Q-учењето користи режим на тренирање, при што се учат параметри наречени Q-вредности, и режим на инференција;
6. Постојат конечен број на состојби (нема бесконечен број можни влезови);
7. Постојат конечен број на акции (може да се изведат само одреден број на акции).


([Q-learning, од Википедија](https://en.wikipedia.org/wiki/Q-learning))

Q-учењето е алгоритам за учење со поттикнување кој тренира агент да доделува вредности на можните акции врз основа на моменталната состојба, без да му биде потребен модел на околината (model-free). 

In [1]:
import random
import statistics
from collections import defaultdict, deque
import threading
import numpy as np
import gymnasium as gym
import plotly.graph_objects as go
import ipywidgets as widgets

**ALPHA** и **GAMMA** се хиперпараметри кои се користат за пресметка на Белмановата равенка. ALPHA е стапка на учење(0,1), повисока вредност на ALPHA послабо учи алгоритмот. GAMMA(0,1) кажува колку се важни акциите што се преземаат во иднина. **EPSILON**(0,1) е хиперпараметар кој контролира колкав дел од акциите ќе бидат случајно избрани, а колкав дел ќе бидат базирани на наученото знаење.

In [2]:
ALPHA = 0.3 
GAMMA = 0.95 
EPSILON = 0.22  

## Состојбениот вектор на овој проблем е ($x$, $\dot{x}$),  односно позиција и брзина на агентот-количката

#### Q-учењето е алгоритам кој работи со дискретни вредности на состојбениот вектор. Поради тоа, континуалните вредности на позицијата и брзината на агентот треба да се дискретизираат.Ова се реализира со помош на функцијата np.linspace, која како влез прима: минимална вредност и максимална вредност на состојбената променлива како и број на поделци (кошнички). Во овој пример се користат 100 кошнички, со што се овозможува соодветна дискретизација на континуалниот простор на состојби.

In [3]:
N_BINS_X = 100
N_BINS_X_DOT = 100

In [4]:
x_low, x_high = -1.2, 0.6  
x_dot_low, x_dot_high = -0.07, 0.07 

bins_x = np.linspace(x_low, x_high, num=N_BINS_X)
bins_x_dot = np.linspace(x_dot_low, x_dot_high, num=N_BINS_X_DOT)

BINS = [bins_x, bins_x_dot]

In [5]:
def get_state_from_observation(observation):
    """
    Го дискретизира набљудувањето за да го претвори во дискретна состојба погодна за Q-учење.

     Влезни аргументи:
        observation (np.ndarray): Влезен вектор што ја претставува моменталната состојба на агентот,
                                  содржи континуални вредности (позиција и брзина).

    Излез:
        tuple: Дискретизирана состојба изразена како торка, каде секој елемент покажува 
               во која кошничка се наоѓа соодветната континуална вредност.
    """
    return tuple([np.digitize(var, bins) for var, bins in zip(observation, BINS)])   

In [6]:
def policy(env, i_state, Q, episode):    
    """
    Ја дефинира полисата според која агентот избира акција, користејќи ε-greedy стратегија.

    Влезни аргументи:
        env (gym.Env): Околината во која агентот се наоѓа. Се користи за избор на случајна акција.
        i_state (tuple): Дискретизирана моментална состојба на агентот.
        Q (dict): Q-табела што содржи Q-вредности за секоја состојба и можни акции.
        episode (int): Тековна епизода од тренирањето, може да се користи за адаптација на ε.

    Излез:
        int: Акција што агентот треба да ја изврши, или најдобрата позната акција (експлоатација),
             или случајна акција (експлорација), во зависност од ε.
    """
    if np.random.rand() < EPSILON or not Q[i_state]:
        return env.action_space.sample()
    return max(Q[i_state], key=Q[i_state].get)

In [7]:
#Bellman equation
def update_q(Q, i_state, i_next_state, action, reward):
    """
    Ја ажурира Q-табелата според Белмановата равенка за Q-учење.

    Влезни аргументи:
        Q (dict): Q-табела што ги содржи Q-вредностите за сите состојби и акции.
        i_state (tuple): Моменталната (дискретизирана) состојба на агентот.
        i_next_state (tuple): Наредната состојба добиена по извршување на акцијата.
        action (int): Акцијата што била избрана во моменталната состојба.
        reward (float): Наградата добиена по извршувањето на акцијата.

    Алгоритам:
        - Прво се пресметува највисоката Q-вредност за наредната состојба.
        - Потоа се пресметува Темпоралната Разлика (TD error).
        - Конечно, Q-вредноста за дадената состојба и акција се ажурира според TD error и факторот на учење (ALPHA).
    """
    max_q_value = max(Q[i_next_state].values(), default=0)
    TD = reward + GAMMA * max_q_value - Q[i_state][action]
    Q[i_state][action] += ALPHA * TD

In [8]:
def dashboard(training, step_log=1000):
    h_rewards = deque(maxlen=step_log)

    dashboard.STOP_TRAINING = False
    bt_stop = widgets.Button(description='Stop Training', button_style='danger')
    def f_stop(bt_stop):
        dashboard.STOP_TRAINING = True
    bt_stop.on_click(f_stop)

    fig = go.FigureWidget()
    fig.add_scatter(x=[], y=[], mode='markers+lines')
    fig.layout.yaxis.rangemode = 'tozero'
    fig.layout.title = 'Dashboard for training RL agents'
    fig.layout.xaxis.title = 'Episode'
    fig.layout.yaxis.title = 'Reward'

    def update_sc_reward(episode, reward):
        with fig.batch_update():
            fig.data[0].x += (episode,)
            fig.data[0].y += (reward,)
    
    def sim(*args, **kwargs):
        display(bt_stop)
        display(fig)
        for episode, reward in training(*args, **kwargs):
            h_rewards.append(reward)
            if episode % step_log == 0:
                update_sc_reward(episode, np.mean(h_rewards))
            if dashboard.STOP_TRAINING:
                break
        bt_stop.disabled = True
    
    def thread(*args, **kwargs):
        t = threading.Thread(target=sim, args=args, kwargs=kwargs)
        t.start()
        return t
    return thread

In [9]:
@dashboard
def train_q_agent(Q, n_episodes):
    """
    Го тренира агентот со Q-учење во средината MountainCar-v0.

    Влезни аргументи:
        Q (defaultdict): Q-табела што ги содржи Q-вредностите за секоја состојба и акција.
        n_episodes (int): Број на епизоди за кои агентот ќе се тренира.

    Опис:
        - Во секоја епизода, околината се ресетира и агентот започнува од случајна почетна состојба.
        - Се врши дискретизација на набљудувањето во состојба со `get_state_from_observation`.
        - Додека епизодата не заврши (поради успех или истек на време), агентот избира акција според ε-greedy стратегија
        - Се добива ново набљудување и награда, и Q-табелата се ажурира со `update_q`.
        - По завршување на секоја епизода, се враќа бројот на епизодата и вкупната награда.

    Излез:
        Generator: Tuple `(episode, episode_reward)` за секоја епизода.
    """
    with gym.make('MountainCar-v0') as env:
        for episode in range(n_episodes):
            observation, _ = env.reset()    
            i_state = get_state_from_observation(observation)  
            episode_reward = 0
            terminated = truncated = False  
            while not (terminated or truncated):
                action = policy(env, i_state, Q, episode)
                observation, reward, terminated, truncated, _ = env.step(action)  
                i_next_state = get_state_from_observation(observation)
                episode_reward += reward
                update_q(Q, i_state, i_next_state, action, reward)
                i_state = i_next_state
            yield episode, episode_reward


Q = defaultdict(lambda: defaultdict(int))
train_q_agent(Q, n_episodes=100000)    

Button(button_style='danger', description='Stop Training', style=ButtonStyle())

<Thread(Thread-5 (sim), started 139637483898560)>

FigureWidget({
    'data': [{'mode': 'markers+lines',
              'type': 'scatter',
              'uid': '36c8d038-8035-4a27-ad5a-b5e93ebfede2',
              'x': [],
              'y': []}],
    'layout': {'template': '...',
               'title': {'text': 'Dashboard for training RL agents'},
               'xaxis': {'title': {'text': 'Episode'}},
               'yaxis': {'rangemode': 'tozero', 'title': {'text': 'Reward'}}}
})

#### На графикот е прикажана вредноста на наградата што агентот ја добива во секоја епизода. Агентот добива награда -1 за секоја временска единица додека не го достигне знаменцето. На почетокот на тренирањето наградата изнесува -200, односно агентот не го достигнува знаменцето пред да заврши епизодата, епизодата за тренирање завршува после 200  временски единици. Како се зголемуваат епизодите, може да се увиде дека наградата расте, агентот го достигнува знаменцето, односно учи. Наградата достигнува стационарна вредност -162.

In [11]:
def visualise_q_agent(Q, n_episodes=1):
    """
    Ја прикажува (визуелизира) работата на агентот кој е трениран со Q-учење во MountainCar околината.

    Влезни аргументи:
        Q (defaultdict): Q-табела со научените Q-вредности за секоја состојба и акција.
        n_episodes (int): Број на епизоди што ќе се визуелизираат (стандардно 1).

    Опис:
        - За секоја епизода, агентот почнува од иницијална состојба.
        - Ако не постои запишана акција за дадената состојба, се избира случајна акција.
        - Ако има достапни Q-вредности, се избира акцијата со највисока вредност (експлоатација).
        - Процесот продолжува сè додека не заврши епизодата (успешно или со истекување на времето).
    """
    with gym.make('MountainCar-v0', render_mode='human') as env:
        for episode in range(n_episodes):
            state, _ = env.reset()
            i_state = get_state_from_observation(state)
            terminated = truncated = False
            while not (terminated or truncated):
                if not Q[i_state]:
                    action = env.action_space.sample()        
                else:
                    action = max(Q[i_state], key=Q[i_state].get)    
                state, _, terminated, truncated, _ = env.step(action)
                i_state = get_state_from_observation(state)


visualise_q_agent(Q, 1)