In [None]:
import gym
import pygame
import numpy as np
import custom_frozen_lake
import time

import random
import matplotlib.pyplot as plt
from pathlib import Path
import pandas as pd
from datetime import datetime

pygame.init(); 

DIR_STATE_FLAG = True

In [None]:
def test_map_generation(manual: bool=False):
    for i in range(1):
        env = gym.make(id='CustomFrozenLake', desc = None, map_name = None, map_size = 6, frozen_p = 0.6)
        print(env.reset())
        env.render()
        while(manual):
            event = pygame.event.wait()
            if event.type == pygame.KEYDOWN:
                break
        if not manual: time.sleep(1)
    env.close()

In [None]:
def test_actions(manual: bool=False):
    env = gym.make(id='CustomFrozenLake', desc = None, map_name = None, map_size = 6, frozen_p = 0.6)
    env.reset()
    for i in range(20):
        env.render()
        while(manual):
            event = pygame.event.wait()
            if event.type == pygame.KEYDOWN:
                break
        if not manual: time.sleep(1)
        
        action = env.action_space.sample()
        obs, reward, done,_ = env.step(action)
        print(obs, reward, action)
        if done:
            break
    env.close()

In [None]:
# test_map_generation()
# test_actions(1)

In [None]:
def set_up(map_size: int = 8, frozen_p: float = 0.7):
    b_slip = True

    env = gym.make(id='CustomFrozenLake', is_slippery=b_slip, desc = None, map_name = None, map_size = map_size, frozen_p = frozen_p)
    action_size = env.action_space.n

    if DIR_STATE_FLAG: state_size = (env.observation_space[0].n, env.observation_space[1].n) 
    else: state_size = env.observation_space.n

    if DIR_STATE_FLAG: qtable = np.zeros(state_size + (action_size,))
    else: qtable = np.zeros((state_size, action_size))
    #print(action_size, state_size)
    return (env, qtable)

In [None]:
def write_file(path: Path, file_name: str, array: np.ndarray, column_label: np.ndarray, row_label: np.ndarray):

    #file_name = (str(total_episodes) +'_'+ str(learning_rate) +'_'+ str(gamma) +'_'+
                 #str(min_epsilon) +'_'+ str(decay_rate))
                 
    path.mkdir(exist_ok=True, parents=True)
    # with open (file_name+'.txt', 'w') as f:
    #     f.write('Total reward in final 1000 episodes: ' + str(rewards_1000[-1]) + '\n' +
    #             'Total episodes: ' + str(total_episodes) + '\n' +
    #             'Learning rate: ' + str(learning_rate) + '\n' +
    #             'Gamma: ' + str(gamma) + '\n' +
    #             'Min epsilon: ' + str(min_epsilon) + '\n' +
    #             'Decay rate: ' + str(decay_rate)
    #               )
       
    df = pd.DataFrame(array)
    if column_label.size == array.shape[1]: df.columns = column_label
    if row_label.size == array.shape[0]: df.index = row_label
    df.to_csv(path/(file_name + '.csv')) 
    # plt.plot(np.arange(0, total_episodes/1000), rewards_1000)
    # plt.savefig(map + '\\'+file_name +'.png')
    # plt.close

In [None]:
def train_model(env: gym.Env, qtable: np.ndarray, manual: bool = False, frozen_p: float = 0.8,
                total_episodes: int=20000, learning_rate: float=0.6, max_steps: int=200, gamma: float=0.6, 
                epsilon: float=1, max_epsilon: float=1, min_epsilon: float=0, decay_rate: float=0.00005) -> list:
    ep_reward = []
    rewards_1000 = []
    render_interval = total_episodes // 10
    win = 0
    for episode in range(total_episodes):
        # Reset the environment
        state = env.reset()
        step = 0
        done = False
        total_rewards = 0
        
        for step in range(max_steps):
            # Choose an action a in the current world state (s)
            ## First we randomize a number
            exp_exp_tradeoff = random.uniform(0, 1)
            
            ## If this number > greater than epsilon --> exploitation (taking the biggest Q value for this state)
            if exp_exp_tradeoff > epsilon:
                action = np.argmax(qtable[state])
                # print(episode, step, state, action)
            # Else doing a random choice --> exploration
            else:
                action = env.action_space.sample()

            # Take the action (a) and observe the outcome state(s') and reward (r)
            new_state, reward, done, info = env.step(action)

            if episode % render_interval == 0:
                #env.render()
                while(manual):
                    event = pygame.event.wait()
                    if event.type == pygame.KEYDOWN:
                        break
                if not manual: time.sleep(0.5)

            # Update Q(s,a):= Q(s,a) + lr [R(s,a) + gamma * max Q(s',a') - Q(s,a)]
            # qtable[new_state,:] : all the actions we can take from new state

            # print(episode, step, state, action)
            if DIR_STATE_FLAG: qtable[state + (action,)] = qtable[state + (action,)] + learning_rate * (reward + gamma * np.max(qtable[new_state]) - qtable[state + (action,)])
            else: qtable[state, action] = qtable[state, action] + learning_rate * (reward + gamma * np.max(qtable[new_state, :]) - qtable[state, action])

            total_rewards += reward
            if reward > 0: win += 1
            
            # Update state
            state = new_state

            # Finish episode if agent reaches reward or hole
            if done == True: 
                break
            
        # Reduce epsilon (because we need less and less exploration)
        epsilon = min_epsilon + (max_epsilon - min_epsilon)*np.exp(-decay_rate*episode) 
        ep_reward.append(total_rewards)

        env, _ = set_up(frozen_p=frozen_p)

    rewards_1000 = np.add.reduceat(ep_reward, np.arange(0, len(ep_reward), 1000))
    #print(rewards_1000[-1])
    #print(epsilon)
    #write_files(qtable, rewards_1000, total_episodes, learning_rate, gamma, min_epsilon, decay_rate)
    env.close()
    print(win)
    return qtable, rewards_1000

