In [1]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.interpolate import CubicSpline
from copy import deepcopy
from matplotlib.animation import FuncAnimation
from matplotlib.patches import Circle
import matplotlib
matplotlib.use('TkAgg')

In [2]:
class Obstacle:
    def __init__(self,center,radius):
        self.center=center
        self.radius=radius
    def in_collision(self,point,robot_radius):
        return np.linalg.norm(point - self.center) <= self.radius + robot_radius

In [3]:
class Environment:
    def __init__(self, width=100, height=100,robot_radius=0,obstacles=[],start=None , goal=None):
        self.width=width
        self.height=height
        self.robot_radius=robot_radius
        self.obstacles=obstacles
        self.start=start
        self.goal=goal
        
    def add_obstacle(self, obstacle):
        self.obstacles.append(obstacle)
        
    def in_collision(self,point):
        for obstacle in self.obstacles:
            if obstacle.in_collision(point,self.robot_radius):
                return True
        return False
    
    def path_in_collision(self,path):
        for point in path:
            if self.in_collision(point):
                return True
            return False
        
    def in_environment(self, point):
        min_x = 0 + self.robot_radius
        max_x = self.width - self.robot_radius
        min_y = 0 + self.robot_radius
        max_y = self.height - self.robot_radius
        return (min_x <= point[0] <= max_x) and (min_y <= point[1] <= max_y)
    
    def clip_point(self, point):
        min_x = 0 + self.robot_radius
        max_x = self.width - self.robot_radius
        min_y = 0 + self.robot_radius
        max_y = self.height - self.robot_radius
        return np.array([np.clip(point[0], min_x, max_x), np.clip(point[1], min_y, max_y)])
    
    def clip_path(self, path):
        clipped_path = []
        for point in path:
            clipped_path.append(self.clip_point(point))
        return np.array(clipped_path)
    
    def path_length(self,path):
        length=0
        for i in range(1,len(path)):
            length+=np.linalg.norm(path[i]-path[i-1])
        return length
    
    def count_violations(self, path):
        """Count the number of violations in a given path and provide details of each type."""
        violations = 0
        details = {
            'start_violation': False,
            'goal_violation': False,
            'environment_violation': False,
            'environment_violation_count': 0,
            'collision_violation': False,
            'collision_violation_count': 0,
        }
        
        if not self.in_start(path[0]):
            violations+=1
            details['start_violation']=True
        if not self.in_goal(path[-1]):
            violations += 1
            details['goal_violation'] = True
        for point in path:
            if not self.in_environment(point):
                violations += 1
                details['environment_violation_count'] += 1
            if self.in_collision(point):
                violations += 1
                details['collision_violation_count'] += 1

        details['environment_violation'] = details['environment_violation_count'] > 0
        details['collision_violation'] = details['collision_violation_count'] > 0

        return violations, details
    def in_start(self, point):
        """Check if a point is within the start region considering the robot's radius."""
        return np.linalg.norm(np.array(point) - np.array(self.start)) <= self.robot_radius

    def in_goal(self, point):
        """Check if a point is within the goal region considering the robot's radius."""
        return np.linalg.norm(np.array(point) - np.array(self.goal)) <= self.robot_radius

In [4]:
class SplinePath:
    def __init__(self,environment,control_points=[],resolution=100):
        self.environment = environment
        self.control_points = control_points
        self.resolution = resolution
        
    @staticmethod
    def random(environment, num_control_points=10, resolution=100):
        """Generate a SplinePath with random control points within the environment."""
        control_points = np.random.rand(num_control_points, 2) * np.array([environment.width, environment.height])
        return SplinePath(environment, control_points, resolution)
    
    @staticmethod
    def from_list(environment, xy, resolution=100, normalized=False):
        """Create a SplinePath from a list of coordinates, optionally normalized to the environment size."""
        control_points = np.array(xy).reshape(-1, 2)
        if normalized:
            control_points[:, 0] *= environment.width
            control_points[:, 1] *= environment.height
        return SplinePath(environment, control_points, resolution)

    def get_path(self):
        """Compute the cubic spline path through the control points, adding start and goal."""
        start = self.environment.start
        goal = self.environment.goal
        points = np.vstack((start, self.control_points, goal))
        t = np.linspace(0, 1, len(points))
        cs = CubicSpline(t, points, bc_type='clamped')
        tt = np.linspace(0, 1, self.resolution)
        path = cs(tt)
        path = self.environment.clip_path(path)
        return path

