### encoding.py

In [7]:
import random
import numpy as np

def generate_individual(num_tasks, num_robots):
    tasks = list(range(num_tasks))
    random.shuffle(tasks)

    # Evenly distribute tasks (some robots may have 1 more/less)
    splits = np.array_split(tasks, num_robots)
    return [list(s) for s in splits]

# def generate_individual(num_tasks, num_robots):
#     tasks = list(range(num_tasks))
#     random.shuffle(tasks)

#     # Generate random split sizes that sum to num_tasks
#     split_sizes = [0] * num_robots
#     for _ in range(num_tasks):
#         split_sizes[random.randint(0, num_robots - 1)] += 1

#     # Distribute tasks accordingly
#     individual = []
#     index = 0
#     for size in split_sizes:
#         individual.append(tasks[index:index+size])
#         index += size

#     return individual


def flatten_individual(individual):
    """Flatten robot-wise task list with -1 as separator"""
    flat = []
    for robot_tasks in individual:
        flat += robot_tasks + [-1]
    return flat[:-1]  # remove last separator

def unflatten_individual(flattened):
    """Convert flattened form back to list-of-lists"""
    robots = []
    temp = []
    for t in flattened:
        if t == -1:
            robots.append(temp)
            temp = []
        else:
            temp.append(t)
    if temp:
        robots.append(temp)
    return robots


#### Test Code

In [8]:
if __name__ == "__main__":
    num_tasks = 20
    num_robots = 6

    individual = generate_individual(num_tasks, num_robots)
    print("Generated Individual:", individual)

    flat = flatten_individual(individual)
    print("Flattened:", flat)

    restored = unflatten_individual(flat)
    print("Restored:", restored)

    assert sorted([t for r in individual for t in r]) == list(range(num_tasks)), "Missing tasks"
    assert individual == restored, "Unflatten failed!"


Generated Individual: [[9, 16, 0, 14], [7, 3, 12, 11], [5, 6, 18], [13, 10, 15], [17, 4, 8], [1, 19, 2]]
Flattened: [9, 16, 0, 14, -1, 7, 3, 12, 11, -1, 5, 6, 18, -1, 13, 10, 15, -1, 17, 4, 8, -1, 1, 19, 2]
Restored: [[9, 16, 0, 14], [7, 3, 12, 11], [5, 6, 18], [13, 10, 15], [17, 4, 8], [1, 19, 2]]


### evaluation.py

In [9]:
def evaluate_individual(individual, drl_planner):
    """
    Args:
        individual: list of task sequences (one per robot)
        drl_planner: function(robot_id, task_seq) -> path_length

    Returns:
        f1 = total path length (sum of all robot paths)
        f2 = max path length (time taken to complete all tasks)
    """
    path_lengths = []

    for robot_id, task_seq in enumerate(individual):
        length = drl_planner(robot_id, task_seq)
        path_lengths.append(length)

    f1 = sum(path_lengths)
    f2 = max(path_lengths)
    return f1, f2

def fake_drl_planner(robot_id, task_seq):
    # Dummy logic: each task adds 10 units of path
    return len(task_seq) * 10


In [10]:

if __name__ == "__main__":
    # from encoding import generate_individual

    ind = generate_individual(num_tasks=20, num_robots=6)
    print("Individual:", ind)

    f1, f2 = evaluate_individual(ind, fake_drl_planner)
    print("Total Path Length (f1):", f1)
    print("Time Taken (f2):", f2)


Individual: [[13, 9, 18, 16], [3, 11, 5, 4], [14, 12, 7], [10, 6, 19], [15, 17, 8], [2, 0, 1]]
Total Path Length (f1): 200
Time Taken (f2): 40


### clustering.py

In [11]:
from sklearn.cluster import KMeans
import numpy as np

def cluster_population(flattened_population, num_clusters):
    """
    Args:
        flattened_population: list of flattened task sequences (1D list with -1 separators)
        num_clusters: int, number of clusters for k-means

    Returns:
        labels: list of cluster IDs for each individual
    """
    # Pad with zeros to equal length for k-means input
    max_len = max(len(ind) for ind in flattened_population)
    padded = [ind + [0] * (max_len - len(ind)) for ind in flattened_population]

    X = np.array(padded)
    kmeans = KMeans(n_clusters=num_clusters, n_init='auto', random_state=42)
    labels = kmeans.fit_predict(X)
    return labels


