In [None]:
%pip install gymnasium[classic-control]
%pip install tensorflow
%pip install matplotlib
%pip install tqdm

import numpy as np
import gymnasium as gym
import matplotlib.pyplot as plt
import random
import time
from gymnasium import wrappers
from collections import deque
from collections import defaultdict
from tqdm import tqdm

In [20]:
class RLAgent:
    def __init__(
        self,
        learning_rate: float,
        initial_epsilon: float,
        epsilon_decay: float,
        final_epsilon: float,
        action_space: int,
        discount_factor: float,

    ):
        """Initialize a Reinforcement Learning agent with an empty dictionary
        of state-action values (q_values), a learning rate and an epsilon.

        Args:
            learning_rate: The learning rate
            initial_epsilon: The initial epsilon value
            epsilon_decay: The decay for epsilon
            final_epsilon: The final epsilon value
            action_space: The number of action for the environment
            discount_factor: The discount factor for computing the Q-value
        """
        self.actions = action_space
        self.q_values = defaultdict(lambda: np.zeros(self.actions))

        self.lr = learning_rate
        self.discount_factor = discount_factor

        self.epsilon = initial_epsilon
        self.epsilon_decay = epsilon_decay
        self.final_epsilon = final_epsilon
        self.training_error = []


    def get_action(self, obs: np.ndarray) -> int:
        """
        Returns the best action with probability (1 - epsilon)
        otherwise a random action with probability epsilon to ensure exploration.
        """
        greedy = random.random() > self.epsilon

        # exploitation
        if greedy:
            obs = tuple(obs)
            # use the train net to get the action value given a state
            return int(np.argmax(self.q_values[obs]))

        # exploration
        else:
             return np.random.choice(self.actions)

    def update(
        self,
        obs: np.ndarray,
        action: int,
        reward: float,
        terminated: bool,
        next_obs: np.ndarray,
    ):
        """Updates the Q-value of an action."""

        # convert np.ndarray to hashable object
        obs = tuple(obs)
        next_obs = tuple(next_obs)


        # get the future q_value for the current observation
        future_q_value = (not terminated) * np.max(self.q_values[next_obs])

        # get the difference between current q_value and next observation
        temporal_difference = (
            reward + self.discount_factor * future_q_value - self.q_values[obs][action]
        )

        # update the q values for the current obseervation and action
        self.q_values[obs][action] = (
            self.q_values[obs][action] + self.lr * temporal_difference
        )

        # store the training error, the goal is to reduce it
        self.training_error.append(temporal_difference)

    def decay_epsilon(self):
        """ Decay epsilon value by a constant"""

        self.epsilon = max(self.final_epsilon, self.epsilon * self.epsilon_decay)

In [21]:
def plot_stats(env):
    rolling_length = 500
    fig, axs = plt.subplots(ncols=3, figsize=(12, 5))
    axs[0].set_title("Episode rewards")
    # compute and assign a rolling average of the data to provide a smoother graph
    reward_moving_average = (
        np.convolve(
            np.array(env.return_queue).flatten(), np.ones(rolling_length), mode="valid"
        )
        / rolling_length
    )
    axs[0].plot(range(len(reward_moving_average)), reward_moving_average)
    axs[1].set_title("Episode lengths")
    length_moving_average = (
        np.convolve(
            np.array(env.length_queue).flatten(), np.ones(rolling_length), mode="same"
        )
        / rolling_length
    )
    axs[1].plot(range(len(length_moving_average)), length_moving_average)
    axs[2].set_title("Training Error")
    training_error_moving_average = (
        np.convolve(np.array(agent.training_error), np.ones(rolling_length), mode="same")
        / rolling_length
    )
    axs[2].plot(range(len(training_error_moving_average)), training_error_moving_average)
    plt.tight_layout()
    plt.show()


In [23]:
EPISODE = 500_000             # number of episode to play
GAMMA = 0.99                # discount factor
LR = 0.001251              # q-table learning rate
EPS_MAX = 1.0               # Initial exploration probability
EPS_MIN = 0.1           # Final exploration probability
#DECAY = EPS_MAX / (EPISODE / 2)  # reduce the exploration probability over time
DECAY = 0.999

# Start with high exploration probability
epsilon = EPS_MAX

reward_sum = 0
win = 0
scores = list()


env = gym.make('MountainCar-v0')
agent = RLAgent(
    learning_rate=LR,
    initial_epsilon=EPS_MAX,
    epsilon_decay=DECAY,
    final_epsilon=EPS_MIN,
    action_space=env.action_space.n,
    discount_factor=GAMMA,
)

reward_sum = 0
win = 0
time_scores = deque(maxlen=100)


env = gym.wrappers.RecordEpisodeStatistics(env, deque_size=EPISODE)
#for episode in tqdm(range(EPISODE)):
for episode in range(EPISODE):
    obs, info = env.reset()
    done = False

    step = 1
    # play one episode
    while not done:

        # get an action according to epsilon greedy policy
        action = agent.get_action(obs)

        # execute the action
        next_obs, reward, terminated, truncated, info = env.step(action)

        # update the agent
        agent.update(obs, action, reward, terminated, next_obs)

        # update if the environment is done and the current obs
        done = terminated or truncated

        obs = next_obs

        if done:

            # store current time for that episode
            time_scores.append(step)

            # compute avg score
            scores_avg = np.mean(time_scores) * -1

            # if goal increase number of win
            if next_obs[0] >= 0.5:
                win += 1

            if episode % 200 == 0:
                print(f"Episode {episode}/{EPISODE}, e {agent.epsilon:.6f}, avg reward {scores_avg:.2f}, state {next_obs}, time {step}, win {win}")
            break

        # increment step
        step+=1


    agent.decay_epsilon()

# plot stats
plot_stats(env)

Episode 0/500000, e 1.000000, avg reward -200.00, state [-0.5191217   0.00074906], time 200, win 0
Episode 200/500000, e 0.818649, avg reward -200.00, state [-0.4309433  -0.00298946], time 200, win 0
Episode 400/500000, e 0.670186, avg reward -200.00, state [-0.6146651   0.01069279], time 200, win 0
Episode 600/500000, e 0.548647, avg reward -200.00, state [-0.67314684  0.0033727 ], time 200, win 0
Episode 800/500000, e 0.449149, avg reward -200.00, state [-0.589977    0.00986916], time 200, win 0
Episode 1000/500000, e 0.367695, avg reward -200.00, state [-0.5316735   0.00805567], time 200, win 0


KeyboardInterrupt: ignored