In [7]:
import random
import numpy as np
from deap import base, creator, tools, algorithms
from collections import Counter
# ----------------------------
# Configuration
# ----------------------------
NUM_TRUCKS = 3
NUM_TASKS = 30
DEPOT = 0

# ----------------------------
# Define Truck
# ----------------------------
class Truck:
    def __init__(self, id, capacity_weight, capacity_volume):
        self.id = id
        self.capacity_weight = capacity_weight
        self.capacity_volume = capacity_volume

trucks = [Truck(i, 100, 100) for i in range(NUM_TRUCKS)]

# ----------------------------
# Define Task
# ----------------------------
class Task:
    def __init__(self, id, pickup, delivery, weight, volume, time_window):
        self.id = id
        self.pickup = pickup
        self.delivery = delivery
        self.weight = weight
        self.volume = volume
        self.time_window = time_window 

# ----------------------------
# Generate Tasks
# ----------------------------
nodes = list(range(1, NUM_TASKS * 2 + 1))
coordinates = {n: (random.randint(0, 40), random.randint(0, 40)) for n in nodes + [DEPOT]}
time_windows = {}  # Only for pickup locations
tasks = []
for i in range(NUM_TASKS):
    a = random.randint(0, len(nodes)-1)
    b = random.randint(0, len(nodes)-1)
    while a == b:
        b = random.randint(0, len(nodes))    
    pickup = nodes[a]
    delivery = nodes[b]
    weight = random.randint(1, 20)
    volume = random.randint(1, 20)
    earliest = random.randint(0, 50)
    latest = earliest + random.randint(10, 50)
    time_window = (earliest, latest)
    tasks.append(Task(i, pickup, delivery, weight, volume, time_window))
    time_windows[pickup] = time_window
# ----------------------------
# Distance Function
# ----------------------------
def distance(a, b):
    xa, ya = coordinates[a]
    xb, yb = coordinates[b]
    return ((xa - xb) ** 2 + (ya - yb) ** 2) ** 0.5

def travel_time(a, b):
    return distance(a, b)/20
# ----------------------------
# Build Route for Assigned Tasks
# ----------------------------
def build_route(task_list):
    if not task_list:
        return [DEPOT, DEPOT], 0.0

    task_ids = [t.id for t in task_list]
    # -----------------------------------------
    # Inner GA Representation: [TID, TID, ..., TID] each appears twice
    # -----------------------------------------
    def valid_sequence(seq):
        seen = []
        for tid in seq:
            seen = seen + [tid]
        for nodes, c in Counter(seen).items():
            if c != 2:
                return False     
        return True
        
    def init_sequence():
        seq = []
        for tid in task_ids:
            seq += [tid, tid]
        random.shuffle(seq)
        return creator.Individual(seq)    

    def mutate_seq(ind):
        if len(ind) <= 3:
            return ind,
        for _ in range(3):
            a, b = sorted(random.sample(range(1,len(ind)-1), 2))
            ind[a], ind[b] = ind[b], ind[a]
            if valid_sequence(ind):
                return ind,
            ind[a], ind[b] = ind[b], ind[a]  # revert if invalid
        return ind,

    def crossover_seq(p1, p2):
        # Helper: extract task IDs in pickup order
        def extract_pickup_order(seq):
            seen = set()
            order = []
            for tid in seq:
                if tid not in seen:
                    order.append(tid)
                    seen.add(tid)
            return order    
        # Get task orders from parents
        p1_order = extract_pickup_order(p1[1:len(p1)])
        p2_order = extract_pickup_order(p2[1:len(p2)])
        if len(p1_order) < 2 or len(p2_order) < 2:
            return p1, p2 
        size = min(len(p1_order),len(p2_order))
    
        # Random slice from parent 1
        a, b = sorted(random.sample(range(size), 2))
        mid = p1_order[a:b]
        
        # Child 1: start with mid from p1, fill from p2
        child1_order = mid + [tid for tid in (p2_order[0:a]+p2_order[b:len(p2_order)]) if tid not in mid]
        
        # Child 2: start with mid from p2, fill from p1
        mid2 = p2_order[a:b]
          
        child2_order = mid2 + [tid for tid in (p1_order[0:a] +p1_order[b:len(p1_order)]) if tid not in mid2]
       
        # Build full pickup-delivery sequences from the original parents
        child1_seq = []
        child2_seq = []
    
        for tid in child1_order:
            child1_seq.extend([tid, tid])
    
        for tid in child2_order:
            child2_seq.extend([tid, tid])
       
        
        # Return as DEAP individuals
        return creator.Individual(child1_seq), creator.Individual(child2_seq)

    def eval_sequence(seq):
        route = [DEPOT]
        total_cost = 0
        current_node = DEPOT
        current_time = 0
        load_weight = 0
        load_volume = 0
        visited = set()
        time_penalty = 0

        for tid in seq:
            task = next(t for t in task_list if t.id == tid)
            node = task.pickup if tid not in visited else task.delivery
            load_weight += task.weight if tid not in visited else -task.weight
            load_volume += task.volume if tid not in visited else -task.volume
            visited.add(tid)

            if load_weight > 100 or load_volume > 100:
                total_cost += 10000  # capacity penalty

            total_cost += distance(current_node, node)
            route.append(node)
            current_node = node

        total_cost += distance(current_node, DEPOT)
        route.append(DEPOT)
        total_cost += 1000000 if not valid_sequence(seq) else 0
        if len(seq) <len(task_ids)*2:
            total_cost += 1000000
        return (total_cost,)

    # Register inner GA
    creator.create("InnerFitness", base.Fitness, weights=(-1.0,))
    creator.create("Individual", list, fitness=creator.InnerFitness)

    inner_toolbox = base.Toolbox()
    inner_toolbox.register("individual", init_sequence)
    inner_toolbox.register("population", tools.initRepeat, list, inner_toolbox.individual)
    inner_toolbox.register("mate", crossover_seq)
    inner_toolbox.register("mutate", mutate_seq)
    inner_toolbox.register("select", tools.selTournament, tournsize=3)
    inner_toolbox.register("evaluate", eval_sequence)

    pop = inner_toolbox.population(n=15)
    algorithms.eaSimple(pop, inner_toolbox, cxpb=0.6, mutpb=0.3, ngen=20, verbose=False)

    best = tools.selBest(pop, 1)[0]
    route = [DEPOT]
    current_node = DEPOT
    visited = set()
    for tid in best:
        task = next(t for t in task_list if t.id == tid)
        node = task.pickup if tid not in visited else task.delivery
        visited.add(tid)
        route.append(node)
        current_node = node
    route.append(DEPOT)
    return route, eval_sequence(best)[0], best
   
