# **Import Libraries**

In [1]:
import gymnasium as gym
import numpy as np
import tensorflow as tf
from keras import models, layers, optimizers
import random
import time
from collections import deque
from tqdm import tqdm
import matplotlib.pyplot as plt
import numpy as np
from IPython.display import clear_output

In [2]:
physical_devices = tf.config.list_physical_devices('GPU')

if physical_devices:
    print("GPU is available")
    print(physical_devices)
else:
    print("GPU is not available")

GPU is available
[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


# **Frogger Enviroment**

In [3]:
# create env with gymnasium (use ram or rgb state)
env = gym.make(
    "ALE/Frogger-v5", # "ALE/Frogger-ram-v5" or "ALE/Frogger-v5"
    obs_type="grayscale", # ram, grescale, rgb
    render_mode="rgb_array", # rgb_array or human
    difficulty = 0, # [0, 1]
    mode = 0 # [0, 1, 2]
    ) 

env.reset()
print(env.step(0))
print(f"State Frame Size: {env.observation_space}")
print(f"Number Of Actions: {env.action_space.n}")

actions_space = possible_actions = np.array(np.identity(env.action_space.n,dtype=int).tolist())
print(f"Possible Actions: \n {actions_space}")

env.reset()
observation = env.step(1)
print(f"Obervation: \n {observation[0]}")
print(observation[0].shape)

(array([[  0,   0,   0, ...,   0,   0,   0],
       [  0,   0,   0, ...,   0,   0,   0],
       [  0,   0,   0, ...,   0,   0,   0],
       ...,
       [104, 104, 104, ..., 104, 104, 104],
       [104, 104, 104, ..., 104, 104, 104],
       [104, 104, 104, ..., 104, 104, 104]], dtype=uint8), 0.0, False, False, {'lives': 4, 'episode_frame_number': 4, 'frame_number': 4})
State Frame Size: Box(0, 255, (210, 160), uint8)
Number Of Actions: 5
Possible Actions: 
 [[1 0 0 0 0]
 [0 1 0 0 0]
 [0 0 1 0 0]
 [0 0 0 1 0]
 [0 0 0 0 1]]
Obervation: 
 [[  0   0   0 ...   0   0   0]
 [  0   0   0 ...   0   0   0]
 [  0   0   0 ...   0   0   0]
 ...
 [104 104 104 ... 104 104 104]
 [104 104 104 ... 104 104 104]
 [104 104 104 ... 104 104 104]]
(210, 160)


# **Hyperparameter**

In [4]:
env = gym.make("ALE/Frogger-v5", obs_type="grayscale", render_mode="rgb_array", difficulty = 0, mode = 0) 

# MODEL Hyperparameters
STATE_SIZE = env.observation_space.shape[0]
ACTIONS_SIZE = env.action_space.n
ACTIONS = list(range(0, ACTIONS_SIZE))
LEARNING_RATE = 0.001 # Learning Rate (alpha)

# AGENT Hyperparameters (epsilon greedy strategy)
EPSILON = 1.0
EPSILON_MIN = 0.001 # EPSELON value where exploreation stops
EPSILON_DECAY_RATE = 0.9995 # the higher the longer the exploreation takes (Linear Decay: EPSELON * DECAY_RATE)
GAMMA = 0.95 # Discounting rate (lower -> agent thinks more long term)

# TRAINING Hyperparameters
RENDER_INTERVAL = 10 # Intervall when the game is rendered
TOTAL_EPISODES = 1000000
REPLAY_INTERVAL = 10 # Replay every x steps (retrain model) 
MINI_BATCHES_REPLAY = 32
REPLAY_BUFFER_MEMORY = 3000
STACKED_FRAMES_SIZE = 4
AVERAGE_WINDOW = 10

# **Deep Q-Learning Neural Network Model**

In [5]:
def build_dqn(input_shape=(210, 160, 4), ACTIONS_SIZE=5):
    model = models.Sequential([
        layers.Conv2D(16, (3, 3), activation='relu', input_shape=input_shape, padding='same'),
        layers.MaxPooling2D((2, 2)),
        layers.Conv2D(32, (3, 3), activation='relu', padding='same'),
        layers.MaxPooling2D((2, 2)),
        layers.Conv2D(64, (3, 3), activation='relu', padding='same'),
        layers.MaxPooling2D((2, 2)),
        layers.Flatten(),
        layers.Dense(64, activation='relu'),
        layers.Dropout(0.5),
        layers.Dense(ACTIONS_SIZE, activation='linear')  # Linear activation for Q-values
    ])

    optimizer = optimizers.Adam(learning_rate=0.001)
    
    model.compile(optimizer=optimizer,
                  loss='mse',  # Use 'mse' for Q-learning
                  metrics=['accuracy'])

    return model

# **Stacked Frames**

In [6]:
STACKED_FRAMES = deque([np.zeros((STATE_SIZE), dtype=np.int32) for i in range(STACKED_FRAMES_SIZE)], maxlen=4)

def stack_frames(stacked_frames, state, is_new_episode):
    if is_new_episode:
        # clear stack for new episode
        stacked_frames = deque([np.zeros((STATE_SIZE), dtype=np.int32) for i in range(STACKED_FRAMES_SIZE)], maxlen=4)
        
        # Add the same frame 4 times to the deque since its a new episode
        stacked_frames.append(state)
        stacked_frames.append(state)
        stacked_frames.append(state)
        stacked_frames.append(state)
        
        # Stack the frames with numpy (join all 4 frames)
        stacked_frames_array = np.stack(stacked_frames, axis=2)

    elif not is_new_episode:
        # append new frame and remove oldest frame
        stacked_frames.append(state)

        # Stack the frames with numpy (join all 4 frames)
        stacked_frames_array = np.stack(stacked_frames, axis=2) 

    stack_expanded = stacked_frames_array.reshape((1,) + stacked_frames_array.shape)
    return stack_expanded, stacked_frames

# **Preprocess Frames**

In [7]:
def preprocess_frames(frame):
    
    # resize frame to fit for cnn
    #print("before", frame.shape)
    #frame = np.reshape(frame, (210, 160))
    #print(frame.shape)
    
    # normalize pixel values to [-1, 1]
    frame = frame / 127.5 - 1.0 
    frame = frame.astype(np.float32)

    return frame

    # TODO: flatten


# **Replay Buffer**

In [8]:
class ReplayBuffer:
    def __init__(self):
        # deque that ther are only max REPLAY_BUFFER_MEMORY items in the list
        # deque = remove oldest item
        self.buffer = deque(maxlen=REPLAY_BUFFER_MEMORY)
    
    def add(self, experience):
        # add item to buffer
        self.buffer.append(experience)
    
    def sample(self):
        return random.sample(self.buffer, MINI_BATCHES_REPLAY) # 16 (MINI_BATCHES_REPLAY) samples to retrain the mdoel

# **DQN Agent**

In [9]:
class DQNAgent:
    def __init__(self):
        self.memory = ReplayBuffer()
        self.EPSILON = EPSILON
        self.model = build_dqn()
        self.target_model = build_dqn()
        self.update_target_model()
    
    def update_target_model(self):
        self.target_model.set_weights(self.model.get_weights())
    
    def remember(self, state, action, reward, next_state, done):
        # flattend_array_state = np.expand_dims(state.reshape(-1), axis=0)
        # flattend_array_next_state = np.expand_dims(next_state.reshape(-1), axis=0)
        self.memory.add((state, action, reward, next_state, done))
    
    def predict_action(self, stacked_array):
        # flatten array for model
        #flattend_array = np.expand_dims(stacked_array.reshape(-1), axis=0)

        if np.random.rand() <= self.EPSILON:
            return random.randrange(ACTIONS_SIZE)
        q_values = self.model.predict(stacked_array, verbose=0)

        action = np.argmax(q_values[0])
        #print(q_values)
        #print(q_values[0])
        #print(action)
        return action
    
    def replay(self):
        print("Replay")
        minibatch = self.memory.sample()
        i = 0
        for state, action, reward, next_state, done in tqdm(minibatch):
            i += 1
            
            # Predict Target Q-Values
            target = self.model.predict(state, verbose=0)
            if done:
                # If the episode is done the target Q-value for the taken action is set to the received reward
                target[0][action] = reward
            elif not done:
                # If the episode is not done, the target Q-value for the taken action is updated using the Bellman equation
                t = self.target_model.predict(next_state, verbose=0)[0]
                target[0][action] = reward + GAMMA * np.amax(t)
            self.model.fit(state, target, epochs=1, verbose=0)
                
        if self.EPSILON > EPSILON_MIN:
            self.EPSILON *= EPSILON_DECAY_RATE

    def load(self, name):
        self.model.load_weights(name)

    def save(self, name):
        self.model.save_weights(name)

# **Reward Function**

In [10]:
def calculate_reward(initial_reward:int, distance:int, distance_before:int, total_reward, loss_of_live:bool, lives:int, lives_before):
    # add distance to reward going forward 
    if distance > distance_before:
        reward = distance
    else:
        reward = total_reward

    # add reward for moving forward
    initial_reward *= 2
    reward += initial_reward

    # reduce reward when colliding
    if lives < lives_before:
        print(f"loss_of_live with {distance} steps forward")
        if reward <= 2:
            reward = 0
        else:
            reward -= 2

    return reward



# **Reward Graph**

In [11]:
%matplotlib qt
average_rewards = []
rewards_per_episode = []
average_distance_travelled_per_episode = []

plt.ion()

# Create a figure and two subplots
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 12))

