<a href="https://colab.research.google.com/github/burhanusman/RL-cliff-walking/blob/master/Cliff_RL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
import numpy as np
from copy import copy
import random

In [0]:
class CliffRoad():
    def __init__(self,nrow,ncol):
      self.ncol=ncol
      self.nrow=nrow
      #0 - road, 1=Edge, 2=Cliff , 4 =Start 5=Win
      self.road=np.zeros((self.nrow,self.ncol),np.int8)
      self.road[0,:]=1
      self.road[-1,1:-1]=2
      self.road[-1,0]=4
      self.road[-1,-1]=5
      self.current_state=[nrow-1,0]
      self.agent_path=[[0,0]]
      self.reward_array=np.zeros((self.nrow,self.ncol))
      self.full_action_list=[0,1,2,3]

      #reward setting 1
      self.reward_array[self.road!=2]=-1
      self.reward_array[self.road==2]=-100

      #reward setting 2
      # self.reward_array[self.road==1]=-1
      # self.reward_array[self.road==2]=-100
      # self.reward_array[self.road==5]=100

    def displayRoad(self):
      print(self.road)
    
    #actions - 0:Up, 1=Right, 2=Down, 3=Left
    def get_possible_actions(self,state):
      row=state[0]
      col=state[1]
      full_action_list=self.full_action_list.copy()
      last_row=self.nrow-1
      last_col=self.ncol-1
      if col==0:
        full_action_list.remove(3)
      if row==0:
        full_action_list.remove(0)
      if col==last_col:
        full_action_list.remove(1)
      if row==last_row:
        full_action_list.remove(2)
      return full_action_list
    
    def move_up(self,state):
      return [state[0]-1,state[1]]
    def move_down(self,state):
      return [state[0]+1,state[1]]
    def move_left(self,state):
      return [state[0],state[1]-1]
    def move_right(self,state):
      return [state[0],state[1]+1]
    
    def is_done(self,state):
      last_row=self.nrow-1
      last_col=self.ncol-1
      if state[0]==last_row and (state[1] != 0):
        return True
      return False

    def step(self,action):
      done=False
      obs=self.current_state
      if action==0:
        next_state=self.move_up(obs)
      if action==1:
        next_state=self.move_right(obs)
      if action==2:
        next_state=self.move_down(obs)
      if action==3:
        next_state=self.move_left(obs)
      reward=self.reward_array[next_state[0],next_state[1]]
      done=self.is_done(next_state)
      self.current_state=next_state
      return next_state,reward,done

    def show_agent_position(self):
      agent_on_road=self.road.copy()
      pos=self.current_state
      agent_on_road[pos[0],pos[1]]=8
      print(agent_on_road)

    def reset(self):
      self.current_state=[self.nrow-1,0]

### Defining the agent and the requred q-table
Number of states = row*col

Ideally the Q table should contain number of states on rows, and actions on the columsn


In [0]:
class Agent():
  def __init__(self,road):
    self.road=road
    nrow=road.nrow
    ncol=road.ncol
    state_number=nrow*ncol
    action_number=len(road.full_action_list)
    self.q=np.full((state_number,action_number),0.5)
    self.policy=np.zeros_like(road.road)

  def state_encode(self,state):
    ncol=self.road.ncol
    encoded_state=state[0]*ncol+state[1]
    return encoded_state
  
  def update_current_policy(self):
    for i in range(self.policy.shape[0]):
      for j in range(self.policy.shape[1]):
        state=[i,j]
        action_set=self.road.get_possible_actions(state)
        q_state=self.state_encode(state)
        self.policy[i,j]=action_set[np.argmax(self.q[q_state,action_set])]

  def q_learn(self,episodes=100,alpha=.5,gamma=.99,epsilon=0.1):
    for i in range(episodes):
      self.road.reset()
      done=False
      while not done:
        state=self.state_encode(self.road.current_state)
        action_set=self.road.get_possible_actions(road.current_state)
        explore_p=random.random()

        #eps-policy for selecting next action
        if explore_p<epsilon:
          action=random.choice(action_set)
        else:
          action=action_set[np.argmax(self.q[state,action_set])]
        
        #Taking the above action, moving to the next state, and getting the corresponding reward.
        next_state,reward,done=road.step(action)
        q_next_state=self.state_encode(next_state)

        #greedy-policy for selecting the next q-value
        next_action_set=self.road.get_possible_actions(next_state)
        #Greedy update 
        self.q[state,action]=self.q[state,action]+alpha*(reward+gamma*np.max(self.q[q_next_state,next_action_set])-self.q[state,action])
    self.update_current_policy()

  def sarsa(self,episodes=100,alpha=.5,gamma=.99,epsilon=0.1):
    for i in range(episodes):
      self.road.reset()
      done=False
      while not done:
        state=self.state_encode(self.road.current_state)
        action_set=self.road.get_possible_actions(road.current_state)
        explore_p=random.random()

        #eps-policy for selecting next action
        if explore_p<epsilon:
          action=random.choice(action_set)
        else:
          action=action_set[np.argmax(self.q[state,action_set])]
        
        #Taking the above action, moving to the next state, and getting the corresponding reward.
        next_state,reward,done=road.step(action)
        q_next_state=self.state_encode(next_state)

        #SARSA - We follow the eps-policy for finding the Q-value of next state
        next_action_set=self.road.get_possible_actions(next_state)
        explore_p=random.random()
        if explore_p<epsilon:
          next_action=random.choice(next_action_set)
        else:
          next_action=next_action_set[np.argmax(self.q[q_next_state,next_action_set])]
        #Greedy update 
        self.q[state,action]=self.q[state,action]+alpha*(reward+gamma*self.q[q_next_state,next_action]-self.q[state,action])
    self.update_current_policy()

    def erase_learning(self):
      self.q=0.5
      self.policy=0



