In [None]:
import heapq

class GreedyRobotCargoGA:
    def __init__(self, grid_size, robot_positions, cargo_positions, cargo_destinations, time_steps):
            self.grid_size = grid_size
            self.initial_robot_positions = robot_positions
            self.initial_cargo_positions = cargo_positions
            self.cargo_destinations = cargo_destinations
            self.time_steps = time_steps
            self.blocked_positions = set()  # 一般的阻擋位置
            self.locked_positions = {}   # 貨物鎖死後的阻擋位置
            self.robot_paths = []
            self.available_cargoes = set(range(len(cargo_positions)))
            self.robot_status = [0] * len(robot_positions)  # 跟踪每個機器人的任務完成時間
            self.robot_tasks = {}  # 儲存每個機器人正在處理的貨物
            self.cargo_positions = cargo_positions[:]  # 複製以允許修改

    def block_path(self, path, cargo_id=None):
        for i in range(len(path) - 1):
            x1, y1, t1 = path[i]
            x2, y2, t2 = path[i + 1]


            self.blocked_positions.add((x1, y1, t1))
            self.blocked_positions.add((x2, y2, t2))
            
            # 封鎖雙向路徑，無論有無貨物
            self.blocked_positions.add(((x1, y1), (x2, y2), t1))
            self.blocked_positions.add(((x2, y2), (x1, y1), t1))
            if (x1, y1, t1) == (x2, y2, t2):
                # 上 (y1-1)
                self.blocked_positions.add(((x1, y1), (x1, y1-1), t1))
                self.blocked_positions.add(((x1, y1-1), (x1, y1), t1))

                # 下 (y1+1)
                self.blocked_positions.add(((x1, y1), (x1, y1+1), t1))
                self.blocked_positions.add(((x1, y1+1), (x1, y1), t1))

                # 左 (x1-1)
                self.blocked_positions.add(((x1, y1), (x1-1, y1), t1))
                self.blocked_positions.add(((x1-1, y1), (x1, y1), t1))

                # 右 (x1+1)
                self.blocked_positions.add(((x1, y1), (x1+1, y1), t1))
                self.blocked_positions.add(((x1+1, y1), (x1, y1), t1))


    def is_valid(self, x, y, t, prev_x=None, prev_y=None):
        # 確認座標是否在網格內
        if not (0 <= x < self.grid_size and 0 <= y < self.grid_size):
            return False

        # 檢查兩點之間的路徑是否被封鎖

        if prev_x is not None and prev_y is not None:
            if (((prev_x, prev_y), (x, y), t - 1) in self.blocked_positions or 
                ((x, y), (prev_x, prev_y), t - 1) in self.blocked_positions):
                return False
        if prev_x is not None and prev_y is not None:
            if (((prev_x, prev_y), (x, y), t) in self.blocked_positions or 
                ((x, y), (prev_x, prev_y), t) in self.blocked_positions):
                return False
        if prev_x is not None and prev_y is not None:
            if (((prev_x, prev_y), (x, y), t+1) in self.blocked_positions or 
                ((x, y), (prev_x, prev_y), t+1) in self.blocked_positions):
                return False


        # 檢查該點是否在時間t時被阻擋
        if (x, y, t) in self.blocked_positions:
            return False

        # 檢查該位置是否為已鎖定的貨物目的地
        if (x, y,t) in self.locked_positions:
            return False
        
        for robot_idx, cargo_idx in self.robot_tasks.items():
            cargo_pos = self.cargo_positions[cargo_idx]
            # 檢查該貨物是否在機器人運輸路徑上
            if cargo_pos == (x, y) and self.robot_status[robot_idx] > t:
                return False  # 如果有貨物在這裡且它正在被搬運，則這個位置是障礙物


        # 確認該時間點是否有其他機器人在此位置
        for robot_idx, path in self.robot_paths:
            for (rx, ry, rt) in path:
                if rt == t and (rx, ry) == (x, y):
                    return False  # 該位置被其他機器人佔據

        return True
    def neighbors(self, x, y, t):
        for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
            nx, ny = x + dx, y + dy
            if self.is_valid(nx, ny, t + 1, x, y):
                yield nx, ny, t + 1

    def shortest_path(self, start, goal, start_time=0):
        queue = [(0, start[0], start[1], start_time, [])]  # (cost, x, y, time, path)
        visited = set()
        while queue:
            cost, x, y, t, path = heapq.heappop(queue)
            if (x, y, t) in visited:
                continue
            visited.add((x, y, t))
            path = path + [(x, y, t)]
            if (x, y) == goal:
                return cost, path
            for nx, ny, nt in self.neighbors(x, y, t):
                heapq.heappush(queue, (cost + 1, nx, ny, nt, path))
        return float('inf'), []
    
    def is_locked(self, x, y, current_time):
        """檢查位置是否已經鎖定，僅當當前時間超過鎖定時間時才真正鎖定"""
        if (x, y) in self.locked_positions:
            lock_time = self.locked_positions[(x, y)]
            if current_time >= lock_time:

                return True
            else:

                return False
        return False  # 如果位置不在鎖定列表中，則視為未鎖定

    def lock_position(self, x, y, lock_time):
        """在指定時間後鎖定已運送完成的貨物位置，防止其他機器人干擾"""
        self.locked_positions[(x, y)] = lock_time  # 位置和鎖定時間


    def find_nearest_empty_position(self, start_pos):
        min_dist = float('inf')
        nearest_pos = None
        for x in range(self.grid_size):
            for y in range(self.grid_size):
                pos = (x, y)
                if pos not in self.cargo_positions and (x, y) not in self.blocked_positions:
                    dist, _ = self.shortest_path(start_pos, pos)
                    if dist < min_dist:
                        min_dist = dist
                        nearest_pos = pos
        return nearest_pos

    def move_blocking_cargo(self, robot_idx, start_pos, cargo_id):
        # 獲取目標貨物的目的地
        target_cargo_pos = self.cargo_positions[cargo_id]
        target_cargo_dest = self.cargo_destinations[cargo_id]

        # 獲取到達目的地的路徑
        current_time = self.robot_status[robot_idx]
        
        path_to_target = self.shortest_path(start_pos, target_cargo_pos,current_time)[1]
        total_path = [] 
        path_to_dest = self.shortest_path(target_cargo_pos, target_cargo_dest,current_time)[1]
        total_path.extend([(x, y, t) for (x, y, t) in path_to_target])
        current_time += len(path_to_target)
        block = False        # 檢查路徑上的障礙物
        for x, y, t in path_to_dest:
            
            if (x, y) in self.cargo_positions and (x, y) != self.cargo_positions[cargo_id]:

                block = True
                blocking_cargo_idx = self.cargo_positions.index((x, y))  # 獲取阻擋的貨物索引
                path_to_block = self.shortest_path(target_cargo_pos, (x, y),current_time)[1]

                total_path.extend([(x, y, t) for (x, y, t) in path_to_block])
                current_time += len(path_to_block)
                # 尋找空位
                empty_pos = self.find_nearest_empty_position((x, y))
                if empty_pos:
                    # 計算從阻擋貨物位置到空位的路徑
                    path_to_empty = self.shortest_path((x, y), empty_pos,current_time)[1]
                    path_to_empty = path_to_empty[1:]
                    #self.block_path(path_to_empty)
                    total_path.extend([(x, y, t-1) for (x, y, t) in path_to_empty])
                    current_time += len(path_to_empty)
                    

                    # 更新貨物位置
                    self.cargo_positions[blocking_cargo_idx] = empty_pos
                    self.update_cargo_positions()
                    
                    # 更新回到原來貨物位置的路徑
                    path_back_to_cargo = self.shortest_path(empty_pos,(x, y),current_time)[1]
                    path_back_to_cargo = path_back_to_cargo[1:]
                    #self.block_path(path_back_to_cargo)

                    total_path.extend([(x, y, t-1) for (x, y, t) in path_back_to_cargo])
                    current_time += len(path_back_to_cargo)
                    place = (x,y) 
        if block and empty_pos:
            path_back_to_cargo = self.shortest_path(place,target_cargo_pos,current_time)[1]
            path_back_to_cargo = path_back_to_cargo[1:]
            #self.block_path(path_back_to_cargo)

            
            total_path.extend([(x, y, t-1) for (x, y, t) in path_back_to_cargo])
            current_time += len(path_back_to_cargo) 
        # 在最終路徑上進行搬移貨物的操作
        path_to_dest = self.shortest_path(target_cargo_pos, target_cargo_dest,current_time)[1]
        path_to_dest = path_to_dest[1:]
        total_path.extend([(x, y, t-1) for (x, y, t) in path_to_dest])
        current_time += len(path_to_dest)
        # 更新機器人的狀態

        self.robot_status[robot_idx] += len(total_path)

        self.block_path(total_path)

        return total_path

    def allocate_tasks(self, assignments):
        time = 0  # 初始化時間
        
        task_assignment = []  # 紀錄任務分配和路徑
        available_robots = list(range(len(self.initial_robot_positions)))  # 可用的機器人索引列表
        self.available_cargoes = {cargo_idx for cargo_idx in self.available_cargoes 
                                      if self.cargo_destinations[cargo_idx] != (float('inf'), float('inf'))}

        while time < self.time_steps and assignments:
            # 使用副本迭代以避免修改問題
            for (robot_idx, cargo_idx) in assignments[:]:  
                # 檢查機器人是否可以執行任務且當前時間允許
                if time >= self.robot_status[robot_idx]:
                    cargo_pos = self.cargo_positions[cargo_idx]
                    cargo_dest = self.cargo_destinations[cargo_idx]

                    # 移動阻擋的貨物並計算路徑
                    blocking_path = self.move_blocking_cargo(robot_idx, self.initial_robot_positions[robot_idx], cargo_idx)

                    # 將機器人路徑與阻擋路徑合併
                    robot_path = blocking_path

                    # 如果機器人路徑穿過貨物位置或目標位置，跳過此任務
                    if self.path_passes_through(robot_path, robot_idx, cargo_pos, cargo_dest):
                        continue
                    # 更新貨物和機器人狀態
                    self.available_cargoes.remove(cargo_idx)  # 移除完成的貨物
                    assignments.remove((robot_idx, cargo_idx))  # 移除已分配的任務
                    available_robots.remove(robot_idx)  # 標記機器人為不可用直到任務完成
                    if robot_path:
                        robot_pos = robot_path[-1]  # 機器人的最終位置
                        self.initial_robot_positions[robot_idx] = cargo_dest
                        self.cargo_positions[cargo_idx] = robot_pos
                        self.lock_position(cargo_dest[0], cargo_dest[1], robot_path[-1][2])  # 鎖定目的地
                        self.robot_paths.append((f"R{robot_idx + 1}", robot_path))

                        # 紀錄任務分配和路徑
                        task_assignment.append((robot_idx, cargo_idx, robot_path))  # 將機器人與貨物和路徑對應
                       
                        break  # 任務分配完成，退出循環

            # 增加時間步
            time += 1

            # 將完成任務的機器人標記為可用
            for robot_idx in range(len(self.robot_status)):
                if time >= self.robot_status[robot_idx] and robot_idx not in available_robots:
                    available_robots.append(robot_idx)  # 標記機器人為可用
        
        return task_assignment  # 返回 (robot_idx, cargo_idx, path) 列表




    def path_passes_through(self, path, current_robot_idx,cargo_pos,cargo_dest):
        """檢查路徑是否經過其他機器人的目標位置"""
        # 將可用貨物集合轉換為列表
        available_cargos_list = list(self.available_cargoes)


        # 檢查當前路徑的所有位置
        path_positions = set((x, y) for x, y, t in path)

        # 檢查是否有其他機器人的路徑會經過當前機器人的目標位置
        for robot_name, robot_path in self.robot_paths:
            if robot_name != f"R{current_robot_idx + 1}":  # 忽略當前機器人的路徑
                for (x, y, t) in robot_path:
                    if (x, y) == cargo_dest and t >= self.robot_status[current_robot_idx]-1:  # 確保時間大於當前時間
                        current_time = self.robot_status[current_robot_idx]
                        return True  # 檢測到經過其他機器人的目標位置
         
        

        return False  # 沒有經過


    def update_cargo_positions(self):
        new_positions = list(self.cargo_positions)

        for robot_idx, cargo_idx in self.robot_tasks.items():
            robot_pos = self.initial_robot_positions[robot_idx]

            if cargo_idx in self.cargo_positions:
                if self.cargo_positions[cargo_idx] == robot_pos:
                    new_positions[cargo_idx] = robot_pos

        self.cargo_positions = new_positions
        
    def print_grids(self):
        max_time = max((t for _, path in self.robot_paths for (_, _, t) in path), default=0)
        for t in range(max_time + 1):
            grid = [["." for _ in range(self.grid_size)] for _ in range(self.grid_size)]
            for (robot_idx, path) in self.robot_paths:
                for (y, x, time) in path:
                    if time == t:
                        grid[y][x] = robot_idx
            print(f"Time {t}:")
            for row in grid:
                print(" ".join(row))
            print()



