In [80]:
# 2026/2/2
# zhangzhong
# my q-learning

In [81]:
# 首先就是环境和目标
# 我们得有一个环境，而且有一个目标，我们才能为了这个在这个环境里面达成这个目标去训练一个agent
# 这个环境会被建模为一个状态机
# 这里我们假设一个一维的数轴，我们最开始在原点，我们的目标是移动到x（如5）的位置，一次移动一格
# 那么环境就会有状态，这里可以用一个整数来表示 state = 0, 1, 2 ...
# 同时，在不同的状态下，可以采取相应的动作，会导致状态的转变，同时获得一定的reward
# 因为目前的环境相对比较简单，这里的reward是静态的，和环境绑定的，所以直接在step函数里面返回reward就行了
from typing import Literal

class Environment:
    def __init__(self, goal: int = 5) -> None:
        self.state: int = 0
        self.goal: int = goal

    def init(self) -> None:
        self.state = 0

    def act(self, action: Literal["LEFT","RIGHT"]) -> float:
        if action == "LEFT":
            self.state = max(0, self.state - 1)
        elif action == "RIGHT":
            self.state = min(self.goal, self.state + 1)

        if self.state  == self.goal:
            return 10
        else:
            return -1
        
    def hit_goal(self) -> bool:
        return self.state == self.goal


In [82]:
goal = 5
myenv = Environment(goal=goal)

r = myenv.act("LEFT")
print(r, myenv.state)

myenv.state = 4
r = myenv.act("RIGHT")
print(r, myenv.state)

-1 0
10 5


In [83]:
# 接下来就是定义一个Agent，这个Agent可以操作环境，并拿到环境给出的反馈（reward）
# 他可以选择某种策略，来决定自己在某种状态的下一步动作
# 这里可以先实现epsilon-greedy策略，

import random

class Agent:
    def __init__(self, env: Environment, Q: dict[int, dict[str, float]] = {}, epsilon: float = 1.0) -> None:
        self.Q = Q
        self.epsilon = epsilon
        self.env = env

    def step(self) -> str:
        # choice: Literal["LEFT","RIGHT"] = random.choice(["LEFT","RIGHT"])
        # self.env.act(choice)
        state: int = self.env.state

        if random.random() < self.epsilon:
            return random.choice(["LEFT","RIGHT"])
        else:
            return max(self.Q[state], key=self.Q[state].get)

    
    def hit_goal(self) -> bool:
        return self.env.state == self.env.goal
            

In [84]:
myenv.init()
myagent = Agent(env=myenv)

for s in range(100):
    action = myagent.step()
    myenv.act(action)
    print(f"current step: {s}, current state: {myenv.state}")
    if myagent.hit_goal():
        break

current step: 0, current state: 0
current step: 1, current state: 0
current step: 2, current state: 0
current step: 3, current state: 0
current step: 4, current state: 0
current step: 5, current state: 1
current step: 6, current state: 0
current step: 7, current state: 1
current step: 8, current state: 2
current step: 9, current state: 3
current step: 10, current state: 4
current step: 11, current state: 5


In [None]:
## 可以看到，即便是随机的策略，我们也能在比较少的步数内完成这个任务，但是有的时候，步数会多至数十步
# 现在引入 Q-Learning

# Q-learning的目的就是学一个Q函数，他可以给出在特定状态下，执行某个动作的期望reward（相对），那么我们就可以根据
# 某个动作的期望reward，来决定选择那个动作

Q:dict[int, dict[Literal["LEFT","RIGHT"], float]] = {}

# Q = Q + lr*(Target - Q)
# Target = reward + df*V

# Value(s): 在某个状态下，可能获得的最大的期望reward
# 在Q-Learning算法里面，Values(s) = max{a}( Q(s, a) ) = max{Q(1, LEFT), Q(1, RIGHT)}

## step 1: initialize Q table
for i in range(goal + 1):
    Q[i] = {"LEFT": 0, "RIGHT": 0}

def Value(Q: dict, state: int) -> float:
    return max(Q[state].values())

learning_rate = 0.1
discount_factor = 0.99

def train_iter(env: Environment, agent: Agent):
    env.init()

    while not env.hit_goal(): # 或者每个iter都应该设置一个最大步数？

        old_state = env.state

        action = agent.step()
        reward = env.act(action=action)

        new_state = env.state

        # question: 怎么处理state == goal的情况？
        value = Value(Q, new_state)
        target = reward + discount_factor*value

        TD = target - Q[old_state][action]
        Q[old_state][action] += learning_rate*TD

        if env.hit_goal():
            break




In [86]:
from tqdm import tqdm

max_epochs = 100
myenv = Environment()
myagent = Agent(env=myenv, Q=Q, epsilon=0.2)
for epoch in tqdm(range(max_epochs)):
    train_iter(myenv, agent=myagent)


100%|██████████| 100/100 [00:00<00:00, 59459.94it/s]


In [87]:
print(Q)

{0: {'LEFT': 1.3121877582039159, 'RIGHT': 5.547534122599602}, 1: {'LEFT': 1.324880720475138, 'RIGHT': 6.693199985677159}, 2: {'LEFT': 2.1869727898967604, 'RIGHT': 7.800410397656253}, 3: {'LEFT': 4.490920320800705, 'RIGHT': 8.897459175890468}, 4: {'LEFT': 3.0416152490515262, 'RIGHT': 9.999734386011124}, 5: {'LEFT': 0, 'RIGHT': 0}}


In [None]:
# 然后我们用现在的Q来在环境中执行
# ok！可以看到执行的结果非常的稳定，一般都是10步以内就可以到达goal
myenv.init()

for s in range(100):
    action = myagent.step()
    myenv.act(action)
    print(f"current step: {s}, current state: {myenv.state}")
    if myagent.hit_goal():
        break

current step: 0, current state: 1
current step: 1, current state: 2
current step: 2, current state: 3
current step: 3, current state: 4
current step: 4, current state: 5


In [None]:
# congratulations！我们写的代码基本上是正确的
# TODO：·现在还是有几个问题需要深入的研究
# 1. state == goal的情况如何处理，对照参考实现，看看我们的事情有哪些地方做的不好？
# 2. 深入的剖析训练的过程，分析一下Q是如何变化的？