In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
! pip install pygame gymnasium


Collecting gymnasium
  Downloading gymnasium-0.29.1-py3-none-any.whl (953 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m953.9/953.9 kB[0m [31m12.6 MB/s[0m eta [36m0:00:00[0m
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-0.29.1


In [None]:
import numpy as np
import random
from collections import deque
from tensorflow.keras.layers import Input, Dense, Concatenate, Conv2D, Flatten,MaxPool2D
from tensorflow.keras.models import Model,Sequential
from tensorflow.keras.optimizers import Adam,SGD
from tensorflow.keras.losses import MeanSquaredError, MSE,mean_squared_error
from tensorflow import reduce_mean, convert_to_tensor, squeeze, float32, GradientTape
import gymnasium as gym
import numpy as np
import time
from PIL import Image
import os

In [None]:
class DDQN:
    def __init__(self, state_space_shape, num_actions, model, target_model, learning_rate=0.1,
                 discount_factor=0.95, batch_size=16, memory_size=100):
        """
        Initializes Double Deep Q Network agent.
        :param state_space_shape: shape of the observation space
        :param num_actions: number of actions
        :param model: Keras model
        :param target_model: Keras model
        :param learning_rate: learning rate
        :param discount_factor: discount factor
        :param batch_size: batch size
        :param memory_size: maximum size of the experience replay memory
        """
        self.state_space_shape = state_space_shape
        self.num_actions = num_actions
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.batch_size = batch_size
        self.memory = deque(maxlen=memory_size)
        self.model = model
        self.target_model = target_model
        self.update_target_model()

    def update_memory(self, state, action, reward, next_state, done):
        """
        Adds experience tuple to experience replay memory.
        :param state: current state
        :param action: performed action
        :param reward: reward received for performing action
        :param next_state: next state
        :param done: if episode has terminated after performing the action in the current state
        """
        self.memory.append((state, action, reward, next_state, done))

    def update_target_model(self):
        """
        Synchronize the target model with the main model.
        """
        self.target_model.set_weights(self.model.get_weights())

    def get_action(self, state, epsilon):
        """
        Returns the best action following epsilon greedy policy for the current state.
        :param state: current state
        :param epsilon: exploration rate
        :return:
        """
        probability = np.random.random() + epsilon / self.num_actions
        if probability < epsilon:
            return np.random.randint(0, self.num_actions)
        else:
            if isinstance(self.state_space_shape, tuple):
                state = state.reshape((1,) + self.state_space_shape)
            else:
                state = state.reshape(1, self.state_space_shape)
            return np.argmax(self.model.predict(state,verbose=0)[0])

    def load(self, model_name, episode):
        path="/content/drive/MyDrive/Colab Notebooks/abs_206009"
        """
        Loads the weights of the model at specified episode checkpoint.
        :param model_name: name of the model
        :param episode: episode checkpoint
        """
        self.model.load_weights(f'{path}/weights/{model_name}_{episode}.h5')

    def save(self, model_name, episode):
        """
        Stores the weights of the model at specified episode checkpoint.
        :param model_name: name of the model
        :param episode: episode checkpoint
        """
        path="/content/drive/MyDrive/Colab Notebooks/abs_206009"
        if not os.path.exists(f"{path}/weights/"):
            os.mkdir(f"{path}/weights")
        self.model.save_weights(f'{path}/weights/{model_name}_{episode}.h5')

    def train(self):
        """
        Performs one step of model training.
        """
        batch_size = min(self.batch_size, len(self.memory))
        minibatch = random.sample(self.memory, batch_size)

        if isinstance(self.state_space_shape, tuple):
            states = np.zeros((batch_size,) + self.state_space_shape)
        else:
            states = np.zeros((batch_size, self.state_space_shape))
        actions = np.zeros((batch_size, self.num_actions))

        for i in range(len(minibatch)):
            state, action, reward, next_state, done = minibatch[i]
            if done:
                max_future_q = reward
            else:
                if isinstance(self.state_space_shape, tuple):
                    next_state = next_state.reshape((1,) + self.state_space_shape)
                else:
                    next_state = next_state.reshape(1, self.state_space_shape)
                max_action = np.argmax(self.model.predict(next_state,verbose=0)[0])
                max_future_q = (reward + self.discount_factor *
                                self.target_model.predict(next_state,verbose=0)[0][max_action])
            if isinstance(self.state_space_shape, tuple):
                state = state.reshape((1,) + self.state_space_shape)
            else:
                state = state.reshape(1, self.state_space_shape)
            target_q = self.model.predict(state,verbose=0)[0]
            target_q[action] = max_future_q
            states[i] = state
            actions[i] = target_q

        self.model.train_on_batch(states, actions)

In [None]:
def build_model1(state_space_shape, num_actions):
    model = Sequential()
    model.add(Dense(16, input_shape=state_space_shape))
    model.add(Dense(16))
    model.add(Dense(num_actions, activation='linear'))
    model.compile(SGD(0.001), mean_squared_error)
    return model

In [None]:
def build_model2(state_space_shape, num_actions):
    model = Sequential()
    model.add(Dense(state_space_shape[0], input_shape=state_space_shape))
    model.add(Dense(12,activation="relu"))
    model.add(Dense(16, activation="relu"))
    model.add(Dense(8, activation="relu"))
    model.add(Dense(num_actions, activation='linear'))
    model.compile(Adam(learning_rate=0.001), mean_squared_error)
    return model

In [None]:
def build_model3(state_space_shape,num_actions):

    model = Sequential()

    model.add(Dense(state_space_shape[0], input_shape=state_space_shape,activation="relu"))
    model.add(Dense(16, activation="relu"))
    model.add(Dense(8, activation="relu"))
    model.add(Dense(num_actions, activation="linear"))

    model.compile(loss="mse", optimizer=Adam(learning_rate=0.0001))
    return model

In [None]:
env = gym.make("Acrobot-v1")
state,_=env.reset()

In [None]:
state_space_shape = env.observation_space.shape
num_actions = env.action_space.n
num_episodes = 600
num_steps_per_episode = 1000
epsilon = 0.35
epsilon_decay=0.95
epsilon_final=0.1

In [None]:
model = build_model1(state_space_shape, num_actions)
target_model = build_model1(state_space_shape, num_actions)
agent = DDQN(state_space_shape, num_actions, model, target_model,memory_size=1000000)

for episode in range(1, num_episodes + 1):
    rewards = 0
    steps = 0
    state, _ = env.reset()
    terminated = False
    while not terminated:
        steps += 1
        action = agent.get_action(state, epsilon)
        new_state, reward, terminated, _, _ = env.step(action)

        if terminated:
            reward=int(10_000/steps)

        rewards += reward

        agent.update_memory(state, action, reward, new_state, terminated)
        state = new_state

    print(f"Episode {episode}/{num_episodes}, reward: {rewards}")

    agent.train()
    epsilon *= epsilon_decay
    epsilon=max(epsilon,epsilon_final)

    if episode % 10 == 0:
        agent.save("ddqn_acrobot_model1", episode)
        agent.update_target_model()


In [None]:
model = build_model2(state_space_shape, num_actions)
target_model = build_model2(state_space_shape, num_actions)
agent = DDQN(state_space_shape, num_actions, model, target_model,memory_size=1000000)

for episode in range(1, num_episodes + 1):
    rewards = 0
    steps = 0
    state, _ = env.reset()
    terminated = False
    while not terminated:
        steps += 1
        action = agent.get_action(state, epsilon)
        new_state, reward, terminated, _, _ = env.step(action)

        if terminated:
            reward=int(10_000/steps)

        rewards += reward

        agent.update_memory(state, action, reward, new_state, terminated)
        state = new_state

    print(f"Episode {episode}/{num_episodes}, reward: {rewards}")

    agent.train()
    epsilon *= epsilon_decay
    epsilon=max(epsilon,epsilon_final)

    if episode % 10 == 0:
        agent.save("ddqn_acrobot_model2", episode)
        agent.update_target_model()


In [None]:
model = build_model3(state_space_shape, num_actions)
target_model = build_model3(state_space_shape, num_actions)
agent = DDQN(state_space_shape, num_actions, model, target_model,memory_size=1000000)

for episode in range(1, num_episodes + 1):
    rewards = 0
    steps = 0
    state, _ = env.reset()
    terminated = False
    while not terminated:
        steps += 1
        action = agent.get_action(state, epsilon)
        new_state, reward, terminated, _, _ = env.step(action)

        if terminated:
            reward=int(10_000/steps)

        rewards += reward

        agent.update_memory(state, action, reward, new_state, terminated)
        state = new_state

    print(f"Episode {episode}/{num_episodes}, reward: {rewards}")

    agent.train()
    epsilon *= epsilon_decay
    epsilon=max(epsilon,epsilon_final)

    if episode % 10 == 0:
        agent.save("ddqn_acrobot_model3", episode)
        agent.update_target_model()
