# Projet SMA Santé : Optimisation Collaborative & Apprentissage par Renforcement

Ce notebook implémente une solution complète pour le problème d'ordonnancement de patients en milieu hospitalier, en respectant les exigences suivantes :

1. **Modélisation hypercubique** : Axe temps discrétisé (5 min), relâchement de la contrainte des boxes.
2. **Stratégies Hybrides & SMA** : Agents métaheuristiques collaboratifs.
3. **Diversité de l'EMP** : Espace Mémoire Partagé avec contrôle de diversité basé sur une distance matricielle.
4. **Modes de collaboration** : Amis (partage de solutions) et Ennemis (compétition/perturbation).
5. **Auto-adaptation** : Processus de Décision Markovien (PDM) pour le choix des voisinages.
6. **Voisinages** : Implémentation des 5 mouvements (A, B, C, D, E).
7. **Q-Learning** : Apprentissage de la meilleure stratégie de recherche.
8. **Tableaux de résultats** : Génération des benchmarks correspondants aux slides 25 et 26.

## 1. Environnement & Modélisation Hypercubique

Définition des structures de données (Tâche, Solution) et de l'environnement de simulation.
- **Temps** : Discrétisé en slots de 5 minutes.
- **Solution** : Dictionnaire `(Patient, Opération) -> (Ressource, Temps)`.

In [None]:
from typing import Dict, List, Tuple, NamedTuple
import copy
import random

# Configuration globale : Discrétisation en intervalles de 5 minutes
SLOT_DURATION = 5  

class Task(NamedTuple):
    id: int          
    patient_id: int
    op_order: int    # Ordre de l'opération (j)
    skill_req: int   # Compétence requise (médecin/ressource)
    duration: int    # En minutes

class Solution:
    """
    Représente une solution dans le modèle hypercubique.
    Structure: dictionnaire mapping (Patient, Op) -> (Staff_ID, Start_Slot)
    """
    def __init__(self):
        self.schedule: Dict[Tuple[int, int], Tuple[int, int]] = {} 
        self.fitness: float = float('inf')
        self.is_valid: bool = False

class SchedulingEnvironment:
    def __init__(self, data: Dict, skills: List[int], num_patients: int):
        self.data = data 
        self.skills = skills
        self.num_patients = num_patients
        self.tasks: List[Task] = []
        self.tasks_map: Dict[Tuple[int, int], Task] = {}
        self._parse_data()

    def _parse_data(self):
        tid = 0
        for pid, ops in self.data.items():
            for order, task_list in ops.items():
                if task_list:
                    # On suppose une tâche principale par étape pour le modèle simplifié
                    skill, dur = task_list[0]
                    t = Task(tid, pid, order, skill, dur)
                    self.tasks.append(t)
                    self.tasks_map[(pid, order)] = t
                    tid += 1

    def duration_to_slots(self, minutes: int) -> int:
        """Convertit la durée en nombre de slots de 5 minutes."""
        return (minutes + SLOT_DURATION - 1) // SLOT_DURATION

    def build_initial_solution(self) -> Solution:
        """Génère une solution aléatoire valide respectant les contraintes de précédence."""
        sol = Solution()
        # Disponibilité des ressources (Staff -> premier slot libre)
        staff_availability = {s: 0 for s in self.skills}
        # Disponibilité des patients (Patient -> premier slot libre)
        patient_availability = {p: 0 for p in range(1, self.num_patients + 1)}

        # Tri aléatoire des tâches pour la diversité initiale
        all_tasks = list(self.tasks)
        random.shuffle(all_tasks)
        # Tri partiel pour respecter grossièrement l'ordre des opérations
        all_tasks.sort(key=lambda t: t.op_order)
        
        for t in all_tasks:
            # Allocation d'une ressource (ici simplifiée : staff_req est l'ID du staff)
            staff_id = t.skill_req 
            
            duration_slots = self.duration_to_slots(t.duration)
            
            # Début au max des disponibilités (Patient dispo ET Médecin dispo)
            start_time = max(patient_availability[t.patient_id], staff_availability.get(staff_id, 0))
            
            sol.schedule[(t.patient_id, t.op_order)] = (staff_id, start_time)
            
            finish_time = start_time + duration_slots
            patient_availability[t.patient_id] = finish_time
            staff_availability[staff_id] = finish_time
            
        self.evaluate(sol)
        return sol

    def evaluate(self, solution: Solution) -> float:
        """Calcule le Makespan (Cmax) en slots."""
        if not solution.schedule:
            solution.fitness = float('inf')
            return float('inf')
            
        max_slot = 0
        for (pid, op), (staff, start) in solution.schedule.items():
            task = self.tasks_map.get((pid, op))
            if task:
                end = start + self.duration_to_slots(task.duration)
                if end > max_slot:
                    max_slot = end
        
        solution.fitness = max_slot
        solution.is_valid = True 
        return max_slot

    def copy_solution(self, solution: Solution) -> Solution:
        new_sol = Solution()
        new_sol.schedule = solution.schedule.copy()
        new_sol.fitness = solution.fitness
        new_sol.is_valid = solution.is_valid
        return new_sol

