# Drone Gym Environment and DQN Demo
## DM996 Group C, University of Strathclyde, 25/03/2021

This environment is compatible with OpenAI Gym and allows you to control a drone in a randomly generated city environment (or a custom one - example included). The drone has a destination and must avoid obstacles such as buildings or other drones.

 The following cells should be uncommented if using Google Colab:

In [2]:
# pip install Box2D

In [3]:
# !pip install gym pyvirtualdisplay > /dev/null 2>&1
# !apt-get install -y xvfb python-opengl ffmpeg > /dev/null 2>&1

In [4]:
# from pyvirtualdisplay import Display
# display = Display(visible=False, size=(1000, 900))
# display.start()

In [5]:
# !echo $DISPLAY

In [6]:
# # general libraris
# import numpy as np
# import math
# import random
# import time

# # box2D import libraries
# import Box2D
# from Box2D.b2 import fixtureDef
# from Box2D.b2 import polygonShape
# from Box2D.b2 import circleShape
# from Box2D.b2 import contactListener

# # OpenAI Gym import
# import gym
# from gym import error, spaces, utils
# from gym.utils import seeding

# # pyglet used for interacting with viewer window
# import pyglet
# #from pyglet.gl import *
# #import pyglet.gl

# # gym wrapper for creating environment visualiser
# #from gym.envs.classic_control import rendering
# import rendering

# from gym import spaces

# import matplotlib.pyplot as plt

# import pandas as pd


Comment this cell if using Google Colab:

In [1]:
# general libraris
import numpy as np
import math
import random
import time

# box2D import libraries
import Box2D
from Box2D.b2 import fixtureDef
from Box2D.b2 import polygonShape
from Box2D.b2 import circleShape
from Box2D.b2 import contactListener

# OpenAI Gym import
import gym
from gym import error, spaces, utils
from gym.utils import seeding

# pyglet used for interacting with viewer window
import pyglet

# gym wrapper for creating environment visualiser
from gym.envs.classic_control import rendering

from gym import spaces

import matplotlib.pyplot as plt

import pandas as pd


In [2]:
# size of observable area from drone - square
OBSRV_WIDTH = 85
OBSRV_HEIGHT = OBSRV_WIDTH
OBSRV_AREA = OBSRV_WIDTH * OBSRV_HEIGHT

# define default shape for a drone
DRONE_SHAPE = [(0,0),(5,0),(5,5),(0,5)]

REPOSITION_ROAD_WINDOW_SIZE = 11 # FOR INITIAL DRONE/DESTINATION RE-POSITIONING - drone needs a REPOSITION_ROAD_WINDOW_SIZExREPOSITION_ROAD_WINDOW_SIZE window of clearance from any buildings

# define frames per second (fps) values
FPS = 1

# viewer window size
FRAME_WIDTH = 640
FRAME_HEIGHT = 480

class ContactDetector(contactListener):
    def __init__(self, env):
        contactListener.__init__(self)
        self.env = env

    def BeginContact(self, contact):
        if self.env.drones[0].dynamic_body == contact.fixtureA.body or self.env.drones[0].dynamic_body == contact.fixtureB.body:
            self.env.drone_contact = True


class Drone():
    def __init__(self):
        # drone initial position as 2D coordinates
        self.drone_position = None
        self.drone_position_prev = None
        # current drone position as 2D coordinates
        self.drone_position_init = None
        # observations (2D image) for drone
        self.drone_observation = None
        # distance observation (float) of drone from destination
        self.drone_distance_observation = None
        
        # drone object in box2d
        self.dynamic_body = None
        
    def drone_step(self, action):
        # in this function, we update the position of the drone in box2D
        # apply a force to drone to update the position
        
        # using discrete action space  
        discrete_actions = {0:(0,0), 1:(0,1), 2:(0.707,0.707), 3:(1,0), 4:(0.707,-0.707), 5:(0,-1),
                            6:(-0.707,-0.707), 7:(-1,0), 8:(-0.707,0.707)}
        
        horizontal_force = discrete_actions[action][0]
        vertical_force = discrete_actions[action][1]
        
        if(action == 0):
            self.dynamic_body.LinearVelocity = (0,0)
        
        self.dynamic_body.ApplyForceToCenter((horizontal_force,vertical_force),True)
        
    def update_position(self):
        # get position for this drone and save
        self.drone_position_prev = (self.drone_position[0],self.drone_position[1])
        self.drone_position = (self.dynamic_body.position[0],self.dynamic_body.position[1])
        
        self.drone_position = (round(self.drone_position[0]),round(self.drone_position[1]))
        
        self.dynamic_body_position = self.drone_position
        
