# **This is an application on Reinforcement Learning(RL) of type Active search using Q-value iteration.**

 This is the Zewail City agent who is responsible for transferring 4 objects from the Hemly building to Nano Building or One stop Shop and transferring 4 objects from the Academic also.
 It is an active search.

#Importing Needed Libraries


In [None]:
from skimage import io
from PIL import Image
import numpy as np
import cv2
from google.colab.patches import cv2_imshow
import pandas as pd  
from math import inf
from time import time
from itertools import count
from random import random

## Reading file that represent Zewail City Map by its available roads and crossroads:



In [None]:
map_dict = {}
with open("/content/ZewailCity_Map.txt") as data_file:
    for line in data_file:
       line = line.replace('\n','')
       key,val=line.split('-')
       list_values=val.split(',')
       end=len(list_values)
       value=[]
       for i in range(0,end,2):
          l=[]
          l.append(list_values[i])
          l.append(list_values[i+1])
          value.append(l)
       map_dict[key] = value

## Function that represents the Q values as a Dataframe:

In [None]:
def Q_values_dataframe(q_dict,filename):

  df = pd.DataFrame()

  all_states=[]

  for key in q_dict:
    all_states.append(key[0])
    
  df["states/actions"] = all_states
  Q_df = df.drop_duplicates(ignore_index=True)


  for key in q_dict:
      for index in range(Q_df.shape[0]):
         if Q_df["states/actions"][index] == key[0]:
           Q_df.at[index,key[1]] = q_dict[key]


  Q_df.to_csv(filename)
  





## Environment Formulation

We formulate an environment in a similar way to decision processes _without_ a way to know the transition model.


In [None]:
class Environment:
    '''
    Abstract base class for an (interactive) environment formulation.
    It declares the expected methods to be used to solve it.
    All the methods declared are just placeholders that throw errors if not overriden by child "concrete" classes!
    '''
    
    def __init__(self):
        '''Constructor that initializes the problem. Typically used to setup the initial state.'''
        self.state = None
    
    def actions(self):
        '''Returns an iterable with the applicable actions to the current environment state.'''
        
        raise NotImplementedError
    
    def apply(self, action):
        '''Applies the action to the current state of the environment and returns the new state from applying the given action to the current environment state; not necessarily deterministic.'''
        raise NotImplementedError


Combining Q-values with TD learning yields an iterative technique that does not need a transition model in learning nor in decision making!

