# Zadanie 5

Celem ćwiczenia jest implementacja algorytmu Q-learning.

Następnie należy stworzyć agenta rozwiązującego problem [Taxi](https://gymnasium.farama.org/environments/toy_text/taxi/). Problem dostępny jest w pakiecie **gym**.

Punktacja (max 7 pkt):
- Implementacja algorytmu Q-learning. [3 pkt]
- Eksperymenty dla różnych wartości hiperparametrów [2 pkt]
- Jakość kodu [1 pkt]
- Wnioski [1 pkt]


In [1]:
# do  wizualizacji
#  https://stackoverflow.com/questions/50107530/how-to-render-openai-gym-in-google-colab
!apt-get install python-opengl -y
!apt install xvfb -y
!pip install pyvirtualdisplay
!pip install piglet
from pyvirtualdisplay import Display
Display().start()
from IPython import display
import matplotlib.pyplot as plt
%matplotlib inline

import numpy as np
import gym
import random
import time

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
E: Unable to locate package python-opengl
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
xvfb is already the newest version (2:21.1.4-2ubuntu1.7~22.04.2).
0 upgraded, 0 newly installed, 0 to remove and 15 not upgraded.


In [2]:
!pip install gym --upgrade



In [10]:
# auxiliary function for tests
def get_state_info_taxi(state:int):
    state_info = state
    destination = state_info%4
    state_info = (state_info - destination)//4
    passenger_location = state_info%5
    state_info = (state_info - passenger_location)//5
    taxi_col = state_info%5
    state_info = (state_info - taxi_col)//5
    taxi_row = state_info
    return f"Destination: {destination}, Passenger_location: {passenger_location}, Taxi_row: {taxi_row}, Taxi_col: {taxi_col}"


class QLearningSolver:
    """Class containing the Q-learning algorithm that might be used for different discrete environments."""

    def __init__(
        self,
        observation_space: int,
        action_space: int,
        learning_rate: float = 0.1,
        gamma: float = 0.9,
        epsilon: float = 0.1,
        env = gym.make("Taxi-v3")
    ):
        self.observation_space = observation_space
        self.action_space = action_space
        self.learning_rate = learning_rate
        self.gamma = gamma
        self.epsilon = epsilon
        self.env = env
        self.q_table = np.zeros([self.observation_space, self.action_space])  # Q-Table initiation
        self.rendered_images = []
        self.is_game_solved = False

    def build_q_table(self, episodes: int, steps_number: int, print_plot = False):
        penalties_per_episode = []
        timesteps_per_episode = []
        rewards_per_episode = []
        for _ in range(episodes):
            state = self.env.reset()
            print(state)
            penalties = 0
            rewards = 0
            for steps in range(steps_number):
                if random.random() < self.epsilon:
                    action = self.env.action_space.sample()
                else:
                    print(self.q_table[state])
                    if not any(self.q_table[state]):  # when Q-table empty for current state to better exploration
                        action = self.env.action_space.sample()
                    else:
                        action = np.argmax(self.q_table[state])

                new_state, reward, terminated, info = env.step(action)
                if reward < 0:
                    penalties += 1
                if reward > 0:
                    rewards += 1
                q_for_current_state = self.q_table[state][action]
                q_for_next_state = self.q_table[new_state][np.argmax(self.q_table[new_state])]
                reward =  q_for_current_state + self.learning_rate * (reward + self.gamma * q_for_next_state- q_for_current_state)
                self.update(state, action, reward)
                state = new_state
                if terminated:
                    break

            rewards_per_episode.append(rewards)
            penalties_per_episode.append(penalties)
            timesteps_per_episode.append(steps)

        if print_plot:
            fig, ax = plt.subplots(figsize = (20, 4))
            ax.set_title("Timesteps to complete level")
            ax.plot(range(episodes), timesteps_per_episode)
            plt.show()

            fig, ax = plt.subplots(figsize = (20, 4))
            ax.set_title("Number of penalties per episode")
            line1, = ax.plot(range(episodes), penalties_per_episode)
            line2, = ax.plot(range(episodes), rewards_per_episode)
            line1.set_label('Penalties')
            line2.set_label('Rewards')
            ax.legend()
            plt.show()

    def test(self, state: np.ndarray):
        self.rendered_images.append(env.render('rgb_array'))
        action = self.get_best_action(state)
        new_state, reward, terminated, info = env.step(action)
        self.rendered_images.append(env.render('rgb_array'))

        limit = 30  # only for 'CliffWalking-v0' where there's no truncation
        while not terminated:
            action = self.get_best_action(new_state)
            new_state, reward, terminated, info = env.step(action)
            self.rendered_images.append(env.render('rgb_array'))
            limit -= 1
            if limit == 0 and env.spec.id == 'CliffWalking-v0': return False

        self.is_game_solved = not info['TimeLimit.truncated']

        # now works only for discrete games such as 'CliffWalking-v0', 'Taxi-v3' and 'FrozenLake-v1' ('4x4', '8x8')
        if (env.spec.id == 'CliffWalking-v0' and new_state != 36):
            pass
        elif (env.spec.id == 'Taxi-v3' and state%4 == ((new_state- new_state%4)//4)%5):
            pass
        elif (env.spec.id == 'FrozenLake-v1') and (env.observation_space.n-1) == new_state:
            pass
        else:
            self.is_game_solved = False
        return self.is_game_solved

    def print_q_table(self):
        print(self.q_table)

    def __call__(self, state: np.ndarray, action: np.ndarray) -> np.ndarray:
        """Return Q-value of given state and action."""
        return self.q_table[state][action]

    def update(self, state: np.ndarray, action: np.ndarray, reward: float) -> None:
        """Update Q-value of given state and action."""
        self.q_table[state][action] = reward

    def get_best_action(self, state: np.ndarray) -> np.ndarray:
        """Return action that maximizes Q-value for a given state."""
        if not any(self.q_table[state]): # if no best action then random
            return self.env.action_space.sample()
        return np.argmax(self.q_table[state])

    def animation(self):
        img = plt.imshow(self.rendered_images[0])
        for render in self.rendered_images:
            img.set_data(render)
            display.display(plt.gcf())
            display.clear_output(wait=True)
            time.sleep(0.4)

    def __repr__(self):
        """Elegant representation of Q-learning solver."""
        self.animation()
        if self.is_game_solved:
            return "Game solved correctly!"
        else:
            return "Game hasn't been solved!"

    def __str__(self):
        return self.__repr__()

In [8]:
env = gym.make("Taxi-v3")
print(env.reset())
action = env.action_space.sample()
print(action)

print(env.step(action))
'''
qls = QLearningSolver(env.observation_space.n, env.action_space.n, env=env)
qls.build_q_table(2500, 100)
qls.print_q_table()'''

(342, {'prob': 1.0, 'action_mask': array([1, 1, 0, 1, 0, 0], dtype=int8)})
2
(342, -1, False, False, {'prob': 1.0, 'action_mask': array([1, 1, 0, 1, 0, 0], dtype=int8)})


'\nqls = QLearningSolver(env.observation_space.n, env.action_space.n, env=env)\nqls.build_q_table(2500, 100)\nqls.print_q_table()'

# Eksperymenty

In [11]:
env = gym.make("Taxi-v3")
qls = QLearningSolver(env.observation_space.n, env.action_space.n, env=env)
qls.build_q_table(2500, 100, print_plot = True)
print("Game solved: ", qls.test(env.reset()))

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
[-0.19 -0.1  -0.1  -0.1  -1.   -1.  ]
[-0.1 -0.1 -0.1 -0.1  0.   0. ]
[-0.1 -0.1 -0.1 -0.1 -1.   0. ]
[-0.1 -0.1 -0.1 -0.1 -1.  -1. ]
[-0.1 -0.1 -0.1 -0.1  0.   0. ]
[-0.1 -0.1 -0.1 -0.1 -1.   0. ]
[-0.1 -0.1 -0.1 -0.1 -1.  -1. ]
[-0.199 -0.1   -0.1   -0.1   -1.    -1.   ]
[-0.19 -0.1  -0.1  -0.1  -1.   -1.  ]
[-0.1  0.   0.  -0.1  0.  -1. ]
[ 0.   0.   0.  -0.1  0.   0. ]
[-0.1 -0.1  0.  -0.1  0.  -1. ]
[-0.1 -0.1 -0.1 -0.1  0.  -1. ]
[-0.1 -0.1 -0.1 -0.1 -1.  -1. ]
[-0.19 -0.19 -0.1  -0.1  -1.   -1.  ]
[-0.19  -0.19  -0.199 -0.1   -1.909 -1.   ]
[-0.19 -0.1  -0.19 -0.1  -1.   -1.  ]
[-0.19 -0.1  -0.1  -0.1  -1.   -1.  ]
[-0.1  0.  -0.1  0.   0.   0. ]
[0. 0. 0. 0. 0. 0.]
[ 0.  -0.1  0.   0.   0.   0. ]
[-0.1 -0.1 -0.1  0.   0.   0. ]
[-0.1  0.   0.   0.   0.   0. ]
[0. 0. 0. 0. 0. 0.]
[ 0.   0.   0.  -0.1  0.   0. ]
[-0.1 -0.1  0.   0.   0.   0. ]
[-0.1 -0.1 -0.1 -0.1  0.   0. ]
[-0.1 -0.1 -0.1 -0.1 -1.   0. ]
[-0.1 -0.

KeyboardInterrupt: ignored

In [None]:
print(qls)

In [None]:
successes = 0
for _ in range(100):
    if qls.test(env.reset()):
        successes +=1
print("Average_accuracy: ", successes/100)

In [None]:
env = gym.make("CliffWalking-v0")
qls = QLearningSolver(env.observation_space.n, env.action_space.n, env=env)
qls.build_q_table(600, 100, print_plot = True)
print("Game solved: ", qls.test(env.reset()))

In [None]:
print(qls)

In [None]:
env = gym.make("FrozenLake-v1", map_name="4x4", is_slippery=False)
qls = QLearningSolver(env.observation_space.n, env.action_space.n, env=env)
qls.build_q_table(250, 100, print_plot = True)
print("Game solved: ", qls.test(env.reset()))

In [None]:
print(qls)

In [None]:
env = gym.make("FrozenLake-v1", map_name="8x8", is_slippery=False)
qls = QLearningSolver(env.observation_space.n, env.action_space.n, env=env)
qls.build_q_table(1500, 100, print_plot = True)
print("Game solved: ", qls.test(env.reset()))

In [None]:
print(qls)

# Wnioski

Zaimplementowany algorytm Q-learningu działa poprawnie. W zależności od złożoności środowiska potrzebujemy różnej liczby  epizodów oraz ruchów na epizod.