# ----------------------------
# Evaluation
# ----------------------------
def evaluate(individual):
    total_cost = 0
    for truck_id in range(NUM_TRUCKS):
        assigned_tasks = [tasks[i] for i, t_id in enumerate(individual) if t_id == truck_id]
        _, cost, _ = build_route(assigned_tasks)
        total_cost += cost
    return (total_cost,)

# ----------------------------
# Mutation Function
# ----------------------------
def mutate_individual(individual):
    # Create a mapping: truck_id → list of task indices
    truck_tasks = {t: [] for t in range(NUM_TRUCKS)}
    for i, truck_id in enumerate(individual):
        truck_tasks[truck_id].append(i)

    # Randomly pick two *different* trucks
    t1, t2 = random.sample(range(NUM_TRUCKS), 2)

    # Make sure both trucks have at least one task to swap
    if not truck_tasks[t1] or not truck_tasks[t2]:
        return individual,

    # Pick a task index from each truck
    task1 = random.choice(truck_tasks[t1])
    task2 = random.choice(truck_tasks[t2])

    # Swap their assignments
    individual[task1], individual[task2] = individual[task2], individual[task1]

    return individual,

# ----------------------------
# DEAP Setup
# ----------------------------
creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
creator.create("Individual", list, fitness=creator.FitnessMin)

toolbox = base.Toolbox()

# Each gene is a truck assignment for task i
toolbox.register("individual", tools.initRepeat, creator.Individual,
                 lambda: random.randint(0, NUM_TRUCKS - 1), n=NUM_TASKS)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)

toolbox.register("evaluate", evaluate)
toolbox.register("mate", tools.cxUniform, indpb=0.3)  # built-in crossover
toolbox.register("mutate", mutate_individual)
toolbox.register("select", tools.selTournament, tournsize=3)

# ----------------------------
# Run GA
# ----------------------------
pop = toolbox.population(n=20)
algorithms.eaSimple(pop, toolbox, cxpb=0.6, mutpb=0.3, ngen=50, verbose=False)

# ----------------------------
# Show Best Solution
# ----------------------------
best = tools.selBest(pop, k=1)[0]
for t in range(NUM_TRUCKS):
    assigned = [tasks[i] for i, tid in enumerate(best) if tid == t]
    route, cost, tsk = build_route(assigned)
    print(f"Truck {t}: {route} | Cost: {cost:.2f} | Tasks {tsk}")

Truck 0: [0, 34, 49, 49, 31, 43, 37, 36, 32, 7, 57, 0] | Cost: 215.52 | Tasks [11, 11, 16, 14, 26, 26, 16, 14, 29, 29]
Truck 1: [0, 59, 20, 19, 39, 10, 51, 51, 15, 42, 40, 10, 50, 38, 52, 2, 55, 39, 43, 37, 9, 16, 34, 29, 26, 55, 41, 13, 56, 22, 32, 0] | Cost: 645.54 | Tasks [21, 21, 8, 8, 2, 2, 0, 0, 13, 9, 9, 13, 20, 20, 5, 5, 6, 6, 15, 15, 12, 12, 17, 17, 25, 25, 19, 19, 18, 18]
Truck 2: [0, 56, 42, 19, 12, 6, 41, 7, 21, 48, 7, 6, 56, 5, 48, 2, 19, 3, 3, 34, 3, 0] | Cost: 358.68 | Tasks [3, 3, 24, 24, 23, 23, 7, 1, 4, 4, 27, 27, 22, 22, 10, 10, 7, 1, 28, 28]
