In [60]:
import numpy as np
from scipy.spatial import ConvexHull
from matplotlib.path import Path
import time

# Fitness Function

In [61]:
def fitness_repel(p_i, j, Dmin=2):
    """
    计算指定个体的COLLISION AVOIDANCE
    Input:
        p_i: 子种群，Np*2维
        j: 指定个体索引
    """
    distances = np.linalg.norm(p_i - p_i[j], axis=1)
    distances[j] = np.inf # 排除自身
    nnd = np.min(distances)
    return np.exp(2 * (Dmin - nnd)) if nnd < Dmin else 1
    
def fitness_closure(firsts, idx, p_ij, prey):
    """
    计算CLOSURE
    Input:
        firsts: 每个子种群首元素，Ns*2维
        idx：指定子中群索引
        p_ij: 第i个子种群第j个个体位置，2维
        prey: 猎物位置，2维
    """
    firsts[idx] = p_ij # 替换为虚拟robot
    try:
        hull = ConvexHull(firsts)
    except Exception as e:
        return 0
    hull_vertices = firsts[hull.vertices]
    hull_path = Path(hull_vertices)
    if hull_path.contains_point(prey, radius=-1e-10): # 判断是否在凸包内部
        return 1
    def is_point_on_segment(point, p1, p2):
        v1 = point - p1
        v2 = point - p2
        if np.cross(np.append(v1, [0]), np.append(v2, [0]))[-1] == 0:  # 点在直线上
            if v1[0] * v2[0] <= 0 and v1[1] * v2[1] <= 0:  # 点在线段上
                return True
        return False
    for i in range(len(hull_vertices)):
        p1 = hull_vertices[i]
        p2 = hull_vertices[(i+1) % len(hull_vertices)]
        if is_point_on_segment(prey, p1, p2): # 判断是否凸包边界上
            return 0.5
    return 0

def fitness_expanse(firsts, idx, p_ij, prey):
    """
    计算SWARM EXPANSE
    Input:
        firsts: 每个子种群首元素，Ns*2维
        p_ij: 第i个子种群第j个个体位置，2维
        prey: 猎物位置，2维
    """
    firsts[idx] = p_ij # 替换为虚拟robot
    return np.sum(np.linalg.norm(firsts - prey, axis=1)) / len(firsts)

def fitness_uniformity(firsts, idx, p_ij, prey):
    """
    计算UNIFORMITY
    Input:
        firsts: 每个子种群首元素，Ns*2维
        p_ij: 第i个子种群第j个个体位置，2维
        prey: 猎物位置，2维
    """ 
    firsts[idx] = p_ij # 替换为虚拟robot
    mask11 = (firsts[:, 0] <= prey[0]) & (firsts[:, 1] > prey[1])
    mask12 = (firsts[:, 0] > prey[0]) & (firsts[:, 1] > prey[1])
    mask21 = (firsts[:, 0] <= prey[0]) & (firsts[:, 1] <= prey[1])
    mask22 = (firsts[:, 0] > prey[0]) & (firsts[:, 1] <= prey[1])
    std = np.std(np.array([sum(mask11), sum(mask12), sum(mask21), sum(mask22)]))
    return std


# Iteration

In [None]:
def ccpso(Ns=20, Np=10, left_range=-100, right_range=100, vicinity = 2, Tv=10, time_lim=1000):
    
    start_time = time.time()
    p_robots = np.random.randint(left_range, right_range, (Ns, Np, 2)) # 初始种群
    p_individual = np.zeros((Ns, Np, 2), dtype=np.int32) # 历史最优位置
    p_group = np.zeros((Ns, 2), dtype=np.int32) # 组内最优位置
    v_robots = np.zeros((Ns, Np, 2)) # 速度
    f_robots = np.zeros((Ns, Np)) # 适应度
    f_individual = np.full((Ns, Np), np.inf) # 历史最优适应度
    f_group = np.full(Ns, np.inf) # 组内最优适应度
    
    prey = np.array([0, 0]) # 猎物位置
    
    SN = np.array([[1, 0], [1, 1], [0, 1], [-1, 1], [-1, 0], [-1, -1], [0, -1], [1, -1]])
    norm_factor = np.array([1, np.sqrt(2), 1, np.sqrt(2), 1, np.sqrt(2), 1, np.sqrt(2)])
    bound_angle = np.arctan2(SN[:, 1], SN[:, 0])
    
    # pso超参数
    w = 1
    c1 = 2
    c2 = 2
    
    def nnd(v):
        dot_products = np.dot(SN, v) / norm_factor
        index = np.argmax(dot_products)
        return SN[index]
        
    
    def evaluate(idx):
        for j in range(1, Np):
            r1 = np.random.rand() * c1
            r2 = np.random.rand() * c2
            v = w * v_robots[idx, j] + r1 * (p_individual[idx, j] - p_robots[idx, j]) + r2 * (p_group[idx] - p_robots[idx, j])
            v_robots[idx] = nnd(v)
            
            p_ij = p_robots[idx, j] + v_robots[idx, j]
            if np.linalg.norm(p_ij - p_robots[idx, 0]) > vicinity:
                real_angle = np.arctan2(p_ij[1] - p_robots[idx, 0, 1], p_ij[0] - p_robots[idx, 0, 0])
                v = np.argmin(np.abs(bound_angle - real_angle))
                p_robots[idx, j] = p_robots[idx, 0] + vicinity * SN[v]
            else:
                p_robots[idx, j] = p_ij
            
            f_repel = fitness_repel(p_robots[idx], j)
            f_closure = fitness_closure(p_robots[:, 0], idx, p_robots[idx, j], prey)
            f_expanse = fitness_expanse(p_robots[:, 0], idx, p_robots[idx, j], prey)
            f_uniformity = fitness_uniformity(p_robots[:, 0], idx, p_robots[idx, j], prey)
            f_robots[idx, j] = f_repel * (f_closure + f_expanse + f_uniformity)
            if f_robots[idx, j] < f_individual[idx, j]:
                f_individual[idx, j] = f_robots[idx, j]
                p_individual[idx, j] = p_robots[idx, j]
            if f_robots[idx, j] < f_group[idx]:
                f_group[idx] = f_robots[idx, j]
                p_group[idx] = p_robots[idx, j]
             
            
        
    p_robots = np.array([[[1., 2.], [3., 4.]], [[1., 2.], [5., 6.]], [[2., 1.], [3., 4.]]])
    
    while time.time() - start_time < time_lim:
        for i in range(Ns):
            # 评估每个虚拟robot
            evaluate(i)
            
            # 虚拟robot重叠时重置
            unique_cnt = np.unique(p_robots[i, 1:], axis=0).shape[0]
            if unique_cnt < Tv:
                p_robots[i, 1:] = np.random.randint(left_range, right_range, (Np - 1, 2))
                evaluate(i)
                
            # TODO: 更新真实robot速度、位置，计算适应度
            
            # TODO: 真实robot的fitness为subpopulation中最小时重置
            if ...:
                p_robots[i, 1:] = np.random.randint(left_range, right_range, (Np - 1, 2))
                evaluate(i)
            #  TODO: 死锁时添加随机扰动
            elif ...:
                p_robots[i, 0] += np.random.normal(loc=0, scale=10, size=2).astype(np.int32)
                break
        
        # TODO: 局部最优时添加随机扰动
        if ...:
            p_robots[:, 0] += np.random.normal(loc=0, scale=10, size=(Ns, 2)).astype(np.int32)
           
        
ccpso(3, 2, time_lim=0.5)