## Import Dependencies ##

In [1]:
import tensorflow as tf
print(tf.__version__)

2.16.1


In [2]:
import os
import gymnasium as gym
import random
import numpy as np
import cv2
from collections import deque
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, Activation, Flatten, Dense, MaxPooling2D, Input
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import TensorBoard

## Test Random Environment ##

In [3]:
environment_name = 'MsPacman-v4'
env = gym.make(environment_name, render_mode="human")
# Set render_fps in metadata
env.metadata['render_fps'] = 500  # You can adjust this value as needed
# used for rescaling and using grayscale
state_size = (88, 80, 1)
# height, width, channels = env.observation_space.shape --old one that uses original size of image
action_size = env.action_space.n

In [4]:
env.unwrapped.get_action_meanings()

['NOOP',
 'UP',
 'RIGHT',
 'LEFT',
 'DOWN',
 'UPRIGHT',
 'UPLEFT',
 'DOWNRIGHT',
 'DOWNLEFT']

In [5]:
episodes = 5
for episode in range(1,episodes+1):
    state, info = env.reset()
    done = False
    score = 0

    while not done:
        env.render()
        action = env.action_space.sample()
        n_state, reward, terminated, truncated, info = env.step(action)
        score += reward

        done = terminated or truncated
    print('Episode:{} Score:{}'.format(episode, score))
env.close()

Episode:1 Score:230.0
Episode:2 Score:250.0
Episode:3 Score:260.0
Episode:4 Score:300.0


KeyboardInterrupt: 

## Prepocess the game screen ##

In [6]:
color = np.array([210, 164, 74]).mean()

def preprocess_state(state):
    # Crop and resize the image to 88x80
    image = cv2.resize(state[1:176:2, ::2], (80, 88))
    
    # Convert the image to grayscale
    image = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
    
    # Improve image contrast by setting specific color to 0
    image[image == color] = 0
    
    # Normalize the image
    image = (image - 128) / 128.0
    
    # Reshape the image to add batch dimension and channel dimension
    image = np.expand_dims(image, axis=(0, -1))
    
    return image

## Creat Deep learning Model with Keras ##

In [7]:
class DQN:
    def __init__(self, state_size, action_size, log_dir):
        self.state_size = state_size
        self.action_size = action_size
        self.replay_buffer = deque(maxlen=5000)
        self.gamma = 0.9
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.update_rate = 1000
        self.main_network = self.build_network()
        self.target_network = self.build_network()
        self.update_target_network()
        self.tensorboard = TensorBoard(log_dir=log_dir)
    
    def build_network(self):
        model = Sequential()
        model.add(Input(shape=self.state_size))
        model.add(Conv2D(32, (8, 8), strides=(4, 4), padding='same', activation='relu'))
        model.add(Conv2D(64, (4, 4), strides=(2, 2), padding='same', activation='relu'))
        model.add(Conv2D(64, (3, 3), strides=(1, 1), padding='same', activation='relu'))
        model.add(Flatten())
        model.add(Dense(512, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))
        model.compile(optimizer=Adam(), loss='mse')
        return model
    
    def store_transition(self, state, action, reward, next_state, done):
        self.replay_buffer.append((state, action, reward, next_state, done))
    
    def epsilon_greedy(self, state):
        if np.random.rand() < self.epsilon:
            return np.random.randint(self.action_size)
        Q_values = self.main_network.predict(state)
        return np.argmax(Q_values[0])
    
    def train(self, batch_size):
        if len(self.replay_buffer) < batch_size:
            return
        
        minibatch = random.sample(self.replay_buffer, batch_size)
        
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                target += self.gamma * np.amax(self.target_network.predict(next_state)[0])
            
            target_f = self.main_network.predict(state)
            target_f[0][action] = target
            
            self.main_network.fit(state, target_f, epochs=1, verbose=0)
        
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
    
    def update_target_network(self):
        self.target_network.set_weights(self.main_network.get_weights())

    def save_model(self, path):
        self.main_network.save_weights(path)
    
    def load_model(self, path):
        self.main_network.load_weights(path)
        self.update_target_network()


## Training the network ##

In [11]:
episodes = 5
num_timesteps = 500
batch_size = 8
num_screens = 4    
log_dir = os.path.join('Training', 'Logs')
os.makedirs(log_dir, exist_ok=True)
agent = DQN(state_size, action_size, log_dir)


In [12]:
for episode in range(1, episodes + 1):
    state, info = env.reset()
    state = preprocess_state(state)
    frames = deque([state] * num_screens, maxlen=num_screens)  # Initialize deque with the initial state repeated
    done = False
    score = 0
    time_step = 0

    while not done and time_step < num_timesteps:
        env.render()
        
        stacked_state = np.concatenate(frames, axis=-1)  # Stack the frames to form the state
        action = agent.epsilon_greedy(stacked_state)
        next_state, reward, terminated, truncated, info = env.step(action)
        next_state = preprocess_state(next_state)
        frames.append(next_state)  # Add new frame to the deque
        
        stacked_next_state = np.concatenate(frames, axis=-1)  # Stack the frames for the next state
        agent.store_transition(stacked_state, action, reward, stacked_next_state, terminated or truncated)
        state = next_state
        score += reward
        time_step += 1

        # Update the target network
        if time_step % agent.update_rate == 0:
            agent.update_target_network()

        # Train the agent
        if len(agent.replay_buffer) > batch_size:
            agent.train(batch_size)

        done = terminated or truncated

    print('Episode:{} Score:{}'.format(episode, score))

env.close()

ValueError: Exception encountered when calling Sequential.call().

[1mInput 0 of layer "conv2d_15" is incompatible with the layer: expected axis -1 of input shape to have value 1, but received input with shape (1, 88, 80, 4)[0m

Arguments received by Sequential.call():
  • inputs=tf.Tensor(shape=(1, 88, 80, 4), dtype=float32)
  • training=False
  • mask=None

## Saving the Model ##

In [None]:
# Save the model
model_path = os.path.join('Training', 'Saved_Models', 'DQN_MsPacman.h5')
os.makedirs(os.path.dirname(model_path), exist_ok=True)
agent.save_model(model_path)

In [None]:
env.close()

In [None]:
# Load the model and evaluate
agent.load_model(model_path)

In [None]:
# Example evaluation loop
for episode in range(1, 6):
    state, info = env.reset()
    state = preprocess_state(state)
    frames = deque([state] * num_screens, maxlen=num_screens)
    done = False
    score = 0

    while not done:
        env.render()
        
        stacked_state = np.concatenate(frames, axis=-1)
        action = agent.epsilon_greedy(stacked_state)
        next_state, reward, terminated, truncated, info = env.step(action)
        next_state = preprocess_state(next_state)
        frames.append(next_state)
        
        state = next_state
        score += reward

        done = terminated or truncated

    print('Episode:{} Score:{}'.format(episode, score))

env.close()