In [None]:
from hyperopt import hp
from hyperopt import fmin, tpe

from stable_baselines3 import DQN
# from stable_baselines3.common.evaluation import evaluate_policy

import PIL

from utils import make_env

In [None]:
best_avg_rwd = float("-inf")

In [None]:
def dict_to_str(args):
    string = ""

    for key, value in args.items():
        string += f"{key}_{value}"

    return string

def save_frames_as_gif(frames, file_path):
    frame_images = []
    for frame in frames:
        frame_images.append(PIL.Image.fromarray(frame))

    frame_images[0].save(file_path, format='GIF',
                        append_images=frame_images[1:],
                        save_all=True,
                        duration=30,
                        loop=0)

In [None]:
def rwd_func_1(info, prev_info):
    reward = 0

    reward += 1 * (info["score"] - prev_info["score"])

    return reward

def rwd_func_2(info, prev_info):
    reward = 0

    reward -= 0.1

    reward += 1.1 * (info["score"] - prev_info["score"])
    
    if info["life"] < prev_info["life"]:
        reward -= 0.9

    return reward

def rwd_func_3(info, prev_info):
    reward = 0

    if (prev_info["head pos"][0] == info["size"]-1 and info["head pos"][0] == 0) or (
        prev_info["head pos"][1] == info["size"]-1 and info["head pos"][1] == 0) or (
        prev_info["head pos"][0] == 0 and info["head pos"][0] == info["size"]-1) or (
        prev_info["head pos"][1] == 0 and info["head pos"][1] == info["size"]-1):
        reward -= 0.5

    reward -= 0.1

    reward += 1.1 * (info["score"] - prev_info["score"])
    
    if info["life"] < prev_info["life"]:
        reward -= 0.9

    return reward

def rwd_func_4(info, prev_info):
    reward = 0

    if info["head food dist"] < prev_info["head food dist"]:
        reward += 0.2
    else:
        reward -= 0.1

    if (prev_info["head pos"][0] == info["size"]-1 and info["head pos"][0] == 0) or (
        prev_info["head pos"][1] == info["size"]-1 and info["head pos"][1] == 0) or (
        prev_info["head pos"][0] == 0 and info["head pos"][0] == info["size"]-1) or (
        prev_info["head pos"][1] == 0 and info["head pos"][1] == info["size"]-1):
        reward -= 0.5

    reward -= 0.1

    reward += 1.1 * (info["score"] - prev_info["score"])
    
    if info["life"] < prev_info["life"]:
        reward -= 0.9

    return reward

In [None]:
def objective(args):
    global best_avg_rwd


    match args["env_rwd_func"]:
        case 0:
            env_rwd_func = None
        case 1:
            env_rwd_func = rwd_func_1
        case 2:
            env_rwd_func = rwd_func_2
        case 3:
            env_rwd_func = rwd_func_3
        case 4:
            env_rwd_func = rwd_func_4

    env = make_env(env_rwd_func=env_rwd_func, max_step=args["env_max_step"], num_stack=args["env_num_stack"])


    model = DQN(
        "MlpPolicy", env, 
        learning_rate=args["learning_rate"], learning_starts=args["learning_starts"], buffer_size=args["buffer_size"], 
        batch_size=args["batch_size"], tau=args["tau"], train_freq=args["train_freq"], gamma=args["gamma"], 
        max_grad_norm=args["max_grad_norm"], gradient_steps=args["gradient_steps"], target_update_interval=args["target_update_interval"], 
        exploration_fraction=args["exploration_fraction"], 
        verbose=0, tensorboard_log="logs/DQN hp-tuning/no.1"
    )
    model.learn(total_timesteps=5_000_000, log_interval=10, tb_log_name=dict_to_str(args))


    env.close()
    del env
    env = make_env(max_step=5000, num_stack=args["env_num_stack"])


    # avg_rwd, _ = evaluate_policy(model, env, n_eval_episodes=100, deterministic=False)
    avg_rwd = 0
    bestscore_frames = None
    bestscore = 0
    for episode in range(100):
        state = env.reset()
        done = False
        score = 0
        frames = []
        while not done:
            action = model.predict(state, deterministic=False)
            state, reward, done, info = env.step(action)
            obs = env.render("rgb_array")
            frames.append(obs)
        score = info["score"]
        avg_rwd += score
        if score > bestscore:
            bestscore = score
            bestscore_frames = frames.copy()
    avg_rwd /= 100

    if avg_rwd > best_avg_rwd:
        best_avg_rwd = avg_rwd

        print("Saving new best model.")
        model.save(f"models/hp/A2C_{avg_rwd}.zip")

        print("Saving new best model's gif.")            
        save_frames_as_gif(bestscore_frames, f"rl videos/hp/DQN_{avg_rwd}.gif")


    env.close()
    del env


    print(f'---Average Episode Reward of opt-epoch:', avg_rwd)
    return -avg_rwd

In [None]:
def get_space():
    return {
        'env_rwd_func': hp.choice('env_rwd_func', [0, 1, 2, 3, 4]),
        'env_max_step': hp.choice('env_max_step', [500, 1000, 1500, 2000, 2500, 3000, 5000]),
        'env_num_stack': hp.choice('env_num_stack', [2, 3, 4, 5, 6, 8, 10, 15, 20]),
        'learning_rate': hp.choice('learning_rate', [1e-6, 3e-6, 6e-6, 1e-5, 3e-5, 6e-5, 1e-4, 3e-4, 6e-4, 1e-3, 3e-3, 6e-3, 1e-2]),
        'learning_starts': hp.choice('learning_starts', [10_000, 25_000, 50_000, 75_000, 100_000, 150_000, 500_000, 1_000_000]),
        'buffer_size': hp.choice('buffer_size', [50_000_000, 100_000, 500_000, 1_000_000, 2_000_000, 5_000_000]),
        'batch_size': hp.choice('batch_size', [16, 32, 64, 128, 256, 512]),
        'tau': hp.uniform('tau', 0.8, 1),
        'train_freq': hp.choice('train_freq', [2, 4, 5, 8, 10, 16, 20, 32, 50, 64, 128, 256, 400, 512, 800, 1024, 1500, 2048]),
        'gamma': hp.choice('gamma', [0.70, 0.80, 0.90, 0.95, 0.99, 0.995]),
        'max_grad_norm': hp.uniform('max_grad_norm', 0.3, 30),
        'gradient_steps': hp.choice('gradient_steps', [-1, 1, 2, 4, 8, 16, 32]),
        'target_update_interval': hp.choice('target_update_interval', [1_000, 2_000, 5_000, 10_000, 20_000, 50_000, 100_000]),
        'exploration_fraction': hp.uniform('exploration_fraction', 0.01, 0.1), 
    }

In [None]:
best = fmin(objective, get_space(), algo=tpe.suggest, max_evals=100)

In [None]:
best