# 以 Monte Carlo 求解 FrozenLake

In [1]:
# 匯入numpy套件，用於數值計算與陣列處理
import numpy as np

# 匯入random模組，用於隨機數生成
import random

# 匯入sys模組，用於與系統互動
import sys

# 匯入gymnasium套件，用於建立和操作強化學習環境
import gymnasium as gym

In [2]:
# 建立FrozenLake-v1環境，is_slippery = False表示湖面不滑(動作是確定性的)
# 預設地圖大小為4x4，若要切換為較大的地圖可加入參數map_name = "8x8"
env = gym.make('FrozenLake-v1', is_slippery = False)  # , map_name = "8x8"

In [3]:
# 建立隨機策略函式(每個狀態下的每個動作機率平均)
def create_random_policy(env):
    # 建立一個空字典，用來儲存每個狀態對應的動作機率分布
    policy = {}

    # 逐一遍歷所有狀態(從0到狀態數-1)
    for state in range(env.observation_space.n):
        # 建立一個空字典，儲存該狀態下每個動作的機率
        action_probabilities = {}

        # 逐一遍歷所有可能動作(從0到動作數-1)
        for action in range(env.action_space.n):
            # 對於每個動作，分配相同的機率(均勻分布，機率 = 1 / 動作總數)
            action_probabilities[action] = 1 / env.action_space.n

        # 將該狀態下的動作機率分布加入整體策略字典中
        policy[state] = action_probabilities

    # 傳回整個隨機策略(每個狀態對應一個動作機率分布)
    return policy

# 建立狀態-行動Q值表(初始為0.0)
def create_state_action_dictionary(env, policy):
    # 建立一個空字典，用來儲存每個狀態-動作對應的Q值
    Q = {}

    # 遍歷策略中的每個狀態(也就是所有可能的狀態)
    for state in policy.keys():
        # 對於該狀態，建立一個動作字典，初始每個動作的Q值都為0.0
        Q[state] = {action: 0.0 for action in range(env.action_space.n)}

    # 傳回Q值表，結構為Q[state][action] = 值
    return Q

In [4]:
# 執行一回合遊戲，依據給定的策略(policy)與環境(env)模擬
def run_game(env, policy):
    s, info = env.reset()  # 重置環境，取得起始狀態s(和環境資訊info)
    episode = []           # 儲存整個episode的狀態、動作、獎勵序列
    done = False           # 回合是否結束(初始為False)

    # 只要回合尚未結束，就持續進行
    while not done:
        # 儲存單一時間步的[狀態, 動作, 獎勵]
        timestep = []

        # 將當前狀態加入時間步資料中
        timestep.append(s)

        # 根據策略進行動作選擇：使用機率分布抽樣一個動作
        n = random.uniform(0, sum(policy[s].values()))

        # 先從0到該狀態所有動作機率總和中隨機抽一個數字
        top_range = 0

        # 依照機率累加的方式進行抽樣(等同於roulette wheel selection)
        for prob in policy[s].items():  # prob是(tuple)，第一個元素是動作action，第二個元素是該動作的機率
            top_range += prob[1]        # 將該動作的機率累加到top_range，形成機率區間邊界
            if n < top_range:           # 如果隨機數n落在目前累積的機率區間內
                action = prob[0]        # 就選擇該動作
                break                   # 找到動作後跳出迴圈

        # 根據選定的動作與當前狀態與環境互動，取得新狀態與獎勵
        s, reward, terminated, truncated, info = env.step(action)

        # 判斷是否為終止狀態(遊戲結束)
        done = terminated or truncated

        # 將這個時間步的[動作, 獎勵]加入timestep
        timestep.append(action)
        timestep.append(reward)

        # 將該時間步加入整個episode紀錄
        episode.append(timestep)

    # 傳回完整的episode
    return episode

In [5]:
# 定義蒙地卡羅強化學習函式
# env：環境物件
# episodes：執行的回合數(預設1000)
# policy：依據該策略決定每個狀態下執行的動作機率分布(預設None，會建立隨機策略)
# epsilon：ε-greedy策略中探索的機率(預設0.01)

