1. Reward table use to store source_state + action + target state as key, value is the reward
2. Transition table store historical counter as dictionary. It use state + action as key, value is another dictionary, which is the number of states it ran. Such as at "state 0" execute "action 1", we get 3 times to "state 4", 7 times to "state 5", so the key value will be (0,1) key match to value {4: 3, 5: 7}
3. Value table will store the value it calculated.

In [1]:
import gym
import collections
from tensorboardX import SummaryWriter

In [2]:
ENV_NAME = "FrozenLake-v1"
#ENV_NAME = "FrozenLake8x8-v1"
GAMMA = 0.9
TEST_EPISODES = 20

![bandicam%202022-07-10%2023-48-27-318.jpg](attachment:bandicam%202022-07-10%2023-48-27-318.jpg)

In [3]:
class Agent:
    #we defined reward table, transition table and value table here
    def __init__(self):
        self.env = gym.make(ENV_NAME)
        self.state = self.env.reset()
        self.rewards = collections.defaultdict(float)
        self.transits = collections.defaultdict(collections.Counter)
        self.values = collections.defaultdict(float)
    
    #we collect random experience from environment and update reward, trasition table.
    #cross entropy can only learn in full episode, but value iteration can learn in N steps without finishing episode.
    def play_n_random_steps(self, count):
        for _ in range(count):
            action = self.env.action_space.sample()
            new_state, reward, is_done, _ = self.env.step(action)
            self.rewards[(self.state, action, new_state)] = reward
            self.transits[(self.state, action)][new_state] += 1
            self.state = self.env.reset() if is_done else new_state
            
    #this function will get the probability with target_counts on states, and sum the target count as total, so 
    #total = p1 + p2, and we will calculate action value with (p1/total)(reward on state s1 + gamma * state_value(s1))
    #refer to the photo on the block above
    def calc_action_value(self, state, action):
        target_counts = self.transits[(state, action)]
        total = sum(target_counts.values())
        action_value = 0.0
        for tgt_state, count in target_counts.items():
            reward = self.rewards[(state, action, tgt_state)]
            action_value += (count / total) * (reward + GAMMA * self.values[tgt_state])
        return action_value
    
    #we calculate the best action in the state by action_value, because the action choosing process is deterministic,
    #play_n_random_steps() explore enough data for training, so we can converge to a good value
    def select_action(self, state):
        best_action, best_value = None, None
        for action in range(self.env.action_space.n):
            action_value = self.calc_action_value(state, action)
            if best_value is None or best_value < action_value:
                best_value = action_value
                best_action = action
        return best_action
    
    #this function get the best action and play one episode, it is for replay test episode, but we don't want to mess 
    #with current environment, so we put the state in 2nd environment, and play to collect reward
    def play_episode(self, env):
        total_reward = 0.0
        state = env.reset()
        while True:
            action = self.select_action(state)
            new_state, reward, is_done, _ = env.step(action)
            self.rewards[(state, action, new_state)] = reward
            self.transits[(state, action)][new_state] += 1
            total_reward += reward
            if is_done:
                break
            state = new_state
        return total_reward
    
    #we visit all state and get the value, and we use the largest state_value to renew our current state_value
    def value_iteration(self):
        for state in range(self.env.observation_space.n):
            state_values = [self.calc_action_value(state, action) for action in range(self.env.action_space.n)]
            self.values[state] = max(state_values)

In [4]:
if __name__ == "__main__":
    #we create environment, agent and TensorBoard writer
    test_env = gym.make(ENV_NAME)
    agent = Agent()
    writer = SummaryWriter(comment="-v-learning")
    
    #we play 100 random steps and fill in the reward and transfer table, then we do value iteration on all states
    iter_no = 0
    best_reward = 0.0
    while True:
        iter_no += 1
        agent.play_n_random_steps(100)
        agent.value_iteration()
        
        #we use value table as our policy to test different rounds, get best mean reward and write to TensorBoard
        reward = 0.0
        for _ in range(TEST_EPISODES):
            reward += agent.play_episode(test_env)
        reward /= TEST_EPISODES
        writer.add_scalar("reward", reward, iter_no)
        if reward > best_reward:
            print("Best reward updated %.3f -> %.3f" %(best_reward, reward))
            best_reward = reward
        if reward > 0.80 :
            print("Solved in %d iterations!" %iter_no)
            break
    writer.close()

Best reward updated 0.000 -> 0.100
Best reward updated 0.100 -> 0.300
Best reward updated 0.300 -> 0.600
Best reward updated 0.600 -> 0.700
Best reward updated 0.700 -> 0.750
Best reward updated 0.750 -> 0.800
Best reward updated 0.800 -> 0.900
Solved in 276 iterations!


# Q-learning vs cross entropy:
- the result need 6 to 10 steps, cross entropy is hard to know what have done right or which step is wrong, but state action can evaluate each action state value pair to know which is better, also it need less data as sample, therefore has higher sample efficiency

- also q-learning don't need a complete episode to start learning, it can learn even in just 1 example, but it need a successful example to converge, so in large and more complicated environment, it maybe hard to achieve. The good example is 8*8 FrozenLake, it needs 50 to 400 iterations to converge, once a successful episode appears, it can converge quickly.