[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/nihalgeorge01/DLSS_RL/blob/main/DLSS_RL_Assignment.ipynb)

# Black Hole
### This game is described using a 10 X 10 grid :
>   
\_ \_ \_ \_ \_ \_ \_ \_ \_ H  
\_ \_ \_ \_ \_ \_ \_ H \_ \_  
\_ \_ \_ \_ \_ H \_ \_ \_ \_  
\_ \_ \_ H \_ \_ \_ \_ \_ \_  
\_ H \_ \_ \_ \_ \_ \_ \_ \_  
\_ \_ \_ \_ \_ \_ \_ \_ \_ H  
\_ \_ \_ \_ \_ \_ \_ H \_ \_  
\_ \_ \_ \_ \_ H \_ \_ \_ \_  
\_ \_ \_ H \_ \_ \_ \_ \_ \_  
G H \_ \_ \_ \_ \_ \_ \_ \_  

\_ : Safe path  
H : Black Hole, avoid falling  
G: Goal, target to reach  

Holes will continuously move 1 step left during each timestep.  
Your goal is to reach G.  
You fall into the hole only if your position coincides with the hole.  
The episode ends when you reach G or fall in H.  
If you reach G, you win.  
If you reach H, you lose.  
If you reach G at a time when it coincides with H, you lose.  
## Reward Scheme 
- 1 if you reach G  
- -1 if you fall in H  
- 0 otherwise.


# **DO NOT EDIT THE CELL BELOW**

In [1]:
import random
import sys
import copy
class BlackHole:
  class action:
    def __init__(self):
      self.total_actions = 5
      self.dtype = type(self.total_actions)
      self.__out=sys.stdout
    
    def random_action(self):
      act = random.randint(1,5)
      return act

    def show_actions(self):
      actions= "1->Up, 2->Right, 3->Down, 4->Left 5->Stay"
      self.__out.write(actions)
    
  class observation:
    def __init__(self):
      self.total_observations = 1000
      self.dtype = type(self.total_observations)
      self.__lst =[x for x in range(1,1000) if x%100 is not 0]

    def random(self):
      obs = random.sample(self.__lst,1)[0]
      return obs
  def __init__(self):
    self.observation_space = self.observation()
    self.__map=['_________H',
                '_______H__',
                '_____H____',
                '___H______',
                '_H________',
                '_________H',
                '_______H__',
                '_____H____',
                '___H______',
                'GH________']
    self.action_space = self.action()
    self.__x = None
    self.__y = None
    self.__state = None
    self.__out = sys.stdout
    self.__action = None
    self.__action_dict = {1:'Up',2:'Right',3:'Down',4:'Left',5:'Stay'}
    self.__done = False
    self.__h = None

  def reset(self):
    self.__y = 0
    self.__h = 9
    self.__adjust_h()
    self.__x = random.randint(0,4)
    self.current_state()
    self.__action = None
    self.__done = False
    return self.__state

  def __adjust_h(self):
    for i in  range(len(self.__map)):
      if i==9 and self.__map[i].count('_')==9:
        self.__map[9] = self.__map[9][:9]
      self.__map[i] = ''.join(i for i in self.__map[i] if i is not 'H')
      index = self.__h-2*i 
      index  = index + (index<0 and index>-10)*10 + (index<-10)*20 
      self.__map[i] = self.__map[i][:index] + 'H' + self.__map[i][index:]
    if self.__h == 8:
      self.__map[9] = ''.join(i for i in self.__map[9] if i is not 'H')+'_'
  
  def current_state(self):
    if self.__y is not None:
     self.__state = (9-self.__h)*100+self.__y*10+self.__x+1
    return self.__state

  def take_step(self,action):
    if self.__done == False :
      reward = 0.0
      if action == 1:
        if self.__y-1>=0:
          self.__y-=1
        self.__action = action
      elif action == 3:
        if self.__y+1<=9:
          self.__y+=1
        self.__action = action
      elif action == 2:
        if self.__x+1<=9:
          self.__x+=1
        self.__action = action
      elif action == 4:
        if self.__x-1>=0:
          self.__x-=1
        self.__action = action
      elif action == 5:
        self.__action = action
      else:
        self.__out.write("Enter a valid action.")
        return
      self.__h  = self.__h -1 + (self.__h-1<0)*10
      self.__adjust_h()
      self.current_state()
      if self.__map[self.__y][self.__x]=='G' :
        reward=1.0
        self.__done= True
        if self.__h ==8:
          reward = -1.0
          self.__done = True
      if self.__map[self.__y][self.__x]=='H':
        reward = -1.0
        self.__done = True
      return self.__state,reward,self.__done
    else :
      self.__out.write("\n\033[38;5;11mWARN: You are calling 'step()' even though this environment has already returned done = True. You should always call 'reset()' once you receive 'done = True\033[0;0m")

  def show(self):
    if self.__state is not None:
      map = copy.deepcopy(self.__map)
      val = map[self.__y][self.__x]
      map[self.__y] = map[self.__y][:self.__x] + 'P' +map[self.__y][self.__x+1:]
      map = self.__add_colour_h(map)
      map[-1]=map[-1].replace('G',"\033[38;5;12mG\033[0;0m")
      map[self.__y] = map[self.__y].replace('P',f'\033[48;5;9m{val}\033[0;0m')
      if self.__action is not None:
        self.__out.write('\n'+self.__action_dict[self.__action])
      self.__out.write("\n+----------+")
      for i in map:
        self.__out.write('\n|'+i+'|')
      self.__out.write("\n+----------+")
      if val =='H':
        self.__out.write("\nTRY AGAIN.......You fell in Black Hole!!!")
      if val =='G':
        if self.__h is not 8 : 
          self.__out.write("\nGG!!")
        else :
          self.__out.write("\nYou reached but fell in Black Hole")
      self.__out.write("\n")
    else :
      self.__out.write('NONE')

  def __add_colour_h(self,map):
    for i in range(len(map)):
      map[i]=map[i].replace('H','\033[48;5;16mH\033[0;0m')
    if self.__h == 8:
      map[9] = map[9].replace('G','\033[48;5;16mG\033[0;0m')
    return map

  def set_state(self,state):
    if state>1000 or state<1:
      self.__out.write("Enter a valid state.")
      return
    self.__state = state
    self.__h = 9 - (state-1)//100
    self.__y = ((state-1)%100)//10
    self.__x = ((state-1)%100)%10
    self.__adjust_h()
    if self.__map[self.__y][self.__x]=='_':
      self.__done = False
    else: 
      self.__done = True
    self.__action = None