def monte_carlo(env, episodes = 1000, policy = None, epsilon = 0.01):
    # 如果沒有提供策略，則建立一個隨機策略(每個動作均等機率)
    if not policy:
        policy = create_random_policy(env)

    # 初始化狀態-行動值函數Q，所有Q值初始為0
    Q = create_state_action_dictionary(env, policy)

    # 用於儲存狀態-行動對應的回報列表
    returns = {}

    # 進行指定數量的回合(episodes)
    for i_episode in range(episodes):
        # 每1000回合輸出進度提示
        if (i_episode + 1) % 1000 == 0:
            print(f"\r {(i_episode + 1)}/{episodes}回合.", end="")
            sys.stdout.flush()  # 清除緩衝區，即時輸出

        # 初始化累積報酬為0
        G = 0

        # 執行一回合遊戲，傳回該回合的(state, action, reward)序列
        episode = run_game(env = env, policy = policy)

        # 從回合最後一步開始往前計算累積報酬G
        for i in reversed(range(len(episode))):
            s_t, a_t, r_t = episode[i]  # 取出該時間步的狀態、動作與獎勵
            state_action = (s_t, a_t)   # 狀態-動作配對
            G += r_t                    # 累積報酬(從末端往前加)

            # 若此狀態-動作對是該回合第一次出現(首次訪問MC)
            if state_action not in [(x[0], x[1]) for x in episode[0: i]]:
                # 如果returns字典已有該狀態-動作的回報列表，則加入G
                if returns.get(state_action):
                    returns[state_action].append(G)
                else:
                    # 否則建立新的列表並加入G
                    returns[state_action] = [G]

                # 更新該狀態-動作對的Q值為平均回報
                Q[s_t][a_t] = sum(returns[state_action]) / len(returns[state_action])

                # 取得該狀態下所有動作的Q值清單
                Q_list = list(map(lambda x: x[1], Q[s_t].items()))

                # 找出Q值最高的動作索引(有多個則隨機選一個)
                indices = [i for i, x in enumerate(Q_list) if x == max(Q_list)]
                max_Q = random.choice(indices)

                # 設定ε-greedy策略，給最大Q值動作較大機率
                A_star = max_Q
                for a in policy[s_t].items():
                    if a[0] == A_star:
                        # 最佳動作的機率設為1 - ε + (ε平均分配部分)
                        policy[s_t][a[0]] = 1 - epsilon + (epsilon / abs(sum(policy[s_t].values())))
                    else:
                        # 其他動作機率均分剩餘ε部分
                        policy[s_t][a[0]] = (epsilon / abs(sum(policy[s_t].values())))

    # 傳回更新後的策略
    return policy

In [6]:
# 使用蒙地卡羅方法訓練策略，執行60000回合
policy = monte_carlo(env, episodes = 60000)

 60000/60000回合.

In [7]:
# 定義函式用來評估給定策略(policy)在指定環境(env)中成功率
def test_policy(policy, env):
    wins = 0        # 記錄成功次數(抵達終點)
    runs = 1000     # 測試次數

    # 執行指定次數的回合數
    for _ in range(runs):
        # 執行一回合遊戲，run_game傳回的是[(state, action, reward), ...]
        # 取最後一筆資料的reward值
        final_reward = run_game(env, policy)[-1][-1]

        # 如果最後一步的reward是1，代表該回合成功達成目標
        if final_reward == 1:
            # 成功次數累加
            wins += 1

    # 傳回成功率(成功次數/總測試次數)
    return wins / runs

# 呼叫測試函式，傳入訓練好的策略與環境
test_policy(policy, env)

0.97

In [8]:
# 定義函式play_episodes，讓agent在環境中依照策略執行多回合遊戲並統計結果
# environment：環境物件
# n_episodes：執行的回合數(預設1000)
# policy：依據該策略決定每個狀態下執行的動作機率分布(預設None，會建立隨機策略)

def play_episodes(environment, n_episodes = 1000, policy = None):
    wins = 0          # 記錄贏得回合數
    total_reward = 0  # 總獲得獎勵累積

    # 執行指定回合數的測試
    for episode in range(n_episodes):
        done = False                       # 回合結束旗標
        state, info = environment.reset()  # 重置環境並取得初始狀態

        # 持續進行直到回合結束
        while not done:
            # 根據當前狀態從策略中抽樣選擇動作
            n = random.uniform(0, sum(policy[state].values()))  # 生成0到總機率間的隨機數
            top_range = 0                                       # 初始化累積機率區間邊界，用於後續累加機率進行抽樣

            # 使用機率累積方式選擇動作(roulette wheel selection)
            for prob in policy[state].items():  # 逐一遍歷該狀態下的所有動作與其機率，prob是(動作, 機率)的tuple
                top_range += prob[1]            # 將目前動作的機率累加到top_range，建立累積機率區間邊界
                if n < top_range:               # 如果隨機數n落在累積機率區間內，表示選擇此動作
                    action = prob[0]            # 將動作從prob中取出，設定為要執行的動作
                    break                       # 找到動作後跳出迴圈，不再繼續累加

            # 執行選定動作，並取得下一狀態及獎勵
            next_state, reward, terminated, truncated, info = environment.step(action)

            # 判斷回合是否結束
            done = terminated or truncated

            # 累積本回合總獎勵
            total_reward += reward

            # 更新狀態為下一狀態
            state = next_state

            # 若回合結束且獲得成功獎勵(1.0)，贏次數加一
            if done and reward == 1.0:
                wins += 1

    # 計算每回合平均獎勵
    average_reward = total_reward / n_episodes

    # 傳回贏的次數、總獎勵、平均獎勵
    return wins, total_reward, average_reward

# 測試程式碼：執行1000回合並輸出結果
n_episodes = 1000
wins, total_reward, average_reward = play_episodes(env, n_episodes, policy)
print(f'number of wins over {n_episodes} episodes = {wins}')
print(f'average reward over {n_episodes} episodes = {average_reward}')

number of wins over 1000 episodes = 963
average reward over 1000 episodes = 0.963