In [0]:
road=CliffRoad(5,8)
agent=Agent(road)

In [0]:
agent.sarsa()

In [203]:
agent.policy

array([[1, 1, 1, 1, 1, 1, 2, 2],
       [1, 0, 0, 0, 1, 0, 1, 2],
       [0, 0, 0, 1, 1, 3, 1, 2],
       [0, 0, 0, 0, 0, 0, 0, 0]], dtype=int8)

In [0]:
agent.q_learn()

In [184]:
agent.policy

array([[1, 1, 2, 1, 2, 2, 2, 2],
       [1, 3, 2, 1, 1, 1, 2, 2],
       [1, 0, 2, 1, 1, 2, 1, 2],
       [1, 1, 1, 1, 1, 1, 1, 2],
       [0, 0, 0, 0, 0, 0, 0, 0]], dtype=int8)

In [0]:
def state_encode(state,ncol):
  encoded_state=state[0]*ncol+state[1]
  if encoded_state<0:
    return encoded_state%4
  return encoded_state

In [65]:
random.random()

0.39565831149966535

In [0]:

#Learning Parameters
episodes=10
alpha=.5
gamma=.99
epsilon=0.1



road=CliffRoad(4,8)
nrow=road.nrow
ncol=road.ncol
state_number=nrow*ncol
action_number=4
#initializing q-table
q=np.full((state_number,action_number),0.5)
exp_i=0
for i in range(1000):
  road.reset()
  done=False
  while not done:
    state=state_encode(road.current_state,ncol)
    action_set=road.get_possible_actions(road.current_state)
    
    explore_p=random.random()
    if explore_p<epsilon:
      action=random.choice(action_set)
      # exp_i+=1
      # print("Exploring:",exp_i,road.current_state)

    else:
      action=action_set[np.argmax(q[state,action_set])]
    next_state,reward,done=road.step(action)
    q_next_state=state_encode(next_state,ncol)
    next_action_set=road.get_possible_actions(next_state)
    next_action=next_action_set[np.argmax(q[q_next_state,next_action_set])]

    #update 
    q[state,action]=q[state,action]+alpha*(reward+gamma*q[q_next_state,next_action]-q[state,action])
    # road.show_agent_position()
    # print(road.current_state)



In [0]:

#Learning Parameters
episodes=10
alpha=.5
gamma=.99
epsilon=0.1



road=CliffRoad(4,8)
nrow=road.nrow
ncol=road.ncol
state_number=nrow*ncol
action_number=4
#initializing q-table
q=np.full((state_number,action_number),0.5)
exp_i=0
for i in range(1000):
  road.reset()
  done=False
  while not done:
    state=state_encode(road.current_state,ncol)
    action_set=road.get_possible_actions(road.current_state)
    
    explore_p=random.random()
    if explore_p<epsilon:
      action=random.choice(action_set)
      # exp_i+=1
      # print("Exploring:",exp_i,road.current_state)
    else:
      action=action_set[np.argmax(q[state,action_set])]

    next_state,reward,done=road.step(action)
    q_next_state=state_encode(next_state,ncol)
    next_action_set=road.get_possible_actions(next_state)

    explore_p=random.random()
    if explore_p<epsilon:
      next_action=random.choice(next_action_set)
    else:
      next_action=next_action_set[np.argmax(q[q_next_state,next_action_set])]

    #update #sarsa

    q[state,action]=q[state,action]+alpha*(reward+gamma*q[q_next_state,next_action]-q[state,action])
    # road.show_agent_position()
    # print(road.current_state)



In [0]:
action_set=road.get_possible_actions(road.current_state)
explore_p=.2


In [0]:
action=action_set[np.argmax(q[state,action_set])]

In [125]:
next_action_set=road.get_possible_actions(next_state)
next_action_set

[0, 1, 3]