In [None]:
import random
import time
import copy
import numpy as np

class PSOStyleGA:
    def __init__(self, population_size, num_robots, num_cargos, grid_size, robot_positions, cargo_positions, cargo_destinations, time_steps, generations, inertia=0.5, c1=1.5, c2=1.5):
        self.population_size = population_size
        self.num_robots = num_robots
        self.num_cargos = num_cargos
        self.grid_size = grid_size
        self.robot_positions = robot_positions
        self.cargo_positions = cargo_positions
        self.cargo_destinations = cargo_destinations
        self.time_steps = time_steps
        self.generations = generations
        self.inertia = inertia
        self.c1 = c1
        self.c2 = c2

    def initialize_population(self):
        # 隨機初始化每個粒子的權重和速度
        population = [np.random.rand(self.num_robots, self.num_cargos) for _ in range(self.population_size)]
        velocities = [np.random.rand(self.num_robots, self.num_cargos) * 0.1 for _ in range(self.population_size)]
        return population, velocities

    def allocate_tasks_based_on_weights(self, weights):
        # 同之前的方法，分配每個機器人基於權重的貨物任務
        available_cargoes = [
            idx for idx in range(len(self.cargo_positions))
            if self.cargo_destinations[idx] != (float('inf'), float('inf'))
        ]

        assignments = []
        robot_positions_copy = copy.deepcopy(self.robot_positions)
        cargo_positions_copy = copy.deepcopy(self.cargo_positions)
        cargo_destinations_copy = copy.deepcopy(self.cargo_destinations)
        while available_cargoes:
            for robot_idx in range(len(self.robot_positions)):
                cargo_preferences = sorted(
                    available_cargoes, 
                    key=lambda c: weights[robot_idx][c], 
                    reverse=True
                )
                for cargo_idx in cargo_preferences:
                    if cargo_idx in available_cargoes:
                        assignments.append((robot_idx, cargo_idx))
                        available_cargoes.remove(cargo_idx)
                        break
                if not available_cargoes:
                    break
        return assignments

    def fitness(self, weights):
        assignments = self.allocate_tasks_based_on_weights(weights)
        simulation = GreedyRobotCargoGA(self.grid_size, self.robot_positions, self.cargo_positions, self.cargo_destinations, self.time_steps)
        task_result = simulation.allocate_tasks(assignments)
        if not task_result:
            return float('inf')
        
        robot_times = {}
        for robot_idx, cargo_idx, path in task_result:
            path_length = len(path)
            robot_times[robot_idx] = robot_times.get(robot_idx, 0) + path_length
        
        return max(robot_times.values(), default=0)

    def run(self):
        start_time = time.time()
        population, velocities = self.initialize_population()
        best=1000
        personal_best_positions = population.copy()
        personal_best_scores = [self.fitness(weights) for weights in population]
        global_best_position = personal_best_positions[np.argmin(personal_best_scores)]
        global_best_score = min(personal_best_scores)

        for generation in range(self.generations):
            for i in range(self.population_size):
                # 更新速度
                fitnesses = [self.fitness(weights) for weights in population]
                velocities[i] = (self.inertia * velocities[i] + 
                                 self.c1 * random.random() * (personal_best_positions[i] - population[i]) + 
                                 self.c2 * random.random() * (global_best_position - population[i]))

                # 更新位置
                population[i] = population[i] + velocities[i]
                
                
                
                population[i] = np.clip(population[i], 0, 1)
                # 計算新的適應度
                
                current_score = self.fitness(population[i])

                # 更新個體最佳位置
                if current_score < personal_best_scores[i]:
                    personal_best_positions[i] = population[i]
                    personal_best_scores[i] = current_score

                    # 更新全局最佳位置
                    if current_score < global_best_score:
                        global_best_position = population[i]
                        global_best_score = current_score
                best_fitness = min(fitnesses)
                if best_fitness < best:
                    best = best_fitness


        end_time = time.time()
        execution_time = end_time - start_time


        return {"runtime": f"{execution_time:.2f}", "obj": best_fitness}

