In [25]:
import math
import numpy as np
from numba import jit
import pygame
from pygame import gfxdraw
import matplotlib.pyplot as plt


pygame 2.1.2 (SDL 2.0.16, Python 3.8.10)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [26]:
class Obstacle:
    def __init__(self, w,h, origin_x, origin_y):
        self.w = w
        self.h = h
        self.o_x = origin_x
        self.o_y = origin_y
        
        o_mat = np.array([[self.o_x,self.o_y,self.o_x,self.o_y],
                         [self.o_x,self.o_y,self.o_x,self.o_y],
                         [self.o_x,self.o_y,self.o_x,self.o_y],
                         [self.o_x,self.o_y,self.o_x,self.o_y]])
        
        size_mat = np.array([[0,0,0,self.h],
                            [0,self.h,self.w,self.h],
                            [self.w,self.h,self.w,0],
                            [self.w,0,0,0]])
        
        walls = o_mat+size_mat
        self.walls = np.reshape(walls,[4,2,2])

In [27]:
class Ray:
    def __init__(self, x, y, angle):
        self.x = x
        self.y = y
        self.dir = (math.cos(angle), math.sin(angle))

    def update(self, mx, my):
        self.x = mx
        self.y = my

    def check_intersect(self, wall):       
        x1 = wall[0][0]
        y1 = wall[0][1]
        x2 = wall[1][0]
        y2 = wall[1][1]
        
        x3 = self.x
        y3 = self.y
        x4 = self.x + self.dir[0]
        y4 = self.y + self.dir[1]
    
        # Using line-line intersection formula to get intersection point of ray and wall
        den= (x1 - x2) * (y3 - y4) - (y1 - y2) * (x3 - x4)
        num = (x1 - x3) * (y3 - y4) - (y1 - y3) * (x3 - x4)
        
        if den == 0:
            return 0
        
        t = num / den
        u = -((x1 - x2) * (y1 - y3) - (y1 - y2) * (x1 - x3)) / den

        if 1 > t > 0 and u > 0:
            x = x1 + t * (x2 - x1)
            y = y1 + t * (y2 - y1)
            intersect = np.array([x, y])
            dist = calc_dist(x,y,self.x,self.y)
            return dist, intersect  


In [28]:
@jit(nopython=True)
def circle_intersect(x1,y1,x2,y2,cx,cy,r):
    d = math.sqrt((cx-x1)**2+(cy-y1)**2)

    #Ray vector
    AB = np.array([x2-x1,y2-y1])
    
    # Vector to center of circle
    AC = np.array([cx-x1,cy-y1])

    AB_mag = math.sqrt(AB[0]**2+AB[1]**2)
    AC_mag = math.sqrt(AC[0]**2+AC[1]**2)
    
    dot_ABAC = AC[0]*AB[0]+AC[1]*AB[1]

    if AB_mag <= AC_mag or dot_ABAC<0:
        # Circle not in line of sight of ray
        return None
    else:
        #Projcetion of AC onto AB
        AD = dot_ABAC / np.sum(AB**2) * AB

        # Coordinats of D
        Dx,Dy = np.array([x1,y1])+AD

        #Distance from circle center to D
        CD_mag = math.sqrt((cx-Dx)**2+(cy-Dy)**2)
        
        if CD_mag>r:
            #No intersect
            return None
        else:

            #Distance from D to circle edge
            PD_mag = math.sqrt(r**2-CD_mag**2)
            
            # Distance from ray start to D
            AD_mag = math.sqrt(AD[0]**2+AD[1]**2)
            
            # If true, ray start is inside the circle
            if AD_mag > 0:
                AD_unit = AD/AD_mag
                AP = AD - AD_unit*PD_mag
            else:
                return None

            #Point of intercect on the circle
            P = np.array([x1,y1]) + AP
            AP_mag = math.sqrt(AP[0]**2+AP[1]**2)
            
            return AP_mag, P