In [126]:
next_action=next_action_set[np.argmax(q[28,next_action_set])]
next_action

0

In [0]:
next_state,reward,done=road.step(action)

In [0]:
reward

In [130]:
next_state

[3, 4]

In [0]:
action_set=road.get_possible_actions(road.current_state)

explore_p=random.random()
if explore_p<epsilon:
  action=random.choice(action_set)
  exp_i+=1
  print("Exploring:",exp_i,road.current_state)

else:
  action=action_set[np.argmax(q[state,action_set])]
next_state,reward,done=road.step(action)
q_next_state=state_encode(next_state,ncol)
next_action_set=road.get_possible_actions(next_state)
next_action=next_action_set[np.argmax(q[q_next_state,next_action_set])]

#update 
q[state,action]=q[state,action]+alpha*(reward+gamma*q[q_next_state,next_action]-q[state,action])

In [0]:
road.current_state=[2,4]

In [119]:
state_encode(road.current_state,8)

20

In [113]:
road.reward_array

array([[  -1.,   -1.,   -1.,   -1.,   -1.,   -1.,   -1.,   -1.],
       [  -1.,   -1.,   -1.,   -1.,   -1.,   -1.,   -1.,   -1.],
       [  -1.,   -1.,   -1.,   -1.,   -1.,   -1.,   -1.,   -1.],
       [  -1., -100., -100., -100., -100., -100., -100.,   -1.]])

In [0]:
def get_policy(road,q):
  nrow=road.nrow
  ncol=road.ncol
  policy_array=np.zeros_like(road.road)
  for i in range(nrow):
    for j in range(ncol):
      state=[i,j]
      action_set=road.get_possible_actions(state)
      q_state=state_encode(state,ncol)
      policy_array[i,j]=action_set[np.argmax(q[q_state,action_set])]
  return policy_array

In [137]:
get_policy(road,q)

array([[1, 2, 1, 1, 2, 2, 2, 2],
       [1, 1, 1, 2, 1, 2, 1, 2],
       [1, 1, 1, 1, 1, 1, 1, 2],
       [0, 0, 0, 0, 0, 0, 0, 0]], dtype=int8)

In [139]:
get_policy(road,q)

array([[1, 1, 1, 1, 1, 1, 1, 2],
       [0, 1, 1, 1, 0, 0, 1, 2],
       [0, 0, 0, 0, 0, 3, 1, 2],
       [0, 0, 0, 0, 0, 0, 0, 0]], dtype=int8)

In [0]:
action_set=road.get_possible_actions()
action=action_set[np.argmax(q[state,action_set])]

In [0]:
def infer(q,road):
  road.reset()
  done=False
  while not done:
    state=state_encode(road.current_state,road.ncol)
    action_set=road.get_possible_actions(road.current_state)
    action=action_set[np.argmax(q[state,action_set])]
    next_state,reward,done=road.step(action)
    road.show_agent_position()

In [87]:
infer(q,road)

[[1 1 1 1 1 1 1 1]
 [0 0 0 0 0 0 0 0]
 [8 0 0 0 0 0 0 0]
 [4 2 2 2 2 2 2 5]]
[[1 1 1 1 1 1 1 1]
 [0 0 0 0 0 0 0 0]
 [0 8 0 0 0 0 0 0]
 [4 2 2 2 2 2 2 5]]
[[1 1 1 1 1 1 1 1]
 [0 0 0 0 0 0 0 0]
 [0 0 8 0 0 0 0 0]
 [4 2 2 2 2 2 2 5]]
[[1 1 1 1 1 1 1 1]
 [0 0 0 0 0 0 0 0]
 [0 0 0 8 0 0 0 0]
 [4 2 2 2 2 2 2 5]]
[[1 1 1 1 1 1 1 1]
 [0 0 0 0 0 0 0 0]
 [0 0 0 0 8 0 0 0]
 [4 2 2 2 2 2 2 5]]
[[1 1 1 1 1 1 1 1]
 [0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 0]
 [4 2 2 2 8 2 2 5]]


In [0]:
def run_episode():
  episodes=10
  alpha=.5
  gamma=.99

  road=CliffRoad(4,8)
  nrow=road.nrow
  ncol=road.ncol
  state_number=nrow*ncol
  action_number=4
  done=False

  #initializing q-table
  q=np.full((state_number,action_number),0.5)
  while not done:
    state=state_encode(road.current_state,road.ncol)
    action_set=road.get_possible_actions()
    print(road.current_state)
    print(action_set)
    action=action_set[np.argmax(q[state,action_set])]
    next_state,reward,done=road.step(action)
    q_next_state=state_encode(next_state,ncol)
    #update 
    q[state,action]=q[state,action]+alpha*(reward+gamma*q[q_next_state,action]-q[state,action])
    road.show_agent_position()
  print(q)