In [1]:
import gym
print(gym.__version__)
import sys
print(sys.version)

0.26.2
3.9.4 (tags/v3.9.4:1f2e308, Apr  6 2021, 13:40:21) [MSC v.1928 64 bit (AMD64)]


In [3]:
import random
import gym
import math
import numpy as np
from collections import deque
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Input
from keras.optimizers.schedules import ExponentialDecay
from keras.optimizers import Adam

# Training Parameters
n_episodes = 1000  # Number of episodes to run the training
n_win_ticks = 195  # Number of ticks required to consider the task solved
max_env_steps = None  # Maximum number of steps per episode
gamma = 1.0  # Discount factor for future rewards
epsilon = 1.0  # Exploration rate (initially high to explore more)
epsilon_min = 0.01  # Minimum value for exploration rate
epsilon_decay = 0.995  # Decay rate for epsilon (exploration) over time
alpha = 0.01  # Learning rate for the model optimizer
alpha_decay = 0.01  # Decay for the learning rate
batch_size = 64  # Size of the minibatch for training
monitor = False  # Whether to monitor the environment (rendering)
quiet = False  # Whether to print progress or not

# Environment Parameters
memory = deque(maxlen=100000)  # Replay buffer to store experience
env = gym.make('CartPole-v1', render_mode='human')  # Create the CartPole environment
if max_env_steps is not None:
    env.max_episode_steps = max_env_steps  # Set the max number of steps per episode if defined

# Model Definition
model = Sequential()  # Initialize the neural network model
model.add(Input(shape=(4,)))  # Define the input layer with 4 features for CartPole-v1 state
model.add(Dense(24, activation='relu'))  # Hidden layer with 24 neurons and ReLU activation
model.add(Dense(48, activation='relu'))  # Another hidden layer with 48 neurons
model.add(Dense(2, activation='linear'))  # Output layer with 2 neurons (representing actions)

# Learning Rate Schedule (decay over time)
lr_schedule = ExponentialDecay(
    initial_learning_rate=alpha,
    decay_steps=100000,  # Define how frequently to decay the learning rate
    decay_rate=0.96,     # Define the decay rate for learning rate
    staircase=True       # Whether to apply step-wise decay
)

# Compile the model with Mean Squared Error loss function and Adam optimizer
model.compile(
    loss='mse',
    optimizer=Adam(learning_rate=lr_schedule)  # Use learning rate schedule in optimizer
)

# Store the experience (state, action, reward, next state, done flag)
def remember(state, action, reward, next_state, done):
    memory.append((state, action, reward, next_state, done))

# Choose an action based on the epsilon-greedy policy
def choose_action(state, epsilon):
    # If exploring (epsilon is high), pick a random action
    # If exploiting (epsilon is low), pick the action with the highest predicted reward
    return env.action_space.sample() if (np.random.random() <= epsilon) else np.argmax(model.predict(state, verbose=0))

# Calculate the epsilon value for exploration vs exploitation at time step t
def get_epsilon(t):
    # Decay epsilon over time, ensuring it never goes below epsilon_min
    return max(epsilon_min, min(epsilon, 1.0 - math.log10((t + 1) * epsilon_decay)))

# Preprocess the state before feeding it into the model (reshape it to fit model input)
def preprocess_state(state):
    state = np.array(state, dtype=np.float32)  # Ensure state is a numpy array of type float32
    return np.reshape(state, [1, len(state)])  # Reshape state to match model input

# Experience replay: sample a batch from memory and train on it
def replay(batch_size, epsilon):
    x_batch, y_batch = [], []  # Initialize batches for input and target values
    minibatch = random.sample(memory, min(len(memory), batch_size))  # Sample random minibatch from memory
    for state, action, reward, next_state, done in minibatch:
        y_target = model.predict(state, verbose=0)  # Predict the current Q-values for the state
        # If done, set Q-value for the action to the reward, else add the discounted future Q-value
        y_target[0][action] = reward if done else reward + gamma * np.max(model.predict(next_state, verbose=0)[0])
        x_batch.append(state[0])  # Add state to the batch
        y_batch.append(y_target[0])  # Add target Q-value to the batch

    # Train the model on the batch
    model.fit(np.array(x_batch), np.array(y_batch), batch_size=len(x_batch), verbose=0)

    # Decay epsilon for exploration (so we explore less as training progresses)
    if epsilon > epsilon_min:
        epsilon *= epsilon_decay

# Main training loop
def run():
    scores = deque(maxlen=100)  # Store scores from the last 100 episodes

    for e in range(n_episodes):  # Run for the specified number of episodes
        state = preprocess_state(env.reset()[0])  # Reset the environment and preprocess the initial state
        done = False  # Initialize done flag
        i = 0  # Ticks (steps) within the current episode
        while not done:  # Loop through each step within the episode
            action = choose_action(state, get_epsilon(e))  # Choose an action using epsilon-greedy policy
            next_state, reward, terminated, truncated, _ = env.step(action)  # Take the action in the environment
            next_state = preprocess_state(next_state)  # Preprocess the next state
            done = terminated or truncated  # Check if the episode is done
            env.render()  # Render the environment (visualize the agent's actions)
            remember(state, action, reward, next_state, done)  # Store experience in memory
            state = next_state  # Update the state to the next state
            i += 1  # Increment tick count

        scores.append(i)  # Append the current episode score (survival time)
        mean_score = np.mean(scores)  # Calculate the average score over the last 100 episodes
        if mean_score >= n_win_ticks and e >= 100:  # Check if the task is solved
            if not quiet:
                print(f'Ran {e} episodes. Solved after {e - 100} trials')  # Print success message
            return e - 100  # Return the episode when the task was solved
        if e % 20 == 0 and not quiet:  # Print progress every 20 episodes
            print(f'[Episode {e}] - Mean survival time over last 100 episodes was {mean_score} ticks.')

        replay(batch_size, get_epsilon(e))  # Perform experience replay

    if not quiet:
        print(f'Did not solve after {e} episodes')  # If the task is not solved, print this message
    return e  # Return the number of episodes run if not solved

run()  # Start the training loop


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


[Episode 0] - Mean survival time over last 100 episodes was 12.0 ticks.
[Episode 20] - Mean survival time over last 100 episodes was 10.333333333333334 ticks.
[Episode 40] - Mean survival time over last 100 episodes was 24.048780487804876 ticks.
[Episode 60] - Mean survival time over last 100 episodes was 23.081967213114755 ticks.
[Episode 80] - Mean survival time over last 100 episodes was 24.814814814814813 ticks.
[Episode 100] - Mean survival time over last 100 episodes was 22.59 ticks.
[Episode 120] - Mean survival time over last 100 episodes was 24.48 ticks.
[Episode 140] - Mean survival time over last 100 episodes was 24.45 ticks.
[Episode 160] - Mean survival time over last 100 episodes was 27.63 ticks.
[Episode 180] - Mean survival time over last 100 episodes was 27.49 ticks.
[Episode 200] - Mean survival time over last 100 episodes was 28.19 ticks.
[Episode 220] - Mean survival time over last 100 episodes was 28.06 ticks.
[Episode 240] - Mean survival time over last 100 episod

611