In [12]:
import numpy as np 
import cv2 
import matplotlib.pyplot as plt
import random

In [13]:
import gymnasium as gym

In [14]:
class Point(object):
    def __init__(self, name, x_max, x_min, y_max, y_min):
        self.x=0
        self.y=0
        self.x_min = x_min
        self.x_max = x_max
        self.y_min = y_min
        self.y_max = y_max
        self.name = name

    def set_position(self,x, y):
        self.x = self.clamp(x, self.x_min, self.x_max - self.icon_w)
        self.y = self.clamp(y, self.y_min, self.y_max - self.icon_h)

    def get_position(self):
        return (self.x, self.y)
        
    def move(self, del_x, del_y):
        self.x += del_x
        self.y += del_y

        self.x = self.clamp(self.x, self.x_min, self.x_max )
        self.y = self.clamp(self.y, self.y_min, self.y_max )

    def clamp(self, n, minn, maxn):
        return max(min(maxn,n),minn)

In [15]:
class Agent(Point):
    def __init__(self, name, x_max, x_min, y_max, y_min):
        super(Agent, self).__init__(name, x_max-64, x_min, y_max-64, y_min)
        self.icon_w = 64
        self.icon_h = 64
        self.icon = np.full((self.icon_w, self.icon_h,3), 0.5)

In [16]:
class Drum(Point):
    def __init__(self, name, x_max, x_min, y_max, y_min):
        super(Drum, self).__init__(name, x_max, x_min, y_max, y_min)
        self.icon_w = 64
        self.icon_h = 64
        self.icon = np.full((self.icon_w, self.icon_h,3), 0.2)
        self.visited = False
        
    def set_visited(self):
        self.visited = True

In [17]:
class Obstacle(Point):
    def __init__(self, name, x_max, x_min, y_max, y_min):
        super(Obstacle, self).__init__(name, x_max, x_min, y_max, y_min)
        self.icon_w = 64
        self.icon_h = 64
        self.icon = np.full((self.icon_w, self.icon_h,3), 0.7)

In [18]:
class Destination(Point):
    def __init__(self, name, x_max, x_min, y_max, y_min):
        super(Destination, self).__init__(name, x_max, x_min, y_max, y_min)
        self.icon_w = 64
        self.icon_h = 64
        self.icon = np.full((self.icon_w, self.icon_h,3), 0.9)

