In [None]:
import numpy as np
import matplotlib.pyplot as plt
import csv
!pip install gymnasium
import gymnasium as gym
from tqdm import tqdm
import pickle

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
# number of buckets for position and velocity
pos_bucket_number = 20
vel_bucket_number = 20

These bucket numbers are used to divide the observation space into smaller partitions or bins, representing different states. By dividing the observation space into smaller buckets, learning algorithms or strategies can better understand the states and adjust their actions accordingly. Smaller buckets help capture the state of the game in more detail, allowing for more precise movements and strategies to be developed.

In [None]:
action_space = [0, 1, 2]

 There are 3 discrete deterministic actions:
 
  0: Accelerate to the left

 1: Don’t accelerate

2: Accelerate to the right

In [None]:
pos_space = np.linspace(-1.2, 0.6, pos_bucket_number)
vel_space = np.linspace(-0.07, 0.07, vel_bucket_number)

bins for position and velocity

position is between [-1.2, 0.6]

and velocity [-0.07, 0.07]

In [None]:
def get_state(observation):
    # Car Position, Car Velocity
    (pos, vel) = observation
    # return bins of current observation state
    pos_bin = int(np.digitize(pos, pos_space))
    vel_bin = int(np.digitize(vel, vel_space))

    return (pos_bin, vel_bin)

By partitioning continuous observation data (such as position and velocity) into bins, we can obtain a discrete state representation. This state representation facilitates the understanding of the environment by reinforcement learning algorithms and enables the agent to make better decisions.

In [None]:
# find max action from Q (Q is a dict)
def max_action(Q, state, actions=action_space):
    values = np.array([Q[state, a] for a in actions])
    action = np.argmax(values)

    return action

The max_action() function is used to find the action with the highest value in the Q-table. This allows reinforcement learning algorithms to select the best action based on the current state.

In [None]:
# This function is a tool used to track and visualize the performance of a reinforcement learning algorithm.
def plot(scores, epsilons, num_of_episodes):
    fig, (ax1, ax2) = plt.subplots(nrows=2, ncols=1, figsize=(20, 12), sharex=True)

    ax1.plot(scores, color='blue')
    ax1.set_xlabel('Episode', fontsize=16)
    ax1.set_ylabel('Score', color='blue', fontsize=16)
    ax1.tick_params(axis='y', labelcolor='blue', labelsize=14)
    ax1.set_ylim(-1000, 0)
    ax1.set_yticks(range(-1000, 1, 100))
    ax1.set_xlim(0, num_of_episodes)
    ax1.set_xticks(np.arange(0, num_of_episodes, 100))
    ax1.grid(alpha=0.4)

    ax2.plot(epsilons, color='red', linewidth=3)
    ax2.set_ylabel('Epsilon', color='red', fontsize=16)
    ax2.tick_params(axis='y', labelcolor='red', labelsize=14)
    ax2.set_ylim(0, 1)
    ax2.grid(alpha=0.4)

    mean_scores = [np.mean(scores[max(0, i - 99):(i + 1)]) for i in range(len(scores))]

    ax3 = ax1.twinx()
    ax3.plot(mean_scores, color='green', linewidth=3)
    ax3.set_ylabel('Mean score (last 100 episodes)', color='green', fontsize=16)
    ax3.tick_params(axis='y', labelcolor='green', labelsize=14)
    ax3.set_ylim(-1000, 0)
    ax3.set_yticks(range(-1000, 1, 100))
    ax3.spines['right'].set_visible(False)
    ax3.spines['left'].set_visible(False)
    ax3.yaxis.set_label_position('right')
    ax3.yaxis.tick_right()

    plt.title('num_of_episodes = ' + str(num_of_episodes), fontsize=20, color='green')

    plt.tight_layout()
    plt.savefig('scores_mcc.png')
    plt.show()