In [5]:
def pathplaningcost(sol):
    path = sol.get_path()
    length = sol.environment.path_length(path)
    _, details = sol.environment.count_violations(path)
    cost = length
    
    START_VIOLATION_PENALTY = 1
    GOAL_VIOLATION_PENALTY = 1
    ENV_VIOLATION_PENALTY = 0.2
    COLLISION_VIOLATION = 1
    
    if details['start_violation']:
        cost += START_VIOLATION_PENALTY
    if details['goal_violation']:
        cost += GOAL_VIOLATION_PENALTY
    cost += details['environment_violation_count'] * ENV_VIOLATION_PENALTY
    cost += details['collision_violation_count'] * COLLISION_VIOLATION
    
    details['sol'] = sol  
    details['path'] = path
    details['length'] = length
    details['cost'] = cost

    return cost, details


def EnvCostFunction(environment, num_control_points, resolution):
    """Generate a cost function suitable for optimization given an environment, number of control points, and resolution."""
    def CostFunction(xy):
        sol = SplinePath.from_list(environment, xy, resolution, normalized=True)
        return pathplaningcost(sol)
    return CostFunction

In [6]:
def plot_path(sol, ax=None, **kwargs):
    """Plot the path derived from the solution on a matplotlib axis."""
    if ax is None:
        ax = plt.gca()  
    path = sol.get_path()
    path_line, = ax.plot(path[:,0], path[:,1], **kwargs)  
    return path_line

def update_path(sol, path_line):
    """Update an existing path line with a new path."""
    path = sol.get_path()
    path_line.set_xdata(path[:,0])  
    path_line.set_ydata(path[:,1])  
    fig = plt.gcf()  
    fig.canvas.draw()  
    fig.canvas.flush_events()
    


In [7]:
def plot_environment(environment, ax=None, obstacles_style={}, start_style={}, goal_style={}):
    """Plot the environment with obstacles, start, and goal points."""
    if ax is None:
        ax = plt.gca()  
    ax.set_aspect('equal', adjustable='box') 
    for obstacle in environment.obstacles:
        circle = plt.Circle(obstacle.center, obstacle.radius, **obstacles_style)
        ax.add_patch(circle)
    start = environment.start
    goal = environment.goal
    ax.plot(start[0], start[1], 's', **start_style)  
    ax.plot(goal[0], goal[1], 's', **goal_style)
    ax.set_xlim([0, environment.width])
    ax.set_ylim([0, environment.height])
    ax.grid(True)
    ax.set_xlabel('X Position')
    ax.set_ylabel('Y Position')

PSO

In [8]:
def create_solution(xy, environment):
    """
    Generate a spline path based on control points determined by a PSO solution.

    Args:
    xy (numpy.ndarray): The flattened array of control points' x and y coordinates.
    environment (Environment): The environment object containing start, goal, and bounds.

    Returns:
    SplinePath: An instance representing the cubic spline path through the control points.
    """
    
    num_control_points = len(xy) // 2

    
    control_points = np.array(xy).reshape(-1, 2)
    control_points[:, 0] *= environment.width  
    control_points[:, 1] *= environment.height  

    
    return SplinePath(environment, control_points, resolution=100)

