In [None]:
import random
import copy
from typing import List, Tuple
import math

# ---------------------------
# Environment : calcul Cmax
# ---------------------------
class Environment:
    def __init__(self, data: dict):

        self.data = data
        self.machines = list(data.keys())
        self.nb_jobs = len(next(iter(data.values())))
        self.nb_machines = len(self.machines)

    def cmax(self, sequence: List[int]) -> int:
        """
        Calcul du makespan (flow-shop / permutation schedule)
        sequence : permutation de [0..nb_jobs-1]
        """
        n = len(sequence)
        m = self.nb_machines

        # completion_times[i][j] = temps d'achèvement machine i sur job j (1-indexed en tableau)
        # On ajoute une ligne/colonne 0 pour faciliter max(...)
        completion = [[0] * (n + 1) for _ in range(m + 1)]

        for j in range(1, n + 1):
            job = sequence[j - 1]
            for i in range(1, m + 1):
                time_on_machine = self.data[self.machines[i - 1]][job]
                # max(temps où la machine i a fini le job précédent, temps où le job a été fini sur la machine i-1)
                completion[i][j] = max(completion[i][j - 1], completion[i - 1][j]) + time_on_machine

        return completion[m][n]

    def schedule_table(self, sequence: List[int]) -> List[List[int]]:
        """
        Retourne la table des complétions (utile pour debug / affichage).
        """
        n = len(sequence)
        m = self.nb_machines
        completion = [[0] * (n + 1) for _ in range(m + 1)]
        for j in range(1, n + 1):
            job = sequence[j - 1]
            for i in range(1, m + 1):
                time_on_machine = self.data[self.machines[i - 1]][job]
                completion[i][j] = max(completion[i][j - 1], completion[i - 1][j]) + time_on_machine
        return completion

# ---------------------------
# Base agent + 3 agents
# ---------------------------
class BaseAgent:
    def __init__(self, name: str, env: Environment):
        self.name = name
        self.env = env

    def optimize(self, solution: List[int]) -> Tuple[List[int], int]:
        """
        Prend une solution (permutation) en entrée, renvoie (nouvelle_solution, cmax).
        À implémenter dans les sous-classes.
        """
        raise NotImplementedError

# Agent Liste : heuristique de type "list scheduling"
class AgentListe(BaseAgent):
    def __init__(self, name: str, env: Environment):
        super().__init__(name, env)

    def optimize(self, solution: List[int]) -> Tuple[List[int], int]:
        """
        Ici on applique une heuristique simple basée sur la somme des temps
        : on trie les jobs par temps total (SPT sur la somme des machines).
        On retourne la meilleure des deux: ordre trié ou la solution entrée (on compare).
        """
        total_times = [sum(self.env.data[m][j] for m in self.env.machines) for j in range(self.env.nb_jobs)]
        sorted_jobs = sorted(range(self.env.nb_jobs), key=lambda j: total_times[j])
        c1 = self.env.cmax(solution)
        c2 = self.env.cmax(sorted_jobs)
        if c2 < c1:
            return sorted_jobs, c2
        else:
            return solution, c1

# Agent Johnson (pour 2 machines) : application de Johnson en utilisant M1 et M2
class AgentJohnson(BaseAgent):
    def __init__(self, name: str, env: Environment):
        super().__init__(name, env)
        # On suppose M1 et M2 existent; si pas, on tombera en exception naturellement
        if env.nb_machines < 2:
            raise ValueError("Johnson requires at least two machines in this implementation.")

    def optimize(self, solution: List[int]) -> Tuple[List[int], int]:
        """
        Applique l'algorithme de Johnson pour 2 machines en considérant M1 et M2.
        - Si plus de 2 machines, on applique Johnson sur M1 vs M2 (simple, comme demandé).
        """
        m1 = self.env.machines[0]
        m2 = self.env.machines[1]
        jobs = list(range(self.env.nb_jobs))
        left = []
        right = []
        # Johnson: si time_on_m1 <= time_on_m2 -> place à gauche (début), sinon à droite (fin) en ordre inverse
        remaining = jobs.copy()
        while remaining:
            # trouver job ayant min(min(m1,m2))
            j_min = min(remaining, key=lambda j: min(self.env.data[m1][j], self.env.data[m2][j]))
            if self.env.data[m1][j_min] <= self.env.data[m2][j_min]:
                left.append(j_min)
            else:
                right.insert(0, j_min)
            remaining.remove(j_min)
        sequence = left + right
        # comparer avec solution d'entrée et retourner la meilleure
        c_in = self.env.cmax(solution)
        c_john = self.env.cmax(sequence)
        if c_john < c_in:
            return sequence, c_john
        else:
            return solution, c_in

# Agent Voisinage : recherche locale (swap 2-opt)
class AgentVoisinage(BaseAgent):
    def __init__(self, name: str, env: Environment, max_iter: int = 200):
        super().__init__(name, env)
        self.max_iter = max_iter

    def optimize(self, solution: List[int]) -> Tuple[List[int], int]:
        """
        Simple hill-climbing : essais de swaps pair-à-pair, accepte le swap si amélioration.
        On répète jusqu'à convergence ou max_iter tests.
        """
        best_seq = solution.copy()
        best_val = self.env.cmax(best_seq)

        improved = True
        iter_count = 0
        # On effectue un parcours systématique des swaps (i<j)
        while improved and iter_count < self.max_iter:
            improved = False
            iter_count += 1
            for i in range(len(best_seq) - 1):
                for j in range(i + 1, len(best_seq)):
                    new_seq = best_seq.copy()
                    new_seq[i], new_seq[j] = new_seq[j], new_seq[i]
                    new_val = self.env.cmax(new_seq)
                    if new_val < best_val:
                        best_seq = new_seq
                        best_val = new_val
                        improved = True
                        # break pour mode first-improvement (plus rapide)
                        break
                if improved:
                    break
        return best_seq, best_val