In [182]:
class Agent:
    def __init__(self, start_pos, goal, agent_r, num_rays):
        self.radius = agent_r
        self.n_ray = num_rays
        self.max_speed = 5
        self.mass = 4.0
        self.force = 10.0
        
        self.x = start_pos[0]
        self.y = start_pos[1]
        self.speed_x = 0
        self.speed_y = 0
        self.goal = goal
        goal_dist = self.get_goal_dist()
        self.done = False
        
        rays = []
        for i in range(self.n_ray):
            angle = 2*math.pi/self.n_ray * i
            ray = Ray(self.x,self.y,angle)
            rays.append(ray)
        self.rays = rays
        self.dists = np.zeros(len(self.rays))
        self.intersects = np.zeros([len(self.rays),2])
        
        
    def update_speed_pos(self, action):
        # right+ left-
        self.speed_x += (int(action==2)-int(action==3)) * math.sqrt(self.force/self.mass)
        self.speed_x = np.clip(self.speed_x, -self.max_speed,self.max_speed)
        # up + down -
        self.speed_y += (int(action==0)-int(action==1)) * math.sqrt(self.force/self.mass)
        self.speed_y = np.clip(self.speed_y, -self.max_speed,self.max_speed)

        self.x += self.speed_x
        self.y += self.speed_y
        
        for ray in self.rays:
            ray.update(self.x,self.y)
    
    '''
    get_ray_info() needs improvement to speed up performance!
    
    '''
    def get_ray_info(self, obstacles, agents_pos):
        # Check intersect with objects
        for i,ray in enumerate(self.rays):
            shortest_d = np.inf
            for obsticle in obstacles:
                for wall in obsticle.walls:
                    ray_intersect = ray.check_intersect(wall)
                    if ray_intersect:
                        d, intersect = ray_intersect 
                        if shortest_d > d:
                            shortest_d = d
                            self.dists[i] = d
                            self.intersects[i] = intersect
                            
        
        #Check intersect with goal
        x1,y1 = [self.x,self.y]
        gx,gy = self.goal
        r = self.radius
        for i,intersect in enumerate(self.intersects):
            shortest_d = np.inf
            # Check intersect with agent goal
            x2,y2 = intersect
            dist = calc_dist(x1,y1,gx,gy)
            if not np.isclose(0,dist) and (dist + self.radius) < self.dists[i]:
                ray_intersect = circle_intersect(x1,y1,x2,y2,gx,gy,r)
                if ray_intersect:
                    d, intersect = ray_intersect
                    if shortest_d > d:
                        shortest_d = d
                        self.dists[i] = d
                        self.intersects[i] = intersect
     
            # Check intersect with other agents
            for agent_pos in agents_pos:
                cx,cy = agent_pos
                dist = calc_dist(x1,y1,cx,cy)
                if not np.isclose(0,dist) and (dist + self.radius) < self.dists[i]:
                    ray_intersect = circle_intersect(x1,y1,x2,y2,cx,cy,r)
                    if ray_intersect:
                        d, intersect = ray_intersect
                        if shortest_d > d:
                            shortest_d = d
                            self.dists[i] = d
                            self.intersects[i] = intersect
                            

                            
            
    def get_goal_dist(self):
        self.goal_dist = calc_dist(self.x,self.y,self.goal[0],self.goal[1])
        goal_vec = np.array([self.goal[0]-self.x,self.goal[1]-self.y])
        self.goal_angle = np.arccos(np.dot(goal_vec,np.array([0,1]))/self.goal_dist)
        return self.goal_dist

            
    def collision(self):
        self.dists = None
        self.done = True
        self.reward = -2
        
        
    def goal_reached(self):
        self.dists = None
        self.done = True
        self.reward = 2
        
        
    def step_reward(self,reward):
        self.reward = reward
        
    def agent_state(self):
        if type(self.dists)==type(None):
            return None
        else:
            return np.append(self.dists,[self.goal_dist,self.goal_angle])
        

In [30]:
@jit(nopython=True)
def calc_dist(x1,y1,x2,y2):
    dist = math.sqrt((x1-x2)**2+(y1-y2)**2)
    return dist

