## Reinforcement Learning Practice

Here we use Q Learning which will train an agent to traverse an array finding the finishline (#3) while
avoiding landmines (#2s).

e.g. Here we want to get #1 to go step by step to reach #3. In this 
case it needs to go left, then up 4 times... if instead it goes up on the 
first move, it hits a landmine and game over!

```
[[0. 0. 0. 3. 0.]
 [2. 0. 0. 0. 0.]
 [0. 2. 0. 0. 0.]
 [0. 0. 0. 0. 2.]
 [0. 0. 0. 0. 1.]]
```

Initialize env

In [63]:
import numpy as np
import pandas as pd
import random

# initialize a numpy array which will be our env/state
def initialize_state():
  state = np.zeros((5,5))  
  
  # starting point
  state[4,np.random.randint(0,5)] = 1

  # landmines
  state[3,np.random.randint(0,5)] = 2
  state[2,np.random.randint(0,5)] = 2
  state[1,np.random.randint(0,5)] = 2

  # Finishline
  state[0,np.random.randint(0,5)] = 3

  return state

In [64]:
state = initialize_state()
state

array([[0., 0., 0., 3., 0.],
       [0., 2., 0., 0., 0.],
       [0., 0., 0., 0., 2.],
       [0., 2., 0., 0., 0.],
       [0., 0., 0., 0., 1.]])

In [65]:
def get_current_position(state):
  current_position = np.argwhere(state == 1)[0]
  return current_position

In [66]:
current_position = get_current_position(state)
current_position

array([4, 4])

In [67]:
def get_actions(state):
  current_position = np.argwhere(state == 1)[0]

  d_actions = {}
 
  up = np.array([current_position[0]-1,current_position[1]])
  right = np.array([current_position[0],current_position[1]+1])
  left = np.array([current_position[0],current_position[1]-1])

  # Dont move if on the edges or top
  if current_position[0] == 0:
    up = [0,up[1]]
  if current_position[1] == 4:
    right = [right[0],4]
  if current_position[1] == 0:
    left = [left[0],0]
  
  d_actions['up'] = up
  d_actions['right'] = right
  d_actions['left'] = left

  return d_actions

In [76]:
actions = get_actions(state)
actions

{'up': array([3, 4]), 'right': [4, 4], 'left': array([4, 3])}

In [98]:
# Figure out Q table by listing all available states and actions
# e.g. 
#. state, up, right, left
#. [0,0], 0,  0,  0
#.  ...
#. [4,5], 0,  0,  0
#
 
def initialize_q_table(state):
  states = [(x,y) for x in range(0,5) for y in range(0,5)]
  actions = ["up","right","left"]
  df = pd.DataFrame(np.zeros((25,3)),columns=actions)
  df['state'] = states

  df = df[['state','up','right','left']]
  
  return df

In [99]:
q = initialize_q_table(state)
q

Unnamed: 0,state,up,right,left
0,"(0, 0)",0.0,0.0,0.0
1,"(0, 1)",0.0,0.0,0.0
2,"(0, 2)",0.0,0.0,0.0
3,"(0, 3)",0.0,0.0,0.0
4,"(0, 4)",0.0,0.0,0.0
5,"(1, 0)",0.0,0.0,0.0
6,"(1, 1)",0.0,0.0,0.0
7,"(1, 2)",0.0,0.0,0.0
8,"(1, 3)",0.0,0.0,0.0
9,"(1, 4)",0.0,0.0,0.0


In [147]:
def propose_action(state,q,epsilon=.10):
  # current position
  current_position = get_current_position(state)
  print(f'current position is : {current_position}')

  # Actions
  actions = get_actions(state)
  print(f'possible actions are : {actions}')

  # See check if state is at initial value if so then we select a random action
  if ((q[q['state'] == (current_position[0],current_position[1])]['up'] + \
      q[q['state'] == (current_position[0],current_position[1])]['right'] + \
      q[q['state'] == (current_position[0],current_position[1])]['left']) == 0.0).item() == True:
      print('initial move for this state, random action...')

      proposed_action = random.choice(list(actions.items()))     
      print(f'proposed action is : {proposed_action}')

      return proposed_action
  
  # Epsilon greedy approach
  elif np.random.uniform(0,1,1) > epsilon:
    # See check if state is at initial value if so then we select a random action
    print('Choosing random action ... \n')
    proposed_action = random.choice(list(actions.items()))      
    print(f'proposed action is : {proposed_action}')

    return proposed_action

  else:
    print('selecting action based on q table...')
    # We want to select the move with the highest q value
    proposed_action_dir = q[q['state'] == (current_position[0],current_position[1])]\
      [['up','right','left']].idxmax(axis=1).item()
    
    proposed_action = list({
      key:value for key,value in actions.items() if key == proposed_action_dir
    }.items())[0]
    
    return proposed_action

In [148]:
proposed_action = propose_action(state,q)
proposed_action


current position is : [2 0]
possible actions are : {'up': array([1, 0]), 'right': array([2, 1]), 'left': [2, 0]}
Choosing random action ... 

proposed action is : ('right', array([2, 1]))


('right', array([2, 1]))

Now that we propse a new state, we will give our agent a reward and 
update the Q table

In [149]:
def best_state_action_q_value(state,q,proposed_action):
    current_position = get_current_position(state)
    actions = get_actions(state)
    proposed_action_dir = q[q['state'] == (current_position[0],current_position[1])]\
      [['up','right','left']].idxmax(axis=1).item()
    
    proposed_action = list({
      key:value for key,value in actions.items() if key == proposed_action_dir
    }.items())[0]

    # Find Q value
    max_val = float(q[q['state'] == (current_position[0],current_position[1])].max(axis=1))


    return proposed_action, max_val


In [150]:
best_state_action_q_value(state,q,proposed_action)

(('up', array([1, 0])), 0.9766024949366847)

In [151]:
def reward_and_update_q_table(state,q,proposed_action,lr,gamma):
  # Check which direction move is proposed to update q table
  current_position = get_current_position(state)
  move = proposed_action[0]

  # Calculate best state and max_val
  best_state, max_val = best_state_action_q_value(state,q,proposed_action)
  
  # Up moves
  if move == 'up':
    print('UP!')

    # Check if proposed move is out of bounds
    if current_position[0] == 0:
      reward = -10
      q.loc[q['state'] == (int(current_position[0]),int(current_position[1])),'up'] += lr *(reward + gamma*max_val - q.loc[q['state']== (int(current_position[0]),int(current_position[1])),'up'])

    # Check for landmine
    elif (state[int(proposed_action[1][0]),int(proposed_action[1][1])] == 2.0):
      reward = -10
      q.loc[q['state'] == (int(current_position[0]),int(current_position[1])),'up'] += lr *(reward + gamma*max_val - q.loc[q['state'] == (int(current_position[0]),int(current_position[1])),'up']) 
    
    # Check for finish line
    elif (state[int(proposed_action[1][0]),int(proposed_action[1][1])] == 3.0):
      reward = 10
      q.loc[q['state'] == (int(current_position[0]),int(current_position[1])),'up'] += lr *(reward + gamma*max_val - q.loc[q['state'] == (int(current_position[0]),int(current_position[1])),'up'])
    
    # Else give small penalty to incentivise quicker moves TODO
    else:
      reward = 1
      q.loc[q['state'] == (int(current_position[0]),int(current_position[1])),'up'] += lr *(reward + gamma*max_val - q.loc[q['state'] == (int(current_position[0]),int(current_position[1])),'up'])

  # Right moves
  if move == 'right':
    print('RIGHT!')
    
    # Check if proposed move is out of bounds 
    if current_position[1] == 4:
      reward = -10
      q.loc[q['state'] == (int(current_position[0]),int(current_position[1])),'right'] += lr *(reward + gamma*max_val - q.loc[q['state'] == (int(current_position[0]),int(current_position[1])),'right'])
    
    # Check for landmine
    elif (state[int(proposed_action[1][0]),int(proposed_action[1][1])] == 2.0):
      reward = -10
      q.loc[q['state'] == (int(current_position[0]),int(current_position[1])),'right'] += lr *(reward + gamma*max_val - q.loc[q['state'] == (int(current_position[0]),int(current_position[1])),'right'])
    
    # Check for finish line
    elif (state[int(proposed_action[1][0]),int(proposed_action[1][1])] == 3.0):
      reward = 10
      q.loc[q['state'] == (int(current_position[0]),int(current_position[1])),'right'] += lr *(reward + gamma*max_val - q.loc[q['state'] == (int(current_position[0]),int(current_position[1])),'right'])
    
    else:
      reward = 1
      q.loc[q['state'] == (int(current_position[0]),int(current_position[1])),'right'] += lr *(reward + gamma*max_val - q.loc[q['state'] == (int(current_position[0]),int(current_position[1])),'right'])

  # Left moves
  if move == 'left':
    print('LEFT!')

    # Check if proposed move is out of bounds
    if current_position[1] == 0:
      reward = -10
      q.loc[q['state'] == (int(current_position[0]),int(current_position[1])),'left'] += lr *(reward + gamma*max_val - q.loc[q['state'] == (int(current_position[0]),int(current_position[1])),'left'])
    
    # Check for landmine
    elif (state[int(proposed_action[1][0]),int(proposed_action[1][1])] == 2.0):
      reward = -10
      q.loc[q['state'] == (int(current_position[0]),int(current_position[1])),'left'] += lr *(reward + gamma*max_val - q.loc[q['state'] == (int(current_position[0]),int(current_position[1])),'left'])
    
    # Check for finish line
    elif (state[int(proposed_action[1][0]),int(proposed_action[1][1])] == 3.0):
      reward = 100
      q.loc[q['state'] == (int(current_position[0]),int(current_position[1])),'left'] += lr *(reward + gamma*max_val - q.loc[q['state'] == (int(current_position[0]),int(current_position[1])),'left'])      
    
    else:
      reward = 1
      q.loc[q['state'] == (int(current_position[0]),int(current_position[1])),'left'] += lr *(reward + gamma*max_val - q.loc[q['state'] == (int(current_position[0]),int(current_position[1])),'left'])

  return reward, q

In [152]:
reward, q = reward_and_update_q_table(state,q,proposed_action,.05,.01)
print(f'reward: {reward}')
q

RIGHT!
reward: -10


Unnamed: 0,state,up,right,left
0,"(0, 0)",0.0,0.05,0.0
1,"(0, 1)",0.0,0.396517,0.054744
2,"(0, 2)",0.0,3.965174,0.0
3,"(0, 3)",0.0,0.0,0.0
4,"(0, 4)",0.441123,3.210356,97.660249
5,"(1, 0)",0.0,0.928925,-0.488184
6,"(1, 1)",0.833211,0.0,0.0
7,"(1, 2)",0.0,0.0,0.0
8,"(1, 3)",0.99875,0.0,0.0
9,"(1, 4)",1.024161,-0.5,0.0


In [153]:
def take_action(state,proposed_action):
  # again get current position
  current_position = get_current_position(state)
  # Create new state
  new_state = state

  # Swap position of the 1 (move the 1 to the proposed position)
  new_state[current_position[0],current_position[1]] = 0
  new_state[proposed_action[1][0],proposed_action[1][1]] = 1
  
  return new_state

In [154]:
print(state)
print('\n\n New State: \n')
new_state = take_action(state,proposed_action)
print(new_state)

[[0. 0. 0. 3. 0.]
 [0. 0. 2. 0. 0.]
 [1. 2. 0. 0. 0.]
 [0. 0. 0. 0. 2.]
 [0. 0. 0. 0. 0.]]


 New State: 

[[0. 0. 0. 3. 0.]
 [0. 0. 2. 0. 0.]
 [0. 1. 0. 0. 0.]
 [0. 0. 0. 0. 2.]
 [0. 0. 0. 0. 0.]]


## Here we initalize an enviornment with Q table and take a step

In [155]:
state = initialize_state()
q = initialize_q_table(state)

print(state)
print(q)

[[3. 0. 0. 0. 0.]
 [0. 0. 0. 0. 2.]
 [0. 0. 0. 2. 0.]
 [2. 0. 0. 0. 0.]
 [0. 0. 1. 0. 0.]]
     state   up  right  left
0   (0, 0)  0.0    0.0   0.0
1   (0, 1)  0.0    0.0   0.0
2   (0, 2)  0.0    0.0   0.0
3   (0, 3)  0.0    0.0   0.0
4   (0, 4)  0.0    0.0   0.0
5   (1, 0)  0.0    0.0   0.0
6   (1, 1)  0.0    0.0   0.0
7   (1, 2)  0.0    0.0   0.0
8   (1, 3)  0.0    0.0   0.0
9   (1, 4)  0.0    0.0   0.0
10  (2, 0)  0.0    0.0   0.0
11  (2, 1)  0.0    0.0   0.0
12  (2, 2)  0.0    0.0   0.0
13  (2, 3)  0.0    0.0   0.0
14  (2, 4)  0.0    0.0   0.0
15  (3, 0)  0.0    0.0   0.0
16  (3, 1)  0.0    0.0   0.0
17  (3, 2)  0.0    0.0   0.0
18  (3, 3)  0.0    0.0   0.0
19  (3, 4)  0.0    0.0   0.0
20  (4, 0)  0.0    0.0   0.0
21  (4, 1)  0.0    0.0   0.0
22  (4, 2)  0.0    0.0   0.0
23  (4, 3)  0.0    0.0   0.0
24  (4, 4)  0.0    0.0   0.0


In [156]:
# Taking a step
proposed_action = propose_action(state,q)
proposed_action
print(reward_and_update_q_table(state,q,proposed_action,lr=.1,gamma=.95))

current position is : [4 2]
possible actions are : {'up': array([3, 2]), 'right': array([4, 3]), 'left': array([4, 1])}
initial move for this state, random action...
proposed action is : ('up', array([3, 2]))
UP!
(1,      state   up  right  left
0   (0, 0)  0.0    0.0   0.0
1   (0, 1)  0.0    0.0   0.0
2   (0, 2)  0.0    0.0   0.0
3   (0, 3)  0.0    0.0   0.0
4   (0, 4)  0.0    0.0   0.0
5   (1, 0)  0.0    0.0   0.0
6   (1, 1)  0.0    0.0   0.0
7   (1, 2)  0.0    0.0   0.0
8   (1, 3)  0.0    0.0   0.0
9   (1, 4)  0.0    0.0   0.0
10  (2, 0)  0.0    0.0   0.0
11  (2, 1)  0.0    0.0   0.0
12  (2, 2)  0.0    0.0   0.0
13  (2, 3)  0.0    0.0   0.0
14  (2, 4)  0.0    0.0   0.0
15  (3, 0)  0.0    0.0   0.0
16  (3, 1)  0.0    0.0   0.0
17  (3, 2)  0.0    0.0   0.0
18  (3, 3)  0.0    0.0   0.0
19  (3, 4)  0.0    0.0   0.0
20  (4, 0)  0.0    0.0   0.0
21  (4, 1)  0.0    0.0   0.0
22  (4, 2)  0.1    0.0   0.0
23  (4, 3)  0.0    0.0   0.0
24  (4, 4)  0.0    0.0   0.0)


## Here we define and run an episode

In [157]:
def episode(state,lr,gamma):
  # Print state
  print('Initial State ... \n')
  print(state)
  print('\n')

  # initialize q table
  q = initialize_q_table(state)
  print('Initial Q Table ... \n')
  print(q)
  print('\n')

  # initalize a count variable for reward
  r = 0
  while r < 10:
    current_position = get_current_position(state)
    proposed_action = propose_action(state,q)
    reward, q = reward_and_update_q_table(state,q,proposed_action,lr=.1,gamma=.95)
    print(f'reward is {reward} \n')
    print(q)
    # Check for landmine
    if state[int(proposed_action[1][0]),int(proposed_action[1][1])] == 2.0:
      print('Oh no you stepped on a landmine! \n')
      print(state)
      break
    elif state[int(proposed_action[1][0]),int(proposed_action[1][1])] == 3.0:
      print('Success! You have reached the finish line! \n')
      print(state)
      break
    else:
      state = take_action(state,proposed_action)
      print('Current State ... \n')
      print(state)
      print('\n')      
      r += 1
      if r == 10:
        print('TIME RAN OUT ... ')

In [158]:
# Running 1 episode 
state = initialize_state()

lr = 0.05
gamma = 0.99
episode(state,lr,gamma)

Initial State ... 

[[0. 0. 3. 0. 0.]
 [2. 0. 0. 0. 0.]
 [0. 2. 0. 0. 0.]
 [0. 0. 2. 0. 0.]
 [0. 0. 1. 0. 0.]]


Initial Q Table ... 

     state   up  right  left
0   (0, 0)  0.0    0.0   0.0
1   (0, 1)  0.0    0.0   0.0
2   (0, 2)  0.0    0.0   0.0
3   (0, 3)  0.0    0.0   0.0
4   (0, 4)  0.0    0.0   0.0
5   (1, 0)  0.0    0.0   0.0
6   (1, 1)  0.0    0.0   0.0
7   (1, 2)  0.0    0.0   0.0
8   (1, 3)  0.0    0.0   0.0
9   (1, 4)  0.0    0.0   0.0
10  (2, 0)  0.0    0.0   0.0
11  (2, 1)  0.0    0.0   0.0
12  (2, 2)  0.0    0.0   0.0
13  (2, 3)  0.0    0.0   0.0
14  (2, 4)  0.0    0.0   0.0
15  (3, 0)  0.0    0.0   0.0
16  (3, 1)  0.0    0.0   0.0
17  (3, 2)  0.0    0.0   0.0
18  (3, 3)  0.0    0.0   0.0
19  (3, 4)  0.0    0.0   0.0
20  (4, 0)  0.0    0.0   0.0
21  (4, 1)  0.0    0.0   0.0
22  (4, 2)  0.0    0.0   0.0
23  (4, 3)  0.0    0.0   0.0
24  (4, 4)  0.0    0.0   0.0


current position is : [4 2]
possible actions are : {'up': array([3, 2]), 'right': array([4, 3]), 'left': arra

## Putting it all together... 
We will train an agent to solve one setup of the env over multiple episodes


In [164]:
# Learning rate, gamma (discount rate), epsilon (how greedy to be in selecting)
lr = 0.05
gamma = 0.95
epsilon = .1

# initialize an enviornment and q-table
initial_state = initialize_state()
q = initialize_q_table(initial_state)

# Number of episodes or training rounds
num_ep = 100

# Total rewards
rewards = []

for episode in range(0,num_ep):
  print(f"Beginning episode {episode} ...  \n\n\n")

  # Reinitialize state
  state = np.copy(initial_state)
  print('Initial State ... \n')
  print(state)
  print('\n')
 
  # Q Table
  print('Q Table ... \n')
  print(q)
  print('\n')

  # Epsilon
  print('Epsilon ... \n')
  print(epsilon)
  print('\n')

  # Run episode ... 
  # initalize a count variable for reward
  episode_total_reward = 0
  stop = 0
  while stop < 10:
    current_position = get_current_position(state)
    proposed_action = propose_action(state,q)
    print(proposed_action)
    reward, q = reward_and_update_q_table(state,q,proposed_action,lr=lr,gamma=gamma)
    print(reward)
    print(q)

    episode_total_reward += reward

    # Check for landmine
    if state[int(proposed_action[1][0]),int(proposed_action[1][1])] == 2.0:
      print('Oh no you stepped on a landmine! \n')
      print(state)
      stop = 10
    
    # Check for finishline
    elif state[int(proposed_action[1][0]),int(proposed_action[1][1])] == 3.0:
      print('Success! You have reached the finish line! \n')
      print(state)
      stop = 10
    
    else:
      state = take_action(state,proposed_action)
      print('Current State ... \n')
      print(state)
      print('\n')      
      
      # Count down so agent needs to complete maze in 10 steps
      stop += 1
      if stop == 10:
        print('TIME RAN OUT ... ')
  epsilon = epsilon+.005
  if epsilon >= 1:
    epsilon = .95
  rewards.append(episode_total_reward)

11  (2, 1)  0.000000  0.000000  0.000000
12  (2, 2)  0.000000  0.199251 -1.407997
13  (2, 3)  0.216457  0.165256  0.397329
14  (2, 4)  0.210806 -0.970369  0.251328
15  (3, 0)  0.301284  0.206012 -2.984231
16  (3, 1) -1.421506 -3.665571  0.149625
17  (3, 2)  0.000000  0.000000  0.000000
18  (3, 3)  0.446149  0.333994 -1.780314
19  (3, 4)  0.364481 -2.601105  0.301392
20  (4, 0)  0.579495  0.640339 -5.395184
21  (4, 1)  0.590049  0.909860  1.224359
22  (4, 2) -8.007965  2.074355  2.150540
23  (4, 3)  0.788226  1.170671  1.149070
24  (4, 4)  0.478616 -5.217901  0.785174
Current State ... 

[[0. 0. 0. 0. 3.]
 [2. 0. 0. 0. 0.]
 [0. 2. 0. 0. 0.]
 [0. 0. 2. 0. 0.]
 [0. 0. 1. 0. 0.]]


current position is : [4 2]
possible actions are : {'up': array([3, 2]), 'right': array([4, 3]), 'left': array([4, 1])}
Choosing random action ... 

proposed action is : ('up', array([3, 2]))
('up', array([3, 2]))
UP!
-10
     state        up     right      left
0   (0, 0)  0.000000  0.050000  0.000000
1   (0, 1

In [165]:
rewards

[-1,
 -10,
 -12,
 -10,
 -34,
 -4,
 -12,
 -10,
 -8,
 17,
 -8,
 -12,
 -10,
 -10,
 -1,
 -1,
 -10,
 -10,
 10,
 -8,
 -1,
 -6,
 17,
 -10,
 -10,
 -34,
 -10,
 7,
 -10,
 -6,
 -8,
 5,
 -4,
 -1,
 -10,
 -10,
 -16,
 -10,
 -10,
 -26,
 -10,
 -10,
 -8,
 -8,
 -36,
 -10,
 -8,
 -10,
 -6,
 -10,
 -16,
 -10,
 -10,
 -10,
 -8,
 -10,
 -10,
 -6,
 -8,
 -23,
 -36,
 -16,
 -10,
 -8,
 -6,
 -10,
 -10,
 -14,
 -36,
 -10,
 -10,
 -1,
 -10,
 -10,
 -10,
 -8,
 -24,
 -16,
 -8,
 -4,
 -4,
 -5,
 -10,
 -8,
 -10,
 -10,
 -10,
 -10,
 10,
 -1,
 -10,
 10,
 -4,
 -6,
 -1,
 -4,
 -14,
 -6,
 -10,
 10]

In [50]:
proposed_action

array([4, 4])

In [30]:
# Using a Q table to run an episode
def new_episode(initial_state,q):
  # Print state, q table
  print('Initial State ... \n')
  print(initial_state)
  print('\n')

  print('Initial Q Table ... \n')
  print(q)
  print('\n')

  state = np.copy(initial_state)
  # initalize a count variable for reward
  r = 0
  while r < 10:
    proposed_action = propose_action(state,q)
    # Check for landmine
    if state[int(proposed_action[0]),int(proposed_action[1])] == 2.0:
      print('Oh no you stepped on a landmine! \n')
      print(state)
      break
    elif state[int(proposed_action[0]),int(proposed_action[1])] == 3.0:
      print('Success! You have reached the finish line! \n')
      print(state)
      break
    else:
      state = take_action(state,proposed_action)
      print('Current State ... \n')
      print(state)
      print('\n')      
      r += 1
      if r == 10:
        print('TIME RAN OUT ... ')

In [None]:
# Look at how the q table does
new_episode(initial_state,q)