# ---------------------------
# Opérateurs génétiques
# ---------------------------
def tournament_selection(pop: List[Tuple[List[int], int]], k: int = 3) -> List[int]:
    """
    pop : list of (sequence, cmax). Retourne une sequence (le gagnant du tournoi).
    """
    competitors = random.sample(pop, k)
    winner = min(competitors, key=lambda x: x[1])
    return winner[0]

def order_crossover(parent1: List[int], parent2: List[int]) -> List[int]:
    """
    Order crossover (OX) : retourne un enfant (permutation).
    """
    n = len(parent1)
    a, b = sorted(random.sample(range(n), 2))
    child = [-1] * n
    # copier le segment a..b
    child[a:b+1] = parent1[a:b+1]
    # remplir le reste dans l'ordre parent2
    cur = (b + 1) % n
    p2_idx = (b + 1) % n
    while -1 in child:
        if parent2[p2_idx] not in child:
            child[cur] = parent2[p2_idx]
            cur = (cur + 1) % n
        p2_idx = (p2_idx + 1) % n
    return child

def mutate_swap(seq: List[int], mutation_rate: float = 0.05) -> List[int]:
    """
    Mutation par swap aléatoire avec probabilité mutation_rate par paire (approximatif).
    """
    new_seq = seq.copy()
    n = len(seq)
    for i in range(n):
        if random.random() < mutation_rate:
            j = random.randrange(n)
            new_seq[i], new_seq[j] = new_seq[j], new_seq[i]
    return new_seq

# ---------------------------
# Processus SMA + GA principal
# ---------------------------
def run_sma_ga(env: Environment,
               population_size: int = 20,
               generations: int = 20,
               seed: int = 0,
               verbose: bool = True) -> Tuple[List[int], int, List[int]]:
    """
    Implémente le flux demandé :
    - Génère population_size solutions aléatoires
    - Pour chaque solution, exécute les 3 agents -> collecte 3*population_size solutions
    - Garde les population_size meilleurs (parents)
    - Croise via GA pour produire population_size enfants
    - Répète for generations
    Retour: (best_sequence, best_cmax, history_best_by_gen)
    """
    random.seed(seed)

    # Instanciation des agents
    agent_liste = AgentListe("Liste", env)
    agent_johnson = AgentJohnson("Johnson", env)
    agent_voisinage = AgentVoisinage("Voisinage", env, max_iter=300)

    # 1) population initiale : permutations aléatoires
    population = []
    for _ in range(population_size):
        seq = list(range(env.nb_jobs))
        random.shuffle(seq)
        population.append(seq)

    best_overall_seq = None
    best_overall_val = math.inf
    history_best = []

    for gen in range(1, generations + 1):
        # pour chaque solution de la population, appliquer les 3 agents
        offspring_candidates: List[Tuple[List[int], int]] = []
        for sol in population:
            # on donne une copie à chaque agent
            s1, v1 = agent_liste.optimize(sol.copy())
            s2, v2 = agent_johnson.optimize(sol.copy())
            s3, v3 = agent_voisinage.optimize(sol.copy())

            offspring_candidates.append((s1, v1))
            offspring_candidates.append((s2, v2))
            offspring_candidates.append((s3, v3))

        # Trier par cmax ascendant et garder les population_size meilleurs (parents)
        offspring_candidates.sort(key=lambda x: x[1])
        parents = offspring_candidates[:population_size]

        # Mettre à jour meilleur global
        if parents[0][1] < best_overall_val:
            best_overall_val = parents[0][1]
            best_overall_seq = parents[0][0].copy()

        history_best.append(best_overall_val)

        if verbose:
            print(f"Gen {gen:02d} | Best gen: {parents[0][1]} | Best overall: {best_overall_val}")

        # GA : croisement pour produire population_size enfants
        new_population = []
        # On utilise sélection par tournoi + OX + mutation
        while len(new_population) < population_size:
            p1 = tournament_selection(parents, k=3)
            p2 = tournament_selection(parents, k=3)
            # éviter p1 == p2 exactement (peut arriver)
            if p1 == p2:
                # shuffle small perturbation
                p2 = p2.copy()
                random.shuffle(p2)
            child = order_crossover(p1, p2)
            child = mutate_swap(child, mutation_rate=0.07)
            new_population.append(child)

        # préparer la génération suivante
        population = new_population

    # fin des générations
    return best_overall_seq, best_overall_val, history_best

# ---------------------------
# Exemple usage (main)
# ---------------------------

# === données ===
data = {
    'M1': [6, 3, 10, 14, 5, 9, 7, 11, 2, 3],
    'M2': [1, 5, 4, 6, 10, 6, 9, 8, 6, 1],
    'M3': [5, 8, 1, 3, 6, 10, 12, 9, 6, 7]
}

env = Environment(data)

best_seq, best_val, history = run_sma_ga(env,
                                       population_size=20,
                                       generations=20,
                                       seed=42,
                                       verbose=True)

print("\n=== Résultat final ===")
print("Meilleure séquence (job indices):", best_seq)
print("Cmax :", best_val)
# pour affichage plus lisible, on peut afficher la séquence 1-based
print("Séquence (1-based jobs) :", [j+1 for j in best_seq])