In [31]:
def get_random_start(n_points,screen_size):
    w,h = screen_size
    points = []
    while len(points) < n_points*2:
        x,y =[np.random.randint(20,w-20),np.random.randint(20,h-20)]

        too_close = 0
        for point in points:
            d = calc_dist(x,y,point[0],point[1])
            if d>60:
                too_close =+ 1

        if too_close <= 1:
            points.append([x,y])
    goal = points[0:n_points]
    start = points[n_points:]
    return start,goal

In [32]:
def find_agent_collisions(positions, radius):
    x = positions[:,0] 
    y = positions[:,1]

    x_mat = np.zeros([len(positions),len(positions)])
    y_mat = np.zeros([len(positions),len(positions)])

    x_mat[:] = np.array(x)
    y_mat[:] = np.array(y)

    dx = np.power(x_mat - x_mat.T,2)
    dy = np.power(y_mat - y_mat.T,2)

    d = np.sqrt(dx+dy)

    collisions_count = np.sum((d <= radius*2).astype('int'),1)
    collisions_array = collisions_count > 1 # 1 since distance to self will always be 0
    return collisions_array

In [33]:
def find_wall_collision(positions,obstacles,radius):
    x = positions[:,0]
    y = positions[:,1]
    r = radius
    wall_collisions = np.zeros([len(obstacles),len(positions)],dtype = 'int')

    for i, obstacle in enumerate(obstacles):
        obj_x = obstacle.o_x
        obj_y = obstacle.o_y
        obj_w = obstacle.w
        obj_h = obstacle.h

        obj_x_mat = np.zeros(len(positions))
        obj_y_mat = np.zeros(len(positions))
        obj_w_mat = np.zeros(len(positions))
        obj_h_mat = np.zeros(len(positions))

        obj_x_mat[:] = obj_x
        obj_y_mat[:] = obj_y
        obj_w_mat[:] = obj_w
        obj_h_mat[:] = obj_h

        if i == 0:
            left = x-r <= obj_x
            right = x+r >= obj_x+obj_w
            bottom = y-r <= obj_y
            top = y+r >= obj_y + obj_h

            collision_sum = np.array([left,right,bottom,top]).T.sum(1)
            wall_collisions[i] = collision_sum > 0 # If any statement is true, the agent is outside the border
        else:
            left = x+r >= obj_x
            right = x-r <= obj_x+obj_w
            bottom = y+r >= obj_y
            top = y-r <= obj_y + obj_h

            collision_sum = np.array([left,right,bottom,top]).T.sum(1)
            wall_collisions[i] = collision_sum == 4 # All 4 statements must be true for there to be a collision
    
    
    wall_collisions = np.sum(wall_collisions, 0) > 0
    return wall_collisions


