In [58]:
import gymnasium as gym
import gym_puddle
import numpy as np
from stable_baselines3 import DQN
from stable_baselines3 import PPO
from stable_baselines3.dqn import MlpPolicy as DQNPolicy
from stable_baselines3.ppo import MlpPolicy as PPOPolicy

import time
import json

import matplotlib.pyplot as plt
import numpy as np

from IPython import display
import pyvirtualdisplay
import cv2

import libs.tiles3 as tc
import random
from util.kanerva import BaseKanervaCoder

selected_seed = 0
np.random.seed(0)


In [59]:

#some functions to help the visualization and interaction wit the environment

def visualize(frames, video_name = "/Video/video.mp4"):
    # Saves the frames as an mp4 video using cv2
    video_path = video_name
    height, width, _ = frames[0].shape
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    video_writer = cv2.VideoWriter(video_path, fourcc, 30, (width, height))
    for frame in frames:
        video_writer.write(frame)
    video_writer.release()

def online_rendering(image):
    #Visualize one frame of the image in a display
    ax.axis('off')
    img_with_frame = np.zeros((image.shape[0]+2, image.shape[1]+2, 3), dtype=np.uint8)
    img_with_frame[1:-1, 1:-1, :] = image
    ax.imshow(img_with_frame)
    display.display(plt.gcf())
    display.clear_output(wait=True)


def prepare_display():
  #Prepares display for onine rendering of the frames in the game
  _display = pyvirtualdisplay.Display(visible=False,size=(1400, 900))
  _ = _display.start()
  fig, ax = plt.subplots(figsize=(5, 5))
  ax.axis('off')


def get_action():
    action = None
    while action not in ["w", "a", "s", "d", "W", "A", "S", "D"]:
        action = input("Enter action (w/a/s/d): ")
    if action == "w":
        return 3
    elif action == "a":
        return 0
    elif action == "s":
        return 2
    elif action == "d":
        return 1

In [60]:
class tabularQlearning:
    def __init__(self, num_feature, num_actions, alpha=0.1, gamma=0.9, epsilon=0.05, seed = selected_seed):
        self.num_feature = num_feature
        self.num_actions = num_actions
        self.alpha = alpha
        self.gamma = gamma
        self.epsilon = epsilon

        # Initialize Q-table with zeros
        self.q_table = np.zeros((self.num_feature, num_actions))

        self.seed = seed    
        # Set random seed
        np.random.seed(self.seed)

    def choose_action(self, state):
        """Choose an action using epsilon-greedy policy"""
        if np.random.rand() < self.epsilon:
            return np.random.choice(self.num_actions)
        else:
            q = self.q_table[state].sum(axis=0)
            return q.argmax()
    
    def update(self, state, action, reward, next_state):
        """Update the Q-table using the Q-learning update rule"""
        self.q_table[state, action] += self.alpha * (reward + self.gamma * np.max(self.q_table[next_state]) - self.q_table[state, action])
    
    def get_q_table(self):
        return self.q_table

## Without Parameter Tuning

In [None]:

scenario_episode_rewards = {}
num_scenarios = 5
num_features = 1500
n_closest = 8
num_episodes = 15000

number_test = 100

max_video_length = 70

def e_greedy_policy(state, epsilon=0.01):
    q_table = agent.get_q_table()
    q = q_table[state].sum(axis=0)
    if np.random.rand() < epsilon:
        return np.random.choice(num_actions)
    else:
        return q.argmax()

