In [11]:
import gymnasium as gym
import numpy as np
import random
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
from tqdm import tqdm
import os


from tensorflow.keras.losses import MeanSquaredError
#from tensorflow.keras.models import load_model
from keras.models import load_model
import time

In [3]:
# Hyperparameters
learning_rate = 0.00025
epsilon = 0.1  # Exploration rate
episodes = 1  # Number of training episodes
optimizer = optimizers.Adam(learning_rate=learning_rate)
loss_function = 'mse'  # Mean Squared Error loss for Q-value difference


In [4]:
def create_cnn(input_shape, num_actions):
    model = models.Sequential([
        layers.Input(shape=input_shape),
        layers.Conv2D(64, (8, 8), strides=(4, 4), activation='relu'),
        layers.Conv2D(128, (4, 4), strides=(2, 2), activation='relu'),
        layers.Conv2D(256, (3, 3), strides=(2, 2), activation='relu'),  # First 256 layer
        layers.Conv2D(256, (3, 3), activation='relu'),  # Second 256 layer
        layers.Conv2D(256, (3, 3), activation='relu'),  # Third 256 layer
        layers.Conv2D(256, (3, 3), activation='relu'),  # Fourth 256 layer
        layers.Flatten(),
        layers.Dense(512, activation='relu'),
        layers.Dense(num_actions, activation='linear')
    ])
    return model


In [5]:
env = gym.make('ALE/Frogger-v5', render_mode='rgb_array')
#env = gym.make('ALE/Frogger-v5', )
input_shape = env.observation_space.shape  # This should match the frame size
num_actions = env.action_space.n  # Number of possible actions

model = create_cnn(input_shape, num_actions)
model.compile(optimizer=optimizers.Adam(learning_rate=0.00025), loss='mse')  # Mean Squared Error loss for Q-value difference


In [8]:
def train_model(model, episodes, save_interval=10):
    # Initialize list to keep track of total rewards for each episode
    episode_rewards = []

    # Create a directory to save model weights
    save_dir = "model_weights"
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    # Set up tqdm progress bar
    with tqdm(total=episodes, desc="Episode", unit='episode') as pbar:
        for e in range(episodes):
            state = env.reset()[0]
            state = np.array(state)
            done = False
            total_reward = 0

            while not done:
                # Randomly choose an action or the best predicted action
                if np.random.rand() <= epsilon:  # Use the global epsilon value
                    action = env.action_space.sample()
                else:
                    q_values = model.predict(np.expand_dims(state, axis=0), verbose=0)
                    action = np.argmax(q_values[0])

                next_state, reward, terminated, truncated, info = env.step(action)
                next_state = np.array(next_state)
                total_reward += reward
                
                if terminated or truncated:
                    done = True

            # Update progress bar
            pbar.update(1)
            pbar.set_description(f"Episode: {e+1}, Reward: {total_reward}")

            # Append the total reward to the rewards list
            episode_rewards.append(total_reward)

            # Save the model every 'save_interval' episodes
            if (e + 1) % save_interval == 0:
                model_path = os.path.join(save_dir, f'model_episode_{e + 1}V2.h5')
                model.save(model_path)
                print(f"Saved model at episode {e + 1} to {model_path}")

    # Print overall training results
    print(f"Average Reward: {np.mean(episode_rewards)}")
    print(f"Best Reward: {max(episode_rewards)}")
# Example usage
train_model(model, 500)  # Adjust as needed for your setup
env.close()

In [15]:
def test_models(weights_folder, input_shape, num_actions, episodes_per_model=50):
    weight_files = [f for f in os.listdir(weights_folder) if f.endswith('.h5')]
    
    for weight_file in weight_files:
        model = create_cnn(input_shape, num_actions)  # Re-create the model architecture
        model_path = os.path.join(weights_folder, weight_file)
        model.load_weights(model_path)  # Load the weights
        print(f"Testing model with weights: {weight_file}")

        total_rewards = []
        total_times = []

        for _ in range(episodes_per_model):
            start_time = time.time()
            observation, info = env.reset()  # Unpack the tuple to get the observation and info
            state = np.array(observation)
            done = False
            total_reward = 0

            while not done:
                q_values = model.predict(np.expand_dims(state, axis=0), verbose=0)
                action = np.argmax(q_values[0])
                next_observation, reward, terminated, truncated, info = env.step(action)
                next_state = np.array(next_observation)
                total_reward += reward
                if terminated or truncated:
                    done = True
                state = next_state

            episode_time = time.time() - start_time
            total_rewards.append(total_reward)
            total_times.append(episode_time)

        average_reward = np.mean(total_rewards)
        average_time = np.mean(total_times)
        print(f"Average Reward: {average_reward:.2f}, Average Time: {average_time:.2f} seconds per episode")


In [21]:
env = gym.make('ALE/Frogger-v5', render_mode = 'human')
input_shape = env.observation_space.shape
num_actions = env.action_space.n
test_models("model_weights/V2", input_shape, num_actions, 1)
env.close()

Testing model with weights: model_episode_200V2.h5


KeyboardInterrupt: 

In [22]:
env.close()