In [1]:
import numpy as np

In [2]:
nan = np.nan

In [3]:
#we define the transition probability for each action at each state.

T = np.array([
    [[0.7, 0.3, 0], [1, 0,0], [0.8, 0.2, 0]],
    [[0, 1, 0], [nan, nan, nan], [0, 0, 1]],
    [[nan, nan, nan], [0.8, 0.1, 0.1], [nan, nan, nan]],
])

In [4]:
#we define the reward while performing an each action at each state.

R = np.array([
    [[10, 0, 0], [0, 0, 0], [0, 0, 0]],
    [[0, 0, 0], [nan, nan, nan], [0, 0, -50]],
    [[nan, nan, nan], [40, 0, 0], [nan, nan, nan]],
])

In [5]:
#possible action that we can perform at each state 
possible_actions = [[0,1,2],[0,2], [1]]
#disount factor
lr = 0.95
#number of iteration
n_iterate = 1000

In [6]:
#function to define V-value of each state.
'''We start by initialize the value funtion at each state with impossible'''
def V_values(lr, n_iterate, T, R, possible_actions):
    '''
    Inputs: lr dicount factor
            n_iterate number of iteration
            T table of transition probavility
            R table of reward
            possible_actions different actions that can be take at each state.
    Return V vector of Q values
    '''
    V = np.full(3, -np.inf)
    for state, actions in enumerate(possible_actions):
        V[state] = 0.0
    for i in range(n_iterate):
        V_prev = V.copy()
        for s in range(3):
            V[s] = np.max([np.sum([
                T[s,a,sp]*(R[s,a,sp] + lr*(V_prev[sp])) for sp in range(3)
            ]) for a in possible_actions[s]
                          ])
    return V

In [7]:
# np.max([np.sum([] for sp in range(3))] for a in possible_actions[s])

In [8]:
V_values(lr,n_iterate, T, R, possible_actions)

array([21.89925005,  1.17982024, 53.87349498])

In [9]:
def Q_values(lr, n_iterate, T, R, possible_actions):
    '''
    Inputs: lr dicount factor
            n_iterate number of iteration
            T table of transition probavility
            R table of reward
            possible_actions different actions that can be take at each state.
    Return Q matrix of Q values
    '''
    Q = np.full((3,3), -np.inf)
    for state, actions in enumerate(possible_actions):
        Q[state, actions] = 0.0
    for i in range(n_iterate):
        Q_prev = Q.copy()
        for s in range(3):
            for a in possible_actions[s]:
                Q[s,a] = np.sum([
                    T[s,a,sp]*(R[s,a,sp] + lr*np.max(Q_prev[sp])) for sp in range(3)
                ])
    return Q

**By runing the Q-values iterative algorithm, we find the following result:**

In [10]:
Q = Q_values(lr,n_iterate, T, R, possible_actions)
Q

array([[21.89925005, 20.80428755, 16.86759588],
       [ 1.12082922,        -inf,  1.17982024],
       [       -inf, 53.87349498,        -inf]])

**This algorithm is verry necessary in reinforcement learning because it gives the value at each state while perfoming a specific action. Since the goal here is to find the optimal policy that can maximize the cumulative rewards, by taking the argmax() function ones will best actions to take.**

In [11]:
best_act = np.argmax(Q, axis = 1)
print('The best action to take at each state is {}.'.format(best_act))

The best action to take at each state is [0 2 1].


**Temporal difference learning algorithm**
RL problem with discret actions can be modeled using MDP. in TD, the agent has partial information about the MDP. In generale, we assume that the agent initially know the states and actions to take.

In [12]:
###      TD(0)

In [13]:
lr_rate0= 0.05
lr_rate_decay=0.1
gamma= 0.95
n_iterates = 1000

def V_value_TD(lr_rate0, lr_rate_decay, gamma, T, R, n_iterates):
    V = np.full(3, -np.inf)
    for state, actions in enumerate(possible_actions):
        V[state] = 0.0
    s = 0
    for iterate in range(n_iterates):
        V_prev = V.copy()
        lr_rate = lr_rate0 / (1 + iterate*lr_rate_decay)
        a = np.random.choice(possible_actions[s])
        sp = np.random.choice(range(3), p = T[s,a])
        r = R[s,a,sp]
        V[s] = (1 - lr_rate)*V_prev[s] + lr_rate*(r + gamma*V_prev[sp])
        s = sp
    return V
        
        

In [14]:
V_value_TD(lr_rate0, lr_rate_decay, gamma, T, R, n_iterates)

array([  1.89843966, -10.33080827,   7.63232731])

In [15]:
def Q_value_TD(lr_rate0, lr_rate_decay, gamma, T, R, n_iterates):
    Q = np.full((3,3), -np.inf)
    s = 0
    for state, actions in enumerate(possible_actions):
        Q[state, actions] = 0
    for iterate in range(n_iterates):
        Q_prev = Q.copy()
        lr_rate = lr_rate0 / (1 + iterate*lr_rate_decay)
        a = np.random.choice(possible_actions[s])
        sp = np.random.choice(range(3), p = T[s,a])
        r = R[s,a,sp]
        Q[s,a] = (1 - lr_rate)*Q_prev[s,a] + lr_rate*(r + gamma*np.max(Q_prev[sp]))
        s = sp
    return Q

In [16]:
Q_value = Q_value_TD(lr_rate0, lr_rate_decay, gamma, T, R, n_iterates)
Q_value

array([[  4.00773988,   0.89898776,   0.82596686],
       [  0.        ,         -inf, -10.66362466],
       [        -inf,   8.21306868,         -inf]])

In [17]:
#Optimal policy
np.argmax(Q_value, axis = 1)