# Environment methods and attributes

In [2]:
env = BlackHole() #Creating object of BlackHole class

In [3]:
print(env.observation_space.total_observations) #Total observations in observation space
print(env.observation_space.random()) # Random observation from observation space

1000
998


In [4]:
print(env.action_space.total_actions) #Total actions in action space
print(env.action_space.random_action()) #Returns random action from action space
env.action_space.show_actions() #Prints details about actions in action space

5
3
1->Up, 2->Right, 3->Down, 4->Left 5->Stay

In [5]:
print(env.current_state()) #No state is initialized

None


In [6]:
env.reset() #initializes game and you are spawned at one of first five blocks
env.show() #prints observation
print(env.current_state())


+----------+
|____[48;5;9m_[0;0m____[48;5;16mH[0;0m|
|_______[48;5;16mH[0;0m__|
|_____[48;5;16mH[0;0m____|
|___[48;5;16mH[0;0m______|
|_[48;5;16mH[0;0m________|
|_________[48;5;16mH[0;0m|
|_______[48;5;16mH[0;0m__|
|_____[48;5;16mH[0;0m____|
|___[48;5;16mH[0;0m______|
|[38;5;12mG[0;0m[48;5;16mH[0;0m________|
+----------+
5


In [7]:
env.set_state(env.observation_space.random()) #state of environment is changed to state specified
env.show() 
print(env.current_state())


+----------+
|___[48;5;16mH[0;0m______|
|_[48;5;16mH[0;0m________|
|_________[48;5;16mH[0;0m|
|_______[48;5;16mH[0;0m__|
|_____[48;5;16mH[0;0m____|
|___[48;5;16mH[0;0m[48;5;9m_[0;0m_____|
|_[48;5;16mH[0;0m________|
|_________[48;5;16mH[0;0m|
|_______[48;5;16mH[0;0m__|
|[38;5;12mG[0;0m____[48;5;16mH[0;0m____|
+----------+
655


### env.take_step( ) returns THREE values only -- state, reward and done (episode completed or not) 

In [8]:
from IPython.display import clear_output
from time import sleep
env.reset()
done = False
while True:
  env.show()
  clear_output(wait=True)
  sleep(1.0)
  if done: 
    break
  action = env.action_space.random_action()
  state,reward,done = env.take_step(action)


Right
+----------+
|__[48;5;9mH[0;0m_______|
|[48;5;16mH[0;0m_________|
|________[48;5;16mH[0;0m_|
|______[48;5;16mH[0;0m___|
|____[48;5;16mH[0;0m_____|
|__[48;5;16mH[0;0m_______|
|[48;5;16mH[0;0m_________|
|________[48;5;16mH[0;0m_|
|______[48;5;16mH[0;0m___|
|[38;5;12mG[0;0m___[48;5;16mH[0;0m_____|
+----------+
TRY AGAIN.......You fell in Black Hole!!!


## **Task**
### Now you are familiar with BlackHole environment. You have to implement Q-learning on this custom environment. Remember you are not allowed to do any changes in BlackHole class.

In [9]:
from tqdm import tqdm

In [10]:
import numpy as np
import random
q_table = np.zeros([env.observation_space.total_observations, env.action_space.total_actions])
# (alpha, gamma, episodes, epsilon) = (0.6, 0.9, 10, 0.5)


In [None]:
(alpha, gamma, episodes, epsilon) = (0.9, 0.9, 10000, 1)
total_epochs =0

for i in tqdm(range(episodes)):

  ####################### Gradual dropping of epsilon #############
  '''
  Full exploration at the beginning and gradually moves towards exploitation
  0-10 % episode : epsilon =1 (complete exploration)
  10-50 % episode : epsilon=0.7
  50-60 % episode: epsilon = 0.5 
  60-90 % episode: epsilon = 0.4
  90-100 % episode: epsilon = 0.2 (exploitation)
  '''
  if i <= (episodes*0.1):
    epsilon = 1 - 0.3
  elif i <= (episodes*0.5):
    epsilon = 1 - 0.5
  elif i <= (episodes*0.6):
    epsilon = 1 - 0.6
  elif i <= (episodes*0.9):
    epsilon = 1 - 0.8
  #################################################################

  state = env.reset()
  epochs = 0
  
  done = False 
  while not done:
    rand = random.uniform(0,1)
    if rand < epsilon:
      action = env.action_space.random_action()
    if rand >= epsilon:
      action = np.argmax(q_table[state])+1
    new_state,reward,done = env.take_step(action)

          
    #################### EXTRA REWARDING AND PUNISHING TO AVOID GETTING STUCK IN A LOOP ###############
    ''' The agent was gettig stuck in the top layer and moved down only to avoid the hole and returned to top layer, So to avoid this
       -punishment for moving up and reward for moving down is being given '''
    if action == 1:        #UP
       reward -= 0.1
    if action == 3:        #Down
       reward += 0.5
    ##########################################################################
    

    #Q_cal = Reward + gamma*max{a}(Q(s',.))
    #new_q = oldq + aplha(Q_cal - oldq) = (1-alpha)oldq + alpha(Q_cal)

    oldq = q_table[state, action-1]
    new_state_max = np.max(q_table[new_state]) 
    newq = (1 - alpha) * oldq + alpha * (reward + gamma * new_state_max)
    q_table[state, action-1] = newq
    state = new_state
    epochs+=1
    print(f"Episode:{i}, Epoch: {epochs}")
    print(action,reward)
    env.show()
    clear_output(wait=True)
    sleep(0.5)
    

Episode:108, Epoch: 6
4 0.0

Left
+----------+
|___[48;5;16mH[0;0m______|
|_[48;5;16mH[0;0m[48;5;9m_[0;0m_______|
|_________[48;5;16mH[0;0m|
|_______[48;5;16mH[0;0m__|
|_____[48;5;16mH[0;0m____|
|___[48;5;16mH[0;0m______|
|_[48;5;16mH[0;0m________|
|_________[48;5;16mH[0;0m|
|_______[48;5;16mH[0;0m__|
|[38;5;12mG[0;0m____[48;5;16mH[0;0m____|
+----------+