$$Q_{i+1}(s,a) = Q_i(s,a) + \alpha (r_s + \gamma \times \max_{a'} Q_i(s',a') - Q_i(s,a))$$

In [None]:
def q_learning(env, q={}, n={}, f=lambda q, n: (q+1)/(n+1), alpha=lambda n:0.5, error=1e-6, verbose=False):
    '''Q-learning implementation that trains on an environment till no more actions can be taken'''
    all_states=[]

    while env.state is not None:
        
        state = env.state
        action = max(env.actions(),
                     key=lambda next_action: f(q.get((state, next_action), 0), n.get((state, next_action), 0)))
        n[(state, action)] = n.get((state, action), 0) + 1
        reward = env.apply(action)
        all_states.append(state)

        q[(state, action)] = q.get((state, action), 0) \
                           + alpha(n[state, action]) \
                           * (reward
                              + env.discount * max((q.get((env.state, next_action), 0) for next_action in env.actions()), default=0)
                              - q.get((state, action), 0))
                      
    return q, n,all_states

In [None]:
def simulate(env_ctor,Int_check=False,intermediate_q_num=100, n_iterations=inf, duration=inf, **q_learning_params,):
    '''A helper function to train for a fixed number of iterations or fixed time'''
    for param in ('q', 'n'): q_learning_params[param] = q_learning_params.get(param, {})
    start_time = time()
    i = count()
    s=intermediate_q_num
    while time() < start_time + duration and next(i) < n_iterations:
        env = env_ctor()
        q, n ,all_states= q_learning(env, **q_learning_params)
        s=s-1
        if s==0 and Int_check==True:
           Q_values_dataframe(q,'intermediate_q.csv')
    return q_learning_params['q'], q_learning_params['n'],all_states

## Agent Searching in ZC:
state=(cross_road,carry or not (Bool),NB,HB,AB,OSB)


*   cross_road-> that the agent is at
*   carry or not-> state of the agent 
*   NB-> number of items in Nano Building
*   HB-> number of items in Helmy Building
*   AB -> number of items in Academic Building
*   OSB-> number of items in One stop shop






In [None]:
from random import choice, randrange

class ZC_Agent(Environment):
    '''Dynamic Zewail City Agent who transfer objects from building to another'''

    def __init__(self,map_dict,max_reward,inter_reward,discount,limit):
        self.state = ('MG',False,4,4,0,0)
        self.map_dict = map_dict
        self.max_reward = max_reward
        self.inter_reward=inter_reward
        self.discount = discount
        self.limit= limit

    # function actions return all possible actions from current state
    def actions(self):
        if self.state is None: return []
        values=self.map_dict[self.state[0]]
        actions=[]
       
        if ((self.state[0]=='NB') and (self.state[1]==False) and (self.state[2] in set([1, 2 ,3, 4]))): return ['take'] #Nano Building
        if ((self.state[0]=='HB') and (self.state[1]==False) and (self.state[3] in set([1, 2, 3, 4]))): return ['take'] # Helmy Building
        if ((self.state[0]=='AB') and (self.state[1]==True) and (self.state[4] in set([0, 1, 2, 3]))): return ['drop']  # Academic Building
        if ((self.state[0]=='OSB') and (self.state[1]==True) and (self.state[5] in set([0, 1, 2, 3]))): return ['drop'] # One stop Shop Building
        for i in range(1,len(values)):
          action=values[i]
          actions.append(action[0])
        return actions
    # Function apply that take a state and action, it returns reward and upate state
    def apply(self, action):
      state_list = list(self.state)
      self.limit= self.limit-1

      
      if (self.state[0]=='AB') and (self.state[1]==False) and (self.state[2] == 0 ) and (self.state[3] == 0 ) and (self.state[4] == 4 ) and (self.state[5] == 4):
        self.state=None
        return self.max_reward

      elif (self.state[0]=='OSB') and (self.state[1]==False) and (self.state[2] == 0 ) and (self.state[3] == 0 ) and (self.state[4] == 4 ) and (self.state[5] == 4):
        self.state=None
        return self.max_reward

      elif ( self.limit == 0 ):
         self.state=None
         return -self.max_reward

        
      elif ((state_list[0]=='NB') and (action=='take')): 
        #update state
        state_list[1]=True 
        state_list[2]=state_list[2]-1
        self.state=tuple(state_list)
        return self.inter_reward

      elif ((state_list[0]=='HB') and (action=='take')): 
        #update state
        state_list[1]=True
        state_list[3]=state_list[3]-1
        self.state=tuple(state_list)
        return self.inter_reward

      elif ((state_list[0]=='AB') and (action=='drop')):
         #update state
         state_list[1]=False
         state_list[4]=state_list[4]+1
         self.state=tuple(state_list)
         return self.inter_reward

      elif ((state_list[0]=='OSB') and (action=='drop')): 
        state_list[1]=False
        state_list[5]=state_list[5]+1
        self.state=tuple(state_list)
        return self.inter_reward
        ## other Cases when Action is Road

      else:
        available_crossroads=self.map_dict[state_list[0]]
        for i in range(1,len(available_crossroads)):
          crossroad=available_crossroads[i]
          if crossroad[0]==action:
            state_list[0]= crossroad[1]  # crossroad[1] is crossroad
            self.state=tuple(state_list)
        return -1

  


# Bonus

In [None]:
d = {'MG':[(160, 57), 30],
'CR1':[(200, 120), 18],
'CR9':[(360, 200), 18],
'CR7':[(390, 150), 18],
'OSB':[(464, 140), 18],
'CR11':[(380, 266), 18],
'OSB1':[(447, 240), 18],
'NB':[(493, 240), 18],
'NB1':[(540, 240), 18],
'CR21':[(740, 340), 18],
'C':[(670, 750), 18],
'DCR':[(780, 490), 18],
'CR24':[(690, 480), 18],
'E':[(595, 455), 30],
'D':[(720, 560), 30],
'CR23':[(480, 415), 18],
'NB2':[(500, 320), 30],
'AB':[(455, 485), 18],
'HB1':[(300, 460), 30],
'ADB':[(325, 295), 18],
'HB':[(240, 420), 18],
'HP':[(220, 475), 18],
'B':[(435, 585), 18],
'HG':[(225, 545), 18],
'CR27':[(275, 695), 18],
'SB':[(395, 780), 18],
'ADB1':[(390, 445), 18]}

In [None]:
def make_video(filename,dsds,d):
  '''
  This function convert a vector of images to a video
  inputs: 1. filename: file name of the video wanted to be created
          2. images: vector of images wanted to collected as a video
  output: there is no output, but a video created
  '''
  radius = 10
  images = []
  for i in range(len(dsds)):
    if dsds[i][1]==False: color = (255, 0, 0)
    else: color = (0,200,0)
    path = r'/content/Whats PM.jpeg'
    image = cv2.imread(path)
    image = cv2.circle(image, d[dsds[i][0]][0], radius, color, d[dsds[i][0]][1])
    # Nano objects
    if dsds[i][2]>=1: 
      image = cv2.circle(image, ((470, 330)), 5, (0,200,0), 7)
    if dsds[i][2]>=2:
      image = cv2.circle(image, ((465, 350)), 5, (0,200,0), 7)
    if dsds[i][2]>=3:
      image = cv2.circle(image, ((470, 310)), 5, (0,200,0), 7)
    if dsds[i][2]==4:
      image = cv2.circle(image, ((475, 290)), 5, (0,200,0), 7)
    # Helmy objects
    if dsds[i][3]>=1: 
      image = cv2.circle(image, ((260, 430)), 5, (0,200,0), 7)
    if dsds[i][3]>=2: 
      image = cv2.circle(image, ((280, 435)), 5, (0,200,0), 7)
    if dsds[i][3]>=3:
      image = cv2.circle(image, ((300, 435)), 5, (0,200,0), 7)
    if dsds[i][3]==4:
      image = cv2.circle(image, ((320, 440)), 5, (0,200,0), 7)
    # Academic objects
    if dsds[i][4]>=1: 
      image = cv2.circle(image, ((445, 540)), 5, (0,200,0), 7)
    if dsds[i][4]>=2: 
      image = cv2.circle(image, ((465, 520)), 5, (0,200,0), 7)
    if dsds[i][4]>=3:
      image = cv2.circle(image, ((485, 500)), 5, (0,200,0), 7)
    if dsds[i][4]==4:
      image = cv2.circle(image, ((505, 480)), 5, (0,200,0), 7)
    # One stop shop objects
    if dsds[i][5]>=1: 
      image = cv2.circle(image, ((420, 180)), 5, (0,200,0), 7)
    if dsds[i][5]>=2: 
      image = cv2.circle(image, ((430, 195)), 5, (0,200,0), 7)
    if dsds[i][5]>=3:
      image = cv2.circle(image, ((440, 210)), 5, (0,200,0), 7)
    if dsds[i][5]==4:
      image = cv2.circle(image, ((455, 225)), 5, (0,200,0), 7)
    for _ in range(10):
      images.append(image)
  
  #getting images dimentions
  height, width, d = images[0].shape
 
  size = (width, height)
  #setting the function of the video
  out = cv2.VideoWriter(filename,cv2.VideoWriter_fourcc(*'mp4v'), 25, size)

  #looping into images to create the video
  for i in range(len(images)):
  
    out.write(images[i].astype(np.uint8))
  #creating the video
  out.release()

## Exploring Q-Learning Simulation:

In [19]:
# Learning part:
q, n = {}, {}
q_dict,n_dict,all_states=simulate(lambda: ZC_Agent(map_dict,500,50,0.3,3000),Int_check=True,intermediate_q_num=10, n_iterations=10, q=q, n=n, f=lambda q, n:1/(n+1))
print("Learned Q values:",q)


Learned Q values: {(('MG', False, 4, 4, 0, 0), 'R5'): -0.29608411774635335, (('CR1', False, 4, 4, 0, 0), 'R5'): -1.1172963994789125, (('CR1', False, 4, 4, 0, 0), 'R1'): 2.594965209960937, (('ADB', False, 4, 4, 0, 0), 'R25'): -0.849921875, (('CR11', False, 4, 4, 0, 0), 'R49'): -1.16259375, (('CR9', False, 4, 4, 0, 0), 'R48'): -1.10625, (('CR7', False, 4, 4, 0, 0), 'R48'): -1.03625, (('CR9', False, 4, 4, 0, 0), 'R49'): -0.963125, (('CR11', False, 4, 4, 0, 0), 'R26'): -0.20625000000000002, (('OSB', False, 4, 4, 0, 0), 'R44'): -0.8625, (('CR7', False, 4, 4, 0, 0), 'R44'): -0.3875, (('OSB', False, 4, 4, 0, 0), 'R50'): -0.75, (('CR21', False, 4, 4, 0, 0), 'R29'): -0.5, (('NB', False, 4, 4, 0, 0), 'take'): 44.2, (('NB', True, 3, 4, 0, 0), 'R27'): 3.0, (('OSB', True, 3, 4, 0, 0), 'drop'): 44.2375, (('OSB', False, 3, 4, 0, 1), 'R44'): -0.825, (('CR7', False, 3, 4, 0, 1), 'R48'): -0.825, (('CR9', False, 3, 4, 0, 1), 'R48'): -0.825, (('CR7', False, 3, 4, 0, 1), 'R44'): -0.2625, (('OSB', False, 3,

In [20]:
filename = 'after_Exploring_.mp4'
#converting the vector of images to a video 
make_video(filename, all_states,d)

## Random Q-Learning:

In [21]:

# Learning part:
q_dict,n_dict,all_states=simulate(lambda: ZC_Agent(map_dict,500,50,0.3,3000),Int_check=True,intermediate_q_num=10,n_iterations=10, f=lambda q,  n:random())



In [22]:
filename = 'after_Random_learning.mp4'
#converting the vector of images to a video 
make_video(filename, all_states,d)

## Greedy Q-Learning:

In [32]:
#learning part:

q_dict,n_dict,all_states=simulate(lambda: ZC_Agent(map_dict,500,50,0.3,3000),Int_check=True,intermediate_q_num=10, n_iterations=100 , q=q, n=n, f=lambda q, n:q)
print("Learned Q values:",q_dict)





Learned Q values: {(('MG', False, 4, 4, 0, 0), 'R5'): 0.08461538005574343, (('CR1', False, 4, 4, 0, 0), 'R5'): -1.1172963994789125, (('CR1', False, 4, 4, 0, 0), 'R1'): 3.615384600185812, (('ADB', False, 4, 4, 0, 0), 'R25'): -0.849921875, (('CR11', False, 4, 4, 0, 0), 'R49'): -1.16259375, (('CR9', False, 4, 4, 0, 0), 'R48'): -1.10625, (('CR7', False, 4, 4, 0, 0), 'R48'): -1.03625, (('CR9', False, 4, 4, 0, 0), 'R49'): -0.963125, (('CR11', False, 4, 4, 0, 0), 'R26'): -0.20625000000000002, (('OSB', False, 4, 4, 0, 0), 'R44'): -0.8625, (('CR7', False, 4, 4, 0, 0), 'R44'): -0.3875, (('OSB', False, 4, 4, 0, 0), 'R50'): -0.75, (('CR21', False, 4, 4, 0, 0), 'R29'): -0.5, (('NB', False, 4, 4, 0, 0), 'take'): 44.2, (('NB', True, 3, 4, 0, 0), 'R27'): 3.0, (('OSB', True, 3, 4, 0, 0), 'drop'): 44.2375, (('OSB', False, 3, 4, 0, 1), 'R44'): -0.825, (('CR7', False, 3, 4, 0, 1), 'R48'): -0.825, (('CR9', False, 3, 4, 0, 1), 'R48'): -0.825, (('CR7', False, 3, 4, 0, 1), 'R44'): -0.2625, (('OSB', False, 3, 

In [31]:
#Testing part:

q_dict,n_dict,all_states=simulate(lambda: ZC_Agent(map_dict,500,50,0.3,3000), n_iterations=1, q=q, n=n, verbose=True, f=lambda q, n: q)
print("Visited states by the agent:",len(all_states))  # length of all states the agent visits


Visited states by the agent: 36


In [33]:
filename = 'after_Greedy_learning.mp4'
#converting the vector of images to a video 
make_video(filename, all_states,d)