In [180]:
class points():
    def __init__(self,n_agents,
                 obstacles = [],
                 screen_size = [600,400],
                 num_rays = 16):
        self.n_agents = n_agents
        self.screen_size = screen_size
        self.agent_radius = 10
        self.num_rays = num_rays
        self.obstacles = []
        self.obstacles.insert(0,Obstacle(self.screen_size[0],self.screen_size[1],0,0)) #border walls

        for obstacle in obstacles:
            self.obstacles.append(obstacle)
        
        self.screen = None
        self.clock = None
        self.isopen = True
        
    
    def step(self, actions):
        for i, agent in enumerate(self.agents):
            if agent.done:
                continue               
            # Update speed and position
            agent.update_speed_pos(actions[i])
            self.positions[i] = np.array([agent.x,agent.y])
        
        # ----- agent-agent collisions -----
        agent_collisions = find_agent_collisions(self.positions, self.agent_radius)
        wall_collisions = find_wall_collision(self.positions,self.obstacles,self.agent_radius)

        # Check all new positions for collisions
        for i, agent in enumerate(self.agents):
            if agent.done:
                continue
            
            if agent_collisions[i]:
                agent.collision()
                continue
            if wall_collisions[i]:
                agent.collision()
                continue
            
            # Get ray distances
            agent.get_ray_info(self.obstacles,self.positions)
            
            # Check if agent has reached the goal
            goal_distance = agent.get_goal_dist()
            if goal_distance <= agent.radius * 2:
                agent.goal_reached()
                continue
            
            # Reward for no collision or goal
            distance_reward = -(agent.goal_dist/calc_dist(0,0,self.screen_size[0],self.screen_size[1]))**0.7
            agent.step_reward(distance_reward)        
        

        for i, agent in enumerate(self.agents):
            self.states[i] = agent.agent_state()
            self.rewards[i] = agent.reward
            self.dones[i] = agent.done
        return self.states, actions, self.rewards, self.dones
        
        
    def reset(self,start_p = [],goal_p = []):        
        '''
        TODO: Add randomazation of obstacles
        '''
        if not start_p or not goal_p:
            self.start_p, self.goal_p = get_random_start(self.n_agents,self.screen_size)
        else:
            self.start_p, self.goal_p = [start_p,goal_p]
            
        agents = []
        self.positions = np.array(self.start_p,float)
                                  
        for start_p, goal_p in zip(self.start_p, self.goal_p):
            agents.append(Agent(start_p,goal_p,self.agent_radius,self.num_rays))
            
        self.rewards = np.zeros(len(self.start_p))
        self.dones = np.array([False]*len(self.goal_p))
            
        self.agents = agents
        states = []
        for agent in self.agents:
            agent.get_ray_info(self.obstacles,self.positions)
            states.append(agent.agent_state())
        self.states = np.array(states)
        
        
    def render(self):
        screen_w = self.screen_size[0]
        screen_h = self.screen_size[1]
        
        if self.screen is None:
            pygame.init()
            pygame.display.init()
            self.screen = pygame.display.set_mode((screen_w, screen_h))
        if self.clock is None:
            self.clock = pygame.time.Clock()
            
        self.surf = pygame.Surface((screen_w, screen_h))
        self.surf.fill((255, 255, 255))
        
        # Draw obsticles
        for i, obstacle in enumerate(self.obstacles):
            if not i == 0:
                corners = [obstacle.o_x,
                          obstacle.o_y,
                          obstacle.w,
                          obstacle.h]
                gfxdraw.rectangle(
                self.surf,
                corners,
                (0,0,0))
        
        # Draw agens 
        for i,agent in enumerate(self.agents):
            if agent.done:
                c = 255
            else:
                c = 0
            
            if not agent.done:
                # Goal
                gfxdraw.filled_circle(
                    self.surf,
                    int(agent.goal[0]),
                    int(agent.goal[1]),
                    int(self.agent_radius),
                    (0, 132, 0),
                )
                
                for intersect in agent.intersects:
                    gfxdraw.line(
                    self.surf,
                    int(agent.x),
                    int(agent.y),
                    int(intersect[0]),
                    int(intersect[1]),
                    (192,192,192))
                    
                # RL point
            gfxdraw.filled_circle(
                self.surf,
                int(agent.x),
                int(agent.y),
                int(self.agent_radius),
                (0+c, 0, 0),
            )

        self.surf = pygame.transform.flip(self.surf, False, True)
        self.screen.blit(self.surf, (0, 0))

        pygame.event.pump()
        self.clock.tick(30)
        pygame.display.flip()

    def get_states(self):
        return self.states
    
    def get_rewards(self):
        return self.rewards
    
    def get_dones(self):
        return self.dones
    
    def close(self):
        if self.screen is not None:
            pygame.display.quit()
            pygame.quit()
            self.isopen = False

    
    

In [192]:
# start_positions=[[526, 50], [460, 310], [277, 134]]
# goal_positions =[[236, 248], [294, 186], [195, 124]]
# obstacles = [Obstacle(100,200,0,0),
#             Obstacle(100,200,100,300),
#             Obstacle(100,200,300,200)]



# n_agents = 3
# env = points(n_agents,obstacles)
# env.reset(start_positions,goal_positions)

# for i in range(100):
#     env.step(np.random.randint(0,4,n_agents))
#     env.render()
#     if env.get_dones().all():
#         break
# env.close()