In [12]:
if __name__ == "__main__":
    # from encoding import generate_individual, flatten_individual

    num_tasks = 20
    num_robots = 6
    population_size = 10
    population = [generate_individual(num_tasks, num_robots) for _ in range(population_size)]
    flat_population = [flatten_individual(ind) for ind in population]

    cluster_labels = cluster_population(flat_population, num_clusters=3)

    for i, (ind, label) in enumerate(zip(population, cluster_labels)):
        print(f"Individual {i}: Cluster {label} => {ind}")


Individual 0: Cluster 1 => [[15, 0, 4, 12], [2, 6, 18, 8], [1, 11, 3], [5, 13, 9], [17, 16, 19], [14, 10, 7]]
Individual 1: Cluster 2 => [[4, 0, 16, 12], [14, 7, 8, 13], [15, 2, 11], [3, 17, 6], [10, 18, 9], [1, 5, 19]]
Individual 2: Cluster 0 => [[15, 18, 0, 19], [6, 9, 17, 4], [2, 13, 14], [5, 7, 12], [11, 10, 3], [16, 1, 8]]
Individual 3: Cluster 0 => [[0, 7, 5, 8], [9, 3, 19, 18], [6, 15, 10], [16, 1, 12], [11, 14, 2], [17, 13, 4]]
Individual 4: Cluster 2 => [[13, 2, 14, 18], [17, 16, 6, 19], [9, 15, 4], [10, 5, 8], [11, 1, 3], [0, 7, 12]]
Individual 5: Cluster 1 => [[13, 14, 7, 9], [5, 12, 2, 1], [19, 15, 8], [11, 16, 17], [10, 4, 6], [0, 18, 3]]
Individual 6: Cluster 2 => [[1, 10, 12, 8], [9, 13, 5, 7], [2, 0, 18], [19, 17, 15], [6, 4, 3], [14, 11, 16]]
Individual 7: Cluster 2 => [[5, 12, 3, 15], [2, 17, 4, 7], [14, 1, 9], [6, 8, 18], [16, 11, 19], [10, 0, 13]]
Individual 8: Cluster 2 => [[9, 8, 15, 17], [3, 4, 7, 16], [12, 1, 13], [2, 18, 0], [19, 14, 6], [5, 11, 10]]
Individual

### cscd.py

In [13]:
import numpy as np
from collections import defaultdict

def normalize_vectors(vectors):
    """Normalize vectors to [0, 1] range dimension-wise."""
    array = np.array(vectors, dtype=np.float32)
    min_vals = np.min(array, axis=0)
    max_vals = np.max(array, axis=0)
    denom = np.where(max_vals - min_vals == 0, 1, max_vals - min_vals)
    return (array - min_vals) / denom


#### Decision Space Crowding Distance 𝐶𝐷𝑥


In [14]:
def compute_decision_space_crowding(flattened_population, cluster_labels):
    cluster_map = defaultdict(list)
    for i, label in enumerate(cluster_labels):
        cluster_map[label].append(i)

    crowding_x = np.zeros(len(flattened_population))

    for cluster_id, indices in cluster_map.items():
        if len(indices) <= 2:
            # Boundary or singleton cluster → assign high crowding
            for idx in indices:
                crowding_x[idx] = float('inf')
            continue

        cluster_vectors = [flattened_population[i] for i in indices]
        max_len = max(len(v) for v in cluster_vectors)
        padded = [v + [0] * (max_len - len(v)) for v in cluster_vectors]
        norm_vectors = normalize_vectors(padded)

        # Calculate crowding for each dimension
        cluster_crowding = np.zeros(len(indices))
        for d in range(norm_vectors.shape[1]):
            sorted_idx = np.argsort(norm_vectors[:, d])
            cluster_crowding[sorted_idx[0]] = float('inf')
            cluster_crowding[sorted_idx[-1]] = float('inf')
            for j in range(1, len(indices) - 1):
                prev_val = norm_vectors[sorted_idx[j - 1], d]
                next_val = norm_vectors[sorted_idx[j + 1], d]
                cluster_crowding[sorted_idx[j]] += (next_val - prev_val)

        for i, idx in enumerate(indices):
            crowding_x[idx] = cluster_crowding[i]

    return crowding_x.tolist()


#### Objective Space Crowding Distance 𝐶𝐷𝑓