def PSO(problem, ax, **kwargs):
    
    max_iter = kwargs.get('max_iter', 100)
    pop_size = kwargs.get('pop_size', 100)
    c1 = kwargs.get('c1', 2)
    c2 = kwargs.get('c2', 1)
    w = kwargs.get('w', 0.8)
    wdamp = kwargs.get('wdamp', 1)
    environment = kwargs['environment']
    resetting = kwargs.get('resetting', 25)

    
    plot_environment(environment, ax=ax)
    path_lines = [ax.plot([], [], alpha=0.3)[0] for _ in range(pop_size)]
    
    
    gbest = {'position': None, 'cost': float('inf')}
    population = []
    for _ in range(pop_size):
        position = np.random.uniform(problem['var_min'], problem['var_max'], problem['num_var'])
        cost, _ = problem['cost_function'](position)  
        particle = {
            'position': position,
            'velocity': np.zeros(problem['num_var']),
            'cost': cost,
            'best': {
                'position': position.copy(),
                'cost': cost
            }
        }
        population.append(particle)
        if cost < gbest['cost']:
            gbest['position'] = position.copy()
            gbest['cost'] = cost

    
    for it in range(max_iter):
        for i in range(pop_size):
            r1, r2 = np.random.rand(), np.random.rand()
            population[i]['velocity'] = w * population[i]['velocity'] + \
                                        c1 * r1 * (population[i]['best']['position'] - population[i]['position']) + \
                                        c2 * r2 * (gbest['position'] - population[i]['position'])
            population[i]['position'] += population[i]['velocity']
            population[i]['position'] = np.clip(population[i]['position'], problem['var_min'], problem['var_max'])

            new_cost, _ = problem['cost_function'](population[i]['position'])
            if new_cost < gbest['cost']:
                gbest['position'] = population[i]['position'].copy()
                gbest['cost'] = new_cost
                path = create_solution(gbest['position'], environment).get_path()
                ax.clear()
                plot_environment(environment, ax=ax) 
                ax.plot(path[:, 0], path[:, 1], 'r-', label='Best Path')  
                ax.legend()
                ax.set_title(f'Iteration {it+1}: Best Cost = {gbest["cost"]:.2f}')
                plt.draw()
                plt.pause(0.1)

        # Reducing inertia weight
        w *= wdamp

        if resetting and (it + 1) % resetting == 0:
            for i in range(pop_size):
                population[i]['position'] = np.random.uniform(problem['var_min'], problem['var_max'], problem['num_var'])
                population[i]['velocity'] = np.zeros(problem['num_var'])
                # Reevaluate the cost for resetting
                new_cost, _ = problem['cost_function'](population[i]['position'])
                population[i]['cost'] = new_cost
                population[i]['best']['position'] = population[i]['position'].copy()
                population[i]['best']['cost'] = new_cost
    plt.ioff()
    return gbest, population



In [9]:
env_params = {
    'width': 100,
    'height': 100,
    'robot_radius': 1,
    'start': [5,5],
    'goal': [95,95],
}
env = Environment(**env_params)


obstacles = [
    {'center': [20, 40], 'radius': 5},
    {'center': [30, 30], 'radius': 9},
    {'center': [15, 15], 'radius': 5},
    {'center': [50, 50], 'radius': 15},
    {'center': [85, 85], 'radius': 5},
    {'center': [80, 80], 'radius': 7},
    {'center': [80, 60], 'radius': 10},
    {'center': [80, 20], 'radius': 10},
    
]
for obs in obstacles:
    env.add_obstacle(Obstacle(**obs))


num_control_points = 5
resolution = 100
cost_function = EnvCostFunction(env, num_control_points, resolution)

problem = {
    'num_var': 2 * num_control_points,
    'var_min': 0,
    'var_max': 1,
    'cost_function': cost_function,
}


pso_params = {
    'max_iter': 100,
    'pop_size': 50,
    'c1': 2,
    'c2': 1,
    'w': 0.8,
    'wdamp': 1,
    'resetting': 25,
}

fig, ax = plt.subplots(figsize=(10, 10))
plot_environment(env, ax=ax)
pso_params.update({'environment': env})
best_solution, population = PSO(problem, ax=ax, **pso_params)
plt.show()
best_solution, population = PSO(problem, **pso_params)


