# My implementation of Q-learning

1. off-policy

2. TD learning method

3. epsilon-greedy search

install some libraries

In [10]:
import gymnasium as gym
import numpy as np
import random
import imageio
%matplotlib inline

have a try in gym

In [11]:
env = gym.make("FrozenLake-v1", map_name="4x4", is_slippery=False, render_mode="rgb_array")

In [12]:
print(f'observation space: {env.observation_space}')
print(f'action space: {env.action_space}')

observation space: Discrete(16)
action space: Discrete(4)


In [13]:
state_space = env.observation_space.n
print("There are ", state_space, " possible states")

action_space = env.action_space.n
print("There are ", action_space, " possible actions")

There are  16  possible states
There are  4  possible actions


## build an agent to play the game

In [14]:
class agent():
    """interact and learn from the environment"""
    def __init__(self,
                 env,
                 state_space,
                 action_space,
                 lr,
                 gamma,
                 n_training_episode,
                 n_eval_episode,
                 min_epsilon,
                 max_epsilon,
                 decay_rate,
                 max_step
                 ):
        """init an all-zero Q table"""
        self.q_table = np.zeros((state_space,action_space))
        self.env = env
        self.lr = lr
        self.gamma = gamma
        self.n_training_episode = n_training_episode
        self.n_eval_episode = n_eval_episode
        self.min_epsilon = min_epsilon
        self.max_epsilon = max_epsilon
        self.decay_rate = decay_rate
        self.max_step = max_step
        
    def act(self,state,ep = None):
        """choose an action given a state"""
        # greedy search
        # epsilon greedy search
        if ep is not None:
            # choose the action using epsilon greedy search
            p = random.random()
            if p > ep:
                action = np.argmax(self.q_table[state][:])
            else:
                action = self.env.action_space.sample()
                
        else:
            # choose the optimal action
            action = np.argmax(self.q_table[state][:])
            
        return action
    
    def step(self, state, ep=None):
        """the agent walk a step, that means the agent choose an action given a observation 
        and obtain the next observation and reward"""
        action  = self.act(state, ep)
        observation, reward, terminated, truncated, info = self.env.step(action)
        return action,observation,reward,terminated,truncated
    
    def update(self,state,ep):
        """agent updates it's Q-values using next_observation and reward"""
        action,next_obs,reward,terminated,truncated = self.step(state,ep)
        self.q_table[state][action] = self.q_table[state][action] + self.lr * (reward + self.gamma * np.max(self.q_table[next_obs]) - self.q_table[state][action])
        return self.q_table, next_obs, terminated, truncated
        
    def train(self):
        for episode in range(self.n_training_episode):
            # we should update the epsilon at very episode firstly
            ep = self.min_epsilon + (self.max_epsilon - self.min_epsilon) * np.exp(-self.decay_rate * episode)
            
            # init the state
            state,info = self.env.reset()
            step = 0
            terminated = False
            truncated = False
            
            # iter every step in an episode
            for step in range(self.max_step):
                # choose an anction using epsilon greedy search
                # take the action and observe the S_{t+1} and Rt
                # update the parameters
                self.q_table, next_state, terminated, truncated = self.update(state = state,
                                                                               ep = ep)
                
                
                if terminated or truncated:
                    break
                
                # next state is the new state
                state = next_state
                
    def evaluate(self):
        """evaluate the agent"""
        episode_rewards = []
        
        for episode in range(self.n_eval_episode):
            
            # init the state
            state,info = self.env.reset()
            step = 0
            truncated = False
            terminated = False
            total_rewards_ep = 0
            
            for step in range(self.max_step):
                action,next_state,reward,terminated,truncated = self.step(state)
                total_rewards_ep += reward
                
                if terminated or truncated:
                    break
                
                state = next_state
            
            episode_rewards.append(total_rewards_ep)
        
        # mean and std of rewards
        mean_rewards = np.mean(episode_rewards)
        std_rewards = np.std(episode_rewards)
        
        return mean_rewards,std_rewards
    
    def record_video(self,save_path,fps=1):
        imgs = []
        terminated = False
        truncated = False
        state,info = self.env.reset(seed = random.randint(0,500))
        img = self.env.render()
        imgs.append(img)
        while not terminated or truncated:
            # Take the action (index) that have the maximum expected future reward given that state
            action = np.argmax(self.q_table[state][:])
            state, reward, terminated, truncated, info = self.env.step(action) # We directly put next_state = state for recording logic
            img = env.render()
            imgs.append(img)
        imageio.mimsave(save_path, [np.array(img) for i, img in enumerate(imgs)], fps=fps)
        
        
        

## train the agent

In [15]:
# hyperparameters
n_training_episode = 10000
n_eval_episode = 100
lr = 0.7
max_step = 99
gamma = 0.95
max_epsilon = 1.0
min_epsilon = 0.05
decay_rate = 0.0005

# instance the agent
FrozenLakeAgent = agent(env,
                        state_space,
                        action_space,
                        lr,
                        gamma,
                        n_training_episode,
                        n_eval_episode,
                        min_epsilon,
                        max_epsilon,
                        decay_rate,
                        max_step)

In [16]:
FrozenLakeAgent.train()
print(FrozenLakeAgent.q_table)

[[0.73509189 0.77378094 0.77378094 0.73509189]
 [0.73509189 0.         0.81450625 0.77378094]
 [0.77378094 0.857375   0.77378094 0.81450625]
 [0.81450625 0.         0.77378094 0.77378094]
 [0.77378094 0.81450625 0.         0.73509189]
 [0.         0.         0.         0.        ]
 [0.         0.9025     0.         0.81450625]
 [0.         0.         0.         0.        ]
 [0.81450625 0.         0.857375   0.77378094]
 [0.81450625 0.9025     0.9025     0.        ]
 [0.857375   0.95       0.         0.857375  ]
 [0.         0.         0.         0.        ]
 [0.         0.         0.         0.        ]
 [0.         0.9025     0.95       0.857375  ]
 [0.9025     0.95       1.         0.9025    ]
 [0.         0.         0.         0.        ]]


## evaluate the agent

In [17]:
mean_rewards, std_rewards = FrozenLakeAgent.evaluate()
mean_rewards, std_rewards

(1.0, 0.0)

## record the video

In [18]:
save_path = '/Users/gaohaitao/robotics-tutorial/record_videos/FrozenLake/replay.mp4'
FrozenLakeAgent.record_video(save_path=save_path,fps=1)