In [None]:
import os
import pandas as pd
import re
import time

def parse_files(start_file, dest_file):
    with open(start_file, 'r') as start, open(dest_file, 'r') as dest:
        start_lines = start.readlines()
        dest_lines = dest.readlines()

    # 讀取機器人數量，假設第二行包含機器人數量
    num_robots = int(start_lines[1].strip())  # 從第二行讀取機器人數量
    
    # 偵測機器人位置，機器人位置從第三行開始
    robot_positions = []
    for i in range(2, 2 + num_robots):  # 從第三行開始讀取機器人位置
        y, x = map(int, start_lines[i].strip().split(','))
        robot_positions.append((y, x))

    # 解析起始和目標網格
    start_grid = [list(map(int, line.strip().split(','))) for line in start_lines[2 + num_robots:]]  # 跳過機器人數量和位置
    dest_grid = [list(map(int, line.strip().split(','))) for line in dest_lines[1:]]
    
    cargo_positions = {}
    cargo_destinations = {}
    obstacles = []

    # 解析起始網格的貨物與障礙物
    for y, row in enumerate(start_grid):
        for x, cell in enumerate(row):
            if cell > 0:  # 貨物
                cargo_positions[cell] = (y, x)
            elif cell == -1:  # 障礙物
                obstacles.append((y, x))

    # 解析目標網格的貨物
    for y, row in enumerate(dest_grid):
        for x, cell in enumerate(row):
            if cell > 0:  # 貨物
                cargo_destinations[cell] = (y, x)

    # 排序貨物位置和目的地
    sorted_cargo_positions = [cargo_positions[i] for i in sorted(cargo_positions)]
    sorted_cargo_destinations = [cargo_destinations[i] for i in sorted(cargo_destinations)]

    # 將障礙物加入到貨物位置中
    sorted_cargo_positions.extend(obstacles)
    sorted_cargo_destinations.extend([(float('inf'), float('inf'))] * len(obstacles))

    grid_size = len(start_grid)

    print("機器人位置:", robot_positions)
    print("貨物位置:", sorted_cargo_positions)
    print("貨物目的地:", sorted_cargo_destinations)
    
    return grid_size, robot_positions, sorted_cargo_positions, sorted_cargo_destinations