class DroneEnv(gym.Env):
    #metadata = {'render.modes': ['human']}
    def __init__(self):
        ### PARAMETERS ###
        # random city generator parameters
        self.ROAD_WIDTH_INIT = 15
        self.ROAD_WIDTH_TOLERANCE = 3
        self.BUILDING_WIDTH_INIT = 50
        self.BUILDING_WIDTH_TOLERANCE = 7
        self.BUILDING_HEIGHT_INIT = 50
        self.BUILDING_HEIGHT_TOLERANCE = 7
                
        # drone parameters
        self.NO_DRONES = 5
        
        self.DRONE_START_POSITION = (10,10)
        self.DRONE_DESTINATION = (400,300)
        
        self.OBSCURE_DRONE_OBSRV = 0
        
        self.STRATHY = 0

        ### ENVIRONMENT OBJECTS ###
        # list of building vertices
        self.buildings = []
        self.parks = []
        
        # list of destination coordinates
        self.destination_box = []
        
        # list of drone objects
        self.drones = []
                
        self.done = False
        self.drone_contact = False
        self.reward = 0.0
        self.prev_reward = 0.0
        self.step_reward = 0.0
        
        self.d250 = False
        self.d200 = False
        self.d150 = False
        self.d100 = False
        self.d50 = False
        
        ### BOX2D OBJECTS ###
        # box2d world
        self.world = Box2D.b2World((0, 0))
        # static bodies (buildings) in box2d world
        self.static_bodies = []
        
        self.world.contactListener = ContactDetector(self)
        
        ### VIEWER OBJECTS ###
        # define "Viewer" object and set width and height of window
        self.viewer = None
        self.viewer = rendering.Viewer(FRAME_WIDTH, FRAME_HEIGHT)
        self.drone_shape_viewer = None
        
        # set to zero when viewer has been closed and indicates we need to set up the buildings and initial drone positions
        self.render_init = 0
        
        # Action and observation spaces
        self.action_space = spaces.Box(
            np.array([-1.0, -1.0]), np.array([+1.0, +1.0]), dtype=np.float32
        )  # horiz force, vertical force

        # 0 - no force, 1 - 'north-east' force, 2 - 'north' force, ...
        self.action_space = spaces.Discrete(9)
        
        self.observation_space = spaces.Box(
            low=0, high=255, shape=(OBSRV_HEIGHT*OBSRV_WIDTH+1, 1), dtype=np.uint8
        )

        
    def initialise_environment(self):
        
        ### INITILIASE ENVIRONMENT ###
        self.create_buildings()
        self.render_init_city()
        self.create_destination()
        self.create_drones()
        self.render_init_drones()
        self.create_box2d_world()  
        self.render_init = 1
        
    def create_buildings(self):
        
        if(self.STRATHY == 1):
            # STRATHCLYDE BUILDING DEFINITIONS FOR 'FINAL DEMO'
            buildings_csv = pd.read_csv("Building_Data_CSV.csv")

            building = []

            building_names = buildings_csv['Building']

            x_coordinates = buildings_csv['X']
            y_coordinates = buildings_csv['Y']

            current_building = building_names[0]

            for i in range(len(x_coordinates)):
                vertex_index = 1
                if(building_names[i] == current_building):
                    building.append((float(x_coordinates[i]),float(y_coordinates[i])))
                else:
                    if(current_building == 'College Gardens' or current_building == 'Gardens'):
                        self.parks.append(building)
                    else:
                        self.buildings.append(building)

                    building = []
                    current_building = building_names[i]
                    building.append((float(x_coordinates[i]),float(y_coordinates[i])))
        else:             
            # RANDOM CITY GENERATION FOR TRAINING
            right_bound = 0
            top_bound = 0

            while(top_bound < FRAME_HEIGHT):
                top_bound_tracker = top_bound
                while(right_bound < FRAME_WIDTH):
                    road_width = self.ROAD_WIDTH_INIT + int(random.uniform(-self.ROAD_WIDTH_TOLERANCE,self.ROAD_WIDTH_TOLERANCE))
                    building_width = self.BUILDING_WIDTH_INIT + int(random.uniform(-30,30)) + int(random.uniform(-self.BUILDING_WIDTH_TOLERANCE,self.BUILDING_WIDTH_TOLERANCE))
                    building_height = self.BUILDING_HEIGHT_INIT + int(random.uniform(-30,30)) + int(random.uniform(-self.BUILDING_HEIGHT_TOLERANCE,self.BUILDING_HEIGHT_TOLERANCE))

                    bottom_left_coord = (right_bound+road_width,
                                         top_bound+road_width)

                    bottom_right_coord = (bottom_left_coord[0]+building_width,
                                          bottom_left_coord[1])

                    top_right_coord = (bottom_right_coord[0],
                                       bottom_right_coord[1]+building_height)

                    top_left_coord = (bottom_left_coord[0],
                                      top_right_coord[1])

                    building = [bottom_left_coord,bottom_right_coord,top_right_coord,top_left_coord]

                    # small chance of making the right bound smaller and hence potentially fusing the buildings or making a small alley the drone cant get down
                    if(random.uniform(0,1) > 0.1):
                        right_bound = bottom_right_coord[0]
                    else:
                        right_bound = bottom_right_coord[0] - road_width

                    if(right_bound < FRAME_WIDTH and top_right_coord[1] < FRAME_HEIGHT):
                        # small change to create a park
                        if(random.uniform(0,1) > 0.1):
                            self.buildings.append(building)
                        else:
                            self.parks.append(building)

                    # make sure we always have largest top bound to avoid buildings "fusing" together - with a small chance of not happening i.e. some fusing
                    if(top_left_coord[1] > top_bound_tracker and random.uniform(0,1) > 0.2):
                        top_bound_tracker = top_left_coord[1]

                # small chance of fusing the buildings vertically (or making them very close together)
                if(random.uniform(0,1) > 0.1):
                    top_bound = top_bound_tracker
                else:
                    top_bound = top_bound_tracker - self.ROAD_WIDTH_INIT

                right_bound = 0
            
    def create_destination(self):
        # get new position of destination on a road
        self.DRONE_DESTINATION = self.re_position_coord_on_road(self.DRONE_DESTINATION)

        drone_dest = ((self.DRONE_DESTINATION[0]-3,self.DRONE_DESTINATION[1]-3),
                    (self.DRONE_DESTINATION[0]+4,self.DRONE_DESTINATION[1]-3),
                    (self.DRONE_DESTINATION[0]+4,self.DRONE_DESTINATION[1]+4),
                    (self.DRONE_DESTINATION[0]-3,self.DRONE_DESTINATION[1]+4))
    
        self.destination_box.append(drone_dest)
        
        # place destination into viewer window
        self.viewer.add_geom(self.viewer.draw_polygon(self.destination_box[0],1,color=[0.5,0.2,0.9])) 
        
        self.viewer.render()
         
    def re_position_coord_on_road(self,START_POSITION_):
        # this function takes a starting coordinate for the drone, determines whether it is merged with a building
        # (or other drone) and returns the nearest coordinate where it is not colliding
        START_POSITION = (0,0)
        
        START_POSITION = (START_POSITION_[1],START_POSITION_[0])
        
        # get image of environment
        self.viewer.render()
        
        image_data_buffer = pyglet.image.get_buffer_manager().get_color_buffer().get_image_data().get_data()         
        image_data = np.frombuffer(image_data_buffer,dtype=np.uint8,count=-1,offset=0) # pixel data is in one big line of values
        pixel_data = image_data.reshape(len(image_data)//4,4) # get individual pixel data and store in elements in an array
        pixel_data = pixel_data.reshape(FRAME_HEIGHT,FRAME_WIDTH,4)
        
        city_image = pixel_data
        
        window_row_start = START_POSITION[0]-math.floor(REPOSITION_ROAD_WINDOW_SIZE/2)+2
        window_row_end = window_row_start+REPOSITION_ROAD_WINDOW_SIZE

        if(window_row_start < 0):
            window_row_start = 0
        elif(window_row_end > FRAME_HEIGHT):
            window_row_end = FRAME_HEIGHT  

        window_col_start = START_POSITION[1]-math.floor(REPOSITION_ROAD_WINDOW_SIZE/2)+2
        window_col_end = window_col_start+REPOSITION_ROAD_WINDOW_SIZE

        if(window_col_start < 0):
            window_col_start = 0
        elif(window_col_end > FRAME_WIDTH):
            window_col_end = FRAME_WIDTH

        city_image_section = city_image[window_row_start:window_row_end,window_col_start:window_col_end,:]

        reposition_indicator = 0

        # loop over rows
        for i in range(np.shape(city_image_section)[0]):
            # loop over columns
            for j in range(np.shape(city_image_section)[1]):
                if(city_image_section[i,j,0] != 166):
                    # i.e. a building - not the ground
                    reposition_indicator = 1
                    break
            if(reposition_indicator == 1):
                break
        
        if(reposition_indicator == 0):
            return START_POSITION_
        else:
            # perform an averaging convolution/correlation to find areas where the drone can spawn (outwith a building)
            # if the convoloved result has a pixel equal to the average (mean) of the ground pixels then we know there is a window at that point the drone can spawn at
            conv_row_start = math.floor(REPOSITION_ROAD_WINDOW_SIZE/2)
            conv_row_end = FRAME_HEIGHT - conv_row_start

            conv_col_start = math.floor(REPOSITION_ROAD_WINDOW_SIZE/2)
            conv_col_end = FRAME_WIDTH - conv_col_start

            convolved_result = np.zeros(shape=(FRAME_HEIGHT,FRAME_WIDTH))
            avg_factor = REPOSITION_ROAD_WINDOW_SIZE**2

            stride = 5

            # implementation of mean filter with kernel of size DRONE_WINDOW_SIZExDRONE_WINDOW_SIZE
            for i in range(FRAME_HEIGHT):
                for j in range(FRAME_WIDTH):
                    if(i >= conv_row_start and i < conv_row_end and j >= conv_col_start and j < conv_col_end):
                        if(i % stride == 0 and j % stride == 0):
                            window_row_start_idx = i - math.floor(REPOSITION_ROAD_WINDOW_SIZE/2)
                            window_col_start_idx = j - math.floor(REPOSITION_ROAD_WINDOW_SIZE/2)
                            # loop over kernel rows
                            for k1 in range(REPOSITION_ROAD_WINDOW_SIZE):
                                # loop over kernel columns
                                for k2 in range(REPOSITION_ROAD_WINDOW_SIZE):
                                    convolved_result[i,j] += city_image[window_row_start_idx+k1,window_col_start_idx+k2,0]

                            convolved_result[i,j] = convolved_result[i,j] / avg_factor

            start_coords = []
            # new coordinate will stay the same if an appriopriate position cannot be found
            new_coord = START_POSITION

            # find coordinates where drone is allowed to start
            for i in range(FRAME_HEIGHT):
                for j in range(FRAME_WIDTH):
                    if(convolved_result[i,j] == 166):
                        start_coords.append((i,j))

            # find closest index to originally requrested position
            if(len(start_coords) > 0):
                smallest_distance = math.sqrt((START_POSITION[0]-start_coords[0][0])**2 + (START_POSITION[1]-start_coords[0][1])**2)
            for i in range(len(start_coords)):
                if(i > 0):
                    distance = math.sqrt((START_POSITION[0]-start_coords[i][0])**2 + (START_POSITION[1]-start_coords[i][1])**2)
                    if(distance < smallest_distance):
                        smallest_distance = distance
                        new_coord = start_coords[i]
            
            #return drone_new_coord
            new_coord_return = (new_coord[1],new_coord[0])
            return new_coord_return
            
    def create_drones(self):
        ### create the first drone
        # create instance of Drone class
        drone = Drone()
        
        # first drone position set manually - check if the drone is starting in a building and if so, move it closeby out of building        
        drone.drone_position_init = self.re_position_coord_on_road(self.DRONE_START_POSITION)
        drone.drone_position = drone.drone_position_init
        drone.drone_position_prev = drone.drone_position
        drone.drone_observation = np.zeros(shape=(OBSRV_WIDTH,OBSRV_HEIGHT,4),dtype=np.uint8)
        # add drone to list
        self.drones.append(drone)
        
        # loop over remaining drones
        for i in range(self.NO_DRONES):
            if(i > 0):
                # create instance of Drone class
                drone = Drone()
                # define drone initial positions and observation windows of drones (square around)
                drone.drone_position_init = (random.uniform(0, FRAME_WIDTH-1),random.uniform(0, FRAME_HEIGHT-1))
                drone.drone_position = drone.drone_position_init
                drone.drone_position_prev = drone.drone_position
                drone.drone_observation = np.zeros(shape=(OBSRV_WIDTH,OBSRV_HEIGHT,4),dtype=np.uint8)
                # add drone to list
                self.drones.append(drone)
    
    def create_box2d_world(self):
        # create static bodies (buildings) in world
        for i in range(len(self.buildings)):
            self.static_bodies.append(self.world.CreateStaticBody(fixtures=fixtureDef(shape=polygonShape(vertices=self.buildings[i]))))
    
        # create dynamic bodies (drones) in world
        for i in range(len(self.drones)):
            self.drones[i].dynamic_body = self.world.CreateDynamicBody(
            position=self.drones[i].drone_position_init, # MAY HAVE TO CHANGE THIS INITIAL POSITION FROM PIXELS TO METRES
            angularDamping = 0.1,
            angle=0,
            fixtures=fixtureDef(shape=polygonShape(vertices=DRONE_SHAPE)))
    
    def step(self, action):
        # step_reward is the reward for the current action
        # reward is the cumulative reward
        # prev_reward is the cumulative reward at the previous time step
        step_reward = 0
        
        if action is not None:  # First step without action, called from reset()
            self.reward -= 0.2
            self.reward += 1/self.drones[0].drone_distance_observation
            if self.drone_contact == True:
                self.done = True
                self.reward -= 100

            if self.drones[0].drone_distance_observation < 250 and self.d250==False:
                self.reward += 30
                self.d250 = True
            
            if self.drones[0].drone_distance_observation < 200 and self.d200==False:
                self.reward += 35
                self.d200 = True
            
            if self.drones[0].drone_distance_observation < 150 and self.d150==False:
                self.reward += 40
                self.d150 = True
            
            if self.drones[0].drone_distance_observation < 100 and self.d100==False:
                self.reward += 45
                self.d100 = True
            
            if self.drones[0].drone_distance_observation < 50 and self.d50==False:
                self.reward += 50
                self.d50 = True

            if self.drones[0].drone_distance_observation < 10:
                self.reward += 500
                self.done = True
                
            # is the drone outwith window
            if(self.drones[0].drone_position[0] < 0 or self.drones[0].drone_position[0] > FRAME_WIDTH or
               self.drones[0].drone_position[1] < 0 or self.drones[0].drone_position[1] > FRAME_HEIGHT):
                self.reward -= 100
                self.done = True
            ###############################
                
            step_reward = self.reward - self.prev_reward
            self.prev_reward = self.reward
        
        
        # update drone positions based on action - ONLY BEING APPLIED TO ONE DRONE (FIRST IN LIST)
        self.drones[0].drone_step(action)
        
        # apply a random movement to the other drones - ***may want to change later if controlling all drones***
        for j in range(len(self.drones)-1):
            self.drones[j+1].drone_step(random.randint(0,8))
        
        # call "Step" function to update drone position in box2D
        self.world.Step(1/FPS,1,1)
        
        # update and round position to integer for all drones - after box2d world step
        for j in range(len(self.drones)):
            self.drones[j].update_position()
        
        # update viewer window
        self.render()
        
        # set state as observation window (per racecar env)
        state = self.drones[0].drone_observation
        state = self.process_states(state)
        
        # only return observation for first drone
        return state, self.reward, self.done, {}
                
    def reset(self):
        self.drone_contact = False
        self.done = False
        self.reward = 0.0
        self.prev_reward = 0.0
        self.step_reward = 0
        
        self.d250 = False
        self.d200 = False
        self.d150 = False
        self.d100 = False
        self.d50 = False

        for i in range(len(self.drones)):  
            self.world.DestroyBody(self.drones[i].dynamic_body)
            
            self.drones[i].dynamic_body = self.world.CreateDynamicBody(
            position=self.drones[i].drone_position_init, # MAY HAVE TO CHANGE THIS INITIAL POSITION FROM PIXELS TO METRES
            angularDamping = 0.1,
            angle=0,
            fixtures=fixtureDef(shape=polygonShape(vertices=DRONE_SHAPE)))
                
        # set state as observation window (per racecar env)
        state = self.drones[0].drone_observation
        state = self.process_states(state)
        
        return state
    
    def process_states(self,states):
        # flatten
        states = np.mean(states[:,:,0:3],2)
        states = np.dstack(states).T
                
        return states
        
    def get_observations(self):
        
        # only want first drone...
        for j in range(1):
            # get origin (i.e. bottom left coordinate) of observation window
            drone_centre = (self.drones[j].drone_position_prev[0]+2,self.drones[j].drone_position_prev[1]+2)
            observation_window_origin = (int(drone_centre[0] - math.floor(OBSRV_WIDTH/2)), int(drone_centre[1] - math.floor(OBSRV_HEIGHT/2)))

            # update drone observation (observation space) - a 2D array of 4D tuples - RGB values and transparency value
            image_data_buffer = pyglet.image.get_buffer_manager().get_color_buffer().get_region(observation_window_origin[0],observation_window_origin[1],OBSRV_WIDTH,OBSRV_HEIGHT).get_image_data().get_data() # get window centred on drone of size OBSRV_WIDTH X OBSRV_HEIGHT

            # save image of observation space to hard drive
            #pyglet.image.get_buffer_manager().get_color_buffer().get_region(observation_window_origin[0],observation_window_origin[1],OBSRV_WIDTH,OBSRV_HEIGHT).get_image_data().save("test" + str(i) + ".png")
        
            image_data = np.frombuffer(image_data_buffer,dtype=np.uint8,count=-1,offset=0) # pixel data is in one big line of values
            pixel_data = image_data.reshape(len(image_data)//4,4) # get individual pixel data and store in elements in an array
            pixel_data = pixel_data.reshape(OBSRV_WIDTH,OBSRV_HEIGHT,4) # reshape to a 2D array of pixel values surrounding the drone
        
            # observation space for each drone - a 13x13 grid of RGB pixel values
    
            if(self.OBSCURE_DRONE_OBSRV == 1):
                # place a black square over the drone (15x15)
                if(OBSRV_WIDTH < 15):
                    print("Warning: image observation all zeros!")
                    pixel_data[:,:,0:3] = 0                    
                else:
                    pixel_data[math.floor(OBSRV_WIDTH/2)-7:math.floor(OBSRV_WIDTH/2)+8,math.floor(OBSRV_HEIGHT/2)-7:math.floor(OBSRV_HEIGHT/2)+8,0:3] = 0
            
            self.drones[j].drone_observation = np.flip(pixel_data,0)
            
            # update distance observation with euclidean distance of drone from destination
            self.drones[j].drone_distance_observation = np.linalg.norm(np.array(self.drones[j].drone_position) - self.DRONE_DESTINATION)
        
    def render_init_city(self):
        # draw background
        self.viewer.add_geom(self.viewer.draw_polygon([(0,0),(FRAME_WIDTH,0),(FRAME_WIDTH,FRAME_HEIGHT),(0,FRAME_HEIGHT)],1,color=[0.65,0.65,0.65]))
        
        # place buildings into viewer window
        for i in range(len(self.buildings)):
            self.viewer.add_geom(self.viewer.draw_polygon(self.buildings[i],1,color=[0.85,0.85,0.85]))   
            
        # place parks into viewer
        for i in range(len(self.parks)):
            self.viewer.add_geom(self.viewer.draw_polygon(self.parks[i],1,color=[0.7,0.85,0.15]))           
    
        # update viewer window and wait before adding drones
        self.viewer.render()
        
        time.sleep(1)
        
    def render_init_drones(self):

        # place drone into viewer window
        drone_positions = []
        for i in range(len(self.drones)):
            drone_positions.append(self.drones[i].drone_position_init)

        # needs to consider the shape of the drone plus the current offset
        self.drone_shape_viewer = []
        for j in range(len(self.drones)): 
            self.drone_shape_viewer.append(DRONE_SHAPE.copy())

        # draw initial positions of drones
        for j in range(len(self.drones)):
            for k in range(len(DRONE_SHAPE)):
                self.drone_shape_viewer[j][k] = (DRONE_SHAPE[k][0] + drone_positions[j][0],DRONE_SHAPE[k][1] + drone_positions[j][1])    

        # add drones to viewer

        # draw drone
        drone_centre = (self.drones[0].drone_position[0]+2,self.drones[0].drone_position[1]+2)
        self.draw_drone(drone_centre,(255,0,0))
        
        # remaining drones - same colour
        for j in range(len(self.drones)-1):            
            drone_centre = (self.drones[j+1].drone_position[0]+2,self.drones[j+1].drone_position[1]+2)
            self.draw_drone(drone_centre,(0,0,255))
    
        # render viewer window and wait a second
        self.viewer.render()
        
        # get observation window for all drones
        self.get_observations()
    
        time.sleep(1)        
    
    def render(self,mode="human"):
        if(self.render_init == 0):
            self.viewer = None
            self.viewer = rendering.Viewer(FRAME_WIDTH, FRAME_HEIGHT)
            self.render_init_city()
            self.create_destination()
            self.render_init_drones()
            self.render_init = 1
        
        # in this loop, we update the position of the drones in the viewer window
        for i in range(len(self.drones)):
            # inner loop for individual drone geometries
            for j in range(5):
                del self.viewer.geoms[-1]
        
        for j in range(len(self.drones)):
            for k in range(len(DRONE_SHAPE)):
                self.drone_shape_viewer[j][k] = (DRONE_SHAPE[k][0] + self.drones[j].drone_position[0],DRONE_SHAPE[k][1] + self.drones[j].drone_position[1])
                        
            # add drone to viewer
            if(j == 0):            
                # draw drone
                drone_centre = (self.drones[j].drone_position[0]+2,self.drones[j].drone_position[1]+2)
                self.draw_drone(drone_centre,(255,0,0))
            else:              
                drone_centre = (self.drones[j].drone_position[0]+2,self.drones[j].drone_position[1]+2)
                self.draw_drone(drone_centre,(0,0,255))
            
        # render viewer window    
        self.viewer.render()       
        
        # get observation window for all drones
        self.get_observations()
        
    def draw_drone(self,drone_centre,drone_colour): 
        # main chassis
        chassis_bottom_left_coord = (drone_centre[0]-1,drone_centre[1]-3)
        chassis_bottom_right_coord = (drone_centre[0]+2,drone_centre[1]-3)
        chassis_top_right_coord = (drone_centre[0]+2,drone_centre[1]+4)
        chassis_top_left_coord = (drone_centre[0]-1,drone_centre[1]+4)

        self.viewer.add_geom(self.viewer.draw_polygon((chassis_bottom_left_coord,
                                                       chassis_bottom_right_coord,
                                                       chassis_top_right_coord,
                                                       chassis_top_left_coord,
                                                       chassis_bottom_left_coord),1,color=[drone_colour[0],drone_colour[1],drone_colour[2]]))
        
        # bottom left propeller
        circle_geom = rendering.make_circle(radius=3, filled=False)
        circle_geom.set_color(0,0,0)        
        for k in range(len(circle_geom.v)):
            circle_geom.v[k] = (circle_geom.v[k][0] + drone_centre[0]-4,circle_geom.v[k][1] + drone_centre[1]-4)
        self.viewer.add_geom(circle_geom)
                
        # bottom right propeller
        circle_geom = rendering.make_circle(radius=3, filled=False)
        circle_geom.set_color(0,0,0)        
        for k in range(len(circle_geom.v)):
            circle_geom.v[k] = (circle_geom.v[k][0] + drone_centre[0]+5,circle_geom.v[k][1] + drone_centre[1]-4)
        self.viewer.add_geom(circle_geom)
                
        # top left propeller
        circle_geom = rendering.make_circle(radius=3, filled=False)
        circle_geom.set_color(0,0,0)        
        for k in range(len(circle_geom.v)):
            circle_geom.v[k] = (circle_geom.v[k][0] + drone_centre[0]-4,circle_geom.v[k][1] + drone_centre[1]+5)
        self.viewer.add_geom(circle_geom)
                
        # top right propeller
        circle_geom = rendering.make_circle(radius=3, filled=False)
        circle_geom.set_color(0,0,0)        
        for k in range(len(circle_geom.v)):
            circle_geom.v[k] = (circle_geom.v[k][0] + drone_centre[0]+5,circle_geom.v[k][1] + drone_centre[1]+5)
        self.viewer.add_geom(circle_geom)
        
    def close(self):
        self.render_init = 0
        self.viewer.close()

Run the following few cells to test out the environment

In [3]:
import matplotlib.pyplot as plt
from IPython import display

In [4]:
def test_env(start,end):
    # instantiate environment
    drone_env = DroneEnv()

    # random city generator parameters - need to be selected carefully so buildings maintain normal shape compatible with box2d
    drone_env.ROAD_WIDTH_INIT = 15
    drone_env.ROAD_WIDTH_TOLERANCE = 3
    drone_env.BUILDING_WIDTH_INIT = 50
    drone_env.BUILDING_WIDTH_TOLERANCE = 7
    drone_env.BUILDING_HEIGHT_INIT = 50
    drone_env.BUILDING_HEIGHT_TOLERANCE = 7

    # set number of drones in environment (includes one we are controlling)
    drone_env.NO_DRONES = 10

    # set start and end positions of drone we are controlling
    drone_env.DRONE_START_POSITION = start
    drone_env.DRONE_END_POSITION = end
    # drone_env.DRONE_START_POSITION = (int(random.uniform(20,FRAME_WIDTH-21)),int(random.uniform(20,FRAME_HEIGHT-21)))
    # drone_env.DRONE_DESTINATION = (int(random.uniform(20,FRAME_WIDTH-21)),int(random.uniform(20,FRAME_HEIGHT-21)))

    # set to '1' to make drone a black box in the observation window - other drones will still be visible normally
    drone_env.OBSCURE_DRONE_OBSRV = 0

    # set to '1' to use the stratchlyde environment or '0' to use a randomly generated environment
    drone_env.STRATHY = 0

    # needs to be called after the parameters have been set
    drone_env.initialise_environment()

    episodes = 10
    for episode in range(1,episodes+1):
        drone_env.reset()
        done = False
        score = 0
        #for i in range(50):
        while not done:
            # action
            #horizontal_force = random.uniform(-0.4, 1)
            #vertical_force = random.uniform(-0.4, 1)
            #action = (horizontal_force,vertical_force)

            action = random.randint(0,8)
            #action = 0

            # step for reinforcemene learning algorithm
            observation, reward, done, _  = drone_env.step(action)
            #time.sleep(0.2)

            # print observation using matplotlib
            #plt.imshow(drone_env.drones[0].drone_observation)
            ##### comment to make simulation run faster #####
            #plt.show()
            #print(done)
            #################################################

        score = drone_env.reward
        #print('Episode:{} Score:{}'.format(episode,score))
        #print('Distance from destination:{}'.format(observation[1]))

    drone_env.close()

In [5]:
start_1 = (143,77)
end_1 = (491,338)

start_2 = (435,7)
end_2 = (244,337)

start_3 = (99,132)
end_3 = (458,283)

start_4 = (322,97)
end_4 = (51,206)

start_5 = (262,114)
end_5 = (427,283)

In [6]:
test_env(start_1,end_1)



# DQN demo with environment
credit: Nicholas Renotte https://github.com/nicknochnack/TensorflowKeras-ReinforcementLearning/blob/master/Deep%20Reinforcement%20Learning.ipynb

# 0. Install Dependencies

In [None]:
# !pip install tensorflow==2.3.0
# !pip install gym
# !pip install keras
# !pip install keras-rl2


# 1. Test Random Environment with OpenAI Gym

In [7]:
def create_env(start,end):
    # instantiate environment
    drone_env =  DroneEnv()

    # random city generator parameters - need to be selected carefully so buildings maintain normal shape compatible with box2d
    drone_env.ROAD_WIDTH_INIT = 15
    drone_env.ROAD_WIDTH_TOLERANCE = 3
    drone_env.BUILDING_WIDTH_INIT = 50
    drone_env.BUILDING_WIDTH_TOLERANCE = 7
    drone_env.BUILDING_HEIGHT_INIT = 50
    drone_env.BUILDING_HEIGHT_TOLERANCE = 7

    # set number of drones in environment (includes one we are controlling)
    drone_env.NO_DRONES = 10

    # set start and end positions of drone we are controlling
    drone_env.DRONE_START_POSITION = start
    drone_env.DRONE_DESTINATION = end
    # drone_env.DRONE_START_POSITION = (int(random.uniform(20,FRAME_WIDTH-21)),int(random.uniform(20,FRAME_HEIGHT-21)))
    # drone_env.DRONE_DESTINATION = (int(random.uniform(20,FRAME_WIDTH-21)),int(random.uniform(20,FRAME_HEIGHT-21)))

    # set to '1' to make drone a black box in the observation window - other drones will still be visible normally
    drone_env.OBSCURE_DRONE_OBSRV = 0

    # set to '1' to use the stratchlyde environment or '0' to use a randomly generated environment
    drone_env.STRATHY = 0

    # needs to be called after the parameters have been set
    drone_env.initialise_environment()

    # this is the dimensions of the observation window input to model
    states = (drone_env.observation_space.shape[0], drone_env.observation_space.shape[1])

    # this is the number of actions, ie 2, vertical and horizontal force
    #actions = drone_env.action_space.shape[0]
    actions = drone_env.action_space.n # for discrete action space

    drone_env.close()

    return drone_env

In [8]:
drone_env = create_env(start_1,end_1)

# this is the dimensions of the observation window input to model
states = (drone_env.observation_space.shape[0], drone_env.observation_space.shape[1])

# this is the number of actions, ie 2, vertical and horizontal force
#actions = drone_env.action_space.shape[0]
actions = drone_env.action_space.n # for discrete action space

In [9]:
# make sure distance between start and end is over 250 for reward to work
np.linalg.norm(np.array(drone_env.DRONE_START_POSITION) - drone_env.DRONE_DESTINATION)

432.40374651475906

# 2. Create a Deep Learning Model with Keras

In [10]:
import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten, Permute
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import activations

In [16]:
import keras
import keras.optimizers as Kopt
from keras.layers import Convolution2D, MaxPooling3D, MaxPooling2D
from keras.layers import Dense, Flatten, Input
from keras.models import Model

In [17]:
actions

9

In [18]:
observation_shape = (1,OBSRV_WIDTH,OBSRV_HEIGHT,1)
observation_shape

(1, 85, 85, 1)

In [19]:
# LEARNING_RATE = 0.001

def build_model(actions=actions):
    brain_in = Input(shape=observation_shape, name='brain_in')
    x = brain_in
    
    x = Convolution2D(12, (7,7), strides=(1,1), activation='relu')(x)
    x = MaxPooling3D(pool_size=(1, 2, 2), strides=(1, 2, 2), padding='valid')(x)
    x = Convolution2D(24, (3,3), strides=(1,1), activation='relu')(x)
    x = MaxPooling3D(pool_size=(1, 2, 2), strides=(1, 2, 2), padding='valid')(x)
   
    x = Flatten(name='flattened')(x)
    x = Dense(128, activation='relu')(x)
    x = Dense(actions, activation="linear")(x)
        
    model = Model(inputs=brain_in, outputs=x)
        
    return model

In [20]:
model = build_model()

In [21]:
model.summary()

Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
brain_in (InputLayer)        [(None, 1, 85, 85, 1)]    0         
_________________________________________________________________
conv2d (Conv2D)              (None, 1, 79, 79, 12)     600       
_________________________________________________________________
max_pooling3d (MaxPooling3D) (None, 1, 39, 39, 12)     0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 1, 37, 37, 24)     2616      
_________________________________________________________________
max_pooling3d_1 (MaxPooling3 (None, 1, 18, 18, 24)     0         
_________________________________________________________________
flattened (Flatten)          (None, 7776)              0         
_________________________________________________________________
dense (Dense)                (None, 128)               995456

# 3. Build Agent with Keras-RL

In [145]:
# pip install gym==0.12.1
# !pip install rl
# !pip install keras-rl2

In [22]:
from rl.agents import DQNAgent
from rl.policy import BoltzmannQPolicy
from rl.memory import SequentialMemory
import tensorflow as tf

In [147]:
# from google.colab import drive
# drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [23]:
limit = 1000

def build_agent(model, actions):
    policy = BoltzmannQPolicy()
    memory = SequentialMemory(limit=limit, window_length=1)
    dqn = DQNAgent(model=model, memory=memory, policy=policy, 
                 nb_actions=actions, nb_steps_warmup=10, target_model_update=1e-2)
    return dqn

In [24]:
del model
model = build_model()

In [25]:
dqn = build_agent(model, actions)
dqn.compile(Adam(lr=1e-3), metrics=['mae'])

In [26]:
tf.debugging.set_log_device_placement(True)
with tf.device('/GPU:0'):
    dqn.fit(drone_env, nb_steps=limit, visualize=True, verbose=1)
drone_env.close()

Training for 1000 steps ...
Interval 1 (0 steps performed)




   11/10000 [..............................] - ETA: 11:05 - reward: -1.1861



   23/10000 [..............................] - ETA: 1:50:12 - reward: -2.3721done, took 18.371 seconds


In [None]:
# model.save('/content/drive/MyDrive/CNN_model_test')

In [28]:
episodes = 5
scores = dqn.test(drone_env, nb_episodes=episodes, visualize=True)
print(np.mean(scores.history['episode_reward']))
drone_env.close()

Testing for 5 episodes ...
Episode 1: reward: -549.867, steps: 67
Episode 2: reward: -2046.550, steps: 140
Episode 3: reward: -6326.915, steps: 251
Episode 4: reward: -100.197, steps: 1
Episode 5: reward: -426.499, steps: 57
-1890.005526479649


In [11]:
drone_env.close()

NameError: name 'drone_env' is not defined