# REWARD PLOT
reward_line, = ax1.plot(rewards_per_episode, label='Reward per Episode')
average_line, = ax1.plot(average_rewards, label='Moving Average Reward')
ax1.set_xlabel('Episode')
ax1.set_ylabel('Reward')
ax1.set_title('Episode Reward and Moving Average Reward over Time')
ax1.legend()

# DISTANCE PLOT
distance_line, = ax2.plot(average_distance_travelled_per_episode, label='AVG. Distance')
ax2.set_xlabel('Episode')
ax2.set_ylabel('AVG. Distance Travelled')
ax2.set_title('Average Distance Travelled Per Episode')
ax2.legend()

plt.show()

# **Training**

In [12]:
%matplotlib qt

agent = DQNAgent()

for episode in range(TOTAL_EPISODES):
    # clear output of cell for every new episode
    clear_output(wait=True)
    # create env (human to render game and see actions)
    if episode % RENDER_INTERVAL == 0 and episode != 0:
        env = gym.make("ALE/Frogger-v5", obs_type="grayscale", render_mode="human", difficulty = 0, mode = 0) 
    else:
        env = gym.make("ALE/Frogger-v5", obs_type="grayscale", render_mode="rgb_array", difficulty = 0, mode = 0) 
    state = env.reset()[0]
    preprocessed_state = preprocess_frames(state)

    # reset episode variables
    done = False
    total_reward = 0
    step_count = 0
    is_new_episode = True
    distrance_travelled = 0
    distance_before = 0
    current_lives = 4
    lives_before = 4
    episode_max_travelled_distance = 0
    episode_step_distances = []

    # initilize episode stack
    stacked_array_state, STACKED_FRAMES = stack_frames(STACKED_FRAMES, preprocessed_state, is_new_episode)
    
    while not done:
        # update step_count
        step_count += 1

        # Predict action
        if is_new_episode and step_count <= 110:
            # jsut wait for first 100 steps because of initialization 
            action = 0
            next_state, reward, done, loss_of_live, info = env.step(0)
            state = next_state
        else:
            action = agent.predict_action(stacked_array_state)
            next_state, reward, done, loss_of_live, info = env.step(action)
            next_state_preprocessed = preprocess_frames(next_state)
            is_new_episode = False
            
            # Do Stacking
            stacked_array_next_state, STACKED_FRAMES = stack_frames(STACKED_FRAMES, next_state_preprocessed, is_new_episode)

            # updated distance
            if action == 1:
                distrance_travelled += 1
            if action == 4 and distrance_travelled > 0:
                distrance_travelled -= 1

            #update lives
            current_lives = info["lives"]

            # update reward
            total_reward = calculate_reward(reward, distrance_travelled, distance_before, total_reward, loss_of_live, current_lives, lives_before)
            total_reward += reward

            # store action infromation in memory
            agent.remember(stacked_array_state, action, reward, stacked_array_next_state, done)

            # set the next state to the current state
            stacked_array_state = stacked_array_next_state

            # Do Replay
            if step_count % REPLAY_INTERVAL == 0 and not is_new_episode and len(agent.memory.buffer) > MINI_BATCHES_REPLAY:
                agent.replay()

            if distrance_travelled == 7:
                print("👑")

            distance_before = distrance_travelled
            
            # if frog dies and loses a life
            if current_lives < lives_before:
                # update max travvellled distance in this episode
                if distrance_travelled > episode_max_travelled_distance:
                    episode_max_travelled_distance = distrance_travelled
                episode_step_distances.append(distrance_travelled)

                print(f"Distance travelled this run: {distrance_travelled} steps forward & {current_lives} lives left!")
                distrance_travelled = 0  
                lives_before = current_lives

            # if game finished (won or lose)
            if done:
                agent.update_target_model()

                rewards_per_episode.append(total_reward)
                average_distance_travelled_per_episode.append(np.mean(episode_step_distances))

                if len(rewards_per_episode) >= AVERAGE_WINDOW:
                    moving_average = np.mean(rewards_per_episode[-AVERAGE_WINDOW:])
                else:
                    moving_average = np.mean(rewards_per_episode)
                average_rewards.append(moving_average)

                step_text = f"Episode: {episode}/{TOTAL_EPISODES}, Total Reward: {total_reward}, Moving AVG. Reward: {moving_average}, Max trav. dist.: {episode_max_travelled_distance}, Epsilon: {agent.EPSILON:.2}"

                # Update REWARD PLOT
                reward_line.set_data(range(episode + 1), rewards_per_episode[:episode + 1])
                average_line.set_data(range(episode + 1), average_rewards[:episode + 1])
                ax1.set_xlim(0, episode + 1)
                ax1.set_ylim(0, max(max(rewards_per_episode[:episode + 1]), max(average_rewards[:episode + 1])) + 10)
                ax1.set_title(step_text)
                
                # Update DISTANCE PLOT
                distance_line.set_data(range(episode + 1), average_distance_travelled_per_episode[:episode + 1])
                ax2.set_xlim(0, episode + 1)
                ax2.set_ylim(0, max(average_distance_travelled_per_episode[:episode + 1]) + 10)
                
                # Redraw the figure
                fig.canvas.draw()
                fig.canvas.flush_events()
                
                # Save the figure
                fig.savefig(f'/plots/training.png')
                
                time.sleep(0.1)

                print(step_text)
                break


Replay


100%|██████████| 32/32 [00:12<00:00,  2.65it/s]


loss_of_live with 1 steps forward
Distance travelled this run: 1 steps forward & 3 lives left!
Replay


100%|██████████| 32/32 [00:11<00:00,  2.73it/s]


Replay


100%|██████████| 32/32 [00:11<00:00,  2.73it/s]


Replay


100%|██████████| 32/32 [00:11<00:00,  2.70it/s]


Replay


100%|██████████| 32/32 [00:12<00:00,  2.65it/s]


Replay


100%|██████████| 32/32 [00:12<00:00,  2.60it/s]


👑
Replay


100%|██████████| 32/32 [00:11<00:00,  2.74it/s]


loss_of_live with 9 steps forward
Distance travelled this run: 9 steps forward & 2 lives left!
Replay


100%|██████████| 32/32 [00:12<00:00,  2.66it/s]


Replay


100%|██████████| 32/32 [00:12<00:00,  2.65it/s]


Replay


 44%|████▍     | 14/32 [00:05<00:07,  2.52it/s]