array([0, 0, 1])

In [18]:
####      TD(1)

In [19]:
lr_rate0= 0.05
lr_rate_decay=0.1
gamma= 0.95
n_iterates = 1000

def V_value_TD1(lr_rate0, lr_rate_decay, gamma, T, R, n_iterates):
    V = np.full(3, -np.inf)
    S = np.zeros(3)
    for state, actions in enumerate(possible_actions):
        V[state] = 0.0
    s = 0
    for iterate in range(n_iterates):
        V_prev = V.copy()
        S_prev = S.copy()
        lr_rate = lr_rate0 / (1 + iterate*lr_rate_decay)
        a = np.random.choice(possible_actions[s])
        sp = np.random.choice(range(3), p = T[s,a])
        r = R[s,a,sp]
        V[s] = (1 - lr_rate*S[s])*V_prev[s] + lr_rate*(r + gamma*V_prev[sp])*S[s]
        S[s] = S_prev[s] + 1
        s = sp
        S[s] = lr_rate*S[s]
        
        
    return V,S
V_value_TD1(lr_rate0, lr_rate_decay, gamma, T, R, n_iterates)

(array([ 0.0511583 , -0.08553505,  0.0572742 ]),
 array([4.95786065e-04, 1.00049727e+00, 1.00049678e+00]))

In [20]:
#!pip3 install gym


**Implementation of Q-learning using gym enveronment.**

In [21]:
import gym
import torch
import time
import numpy as np
import matplotlib.pyplot as plt
from copy import deepcopy
from gym.envs.registration import register
from IPython.display import clear_output
try:
    register(
        id='FrozenLakeNolip-v0',
        entry_point='gym.envs.toy_text:FrozenLakeEnv',
        kwargs={'map_name' : '4x4', 'is_slippery':False},
        max_episode_steps=100,
        reward_threshold=0.78, # optimum = .8196
    )
except:
    pass

env_names = 'FrozenLakeNolip-v0'
env = gym.make(env_names)
print('Observation spaces:', env.observation_space)
print('Actions spaces:', env.action_space)
type(env.action_space)


Observation spaces: Discrete(16)
Actions spaces: Discrete(4)


gym.spaces.discrete.Discrete

In [22]:
class Qagent():
    def __init__(self, env):
        self.action_size = env.action_space.n

    def get_action(self, state):
        action = np.random.choice(range(self.action_size))
        return action

In [23]:
class Agent(Qagent):
    def __init__(self, env, discount_rate= 0.95, learning_rate=0.01):
        super().__init__(env)
        self.state_size = env.observation_space.n
        self.discount_rate = discount_rate
        self.learning_rate = learning_rate
        self.eps = 1.0
        self.model()
        
    def model(self):
        self.q_value = 1e-5 * np.random.random((self.state_size, self.action_size))
        
    def get_action(self, state):
        #we first get the row containing the q_value for the given state.
        q_state = self.q_value[state]
        #exploitation
        action_greedy = np.argmax(q_state)
        #exploiration 
        action_random = super().get_action(state)
        
        return action_random if np.random.random() < self.eps else action_greedy
    
    
    def train(self, lis):
        state, action, next_state, reward, done= lis
        q_next =    self.q_value[next_state]
        q_next =    np.zeros(self.action_size) if done else q_next
        q_target =  reward + self.discount_rate * np.max(q_next)
        q_update =  q_target - self.q_value[state,action]
        self.q_value[state,action]  +=  self.learning_rate * q_update
        
        
        if done:
            self.eps = self.eps * 0.99
            
agent = Agent(env)       

In [24]:
total_reward = 0
for ep in range(100):
    state = env.reset()
    done = False
    while not done:
        action = agent.get_action(state)
        next_state, reward, done, info = env.step(action)
        agent.train((state, action, next_state, reward, done))
        state = next_state
        total_reward += reward
        print('s:', state, 'a:',action, 'reward', reward)
        print('Episode {}, Total rewards {}, eps {}'.format(ep,total_reward, agent.eps))
        env.render()
        print(agent.q_value)
        time.sleep(0.02)
        clear_output(wait = True)

s: 7 a: 2 reward 0.0
Episode 99, Total rewards 0.0, eps 0.36603234127322926
  (Right)
SFFF
FHF[41mH[0m
FFFH
HFFG
[[1.34869220e-06 4.93477675e-06 1.83780042e-06 1.13875006e-06]
 [2.35948087e-06 5.13239350e-06 4.31452393e-06 3.84756194e-06]
 [4.71510025e-06 4.49426278e-06 2.54052352e-06 8.84518014e-06]
 [2.95526798e-06 1.47662952e-06 4.94839095e-06 5.21141904e-06]
 [3.11375222e-06 6.51904582e-06 4.26869991e-06 2.01342465e-06]
 [4.66667789e-06 9.93355509e-06 7.57952725e-06 1.43648310e-06]
 [5.25899291e-06 1.49967465e-06 8.95327793e-06 4.31683483e-06]
 [7.89940565e-06 4.02403357e-06 2.08605244e-06 5.14905450e-06]
 [6.83257529e-06 2.75563296e-06 8.63417828e-06 8.63600701e-06]
 [8.23722746e-06 5.37449313e-06 1.14977336e-06 7.71455373e-06]
 [4.91238699e-06 3.10009473e-06 3.41096310e-06 7.89398952e-06]
 [9.08476572e-06 4.16949191e-06 6.22525720e-06 3.04035491e-06]
 [3.93638855e-06 3.41547650e-06 9.40045242e-06 2.79368534e-06]
 [2.90180057e-06 5.65149163e-06 2.07846231e-06 4.94128739e-06]
 [8