## 2. Voisinages (Neighborhoods)

Implémentation des 5 opérateurs de voisinage décrits dans le document :
- **A** : Réassignation à un autre personnel.
- **B** : Réassignation de tâches successives.
- **C** : Insertion (Shift) dans le planning du même personnel.
- **D** : Échange (Swap) entre deux personnels différents.
- **E** : Échange (Swap) au sein du même personnel.

In [None]:
class NeighborhoodManager:
    def __init__(self, env: SchedulingEnvironment):
        self.env = env
        # Liste des voisinages disponibles
        self.moves = ['A', 'B', 'C', 'D', 'E']

    def apply_move(self, solution: Solution, move_type: str) -> Solution:
        new_sol = self.env.copy_solution(solution)
        keys = list(new_sol.schedule.keys())
        if not keys: return new_sol

        # Sélection aléatoire d'une tâche source (i, j)
        k1 = random.choice(keys)
        staff1, start1 = new_sol.schedule[k1]
        
        # --- A: Assignment to different medical staff ---
        if move_type == 'A':
            other_staffs = [s for s in self.env.skills if s != staff1]
            if other_staffs:
                new_staff = random.choice(other_staffs)
                new_sol.schedule[k1] = (new_staff, start1)

        # --- B: Successive care tasks assignment ---
        elif move_type == 'B':
            k_next = (k1[0], k1[1] + 1)
            if k_next in new_sol.schedule:
                other_staffs = [s for s in self.env.skills if s != staff1]
                if other_staffs:
                    new_staff = random.choice(other_staffs)
                    new_sol.schedule[k1] = (new_staff, start1)
                    s2, t2 = new_sol.schedule[k_next]
                    new_sol.schedule[k_next] = (new_staff, t2)

        # --- C: Work schedule insertion (Shift/Move) ---
        elif move_type == 'C':
            shift = random.randint(-6, 6) # Décalage +/- 30 min
            new_start = max(0, start1 + shift)
            new_sol.schedule[k1] = (staff1, new_start)

        # --- D: Swap two care tasks between different medical staff ---
        elif move_type == 'D':
            candidates = [k for k, v in new_sol.schedule.items() if v[0] != staff1]
            if candidates:
                k2 = random.choice(candidates)
                staff2, start2 = new_sol.schedule[k2]
                new_sol.schedule[k1] = (staff2, start1)
                new_sol.schedule[k2] = (staff1, start2)

        # --- E: Swap between the same medical staff member ---
        elif move_type == 'E':
            candidates = [k for k, v in new_sol.schedule.items() if v[0] == staff1 and k != k1]
            if candidates:
                k2 = random.choice(candidates)
                staff2, start2 = new_sol.schedule[k2]
                new_sol.schedule[k1] = (staff1, start2)
                new_sol.schedule[k2] = (staff1, start1)

        self.env.evaluate(new_sol)
        return new_sol