for gms in range(1, num_scenarios+1):

    np.random.seed(gms)
    json_file = '/Users/hadiaghazadeh/Library/CloudStorage/OneDrive-UniversityofCalgary/@upperboundCompetition/gym-puddle/gym_puddle/env_configs/pw{}.json'.format(gms)

    with open(json_file) as f:
        env_setup = json.load(f)


    env = gym.make(
    "PuddleWorld-v0",
    start=env_setup["start"],
    goal=env_setup["goal"],
    goal_threshold=env_setup["goal_threshold"],
    noise=env_setup["noise"],
    thrust=env_setup["thrust"],
    puddle_top_left=env_setup["puddle_top_left"],
    puddle_width=env_setup["puddle_width"],
    )
    
    rep = BaseKanervaCoder(env.observation_space, n_prototypes= num_features, n_closest= n_closest, random_seed= gms)


    ## simulare the agent in the environment
    num_actions = len(env.get_wrapper_attr("actions"))
    agent = tabularQlearning(num_feature= num_features, num_actions=num_actions,alpha=0.1, gamma=0.9, epsilon=0.05, seed = gms)



    for episode in range(num_episodes):
        obs, info = env.reset()
        state = rep.get_features(obs)
        done = False
        total_reward = 0
        while not done:
            action = agent.choose_action(state)
            next_obs, reward, done, trunc, _ = env.step(action)
            next_state = rep.get_features(next_obs)
            agent.update(state, action, reward, next_state)
            state = next_state
            total_reward += reward
        
    
    #Test the trained model

    scenario_episode_rewards[gms] = [0]*number_test

    counter  = 0
    while counter < number_test:

        np.random.seed(counter)

        obs, info = env.reset()
        total_reward = 0
        episode_rewards = []
        frames = []
        observation = obs

        for time_step in range(max_video_length):
            
            frames.append(env.render())

            action = e_greedy_policy(rep.get_features(observation))
            
            #action = agent.choose_action(rep.get_features(observation))
            observation, reward, done, trunc, _ = env.step(action)
            total_reward += reward
            image = env.render()
            #online_rendering(image) #uncomment this line to see the online rendering of the environment frame by frame
            frames.append(image)

            if done:
                episode_rewards.append(total_reward)
                total_reward = 0
                break

        env.close()

        if episode_rewards:
            
            scenario_episode_rewards[gms][counter] = float(round(episode_rewards[0],2))
            counter += 1
    
    print(f"Scenario {gms} is done")


## Parameter Tuning with Bayesian

In [None]:
import numpy as np
import json
from skopt import gp_minimize
from skopt.space import Real, Integer
from skopt.utils import use_named_args
 
scenario_episode_rewards = {}
num_scenarios = 5
num_features = 1500
n_closest = 8
num_episodes = 15000

number_test = 100

number_test_param = 5000
max_video_length = 70
# Function to perform e-greedy policy
def e_greedy_policy(state, epsilon=0.01):
    q_table = agent.get_q_table()
    q = q_table[state].sum(axis=0)
    if np.random.rand() < epsilon:
        return np.random.choice(num_actions)
    else:
        return q.argmax()

# Define the objective function for Bayesian optimization
@use_named_args([
    Real(0.01, 0.2, name='alpha'),
    Real(0.8, 0.99, name='gamma'),
    Real(0.01, 0.2, name='epsilon'),
])
def objective(alpha, gamma, epsilon):
    total_rewards = []
    
    agent = tabularQlearning(num_feature=num_features, num_actions=num_actions, alpha=alpha, gamma=gamma, epsilon=epsilon, seed=gms)
    episode_rewards = []
    for episode in range(num_episodes):
        obs, info = env.reset()
        state = rep.get_features(obs)
        done = False
        total_reward = 0
        while not done:
            action = agent.choose_action(state)
            next_obs, reward, done, _, _ = env.step(action)
            next_state = rep.get_features(next_obs)
            agent.update(state, action, reward, next_state)
            state = next_state
            total_reward += reward
        episode_rewards.append(total_reward)
    total_rewards.append(np.mean(episode_rewards))
    

    # test the model for parameter tuning
    obs, info = env.reset()
    total_reward = 0
    episode_rewards = []
    frames = []
    observation = obs

    for time_step in range(max_video_length):
        
        frames.append(env.render())

        action = e_greedy_policy(rep.get_features(observation))
        
        #action = best_agent.choose_action(rep.get_features(observation))
        observation, reward, done, trunc, _ = env.step(action)
        total_reward += reward
        image = env.render()
        #online_rendering(image) #uncomment this line to see the online rendering of the environment frame by frame
        frames.append(image)

        if done:
            episode_rewards.append(total_reward)
            total_reward = 0
            break

    env.close()

    return -float(episode_rewards[0])  # We use negative mean rewards as we want to maximize rewards