def process_all_data(base_dir):
    results = []
    start_folder = os.path.join(base_dir, "start")
    end_folder = os.path.join(base_dir, "end")
    
    if not (os.path.exists(start_folder) and os.path.exists(end_folder)):
        return results

    for start_file in os.listdir(start_folder):
        if not start_file.endswith('.txt') or not start_file.startswith('graph_start_'):
            continue

        corresponding_end_file = start_file.replace("graph_start_", "graph_end_")
        start_file_path = os.path.join(start_folder, start_file)
        end_file_path = os.path.join(end_folder, corresponding_end_file)

        if not os.path.exists(end_file_path):
            print(f"End file not found for: {start_file}")
            continue
        print(start_file_path)
        grid_size, robot_positions, cargo_positions, cargo_destinations = parse_files(start_file_path, end_file_path)

        time_steps = 100
        time_steps = 100  
        population_size = 10
        generations = 50
        cargo_list = list(range(len(cargo_positions)))  # 貨物的索引列表
        num_robots = len(robot_positions)
        num_cargos = len(cargo_positions)

        ga = PSOStyleGA(population_size, num_robots, num_cargos, grid_size, robot_positions, cargo_positions, cargo_destinations, time_steps, generations)
        result = ga.run()

        file_name = os.path.basename(start_file)
        match = re.match(r"graph_start_(\d+x\d+)_tar(\d+)_sp(\d+)_(\d+).txt", file_name)

        if match:
            grid_size_str, tar_num, sp_num, test_num = match.groups()
            results.append({
                "n_tar": int(tar_num),
                "n_spa": int(sp_num),
                "id": int(test_num),
                "runtime": result["runtime"],
                "obj": result["obj"],
            })

    return results

def main():
    base_dir = r"C:\\Users\\mirac\\3x3"
    output_excel = r"C:\\Users\\mirac\\output.xlsx"
    tar_folders = ["tar1", "tar2", "tar3", "tar4", "tar5", "tar6", "tar8"]

    all_results = []

    for tar in tar_folders:
        folder_path = os.path.join(base_dir, tar)
        print(f"Processing folder: {folder_path}")
        results = process_all_data(folder_path)
        if results:
            all_results.extend(results)

    if all_results:
        df = pd.DataFrame(all_results)
        df.to_excel(output_excel, index=False)
        print(f"Results saved to {output_excel}")
    else:
        print("No results to save.")

if __name__ == "__main__":
    main()
