In [None]:
import pickle

class Q_learning_agent:
    """Q Learning agent

    Attributes:
        env: OpenAI Gym environment
        q: Q table
        pos_space: position space
        vel_space: velocity space
        angle_space: pole angle space
        angular_vel_space: pole angular velocity space

        alpha: learning rate
        gamma: discount factor
        epsilon: exploration rate
        epsilon_decay: exploration rate decay
    """
    def __init__(self, env):
        # Environment
        self.env = env

        # Divide the continuous state space, position, velocity, pole angle, and pole angular velocity, into discrete bins using linspace
        # These ranges are narrower than the environment's actual limits to keep bins manageable
        self.pos_space = np.linspace(-2.4, 2.4, 7)
        self.vel_space = np.linspace(-4.0, 4.0, 7)
        self.angle_space = np.linspace(-.2095, .2095, 7)
        self.angular_vel_space = np.linspace(-4.0, 4.0, 7)

        # Initialize Q table
        # Add +1 because np.digitize returns values from 0 to len(bins)
        self.q = np.zeros((len(self.pos_space)+1, len(self.vel_space)+1, len(self.angle_space)+1, len(self.angular_vel_space)+1, self.env.action_space.n))

        # Hyperparameters
        self.alpha = 0.1 # learning rate
        self.gamma = 0.95 # discount factor
        self.epsilon = 0.1 # exploration rate
        self.epsilon_decay = 0.00001 # exploration rate decay

        self.rng = np.random.default_rng() # Random number generator


    def run_one_episode(self, is_training=True, render=False):
        """
        Run one episode of the environment

        Args:
            is_training: boolean to indicate if the agent is training or testing
        """
        # Reset environment and get initial state
        state = self.env.reset()

        # Discretize the continuous state variables into bins using np.digitize
        state_p = np.digitize(state[0], self.pos_space)
        state_v = np.digitize(state[1], self.vel_space)
        state_a = np.digitize(state[2], self.angle_space)
        state_av = np.digitize(state[3], self.angular_vel_space)

        # Initialize variables
        terminated = False
        rewards = 0

        # Loop through one episode
        while(not terminated):

            if render:
              self.env.render()

            # Choose action based on epsilon-greedy policy: Explore or Exploit
            if is_training and np.random.uniform() < self.epsilon:
                action = self.env.action_space.sample() # Explore
            else:
                action = np.argmax(self.q[state_p, state_v, state_a, state_av, :]) # Exploit

            # Choose action based on Q table


            # Take action, get new state and reward
            new_state, reward, terminated, _ = self.env.step(action)
            new_state_p = np.digitize(new_state[0], self.pos_space)
            new_state_v = np.digitize(new_state[1], self.vel_space)
            new_state_a = np.digitize(new_state[2], self.angle_space)
            new_state_av = np.digitize(new_state[3], self.angular_vel_space)
            # Update Q table based on Bellman equation
            if is_training:
                self.q[state_p, state_v, state_a, state_av, action] = self.q[state_p, state_v, state_a, state_av, action] + self.alpha * (
                    reward + self.gamma*np.max(self.q[new_state_p, new_state_v, new_state_a, new_state_av,:]) - self.q[state_p, state_v, state_a, state_av, action]
                )
            # Update to new state
            state = new_state
            state_p = new_state_p
            state_v = new_state_v
            state_a = new_state_a
            state_av = new_state_av
            rewards += reward
        return rewards


    def train(self, min_rewards=300, plot=False):
        """
        Train the agent

        Args:
            min_rewards: minimum rewards to stop training
            plot: boolean to indicate if the rewards should be plotted
        """
        rewards_per_episode = []
        i = 0

        # Loop through episodes
        while True:
            rewards = self.run_one_episode() # Run one episode
            rewards_per_episode.append(rewards) # Append rewards
            mean_rewards = np.mean(rewards_per_episode[max(0, len(rewards_per_episode)-100):]) # Mean rewards of last 100 episodes
            if i%100==0:
                print(f'Episode: {i} Rewards: {rewards}  Epsilon: {self.epsilon:0.2f}  Mean Rewards {mean_rewards:0.1f}')
            if mean_rewards > min_rewards:
                break
            self.epsilon = max(self.epsilon - self.epsilon_decay, 0.1) # Decay epsilon
            i += 1
        self.env.close()

        # plot the rewards
        if plot:
            self.plot_rewards(rewards_per_episode)


    def train_by_episodes(self, num_episodes=1000, plot=False):
        """
        Train the agent by episodes

        Args:
            num_episodes: number of episodes to train
            plot: boolean to indicate if the rewards should be plotted
        """
        rewards_per_episode = []
        for i in range(num_episodes):
            rewards = self.run_one_episode()
            rewards_per_episode.append(rewards)
            mean_rewards = np.mean(rewards_per_episode[max(0, len(rewards_per_episode)-100):])
            if i%100==0:
                print(f'Episode: {i} Rewards: {rewards}  Epsilon: {self.epsilon:0.2f}  Mean Rewards {mean_rewards:0.1f}')
            self.epsilon = max(self.epsilon - self.epsilon_decay, 0.01)
        self.env.close()

        # plot the rewards
        if plot:
            self.plot_rewards(rewards_per_episode)



    def test(self, num_episodes=100, plot=True):
        """
        Test the agent

        Args:
            num_episodes: number of episodes to test
            plot: boolean to indicate if the rewards should be plotted
        """
        rewards_per_episode = []
        for i in range(num_episodes):
            rewards = self.run_one_episode(is_training=False) # Run one episode
            rewards_per_episode.append(rewards) # Append rewards
            print(f'Episode: {i+1} Rewards: {rewards}')
        print(f'Cummulative average rewards: {np.mean(rewards_per_episode)}')

        # plot the rewards
        if plot:
            plt.plot(rewards_per_episode)
            plt.xlabel('Episodes')
            plt.ylabel('Rewards')
            plt.title(f'Rewards vs Episodes)')
            plt.savefig('cartpoleQLAgent_test.png')

        return np.mean(rewards_per_episode)


    def show_action(self):
        """
        Show the action

        """
        # Reset and get initial state
        state = self.env.reset()

        # Discretize state
        state_p = np.digitize(state[0], self.pos_space)
        state_v = np.digitize(state[1], self.vel_space)
        state_a = np.digitize(state[2], self.angle_space)
        state_av = np.digitize(state[3], self.angular_vel_space)

        # Choose action based on Q table
        action = np.argmax(self.q[state_p, state_v, state_a, state_av, :])

        # Return state and action
        return state, action


    def predict(self, state):
        """
        Predict the action

        Args:
            state: state to predict the action
        """
        # Discretize state
        state_p = np.digitize(state[0], self.pos_space)
        state_v = np.digitize(state[1], self.vel_space)
        state_a = np.digitize(state[2], self.angle_space)
        state_av = np.digitize(state[3], self.angular_vel_space)
        action = np.argmax(self.q[state_p, state_v, state_a, state_av, :])
        return action


    #Saves model to preserve the state of QL agent after training
    def save_model(self, filename='cartpoleQLAgent.pkl'):
        """
        Save and load the model

        Args:
            filename: name of the file to save/load the model
        """
        with open(filename, 'wb') as f:
            pickle.dump(self.q, f)


    #Loads a previously saved Q-table from a file
    def load_model(self, filename='cartpoleQLAgent.pkl'):
        """
        Load the model

        Args:
            filename: name of the file to load the model
        """
        with open(filename, 'rb') as f:
            self.q = pickle.load(f)


    def plot_rewards(self, rewards_array):
        """
        Plot the rewards

        Args:
            rewards_array: array of rewards
        """
        mean_rewards = []
        for t in range(len(rewards_array)):
            mean_rewards.append(np.mean(rewards_array[max(0, t-100):(t+1)]))
        plt.plot(mean_rewards)
        plt.xlabel('Episodes')
        plt.ylabel('Mean Rewards')
        plt.title(f'Mean Rewards vs Episodes (lr={self.alpha}, er={self.epsilon:0.2f})')
        plt.savefig('cartpoleQLAgent.png')