# Perform Bayesian optimization for each scenario
for gms in range(1, num_scenarios+1):

    np.random.seed(gms)
    json_file = '/Users/hadiaghazadeh/Library/CloudStorage/OneDrive-UniversityofCalgary/@upperboundCompetition/gym-puddle/gym_puddle/env_configs/pw{}.json'.format(gms)

    with open(json_file) as f:
        env_setup = json.load(f)

    env = gym.make(
        "PuddleWorld-v0",
        start=env_setup["start"],
        goal=env_setup["goal"],
        goal_threshold=env_setup["goal_threshold"],
        noise=env_setup["noise"],
        thrust=env_setup["thrust"],
        puddle_top_left=env_setup["puddle_top_left"],
        puddle_width=env_setup["puddle_width"],
    )

    rep = BaseKanervaCoder(env.observation_space, n_prototypes=num_features, n_closest=n_closest, random_seed=gms)

    num_actions = len(env.get_wrapper_attr("actions"))

    # Perform Bayesian optimization for this scenario
    res = gp_minimize(objective,                  # the function to minimize
                      [(0.01, 0.2),              # the bounds on each dimension of x
                       (0.8, 0.99),
                       (0.01, 0.2)],
                      acq_func="EI",            # the acquisition function
                      n_calls=10,                # the number of evaluations of f
                      random_state=gms)          # the random seed

    best_alpha, best_gamma, best_epsilon = res.x
    print(f"Best parameters for Scenario {gms}: alpha={best_alpha}, gamma={best_gamma}, epsilon={best_epsilon}")

    # Test the trained model with the best parameters
    best_agent = tabularQlearning(num_feature=num_features, num_actions=num_actions, alpha=best_alpha, gamma=best_gamma, epsilon=best_epsilon, seed=gms)
    
    for episode in range(num_episodes):
        obs, info = env.reset()
        state = rep.get_features(obs)
        done = False
        total_reward = 0
        while not done:
            action = best_agent.choose_action(state)
            next_obs, reward, done, trunc, _ = env.step(action)
            next_state = rep.get_features(next_obs)
            agent.update(state, action, reward, next_state)
            state = next_state
            total_reward += reward
        
    
    #Test the trained model

    scenario_episode_rewards[gms] = [0]*number_test

    counter  = 0
    while counter < number_test:

        np.random.seed(counter)

        obs, info = env.reset()
        total_reward = 0
        episode_rewards = []
        frames = []
        observation = obs

        for time_step in range(max_video_length):
            
            frames.append(env.render())

            #action = e_greedy_policy(rep.get_features(observation))
            
            action = best_agent.choose_action(rep.get_features(observation))
            observation, reward, done, trunc, _ = env.step(action)
            total_reward += reward
            image = env.render()
            #online_rendering(image) #uncomment this line to see the online rendering of the environment frame by frame
            frames.append(image)

            if done:
                episode_rewards.append(total_reward)
                total_reward = 0
                break

        env.close()

        if episode_rewards:
            scenario_episode_rewards[gms][counter] = float(round(episode_rewards[0],3))
            counter += 1
    
    print(f"Scenario {gms} is done")

print("Parameter tuning and testing for all scenarios are done.")



In [62]:

import pandas as pd

# Define the column names
columns = ['seed_ID', 'ep_reward_pw1', 'ep_reward_pw2', 'ep_reward_pw3', 'ep_reward_pw4', 'ep_reward_pw5']

# Create the DataFrame
df = pd.DataFrame(columns=columns)

# Fill the DataFrame with the scenario_episode_rewards
for i in range(number_test):
    df.loc[i] = [i+1] + [scenario_episode_rewards[j][i] for j in range(1, num_scenarios+1)]



In [None]:
df.head()

In [None]:
# overall mean and std
mean = df.mean()
std = df.std()

np.mean(mean[1:])

In [None]:

# Define the file name for saving the results
csv_file_name = "submission2.csv"

# Save the DataFrame to a CSV file
df.to_csv(csv_file_name, index=False)

print("Results saved successfully to", csv_file_name)