In [None]:
def test_model(env: gym.Env, qtable: np.ndarray, manual: bool=True, max_steps: int=100, frozen_p: float = 0.8):
    total_reward = 0
    win = 0
    total_episodes = 100
    render_interval = total_episodes // 5
    
    for episode in range(total_episodes):
        state = env.reset()
        step = 0
        done = False
        print("****************************************************")
        print("EPISODE ", episode)

        for step in range(max_steps):
            # Take the action (index) that have the maximum expected future reward given that state
            action = np.argmax(qtable[state])
            # print(episode, step, state, action)
            
            new_state, reward, done, info = env.step(action)
            if episode % render_interval == 0:
                env.render()
                while(manual):
                    event = pygame.event.wait()
                    if event.type == pygame.KEYDOWN:
                        break
                if not manual: time.sleep(.3)
            if done:
                # Here, we decide to only print the last state (to see if our agent is on the goal or fall into an hole)

                # We print the number of step it took.
                print("Number of steps", step + 1)
                print("Reward:", reward)
                total_reward += reward
                if reward > 0: win += 1
                break
            state = new_state
        if not done: print("Timed out", step + 1, "steps")
        env, _ = set_up(frozen_p=frozen_p)
    print('Total rewards:', total_reward)
    print('Success rate:', win,'/',total_episodes)
    env.close()

In [None]:
def tune_hyperparam(env: gym.Env, qtable: np.ndarray, total_episodes: int = 20000, frozen_p: float = 0.8):
    interval = 10
    interval2 = 10
    parameter_range = np.linspace(0.1,0.9,interval,endpoint=True)
    parameter_range2 = np.linspace(0.1,0.9,interval2,endpoint=True)
    rewards = []
    
    min_epsilon, max_epsilon = 0.0, 1.0
    decay_rate = -(np.log((0.1 - min_epsilon) / (max_epsilon - min_epsilon))) / total_episodes

    for i in range(1):
        for parameter in parameter_range:
            for parameter2 in parameter_range2:
                # reset qtable
                qtable = np.zeros(qtable.shape)
                qtable, rewards_1000 = train_model(env, qtable, False, frozen_p, total_episodes, max_steps=100,
                                                    learning_rate=parameter, gamma=parameter2, 
                                                    min_epsilon=min_epsilon, max_epsilon=max_epsilon, decay_rate=decay_rate)
                rewards.append(rewards_1000[-1])
    rs_rewards = np.reshape(rewards,(-1,interval,interval2))
    rs_rewards_mean = np.mean(rs_rewards, axis=0)

    now = datetime.now().strftime('%Y%m%d-%H%M')
    path = Path('tune_hyperparam/'+ str(now))
    param_list = [total_episodes, min_epsilon, max_epsilon, decay_rate]
    file_name = '_'.join(map(str, param_list))

    write_file(path, file_name, rs_rewards_mean, parameter_range, parameter_range2)


In [None]:
def test_write_file():
    path = Path('test/test2')
    file_name = 'testfile'
    array2 = np.arange(0,12)
    array2 = array2.reshape((3,4))
    row = np.arange(3,6)
    col = np.arange(4,8)
    write_file(path, file_name, array2, col, row)

In [None]:
# frozen_p = 0.8
# env, qtable = set_up(frozen_p = frozen_p)
# tune_hyperparam(env, qtable, frozen_p = frozen_p)

In [None]:
frozen_p = 0.8
env, qtable = set_up(frozen_p=frozen_p)
total_episodes = 100000
min_epsilon, max_epsilon = 0.0, 1.0

# calculate decay_rate needed to achieve 90% exploit chance at the final episode
decay_rate = -(np.log((0.1 - min_epsilon) / (max_epsilon - min_epsilon))) / total_episodes

qtable, rewards_1000 = train_model(env, qtable, False, frozen_p, total_episodes, 0.8, gamma=0.36667,
                                    min_epsilon=min_epsilon, max_epsilon=max_epsilon, decay_rate=decay_rate)
print(rewards_1000)
env, _ = set_up(frozen_p=frozen_p)
print(qtable)
test_model(env, qtable, False)

plt.plot(rewards_1000)

In [None]:
frozen_p = 0.8
env, _ = set_up(frozen_p=frozen_p)
qtable = np.load('custom_test.npy')
print(qtable)
test_model(env, qtable, False, frozen_p=frozen_p)

In [None]:
# env, _ = set_up()
# print(qtable)
# test_model(env, qtable, False)

In [None]:
# np.save('custom_test.npy' ,qtable)

In [None]:
# qload = np.load('custom_test.npy')