## 3. Mémoire Partagée (EMP) & Diversité

Gestion de l'espace mémoire commun pour la collaboration.
- Calcul de la distance matricielle entre solutions.
- **Algorithme 6** : Contrôle d'insertion basé sur le seuil de diversité.

In [None]:
class SharedMemory:
    def __init__(self, max_size=20, min_dist=2, diversity_threshold=0.5):
        self.solutions: List[Solution] = []
        self.max_size = max_size 
        self.min_dist = min_dist 
        self.dt = diversity_threshold 

    def calculate_distance(self, sol1: Solution, sol2: Solution) -> int:
        """
        Calcule la distance matricielle : Nombre de créneaux différents.
        """
        dist = 0
        all_keys = set(sol1.schedule.keys()) | set(sol2.schedule.keys())
        for k in all_keys:
            v1 = sol1.schedule.get(k)
            v2 = sol2.schedule.get(k)
            if v1 != v2: 
                dist += 1
        return dist

    def try_insert(self, cs: Solution) -> bool:
        """Algorithme 6: Contrôler la diversité de l'EMP."""
        nb = len(self.solutions)
        
        # Vérification doublon exact
        for s in self.solutions:
            if self.calculate_distance(cs, s) == 0:
                return False

        # Calculer le nombre de solutions 'différentes'
        d_count = 0
        for s in self.solutions:
            if self.calculate_distance(cs, s) >= self.min_dist:
                d_count += 1
        
        ratio = d_count / nb if nb > 0 else 1.0
        inserted = False

        if ratio >= self.dt:
            if nb < self.max_size:
                self.solutions.append(cs)
                inserted = True
            else:
                worst_idx = self._get_worst_idx()
                if worst_idx != -1 and cs.fitness < self.solutions[worst_idx].fitness:
                     self.solutions.pop(worst_idx)
                     self.solutions.append(cs)
                     inserted = True
        else:
            worst_idx = self._get_worst_idx()
            if nb >= self.max_size and worst_idx != -1 and cs.fitness < self.solutions[worst_idx].fitness:
                self.solutions.pop(worst_idx)
                self.solutions.append(cs)
                inserted = True

        if inserted:
            self.solutions.sort(key=lambda x: x.fitness)
            
        return inserted

    def _get_worst_idx(self):
        if not self.solutions: return -1
        vals = [s.fitness for s in self.solutions]
        return vals.index(max(vals))

    def get_best(self):
        if not self.solutions: return None
        return self.solutions[0]

## 4. Q-Learning & Auto-Adaptation

Implémentation du **Processus de Décision Markovien (MDP)** pour l'auto-adaptation des agents.
- **États** : Amélioration (0), Stagnation (1), Dégradation (2).
- **Actions** : Choix du voisinage (A, B, C, D, E).
- **Récompense** : Différence de fitness.
- **Mise à jour Q-Table** : Formule standard du Q-Learning.

In [None]:
import numpy as np

class QLearningModel:
    def __init__(self, actions, alpha=0.1, gamma=0.9, epsilon=0.1):
        self.actions = actions 
        self.q_table = {} 
        self.alpha = alpha
        self.gamma = gamma
        self.epsilon = epsilon
        self.states = [0, 1, 2]
        
        for s in self.states:
            self.q_table[s] = {a: 0.0 for a in actions}

    def get_state(self, current_fit, prev_fit):
        if prev_fit is None: return 1
        if current_fit < prev_fit: return 0 # Amélioration
        if current_fit == prev_fit: return 1 # Stagnation
        return 2 # Dégradation

    def select_action(self, state):
        # Algorithme 4: SelectAction (e, b) - Epsilon-Greedy
        if random.random() < self.epsilon:
            return random.choice(self.actions) # Exploration
        else:
            items = list(self.q_table[state].items())
            random.shuffle(items)
            return max(items, key=lambda x: x[1])[0] # Exploitation

    def update(self, state, action, reward, next_state):
        current_q = self.q_table[state][action]
        max_next_q = max(self.q_table[next_state].values())
        
        new_q = current_q + self.alpha * (reward + self.gamma * max_next_q - current_q)
        self.q_table[state][action] = new_q