In [19]:
class RobotEnv(gym.Env):
    def __init__(self,space_size):
        super(RobotEnv,self).__init__()

        # Define a 2-D observation space
        self.observation_shape = space_size
        self.observation_space = gym.spaces.Box(low = np.zeros(self.observation_shape),
                                            high = np.ones(self.observation_shape),
                                            dtype = np.float16)

        # Define an action space ranging from 0 to 4
        self.action_space = gym.spaces.Discrete(5,)

        # Create a canvas to render the environment images upon 
        self.canvas = np.ones(self.observation_shape) * 1

        # Define elements present inside the environment
        self.elements = []

        #Permissible area of roaming environment
        self.y_min = int(self.observation_shape[0]*0.1)
        self.y_max = int(self.observation_shape[0]*0.9)
        self.x_min = 0
        self.x_max = self.observation_shape[1]

        # Maximum no of Steps allowed for the agent
        self.max_no_of_steps = 5000

    def draw_elements_on_canvas(self):
        # Initialize the canvas 
        self.canvas = np.ones(self.observation_shape) * 1

        # Draw the agent on canvas
        for elem in self.elements:
            elem_shape = elem.icon.shape
            x,y = elem.x, elem.y
            # if isinstance(elem, Agent):
            #     print(y, y+elem_shape[1], x, x+elem_shape[0])
            self.canvas[y:y+elem_shape[1], x:x+elem_shape[0]] = elem.icon

        text = 'Visited Drums:  {} | Rewards: {}'.format(self.visited_drum, self.ep_return)

        #Put the info on canvas
        self.canvas = cv2.putText(self.canvas, text, (10,20), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,0,0), 1, cv2.LINE_AA)

    def has_collided(self, elem1, elem2):
        x_col = False
        y_col = False
    
        elem1_x, elem1_y = elem1.get_position()
        elem2_x, elem2_y = elem2.get_position()
    
        if 2 * abs(elem1_x - elem2_x) <= (elem1.icon_w + elem2.icon_w):
            x_col = True
    
        if 2 * abs(elem1_y - elem2_y) <= (elem1.icon_h + elem2.icon_h):
            y_col = True
    
        if x_col and y_col:
            return True
    
        return False

    def collide(self, obj, obj_list):

        new_list = obj_list + self.elements
        
        for item in new_list:
            if(self.has_collided(obj, item)):
                return True
        return False

    def generate_objects(self, no_of_objs, obj_type):
        count = 1
        obj_list=[]
        while(count<=no_of_objs):
            if(obj_type=="drum"):
                obj = Drum("drum_"+str(count), self.x_max, self.x_min, self.y_max, self.y_min)
            else:
                obj = Obstacle("obstacle_"+str(count), self.x_max, self.x_min, self.y_max, self.y_min)

            #create a random position for the object
            x = random.randrange(int(self.observation_shape[0] * 0.10), int(self.observation_shape[0] * 0.90))
            y = random.randrange(int(self.observation_shape[1] * 0.20), int(self.observation_shape[1] * 0.90))
            obj.set_position(x,y)

            #Add valid objects in the object list
            if(self.collide(obj,obj_list)):
                # print("Collide happening"+obj.name)
                continue
            else:    
                obj_list.append(obj)
                count+=1

        return obj_list

    def get_action_meanings(self):
        return {0: "Right", 1: "Left", 2: "Down", 3: "Up", 4: "Do Nothing"}

    # def _get_obs(self):
    #     return {"elements":self.elements}

    # def _get_info(self):
    #     return {}
        

    def reset(self):
        #Reset the reward
        self.ep_return = 0

        #No of drum position
        self.visited_drum = 0

        #reset the max no of steps for agent
        self.max_no_of_steps = 5000
        
        random.seed(40)
        
        #Reset the position of the agent
        self.agent = Agent("agent", self.x_max, self.x_min, self.y_max, self.y_min)
        self.agent.set_position(self.x_min,self.y_min)

        #Reset the position of the destination
        self.destination = Destination("destination", self.x_max, self.x_min, self.y_max, self.y_min)
        self.destination.set_position(self.x_max,self.y_max)

        #Initialise the elements
        self.elements = [self.agent,self.destination]

        #Reset the obstacle position
        no_of_obstacles = random.randrange(2,5)
        self.obstacles = self.generate_objects(no_of_obstacles, "obstacles")
        print("No of obstacles: "+str(no_of_obstacles))

        if self.obstacles is not None:
            self.elements.extend(self.obstacles)
            
        #Reset the drum position
        no_of_drums = random.randrange(4,10)
        self.drums = self.generate_objects(no_of_drums, "drum")
        print("Total no of drums: "+str(no_of_drums))
        self.total_drum = no_of_drums
        
        if self.drums is not None:
            self.elements.extend(self.drums)

        #Reset the Canvas
        self.canvas = np.ones(self.observation_shape) * 1

        #Draw elements on the canvas
        self.draw_elements_on_canvas()

        #return the observation
        return self.canvas, {}

    def get_action_meaning(self):
        return {0:"Right", 1:"Left", 2:"Down", 3:"Up", 4:"Do Nothing"}

    def step(self, action):
        # Flag that marks the termination of an episode
        done = False
        
        # Assert that it is a valid action 
        assert self.action_space.contains(action), "Invalid Action"
        
        # Reward for executing a step.
        reward = 0      
    
        # apply the action to the chopper
        if action == 0:
            self.agent.move(0,5)
        elif action == 1:
            self.agent.move(0,-5)
        elif action == 2:
            self.agent.move(5,0)
        elif action == 3:
            self.agent.move(-5,0)
        elif action == 4:
            self.agent.move(0,0)

        # For elements in the Ev
        for elem in self.elements:
            if isinstance(elem, Obstacle):
                # If agent collide with the obstacle it is penalised and the episode ends
                if self.has_collided(self.agent, elem):
                    done = True
                    reward = -10
                    self.elements.remove(self.agent)
                    print("Agent collide with Obstacle")
                    break
    
            if isinstance(elem, Drum):
                # If the agent has scanned the drum then reward it and mark the drum as visited.
                if self.has_collided(self.agent, elem) and elem.visited == False:
                    # Mark the drum as visited
                    elem.set_visited()

                    # Increase the no of visited drums by 1
                    self.visited_drum += 1

                    # Reward the agent
                    reward = + 0.1
                    print("Agent scanned Drum")

            if isinstance(elem, Destination):
                #If the agent has reached the destination then reward it and the episode ends
                if self.has_collided(self.agent, elem):
                    done = True

                    if self.visited_drum == self.total_drum: 
                        reward = +10
                    else:
                        reward = -20
                    self.elements.remove(self.agent)
                    print("Agent reached the destination")
                    break

        #Reduce the max_no_of_steps by -1
        self.max_no_of_steps -= 1
        
        # # Increment the episodic return
        # self.ep_return += 1
    
        # Draw elements on the canvas
        self.draw_elements_on_canvas()
    
        # If max_no_of_steps is reached, the simulation is over
        if self.max_no_of_steps == 0:
            done = True
    
        return self.canvas, reward, done, False, []

    def display_and_handle_keys(self,img_copy):
        cv2.imshow('image', img_copy)
        key = cv2.waitKey(0) & 0xFF
        return key

    def render(self, mode = "human"):
        assert mode in ["human", "rgb_array"], "Invalid mode, must be either \"human\" or \"rgb_array\""
        action = -1
        if mode == "human":
            key = self.display_and_handle_keys(self.canvas)
            
            if key == ord('q'):
                return -1
                
            # Add movement logic here based on keypresses
            elif key == ord('a'):
                # print("Moving left:")  # Debug line
                return 3
                
            elif key == ord('d'):
                # print("Moving right:")  # Debug line
                return 2
                
            elif key == ord('w'):
                # print("Moving up:")  # Debug line
                return 1
                
            elif key == ord('s'):
                # print("Moving down:")  # Debug line
                return 0
            else:
                return 4
        
        elif mode == "rgb_array":
            return self.canvas

    def agent_get_position(self):
        return self.agent.get_position()

    def create_window(self):
        cv2.namedWindow('image')
        
    def close(self):
        cv2.destroyAllWindows()