In [None]:
if __name__ == '__main__':
    num_of_episodes = 5000 #the total number of episodes (iterations) the agent will run during the learning process.
    num_of_steps_per_episode = 1000 #represents the maximum number of steps the agent can take within each episode. If the agent doesn't reach the goal or terminate before reaching this step limit, the episode ends
    alpha = 0.1  # is the learning rate parameter used in the Q-learning algorithm. It determines the weight given to the new information obtained from each learning step.
    gamma = 0.9  # is the discount factor parameter used in the Q-learning algorithm. It determines the importance of future rewards compared to immediate rewards. A higher value of gamma gives more weight to future rewards.
    eps = 1.  # the exploration parameter or epsilon, represents the probability of an agent taking a random action

In [None]:
env = gym.make('MountainCar-v0', max_episode_steps=num_of_steps_per_episode)

In the Mountain Car environment, a car tries to climb a hill but doesn't have enough power to do so on its own. The car needs to apply the gas at the right time and control the acceleration correctly to reach the top of the hill. The environment is defined by the car's current position, velocity, and three possible actions to choose from (left, stay, right).

This line creates the environment and assigns it to the env variable, making it ready to perform actions and interact with the environment.

In [None]:
    states = list()
    Q = {}

    scores = np.zeros(num_of_episodes)
    epsilons = np.zeros(num_of_episodes)

    for position in range(pos_bucket_numer + 1):
        for velocity in range(vel_bucket_number + 1):
            states.append((position, velocity))

    for state in states:
        for action in action_space:
            Q[state, action] = 0

This loop initializes the Q-values in the dictionary Q with a value of 0 for each state-action pair.

This code snippet prepares an empty dictionary Q to store the Q-values and creates the list of possible states in the environment along with their initial values.

In [None]:
 # create progress bar
progress_bar = tqdm(total=num_of_episodes, desc='Learning')

This progress bar is a tool used to track the progress of the learning process in the reinforcement learning algorithm.

In [None]:
    # learning loop
    for episode in range(num_of_episodes):
        terminated = False
        truncated = False

        obs, _ = env.reset()
        state = get_state(obs)

        score = 0

        while not (terminated or truncated):
            if np.random.random() < eps:
                action = np.random.choice(action_space)
            else:
                action = max_action(Q, state)

            obs_new, reward, terminated, truncated, info = env.step(action)
            score += reward

            # calculate Q
            state_new = get_state(obs_new)
            action_new = max_action(Q, state)
            Q[state, action] = Q[state, action] + alpha*(reward + gamma*Q[state_new, action_new] - Q[state, action])

            state = state_new

**The terminated** variable represents the condition where the car has reached the goal and the episode is completed successfully. If the car reaches the top of the hill and accomplishes the task, the terminated value becomes True.

**The truncated** variable is used when the episode has a maximum step limit (num_of_steps_per_episode). If the car fails to reach the goal or satisfy other termination conditions within the maximum step limit, the truncated value becomes True. In this case, the episode ends, but it is not considered a successful completion.

Q[state_new, action_new] - Q[state, action]

we had calculated the error 

In [None]:
        # decrease epsilon over time (in halfway selection strategy will be almost entirely greedy)
        eps = eps - 2/num_of_episodes if eps > 0.01 else 0.

It decreases the value of epsilon over time. This allows the agent to focus more on making accurate predictions by reducing the exploration rate. Here, the epsilon value is decreased by 2/num_of_episodes in each iteration. However, if the epsilon value is less than 0.01, it is not decreased further and the minimum value of 0.01 is used.

In [None]:
scores[episode] = score
epsilons[episode] = eps


It saves the score and epsilon values obtained for each episode in the 'scores' and 'epsilons' arrays, respectively. These arrays can be used to track the progress of the learning process and visualize the results.

In [None]:
        # per one hundred episodes calculate mean score,
        # which is shown on the progress bar
        if episode % 100 == 0:
            # mean of last 100 episodes
            mean = np.mean(scores[max(0, episode - 100):(episode + 1)])


In [None]:
        # Update the progress bar
        progress_bar.set_postfix(epsilon=f'{eps:.2f}',
                                 score=str(int(score)),
                                 mean_score=str(int(mean)))
        progress_bar.update(1)

    env.close()

    plot(scores, epsilons, num_of_episodes)

It updates and advances the progress bar. 

After completing the learning loop, it closes the environment.