## 5. Agents & Système Multi-Agents (SMA)

Définition des agents métaheuristiques (AG, Tabou, RS) et de leurs modes d'interaction :
- **Amis** : Échange complet de solutions via l'EMP.
- **Ennemis** : Accès limité au critère (fitness), stimulation par perturbation.

In [None]:
class MetaheuristicAgent:
    def __init__(self, id, env, strategy_type, use_learning=True):
        self.id = id
        self.env = env
        self.strategy_type = strategy_type 
        self.use_learning = use_learning
        
        self.current_solution = None
        self.best_solution = None
        
        self.nm = NeighborhoodManager(env)
        self.brain = QLearningModel(actions=self.nm.moves) if use_learning else None
        self.last_fitness = None

    def initialize(self):
        self.current_solution = self.env.build_initial_solution()
        self.best_solution = self.env.copy_solution(self.current_solution)
        self.last_fitness = self.current_solution.fitness

    def step(self, emp: SharedMemory, collaboration_mode: str):
        # 1. Collaboration
        if collaboration_mode == 'FRIENDS':
            best_shared = emp.get_best()
            if best_shared and best_shared.fitness < self.current_solution.fitness:
                if random.random() < 0.2: 
                    self.current_solution = self.env.copy_solution(best_shared)

        elif collaboration_mode == 'ENEMIES':
            best_shared = emp.get_best()
            if best_shared and best_shared.fitness < self.current_solution.fitness:
                 # Perturbation majeure si un 'ennemi' fait mieux
                 self.current_solution = self.nm.apply_move(self.current_solution, 'A')

        # 2. Choix Action (Auto-adaptation)
        state = 1
        action = 'C'
        if self.use_learning:
            state = self.brain.get_state(self.current_solution.fitness, self.last_fitness)
            action = self.brain.select_action(state)
        else:
            if self.strategy_type == 'AG': action = random.choice(['A', 'B'])
            elif self.strategy_type == 'Tabu': action = random.choice(['C', 'D'])
            else: action = random.choice(['C', 'E'])

        # 3. Application Mouvement
        prev_fit = self.current_solution.fitness
        new_sol = self.nm.apply_move(self.current_solution, action)
        
        # 4. Acceptation
        accept = False
        if new_sol.fitness <= prev_fit:
            accept = True
        elif self.strategy_type == 'RS' and random.random() < 0.1:
            accept = True
        
        if accept:
            self.current_solution = new_sol
            if new_sol.fitness < self.best_solution.fitness:
                self.best_solution = self.env.copy_solution(new_sol)
                emp.try_insert(self.env.copy_solution(self.best_solution))

        # 5. Update Q-Learning
        if self.use_learning:
            reward = prev_fit - new_sol.fitness 
            next_state = self.brain.get_state(new_sol.fitness, prev_fit)
            self.brain.update(state, action, reward, next_state)
            
        self.last_fitness = self.current_solution.fitness
        return self.best_solution.fitness

class MultiAgentSystem:
    def __init__(self, env, agents_config, mode='FRIENDS'):
        self.env = env
        self.emp = SharedMemory()
        self.mode = mode
        self.agents = []
        for conf in agents_config:
            self.agents.append(MetaheuristicAgent(conf['id'], env, conf['type'], conf['learning']))

    def run(self, iterations=50):
        for a in self.agents: a.initialize()
        
        history = []
        for i in range(iterations):
            step_res = []
            for a in self.agents:
                fit = a.step(self.emp, self.mode)
                step_res.append(fit)
            history.append(min(step_res))
        
        return min(history)

