Importing the required libraries

In [None]:
import gym
import numpy as np
import random
from collections import deque
import os
import tensorflow as tf
from tensorflow.keras import layers, models
import matplotlib.pyplot as plt

In [None]:
output_dir = 'model_output'
# Create directories to save model output
if not os.path.exists(output_dir):
    os.makedirs(output_dir)
with open("scores.txt", "w") as f:
    f.write("")

At first, I tried the game breakdown - v4 but that didn't worked out. It gave me tough times in installation and debugging because the atari version of gym don't work with pythons >= 3.10. So, I tried to create conda environment and the different installations all messed up as google colab didn't worked then so I switched back to the Cartpole - v1 game

In [None]:
env = gym.make('CartPole-v1',render_mode="rgb_array") # Create the environment for the cartpole

In [None]:
#hyperparameters
learning_rate = 0.0005

batch_size = 32
n_episodes = 50
deque_memory_size = 2000
output_dir = 'model_output/cartpole/'

epsilon = 1.0
epsilon_decay = 0.995
epsilon_min = 0.01
gamma = 0.999 #discount factor

#environment variables
state_size = env.observation_space.shape[0]  # 4 = cart position, cart velocity, pole angle, pole velocity at the tip
action_size = env.action_space.n #two actions left and right

The provided Markdown snippet outlines the process of defining a Deep Q-Network (DQN) model. It describes the creation of a sequential model using TensorFlow's Keras API, detailing the addition of two dense layers with ReLU activation functions, followed by an output layer with a linear activation function to predict Q-values for each possible action. 
The model is compiled with the Mean Squared Error (MSE) loss function and the Adam optimizer, highlighting the key steps in constructing and preparing the DQN model for training.

In [None]:
#state_size = input_dimention = 4 and action_size = output_dimention = 2
# Define the DQN model
def build_model(): 
    model = models.Sequential()
    model.add(layers.Dense(24, input_dim=state_size, activation='relu'))
    model.add(layers.Dense(24, activation='relu'))
    # model.add(layers.Dense(16, input_dim=32, activation='relu'))
    # model.add(layers.Dense(16, activation='relu'))
    model.add(layers.Dense(action_size, activation='linear'))
    model.compile(loss='mse', optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate))
    return model

In [None]:
# Define the agent class
class DQNAgent:
    def __init__(self):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=2000)
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min
        self.model = build_model() #primary model
        self.target_model = build_model() #target model
        self.update_target_model() #copy the weights from model to target model
        
    def update_target_model(self): #update the target model
        self.target_model.set_weights(self.model.get_weights())

    def remember(self, state, action, reward, next_state, done): #store the state, action, reward, next_state and done in memory
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state): #choose the action based on epsilon greedy policy
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        act_values = self.model.predict(state) #predict the action values
        return np.argmax(act_values[0]) #return the action with maximum value

    def replay(self, batch_size):
        if(len(self.memory) < batch_size): #if the memory is less than batch size then return
            return
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            state =np.array(state).reshape([1, state_size])
            next_state = np.array(next_state).reshape([1, state_size])
            # Predict Q-values for current state using the primary model
            target = self.model.predict(state)
            if done:
                # If the episode is done, set the target for the action to the reward
                target[0][action] = reward
            else:
                # Predict Q-values for the next state using the target model
                t = self.target_model.predict(next_state)
                # Set the target for the action to reward plus the discounted max Q-value of the next state
                target[0][action] = reward + self.gamma * np.amax(t)
            # Fit the model with the updated target values
            self.model.fit(state, target, epochs=1, verbose=0)

    def load(self, name):
        self.model.load_weights(name)

    def save(self, name):
        self.model.save_weights(name)


In [None]:

# Train the agent
agent = DQNAgent()
done = False
scores = [] #store the scores

for e in range(n_episodes):
    state = env.reset()
    if(type(state) is tuple):
        state = np.array(state[0]).reshape([1, state_size])
    print(state)
    done = False
    tReward = 0
    while not done:
        # env.render()
        action = agent.act(state)
        step_action = env.step(action)
        # print(step_action)

        next_state = np.array(step_action[0]).reshape([1, state_size])
        reward = step_action[1]
        done = step_action[2]
        
        agent.remember(state, action, reward, next_state, done)     
        state = next_state
        
        tReward += reward
        agent.replay(batch_size)
    
    agent.update_target_model()
    scores.append(tReward)
    print(f"Episode: {e}/{n_episodes}, Score: {tReward}, Epsilon: {agent.epsilon:.2}")
    # appending the score in a file
    with open('scores.txt', 'a') as file:
        file.write(f"Episode: {e}/{n_episodes}, Score: {tReward}, Epsilon: {agent.epsilon:.2}\n")
    if agent.epsilon > agent.epsilon_min:
        agent.epsilon *= agent.epsilon_decay
agent.save(output_dir + 'model.weights.h5')

In [None]:
#print_scores
print(scores)
#plotting the tRewards
plt.plot(scores)
plt.ylabel('Total Rewards in each episode')
plt.xlabel('Episode #')
plt.show()