In [None]:
import random
import gym
import numpy as np
from collections import deque
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam

from scores.score_logger import ScoreLogger

ENV_NAME = "CartPole-v1"

GAMMA = 0.95
INITIAL_LEARNING_RATE = 0.001

MEMORY_SIZE = 1000000
BATCH_SIZE = 20

EXPLORATION_MAX = 1.0
EXPLORATION_MIN = 0.01
EXPLORATION_DECAY = 0.995


class DQNSolver:

    def __init__(self, observation_space, action_space):
        self.exploration_rate = EXPLORATION_MAX
        self.learning_rate = INITIAL_LEARNING_RATE
        self.previous_performance_metric = 0

        self.action_space = action_space
        self.memory = deque(maxlen=MEMORY_SIZE)

        self.model = Sequential()
        self.model.add(Dense(24, input_shape=(observation_space,), activation="relu"))
        self.model.add(Dense(24, activation="relu"))
        self.model.add(Dense(self.action_space, activation="linear"))
        self.model.compile(loss="mse", optimizer=Adam(lr=self.learning_rate))

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() < self.exploration_rate:
            return random.randrange(self.action_space)
        q_values = self.model.predict(state)
        return np.argmax(q_values[0])

    def experience_replay(self):
        if len(self.memory) < BATCH_SIZE:
            return
        batch = random.sample(self.memory, BATCH_SIZE)
        for state, action, reward, state_next, terminal in batch:
            q_update = reward
            if not terminal:
                q_update = (reward + GAMMA * np.amax(self.model.predict(state_next)[0]))
            q_values = self.model.predict(state)
            q_values[0][action] = q_update
            self.model.fit(state, q_values, verbose=0)
        self.exploration_rate *= EXPLORATION_DECAY
        self.exploration_rate = max(EXPLORATION_MIN, self.exploration_rate)

    def reward_shaping(self, state, reward, terminal):
        pole_angle = state[2]
        cart_position = state[0]
        if terminal:
            return -100  # Penalize heavily for terminal state
        else:
            return reward + (1.0 - abs(pole_angle)) + (0.5 - abs(cart_position) * 0.5)

    def adaptive_learning_rate(self, performance_metric):
        if performance_metric > self.previous_performance_metric:
            self.learning_rate *= 0.95  # Reduce learning rate slightly for stability
        else:
            self.learning_rate *= 1.05  # Increase learning rate for more exploration
        self.learning_rate = max(INITIAL_LEARNING_RATE * 0.01, min(self.learning_rate, INITIAL_LEARNING_RATE))
        self.model.optimizer.learning_rate.assign(self.learning_rate)
        self.previous_performance_metric = performance_metric


def normalize_state(state):
    # Scale state variables to range [0, 1] (example normalization)
    return state / np.array([4.8, 3.5, 0.418, 3.5])


def cartpole():
    env = gym.make(ENV_NAME)
    score_logger = ScoreLogger(ENV_NAME)
    observation_space = env.observation_space.shape[0]
    action_space = env.action_space.n
    dqn_solver = DQNSolver(observation_space, action_space)
    run = 0
    while True:
        run += 1
        state = env.reset()
        state = normalize_state(state)  # Normalize state
        state = np.reshape(state, [1, observation_space])
        step = 0
        while True:
            step += 1
            # env.render()
            action = dqn_solver.act(state)
            state_next, reward, terminal, info = env.step(action)
            reward = dqn_solver.reward_shaping(state_next, reward, terminal)
            state_next = normalize_state(state_next)  # Normalize state
            state_next = np.reshape(state_next, [1, observation_space])
            dqn_solver.remember(state, action, reward, state_next, terminal)
            state = state_next
            if terminal:
                print(f"Run: {run}, exploration: {dqn_solver.exploration_rate}, score: {step}")
                score_logger.add_score(step, run)
                dqn_solver.adaptive_learning_rate(step)
                break
            dqn_solver.experience_replay()


if __name__ == "__main__":
    cartpole()



In [None]:
cartpole()



You may see an error about not having an exit command. This error does not affect the program's functionality and results from the steps taken to convert the code from Python 2.x to Python 3. Please disregard this error.