# Assignment 16 - Cart Pole Q-learning
### Author: Michał Krępa 6475383

This project is a solution to the assignment given during AI For Robotics I course at UniGe.
The project number is 16.

## Project Structure
The project consists of two 3 components:
- `CartPole`: A wrapper class for OpenAI Gymnasium CartPole environment
- `Q_learnign`: Agent which implements Qlearnig algorithm

- `main.py`: Main function with all the variables

## Requirements
To successfully run the project few dependencies are needed.

- `gymnasium` - Library from OpenAI containing API with required Environments
- `numpy` -


In [1]:
!pip install gymnasium
!pip install numpy



In [2]:
import gymnasium as gym
import numpy as np
import random
import pickle

### CartPole class
This is a wrapper class for  Gymnasium's CartPole environmnent. The reason I had in mind while creating this, was to follow the example given during lab sessions. While creating it I spotted the main challenge I had with this project - The Environment generates a continuous observation space, which later on has to be turned discrete, if we want to have some efficient computations about it. After some struggle, I ended up with the observation space digitized into 10 pieces.

Additionally this wrapper function has some member functions that are meant to return specific values, more details in the code.

The object of this class can be initialized using the flag of `is_learning` set to `False` to get the visualizations of the CartPole environment. Nevertheless, this is not recommended while training the model, as it may prolong the training time effectively.

In [3]:
class CartPole:
    """
        Wrapper class for CartPole environment

        Attributes:
            _env: The Gym environment for the Cart Pole game.
            _curr_state (np.array): The current state of the environment.
            _isTerminated (bool): Flag indicating whether the current episode has ended.
    """
    def __init__(self, is_learning = False):
        """
        Initializes the CartPole environment

        Args:
            is_learning (bool): Flag to determine if the environment is for learning or visualization.
        """
        # Define whether we want to visualize
        if is_learning:
            self._env = gym.make('CartPole-v1')
        else:
            self._env = gym.make('CartPole-v1', render_mode = "human")
        self._currState = self._env.reset()[0]
        self._isTerminated = False


    def digitize_state(self, state):
        """
        Digitizes the continuous state into discrete values for Q-table.
        
        Args:
            state (np.array): The current state of the environment.

        Returns:
            list: A list representing the digitized state.
        """
        pos_space = np.linspace(-2.4, 2.4, 10)
        vel_space = np.linspace(-4, 4, 10)
        ang_space = np.linspace(-.2095, .2095, 10)
        ang_vel_space = np.linspace(-4, 4, 10)
        
        new_state_p = np.digitize(state[0], pos_space)
        new_state_v = np.digitize(state[1], vel_space)
        new_state_a = np.digitize(state[2], ang_space)
        new_state_av= np.digitize(state[3], ang_vel_space)
        new_state_dig = [new_state_p, new_state_v, new_state_a, new_state_av]
        return new_state_dig

    def do_action(self, action):
       """
        Performs a step in the environment. Gets the values for Observation, reward and checks if the game is over

        Args:
            action (int): an action passed to the environment
        Returns:
            new_state: Discrete state after the action is taken
            reward: Reward basing on the taken action
       """
       new_state, reward, self._isTerminated, _, _ = self._env.step(action)       
       # Update the current state
       self._currState = new_state
       return self.digitize_state(new_state), reward
    
    def reset_env(self):
        """ Resets the environment """
        self._currState = self._env.reset()[0]
        self._isTerminated = False

    def get_current_state(self):
        """ Gets the discrete state of the environment """
        return self.digitize_state(self._currState)
    
    def get_action_space(self):
        """Returns the size of the action space"""
        return self._env.action_space.n
    
    def is_game_over(self):
        """ Returns boolean determining if game is over"""
        return self._isTerminated

### Q_learning Agent
The class that contains the Q_learning agent follows, or at least attempts to follow the style shown during lab sessions. Inside this class there are few interesting things:

- `_Q_table` - the object of Q-table is not a dictionary of dictionaries as it was done during the class, as I found it difficult to do that for a 4 dimensional state space (or observation space). I am also not that good with python, so basically tried to create a 4-dim array of lists + action space (which in our case is pretty simple. The cart can only go left or right, so two values). This object ends up being initialized as `11x11x11x11x2` array, which in my opinion is quite big, but thought of it as necessary to get rather precise results in the longer run.


- **Policy** - For the policy I am using the *Epsilon Greedy Policy* with an additional modifications. I wanted to train my model, so added there a boolean for learning, to distinguish if we want to explore or just use the values from the Q-table

- Epsilon Decay Rate - While doing my own research for this project I noticed that Epsilon Greedy Policy can be adjusted with additional use of the Epsilon that is decaying over the episodes, having first fully random choice, and then turning the use to rely more and more on the Q-table. I was unsure whether I could use this or not, but In the end it can be found in the code with a proper comments next to it. If the user would like to turn it of, they can simply comment that one line or remove it from the code

```python
            # Epsilon Decay rate 
            self._epsilon = max(self._epsilon - self._epsilon/sum(episodes), 0)
```

- `apply` - this function does the whole simulation. It works in two modes. Depending on `isLearning` flag, the first mode is meant for learning, second one is meant to use the Q-table to "show off" the capabilities of trained model.
if we run the agent with `isLearning` set to `True` at the end we will receive an object of `Q_table.pkl `. This is the filled Q-table that will be loaded when `isLearning` will be set to `False`.  Finally this function also provides some data for statistics to see whether our model is learning efficiently or not. Every 100 episodes it prints out the message as in the example below:
```
Episode: 81500 Rewards: 142.0  Epsilon: 0.44  Mean Rewards 80.8
```
Where Episode - is the current episode number, Rewards go for the obtained rewards for that current episode, Epsilon shows the current value of the Epsilone and Mean rewards showing the average score for 100 episodes.


In [10]:



class Q_learning:
    """
        Implementation of Q-learning algorhitm for the CartPole environment.

        Attributes:
            _env (cartPoleEnv): Cart Pole env
            _gamma (float):   The discount factor
            _alpha (float): The learning rate.
            _epsilon (float): The exploration rate.
            _episodes (int): The number of episodes for training
            _is_learning (bool): Flag indicating whether the agent is in learning mode.
            _Q_table (np.array): The Q-table, stores state-action values
    """
    def __init__(self, env, gamma, alpha, epsilon, episodes, isLearning = True):
        """
            Initializes Q-learning agent.

            Works in two ways. When isLearning flag is set True,
            it initializes Q-table as a empty np.array, else it tries to load it from file.
            Args:
                env (cartPoleEnv): The Cart Pole environment.
                gamma (float): The discount factor.
                alpha (float): The learning rate.
                epsilon (float): The exploration rate.
                episodes (int): The number of episodes for training.
                isLearning (bool): Flag to determine if the agent is in learning mode.
        """
        self._env = env
        self._gamma = gamma
        self._alpha = alpha
        self._epsilon = epsilon
        self._episodes = episodes
        self._isLearning = isLearning

        # Initialize Q_Table
        if self._isLearning: 
            # State is given as continuous set of variables
            # we need to cut it into pieces to be able to learn
            # The limits here are the limits for our game to be over
            pos_space = np.linspace(-2.4, 2.4, 10)
            vel_space = np.linspace(-4, 4, 10)
            ang_space = np.linspace(-.2095, .2095, 10) #value in rad
            ang_vel_space = np.linspace(-4, 4, 10)
            self.Q_table = np.zeros((len(pos_space)+1, len(vel_space)+1, 
                                    len(ang_space)+1, len(ang_vel_space)+1, self._env.get_action_space())) #11x11x11x11x2
        else:
            #Load the model
            f = open('Q_table.pkl', 'rb')
            self.Q_table = pickle.load(f)
            f.close()

    def policy(self, state):
        """ 
        Epsilon Greedy Policy

        Function works in two modes:
            If isLearning is True, decides on random whether to choose random action or
            the best action according to the Q_table. The higher epsilon, the higher chance of getting random results
            When isLearning is set to False, policy only chooses the values basing on the Q_table.
        
        Args:
            state: Discrete state of the environment

        """
        if self._isLearning and np.random.random() < self._epsilon:
            # Choose an action at random with probability epsilon
            return random.choice([0,1]) # only two actions - left or right
        else:
            # Choose the best action accordin to Q_table with probability 1-epsilon
            return np.argmax(self.Q_table[state[0], state[1], state[2], state[3], :])

    def apply(self):
        """
        Executes Q-learning algorhithm over a specified number of episodes.

        This method runs the Q-learning algorithm, updating the Q-table based on the interactions
        with the environment. It implements an epsilon-greedy policy for action selection and applies 
        temporal difference learning for updating the Q-table.
        Additionally, the method also handles epsilon decay.
         
        For exploration over time and prints out the progress every 100 episodes.

        The method performs the following steps in each episode:
        - Interacts with the environment to obtain states, rewards, and new states.
        - Updates the Q-table using the temporal difference
        - Applies epsilon decay to gradually shift from exploration to exploitation.
        - Tracks and logs the rewards for each episode.

        At the end of the training, the updated Q-table is saved to a file (if in learning mode), 
        and the average reward across all episodes is calculated and printed to the output.
        """

        total_episode_rewards = []  # Rewards of all runs
        for episode in range(self._episodes):
            episode_rewards = [] # rewards for each episode
            rewards = 0
            while not self._env.is_game_over():
                # get the current state
                curr_state = self._env.get_current_state()
                action = self.policy(curr_state)
                next_state, reward = self._env.do_action(action)
                # Choose maximum Q-value for next state
                max_next_value = np.max(self.Q_table[next_state[0], next_state[1], next_state[2], next_state[3], :])
                # Temporal difference update TODO improve readability
                self.Q_table[curr_state[0], curr_state[1], curr_state[2], curr_state[3], action] = self.Q_table[curr_state[0], curr_state[1], curr_state[2], curr_state[3], action] +\
                self._alpha * ( reward + self._gamma * max_next_value -  self.Q_table[curr_state[0], curr_state[1], curr_state[2], curr_state[3], action]) 
                rewards += reward

            # Reset before new episode
            self._env.reset_env()

            # Epsilon Decay rate 
            self._epsilon = max(self._epsilon - self._epsilon/self._episodes, 0)
            
            # Get episode  rewards
            total_episode_rewards.append(rewards)
            mean_rewards = np.mean(total_episode_rewards[len(total_episode_rewards)-100:])
            # For every 100 display rewards
            if episode % 1000 == 0:
                print(f'Episode: {episode} Rewards: {rewards}  Epsilon: {self._epsilon:0.2f}  Mean Rewards {mean_rewards:0.1f}')
            total_episode_rewards.append(np.sum(episode_rewards))
        
        # Save Q table to file
        if self._isLearning:
            f = open('Q_table.pkl','wb')
            pickle.dump(self.Q_table, f)
            f.close()

        # Calculate the mean
        print("Average reward after all episodes: ", np.mean(total_episode_rewards))


### The main function
Finally, the main function defines the constant variables as `gamma` `alpha` `epsilon` or the amount of `episodes`.
For this project I had chosen a big amount of episodes, justifying it by the size of the Q-table. A huge object like that will require many steps and episodes to be eventually filled and therefore bigger amount of steps.

If I were to decide, this is probably not the best setup for this environment and Q-learning, as I was playing with different values I could get various results, sometimes having more than 100 000 episodes lead me to obtaining even +1k rewards for an episode, but in the end this works fine, after around 20 k we see the increase of average rewards and this continues till the end of simulation.

In [12]:
def main():
    gamma = 0.7 # Discount rate
    alpha = 0.1 # Learning rate
    epsilon = 1 # How much we want to explore 
    episodes = 100000 # Number of episodes

    isLearning = False # Set to False to test the trained model

    cart_pole = CartPole(isLearning)
    agent = Q_learning(cart_pole, gamma, alpha, epsilon, episodes, isLearning)
    agent.apply()

if __name__ == "__main__":
    main()


Episode: 0 Rewards: 262.0  Epsilon: 1.00  Mean Rewards 262.0


KeyboardInterrupt: 