In [20]:
env = RobotEnv(space_size=(600, 800, 3))

In [21]:
obs, info = env.reset()
env.create_window()
action = env.render(mode="human")
if(action ==-1):
    env.close()
# plt.imshow(screen)

No of obstacles: 3
Total no of drums: 5


In [64]:
env.close()

In [17]:
a=env.action_space.sample()
print(a)

4


In [11]:
episodes = 1
for episode in range(1,episodes+1):
    score = 0
    step = 0
    env.create_window()
    obs, info = env.reset()
    done = False
    action = env.render(mode="human")
    if(action == -1):
        env.close()
        break
    # plt.imshow(screen)
    while not done:
        # action = env.action_space.sample()
        # action = 2
        obs, reward, done,_, info = env.step(action)
        action = env.render(mode="human")
        score+=reward
        step+=1
        if action == -1:
            break
        # plt.imshow(screen)
        
        # print("Current step"+str(step))
        # print("agent position"+str(env.agent_get_position()))
        print(score)
    env.close()
    print('Epiosde:{} Score:{}'.format(episode,score))

No of obstacles: 3
Total no of drums: 5
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
Agent scanned Drum
0.1
0.1
0.1
0.1
0.1
0.1
0.1
0.1
0.1
0.1
0.1
0.1
0.1
0.1
Agent scanned Drum
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
0.2
Agent reached the destination
-19.8
Epiosde:1 Score:-19.8
