In [3]:
import numpy as np
from collections import defaultdict

In [4]:
class GridWorld:
    def __init__(self, grid_size=(4, 4), goal_state=(3, 3), obstacles=None):
        self.grid_size = grid_size
        self.goal_state = goal_state
        self.obstacles = obstacles if obstacles is not None else []
        self.reset()

    def reset(self):
        # 重置智能体位置到随机位置（不是目标位置或障碍物位置）
        self.agent_pos = (np.random.randint(0, self.grid_size[0]), np.random.randint(0, self.grid_size[1]))
        while self.agent_pos == self.goal_state:
            self.agent_pos = (np.random.randint(0, self.grid_size[0]), np.random.randint(0, self.grid_size[1]))
        return self.agent_pos

    def step(self, action):
        x, y = self.agent_pos
        if action == 0:  # 上
            new_pos = (max(x - 1, 0), y)
        elif action == 1:  # 下
            new_pos = (min(x + 1, self.grid_size[0] - 1), y)
        elif action == 2:  # 左
            new_pos = (x, max(y - 1, 0))
        elif action == 3:  # 右
            new_pos = (x, min(y + 1, self.grid_size[1] - 1))

        if new_pos in self.obstacles:
            reward = -10
        elif new_pos == self.agent_pos:
            reward = -5
        else:
            reward = -1
        
        self.agent_pos = new_pos
        
        if self.agent_pos == self.goal_state:
            return self.agent_pos, 0, True, {}
        else:
            return self.agent_pos, reward, False, {}

In [16]:
class ModelFreeAlgorithm:
    def __init__(self, env, num_episodes, gamma = 0.5):
        self.env = env
        self.num_episodes = num_episodes
        self.gamma = gamma
        
    def mc_exploring_starts(self):
        # 初始化动作价值函数
        Q = defaultdict(lambda: np.zeros(4))
        # 动作-状态对的回报
        returns = defaultdict(list)
        
        for episode_num in range(self.num_episodes):
            state = self.env.reset()
            action = np.random.randint(0, 4)
            
            episode = []
            done = False
            while not done:
                next_state, reward, done, _ = self.env.step(action)
                episode.append((state, action, reward))
                state = next_state
                if not done:
                    action = np.random.choice(4)
            
            G = 0
            episode.reverse()
            for (state, action, reward) in episode:
                G = G*self.gamma + reward
                if (state, action) not in [(x[0], x[1]) for x in episode[:-1]]:
                    returns[(state, action)].append(G)
                    Q[state][action] = np.mean(returns[(state, action)])
            
        return Q
        
         
    def mc_eplison_greedy(self, epsilon):
        Q = defaultdict(lambda: np.zeros(4))
        returns = defaultdict(list)
        
        def epsilon_greedy_policy(state):
            if np.random.rand() < epsilon:
                return np.random.choice(4)
            else:
                return np.argmax(Q[state])
            
        for episode_num in range(self.num_episodes):
            state = self.env.reset()

            done = False
            episode = []
            while not done:
                action = epsilon_greedy_policy(state)
                next_state, reward, done, _ = self.env.step(action)
                episode.append((state, action, reward))
                state = next_state
            
            G = 0
            episode.reverse()
            visited = set()
            for (state, action, reward) in episode:
                G = reward + self.gamma * G
                if (state, action) not in visited:
                    visited.add((state,action))
                    returns[(state,action)].append(G)
                    Q[state][action] = np.mean(returns[(state,action)])
        return Q
    
    def td_zero(self, alpha):
        V = defaultdict(float)
        
        for episode_num in range(self.num_episodes):
            state = self.env.reset()
            
            done = False
            while not done:
                action = np.random.choice(4)
                
                next_state, reward, done, _ = self.env.step(action)
                V[state] = V[state] - alpha * (V[state] -(reward + self.gamma * V[next_state]))
                
                state = next_state
        return V
    
    def td_sarsa():
        pass
    
    def td_QLearning():
        pass

In [17]:
obstacles = [(1, 1), (1, 2), (2, 1)]  # 在网格中定义障碍物

np.random.seed(42)

env = GridWorld(obstacles=obstacles)
algorithm = ModelFreeAlgorithm(env = env, num_episodes = 10000)

print("================  MC-eplison-greedy  ================")
Q = algorithm.mc_eplison_greedy(epsilon=0.1)
# 打印学习到的Q值
for state in Q:
    print(f"State {state}: {Q[state]}")
    
print("================  MC-exploring-starts  ================")

Q = algorithm.mc_exploring_starts()
for state in Q:
    print(f"State {state}: {Q[state]}")
    

print("==============  TD(0)  ==================")
V = algorithm.td_zero(alpha=0.01)
for state in V:
    print(f"State {state}: {V[state]}")

State (0, 2): [ -6.00758605 -10.88764373  -2.3473743   -1.97561373]
State (0, 3): [-5.90803272 -1.71429395 -2.1751342  -5.94169463]
State (1, 3): [ -1.96315174  -1.08556337 -10.87449526  -5.78334989]
State (0, 1): [ -6.17698482 -11.38636357  -2.24149554  -2.1585491 ]
State (0, 0): [-6.24001417 -2.3853661  -6.22104456 -2.13934744]
State (1, 1): [ -2.42803006 -11.42932129  -2.16333371 -11.09649886]
State (2, 0): [ -2.31035477  -2.46950954  -6.2524009  -11.34587915]
State (1, 0): [ -2.16020313  -2.70605515  -6.31832807 -11.51759168]
State (1, 2): [ -2.15483784  -1.80981429 -11.44470215  -1.63247486]
State (2, 2): [-11.12997136  -1.13903537 -10.96421606  -1.08206908]
State (2, 1): [-11.17061359  -2.03827374  -2.27177153  -1.61018196]
State (2, 3): [-1.6872093   0.         -1.77985848 -5.04680851]
State (3, 2): [-2.06742157 -5.025      -1.82991625  0.        ]
State (3, 0): [-2.36997883 -6.24339645 -6.26935096 -1.82586316]
State (3, 1): [-11.08789062  -6.05934837  -2.27573866  -1.07393732]