In [15]:
def compute_objective_space_crowding(objective_values, front_labels):
    front_map = defaultdict(list)
    for i, front in enumerate(front_labels):
        front_map[front].append(i)

    crowding_f = np.zeros(len(objective_values))

    for front_id, indices in front_map.items():
        if len(indices) <= 2:
            for idx in indices:
                crowding_f[idx] = float('inf')
            continue

        front_objs = [objective_values[i] for i in indices]
        norm_objs = normalize_vectors(front_objs)

        front_crowding = np.zeros(len(indices))
        for d in range(norm_objs.shape[1]):
            sorted_idx = np.argsort(norm_objs[:, d])
            front_crowding[sorted_idx[0]] = float('inf')
            front_crowding[sorted_idx[-1]] = float('inf')
            for j in range(1, len(indices) - 1):
                prev_val = norm_objs[sorted_idx[j - 1], d]
                next_val = norm_objs[sorted_idx[j + 1], d]
                front_crowding[sorted_idx[j]] += (next_val - prev_val)

        for i, idx in enumerate(indices):
            crowding_f[idx] = front_crowding[i]

    return crowding_f.tolist()


#### Combine into CSCD

In [16]:
def compute_cscd(crowding_x, crowding_f):
    finite_cx = [v for v in crowding_x if not np.isinf(v)]
    finite_cf = [v for v in crowding_f if not np.isinf(v)]

    avg_x = np.mean(finite_cx) if finite_cx else 0
    avg_f = np.mean(finite_cf) if finite_cf else 0

    cscd = []
    for cx, cf in zip(crowding_x, crowding_f):
        if cx > avg_x or cf > avg_f:
            cscd.append(max(cx, cf))
        else:
            cscd.append(min(cx, cf))
    return cscd


#### Full Interface: compute_cscd_scores

In [17]:
def compute_cscd_scores(flattened_population, objective_values, cluster_labels, front_labels):
    crowding_x = compute_decision_space_crowding(flattened_population, cluster_labels)
    crowding_f = compute_objective_space_crowding(objective_values, front_labels)
    cscd = compute_cscd(crowding_x, crowding_f)
    return cscd