## 6. Génération des Résultats (Benchmark)

Génération automatique des données et exécution des scénarios pour produire les tableaux comparatifs :
- Sans Collaboration (Agents isolés vs SMA de base).
- Avec Collaboration (Comparaison Amis vs Ennemis, avec et sans apprentissage).

In [None]:
def generate_random_data(num_patients=5, max_ops=3, skills=[1,2,3,4]):
    data = {}
    for i in range(1, num_patients+1):
        data[i] = {}
        for j in range(1, max_ops+1):
            if random.random() > 0.1: 
                skill = random.choice(skills)
                duration = random.randint(10, 60) 
                data[i][j] = [(skill, duration)]
    return data

def run_benchmark(num_patients=20, iterations=15):
    print(f"Génération des données pour {num_patients} patients...")
    skills = [1, 2, 3, 4]
    data = generate_random_data(num_patients=num_patients, max_ops=4, skills=skills)
    env = SchedulingEnvironment(data, skills, num_patients)

    print("\n--- Tableau de comparaison SANS collaboration (Slide 25) ---")
    print('"Jour","Patients",,"AG_Solo","Tabou_Solo","RS_Solo","SMA_NoLearn","SMA_Learn"')

    ag_solo = MultiAgentSystem(env, [{'id':'AG','type':'AG','learning':False}], mode='ENEMIES').run(iterations)
    tabu_solo = MultiAgentSystem(env, [{'id':'Tabu','type':'Tabu','learning':False}], mode='ENEMIES').run(iterations)
    rs_solo = MultiAgentSystem(env, [{'id':'RS','type':'RS','learning':False}], mode='ENEMIES').run(iterations)

    sma_no_learn = MultiAgentSystem(env, [
        {'id':'1','type':'AG','learning':False},
        {'id':'2','type':'Tabu','learning':False},
        {'id':'3','type':'RS','learning':False}
    ], mode='FRIENDS').run(iterations)

    sma_learn = MultiAgentSystem(env, [
        {'id':'1','type':'AG','learning':True},
        {'id':'2','type':'Tabu','learning':True},
        {'id':'3','type':'RS','learning':True}
    ], mode='FRIENDS').run(iterations)

    print(f'"J1","{num_patients}",,"{ag_solo}","{tabu_solo}","{rs_solo}","{sma_no_learn}","{sma_learn}"')

    print("\n\n--- Tableau de comparaison AVEC collaboration (Slide 26) ---")
    print('"SMA sans apprentissage (Amis)",,,,,,"SMA avec apprentissage (Ennemis)"')
    print('"AG_Tabou","AG_RS","Tabou_RS",,"AG_Tabou","AG_RS","Tabou_RS"')
    
    pairs = [
        ([{'id':'1','type':'AG','learning':False},{'id':'2','type':'Tabu','learning':False}], 'FRIENDS'),
        ([{'id':'1','type':'AG','learning':False},{'id':'3','type':'RS','learning':False}], 'FRIENDS'),
        ([{'id':'2','type':'Tabu','learning':False},{'id':'3','type':'RS','learning':False}], 'FRIENDS'),
        ([{'id':'1','type':'AG','learning':True},{'id':'2','type':'Tabu','learning':True}], 'ENEMIES'),
        ([{'id':'1','type':'AG','learning':True},{'id':'3','type':'RS','learning':True}], 'ENEMIES'),
        ([{'id':'2','type':'Tabu','learning':True},{'id':'3','type':'RS','learning':True}], 'ENEMIES'),
    ]
    
    res = []
    for conf, mode in pairs:
        res.append(MultiAgentSystem(env, conf, mode=mode).run(iterations))
        
    print(f'"{res[0]}","{res[1]}","{res[2]}",,"{res[3]}","{res[4]}","{res[5]}"')

In [None]:
# Lancement de la démonstration
if __name__ == "__main__":
    run_benchmark(num_patients=20, iterations=15)