In [18]:
if __name__ == "__main__":
    # from encoding import generate_individual, flatten_individual

    pop_size = 10
    num_tasks = 15
    num_robots = 3

    population = [generate_individual(num_tasks, num_robots) for _ in range(pop_size)]
    flat = [flatten_individual(ind) for ind in population]
    objectives = [(i, 100 - i) for i in range(pop_size)]  # Dummy values
    clusters = [i % 3 for i in range(pop_size)]  # Dummy clusters
    fronts = [i // 3 for i in range(pop_size)]   # Dummy fronts

    cscd_scores = compute_cscd_scores(flat, objectives, clusters, fronts)
    for i, score in enumerate(cscd_scores):
        print(f"Ind {i}: CSCD = {score:.3f}")


Ind 0: CSCD = inf
Ind 1: CSCD = inf
Ind 2: CSCD = inf
Ind 3: CSCD = inf
Ind 4: CSCD = inf
Ind 5: CSCD = inf
Ind 6: CSCD = inf
Ind 7: CSCD = inf
Ind 8: CSCD = inf
Ind 9: CSCD = inf


### nsga2.py

In [19]:
# nsga2.py
def dominates(a, b):
    return all(x <= y for x, y in zip(a, b)) and any(x < y for x, y in zip(a, b))

def assign_fronts(objective_values):
    N = len(objective_values)
    fronts = [None] * N
    domination_counts = [0] * N
    dominated_sets = [[] for _ in range(N)]
    current_front = []

    for i in range(N):
        for j in range(N):
            if i == j:
                continue
            if dominates(objective_values[i], objective_values[j]):
                dominated_sets[i].append(j)
            elif dominates(objective_values[j], objective_values[i]):
                domination_counts[i] += 1
        if domination_counts[i] == 0:
            fronts[i] = 0
            current_front.append(i)

    front = 0
    while current_front:
        next_front = []
        for p in current_front:
            for q in dominated_sets[p]:
                domination_counts[q] -= 1
                if domination_counts[q] == 0:
                    fronts[q] = front + 1
                    next_front.append(q)
        front += 1
        current_front = next_front

    return fronts


### DBESM

In [20]:
import numpy as np
# from encoding import unflatten_individual, flatten_individual
import random

def euclidean_distance(a, b):
    max_len = max(len(a), len(b))
    a = a + [0] * (max_len - len(a))
    b = b + [0] * (max_len - len(b))
    return np.linalg.norm(np.array(a) - np.array(b))


In [21]:
def select_exemplar(index, flattened_pop, front_ranks, cscd_scores, F=0.5):
    current_rank = front_ranks[index]
    current_vector = flattened_pop[index]

    # Step 1: Find candidates from better fronts
    candidates = [i for i, r in enumerate(front_ranks) if r < current_rank]

    if not candidates:
        return index  # fallback: no better fronts

    # Step 2: Among those, pick top-CSCD elites (top 50%)
    cscd_vals = [cscd_scores[i] for i in candidates]
    threshold = np.percentile(cscd_vals, 50)
    elites = [i for i in candidates if cscd_scores[i] >= threshold]

    if not elites:
        elites = candidates  # fallback: use all candidates

    # Step 3: Select the closest elite in decision space
    distances = [euclidean_distance(current_vector, flattened_pop[i]) for i in elites]
    exemplar_idx = elites[np.argmin(distances)]
    return exemplar_idx


In [22]:
def generate_offspring(parent, exemplar, mutation_prob=0.3):
    """
    Args:
        parent, exemplar: both are list-of-lists task sequences (robot-wise)

    Returns:
        new_individual: mutated child
    """
    num_robots = len(parent)
    tasks = set(t for r in parent for t in r)
    
    # Flatten exemplar & parent
    ex_flat = [t for r in exemplar for t in r]
    ex_flat = [t for t in ex_flat if t in tasks]  # ensure same task set

    # Crossover: start from exemplar
    new_seq = ex_flat.copy()

    # Mutation: randomly swap some tasks
    if random.random() < mutation_prob:
        i, j = random.sample(range(len(new_seq)), 2)
        new_seq[i], new_seq[j] = new_seq[j], new_seq[i]

    # Redistribute to robots (uneven split)
    splits = [0] * num_robots
    for _ in new_seq:
        splits[random.randint(0, num_robots - 1)] += 1

    robot_seq = []
    idx = 0
    for s in splits:
        robot_seq.append(new_seq[idx:idx+s])
        idx += s

    return robot_seq



In [23]:
def dbesm_selection(population, flattened_pop, front_ranks, cscd_scores):
    """
    Returns: new population (offspring) using DBESM logic
    """
    new_pop = []

    for i in range(len(population)):
        exemplar_idx = select_exemplar(i, flattened_pop, front_ranks, cscd_scores)
        exemplar = unflatten_individual(flattened_pop[exemplar_idx])
        offspring = generate_offspring(population[i], exemplar)
        new_pop.append(offspring)

    return new_pop


In [30]:
if __name__ == "__main__":
    # from encoding import generate_individual, flatten_individual
    # from cscd import compute_cscd_scores
    # from clustering import cluster_population
    # from evaluation import evaluate_individual
    # from nsga2 import assign_fronts

    pop_size = 50
    num_tasks = 20
    num_robots = 4
    num_clusters = 3

    population = [generate_individual(num_tasks, num_robots) for _ in range(pop_size)]
    flattened = [flatten_individual(ind) for ind in population]

    def fake_drl(r, t): return len(t) * 10
    objectives = [evaluate_individual(ind, fake_drl) for ind in population]
    clusters = cluster_population(flattened, num_clusters)
    fronts = assign_fronts(objectives)
    cscd_scores = compute_cscd_scores(flattened, objectives, clusters, fronts)

    offspring = dbesm_selection(population, flattened, fronts, cscd_scores)

    print(len(offspring[0][0]))  # Should be same as population size
    # for i, child in enumerate(offspring):
    #     print(f"Child {i}: {child}")


3


### Main Loop

In [25]:
# from encoding import generate_individual, flatten_individual
# from evaluation import evaluate_individual
# from clustering import cluster_population
# from cscd import compute_cscd_scores
# from dbesm import dbesm_selection
# from nsga2 import assign_fronts
# import random

def initialize_population(pop_size, num_tasks, num_robots):
    return [generate_individual(num_tasks, num_robots) for _ in range(pop_size)]

def evaluate_population(population, drl_planner):
    return [evaluate_individual(ind, drl_planner) for ind in population]

def flatten_population(population):
    return [flatten_individual(ind) for ind in population]

def select_next_generation(pop, obj_vals, flat, fronts, cscd, N):
    """
    Select top-N individuals based on front + CSCD
    """
    combined = list(zip(pop, obj_vals, flat, fronts, cscd))
    combined.sort(key=lambda x: (x[3], -x[4]))  # sort by front asc, CSCD desc
    selected = combined[:N]
    return [x[0] for x in selected], [x[1] for x in selected]

def run_evolution(
    num_tasks=9,
    num_robots=3,
    pop_size=20,
    generations=50,
    num_clusters=5,
    drl_planner=lambda r, s: len(s) * 10  # dummy DRL
):
    # Step 1: Initialize
    population = initialize_population(pop_size, num_tasks, num_robots)

    for gen in range(generations):
        # print(f"\n--- Generation {gen} ---")

        # Step 2: Flatten
        flattened = flatten_population(population)

        # Step 3: Evaluate
        objective_values = evaluate_population(population, drl_planner)

        # Step 4: Assign fronts
        fronts = assign_fronts(objective_values)

        # Step 5: Clustering
        clusters = cluster_population(flattened, num_clusters)

        # Step 6: CSCD
        cscd_scores = compute_cscd_scores(flattened, objective_values, clusters, fronts)

        # Step 7: DBESM offspring generation
        offspring = dbesm_selection(population, flattened, fronts, cscd_scores)

        # Combine populations
        combined_population = population + offspring
        combined_flattened = flatten_population(combined_population)
        combined_objectives = evaluate_population(combined_population, drl_planner)

        # Recalculate fronts, clusters, CSCD for selection
        combined_fronts = assign_fronts(combined_objectives)
        combined_clusters = cluster_population(combined_flattened, num_clusters)
        combined_cscd = compute_cscd_scores(combined_flattened, combined_objectives, combined_clusters, combined_fronts)

        # Step 8: Select next generation
        population, objective_values = select_next_generation(
            combined_population,
            combined_objectives,
            combined_flattened,
            combined_fronts,
            combined_cscd,
            pop_size
        )

        # Optional: Print best
        best = min(objective_values, key=lambda x: (x[0], x[1]))
        # print(f"Best in gen {gen}: f1 = {best[0]}, f2 = {best[1]}")

    return population, objective_values


In [76]:
if __name__ == "__main__":
    final_pop, final_objs = run_evolution(
        num_tasks=12,
        num_robots=3,
        pop_size=20,
        generations=20,
        num_clusters=4,
        drl_planner=lambda r, s: len(s) * (r + 1) * 10  # Fake but variable
    )

    print("\nFinal Pareto Front (Objective Space):")
    for i, obj in enumerate(final_objs):
        print(f"Solution {i}: f1 = {obj[0]}, f2 = {obj[1]}")

    print("\nFinal Pareto Set (Decision Space):")
    for i, individual in enumerate(final_pop):
        print(f"Solution {i}:")
        for r, tasks in enumerate(individual):
            print(f"  Robot {r+1}: {tasks}")



  diff_b_a = subtract(b, a)
  diff_b_a = subtract(b, a)
  diff_b_a = subtract(b, a)
  diff_b_a = subtract(b, a)
  diff_b_a = subtract(b, a)
  diff_b_a = subtract(b, a)
  diff_b_a = subtract(b, a)
  diff_b_a = subtract(b, a)
  diff_b_a = subtract(b, a)
  diff_b_a = subtract(b, a)
  diff_b_a = subtract(b, a)
  diff_b_a = subtract(b, a)
  diff_b_a = subtract(b, a)
  diff_b_a = subtract(b, a)
  diff_b_a = subtract(b, a)
  diff_b_a = subtract(b, a)
  diff_b_a = subtract(b, a)
  diff_b_a = subtract(b, a)
  diff_b_a = subtract(b, a)
  diff_b_a = subtract(b, a)



Final Pareto Front (Objective Space):
Solution 0: f1 = 190, f2 = 70
Solution 1: f1 = 170, f2 = 80
Solution 2: f1 = 190, f2 = 70
Solution 3: f1 = 170, f2 = 80
Solution 4: f1 = 190, f2 = 70
Solution 5: f1 = 190, f2 = 70
Solution 6: f1 = 170, f2 = 90
Solution 7: f1 = 180, f2 = 80
Solution 8: f1 = 170, f2 = 90
Solution 9: f1 = 180, f2 = 80
Solution 10: f1 = 180, f2 = 80
Solution 11: f1 = 180, f2 = 80
Solution 12: f1 = 200, f2 = 80
Solution 13: f1 = 200, f2 = 80
Solution 14: f1 = 200, f2 = 80
Solution 15: f1 = 190, f2 = 90
Solution 16: f1 = 190, f2 = 90
Solution 17: f1 = 190, f2 = 90
Solution 18: f1 = 200, f2 = 80
Solution 19: f1 = 200, f2 = 80

Final Pareto Set (Decision Space):
Solution 0:
  Robot 1: [11, 5, 8, 10, 7, 6, 9]
  Robot 2: [3, 0, 1]
  Robot 3: [2, 4]
Solution 1:
  Robot 1: [11, 6, 3, 2, 10, 0, 8, 7]
  Robot 2: [5, 4, 9]
  Robot 3: [1]
Solution 2:
  Robot 1: [11, 6, 3, 2, 10, 0, 8]
  Robot 2: [7, 5, 4]
  Robot 3: [9, 1]
Solution 3:
  Robot 1: [11, 5, 8, 10, 7, 6, 9, 3]
  Robot