In [None]:
# ----- DATA LOADING -----
import pandas as pd
import json
import os

def load_csv_data(file_path):
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"CSV file not found: {file_path}")
    df = pd.read_csv(file_path, low_memory=False)
    df['time'] = pd.date_range('2016-01-01 05:00', periods=len(df), freq='min')
    df.set_index('time', inplace=True)
    df['month'] = df.index.strftime('%B')
    df['hour']  = df.index.hour
    return df

def load_json_data(file_path):
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"JSON file not found: {file_path}")
    with open(file_path, 'r') as f:
        return json.load(f)

def load_devices(json_path="devices_with_w.json", csv_path="HomeC.csv"):
    """
    Loads for each device:
      - w       : cost/comfort weight
      - lambda  : comfort on/off
      - power   : max kW observed in the CSV
    """
    devices = load_json_data(json_path)
    df = load_csv_data(csv_path)

    for name, params in devices.items():
        col = next(c for c in df.columns if name in c)
        params['power'] = df[col].max()

    return devices

In [None]:
# ----- DATA PROCESSING -----


# --- New Constants ---
SLOT_DURATION_MIN = 15
SLOTS_PER_HOUR = 60 // SLOT_DURATION_MIN  # THIS SHOULD BE 4 FOR 15-MIN SLOTS
SLOTS_PER_DAY = 24 * SLOTS_PER_HOUR      # 96
# ---------------------

def extract_time_params(df_day, devices, usage_threshold=0.05):
    """
    MODIFIED FOR SINGLE-DAY OPTIMIZATION:
    Analyzes a single day's data (df_day) to find which devices were active,
    their total runtime (LOT), and their original start time on that day.
    Historical comfort windows (alpha, beta) and median start (m_slot) are no longer calculated.
    """
    df_day = df_day.copy()
    # The 'slot' column is calculated in the calling function (generate_planning)
    if 'slot' not in df_day.columns:
         df_day['slot'] = df_day.index.hour * SLOTS_PER_HOUR + df_day.index.minute // SLOT_DURATION_MIN

    daily_tasks = {}
    # The data is sampled at 1-minute intervals.
    samp_period_seconds = 60

    for dev, params in devices.items():
        col = next((c for c in df_day.columns if dev in c), None)
        if not col:
            continue

        # Use the day's maximum usage to set a dynamic threshold
        max_power_on_day = df_day[col].max()
        if max_power_on_day == 0:
            continue  # Device was not used at all on this day.

        thresh = usage_threshold * max_power_on_day
        is_on = df_day[col] > thresh

        # Calculate total duration (Length Of Task) in seconds for the day
        lot_s = float(is_on.sum() * samp_period_seconds)

        if lot_s > 0:
            # Find the timestamp of the first "on" minute and get its corresponding slot
            first_on_index = is_on.idxmax()
            actual_start_slot = df_day.loc[first_on_index, 'slot']

            # The new 'params' for a device are its original params + LOT and actual start slot
            daily_tasks[dev] = {
                **params,
                'LOT': lot_s,
                'actual_start_slot': int(actual_start_slot),
            }

    return daily_tasks

In [None]:
# ---- Import the data ----

# --- Global Data Loading (Load once when module is imported) ---
_df = load_csv_data("/content/drive/MyDrive/PFE/main_program/HomeC.csv")
_devices = load_devices("/content/drive/MyDrive/PFE/main_program/devices_with_w.json", "/content/drive/MyDrive/PFE/main_program/HomeC.csv")

In [None]:
# ----- SOLVER INTERFACE -----

from typing import Dict, Any, Sequence, Tuple, List

class Solver:
    def run(self,
            devices_to_schedule: List[str], # Renamed for clarity to distinguish from self.params
            seed: int = None,
            max_iter: int = 100
    ) -> Tuple[Dict[str, int], List[float]]: # Now returns best schedule AND fitness history
        """Return a tuple: (a dict mapping device→start_hour, a list of fitness values over iterations)."""
        raise NotImplementedError

In [None]:
# ----- CSA SOLVER -----

import numpy as np
from typing import Callable # Added for repair_function typing

class CsaSolver(Solver):
    def __init__(
        self,
        fitness,
        params: dict, # This `params` dictionary contains M, LOT, P, W, L, valid_slots
        repair_function: Callable = None, # ADDED: Repair function for constraints
    ):
        self.fitness = fitness
        self.params = params
        # ADDED: Store repair function, defaulting to an identity function if None
        self.repair_function = repair_function if repair_function else lambda s: s
        # Extract device names once from any of the parameter dictionaries
        self.devices = list(self.params['P'].keys())

    # Corrected 'run' signature: it now expects 'devices' (list of names) not 'params'
    def run(self, devices_to_schedule: list, seed: int = None, max_iter: int = 100):  # Updated default max_iter
        # delegate directly to run_csa, passing the list of device names
        return self.run_csa(devices_to_schedule, seed=seed, max_iter=max_iter)

    def run_csa(self, devices: list,
                # This 'devices' parameter is the list of device names (e.g., ["Dishwasher [kW]", ...])
                n_crows: int = 150, P_aw: float = 0.3, seed: int = None, FL: int = 1,
                max_iter: int = 100):  # max_iter added here

        if seed is not None:
            np.random.seed(seed)

        population = []
        memories = []
        for _ in range(n_crows):
            # MODIFIED: For immovable devices (W=0), lock to original start time (M), not historical alpha.
            η = {d: (self.params['M'][d] if self.params['W'][d] == 0 else int(
                np.random.choice(self.params['valid_hours'][d]))) for d in devices}
            population.append(η.copy())
            memories.append(η.copy())

        best_mem = min(memories, key=self.fitness)
        best_fit = self.fitness(best_mem)
        fitness_history = [best_fit]  # Initialize fitness history

        for _ in range(max_iter):
            for k in range(n_crows):
                j = np.random.choice([i for i in range(n_crows) if i != k])
                if np.random.rand() > P_aw:
                    # η_new = {
                    #     d: memories[j][d] if np.random.rand() < FL else population[k][d]
                    #     for d in devices
                    # }
                    η_new = {d: memories[j][d] for d in devices}
                else:
                    η_new = {d: int(np.random.choice(self.params['valid_hours'][d])) for d in devices}

                # MODIFIED: enforce immovable (w=0) → lock at original start time (M)
                for d in η_new:
                    if self.params['W'][d] == 0:
                        η_new[d] = self.params['M'][d]

                # ADDED: Repair the new schedule to satisfy hard constraints (like picLimit)
                η_new = self.repair_function(η_new)

                if self.fitness(η_new) < self.fitness(memories[k]):
                    memories[k] = η_new.copy()
                    population[k] = η_new.copy()
                    f_new = self.fitness(η_new)
                    if f_new < best_fit:
                        best_fit = f_new
                        best_mem = η_new.copy()
            fitness_history.append(best_fit)  # Append best fitness of current iteration

        return best_mem, fitness_history  # Return schedule AND history

In [None]:
# ----- GA SOLVER -----

import random
from typing import Callable  # Added for repair_function typing


def _crossover(p1: dict, p2: dict):
    """Single-point crossover over the list of devices."""
    keys = list(p1.keys())
    pt = random.randint(1, len(keys) - 1)
    c1, c2 = {}, {}
    for i, d in enumerate(keys):
        if i < pt:
            c1[d], c2[d] = p1[d], p2[d]
        else:
            c1[d], c2[d] = p2[d], p1[d]
    return c1, c2


class GaSolver(Solver):
    def __init__(
            self,
            fitness,
            params: dict,
            pop_size: int = 50,
            generations: int = 100,
            crossover_rate: float = 0.8,
            mutation_rate: float = 0.1,
            repair_function: Callable = None,  # ADDED: Repair function for constraints
    ):
        # fitness: a callable mapping schedule→cost (lower is better)
        # params: dict with keys 'M', 'valid_hours', 'W', etc.
        self.fitness = fitness
        self.params = params
        self.pop_size = pop_size
        self.generations = generations
        self.crossover_rate = crossover_rate
        self.mutation_rate = mutation_rate
        # ADDED: Store repair function, defaulting to an identity function if None
        self.repair_function = repair_function if repair_function else lambda s: s

    def run(self, devices: list, seed: int = None, max_iter: int = 100):  # Updated default max_iter
        # max_iter is used as generations in GA context
        self.generations = max_iter
        return self._run_ga(devices, seed=seed)

    def _run_ga(self, devices: list, seed: int = None):
        if seed is not None:
            random.seed(seed)

        # 1. Initialize population of schedules
        population = []
        for _ in range(self.pop_size):
            indiv = {}
            for d in devices:
                if self.params['W'][d] == 0:
                    # MODIFIED: immovable device locked at its original start time (M), not historical alpha
                    indiv[d] = self.params['M'][d]
                else:
                    indiv[d] = int(random.choice(self.params['valid_hours'][d]))
            population.append(indiv.copy())

        # 2. Find initial best
        best = min(population, key=self.fitness)
        best_score = self.fitness(best)
        fitness_history = [best_score]  # Initialize fitness history

        # 3. Evolve for given generations
        for _ in range(self.generations):
            new_pop = []

            # pair off to produce pop_size children
            for _ in range(self.pop_size // 2):
                p1 = self._tournament_selection(population)
                p2 = self._tournament_selection(population)

                # crossover
                if random.random() < self.crossover_rate:
                    c1, c2 = _crossover(p1, p2)
                else:
                    c1, c2 = p1.copy(), p2.copy()

                # mutate
                self._mutate(c1)
                self._mutate(c2)

                # ADDED: Repair children to satisfy hard constraints (like picLimit)
                c1 = self.repair_function(c1)
                c2 = self.repair_function(c2)

                new_pop.extend([c1, c2])

            # if odd, carry the best over
            if len(new_pop) < self.pop_size:
                new_pop.append(best.copy())

            population = new_pop[: self.pop_size]

            # update best individual
            for indiv in population:
                score = self.fitness(indiv)
                if score < best_score:
                    best_score, best = score, indiv.copy()
            fitness_history.append(best_score)  # Append best fitness of current generation

        return best, fitness_history  # Return schedule AND history

    def _tournament_selection(self, population: list, k: int = 3):
        """Pick k random schedules and return the one with lowest cost."""
        contenders = random.sample(population, k)
        return min(contenders, key=self.fitness)

    def _mutate(self, indiv: dict):
        """For each device, with mutation_rate prob, reassign a valid hour."""
        for d in indiv:
            if self.params['W'][d] != 0 and random.random() < self.mutation_rate:
                indiv[d] = int(random.choice(self.params['valid_hours'][d]))

In [None]:
# ----- SOLVER FACTORY -----
def SolverFactory(name: str, **kwargs):
    name = name.lower()
    if name == 'csa':
        return CsaSolver(**kwargs)
    elif name == 'ga':
      # pull out GA hyperparams (or use defaults)
        return GaSolver(**kwargs)
    else:
        raise ValueError(f"Unknown solver: {name}")

In [None]:
# ----- MAIN LOGIC -----
from typing import Callable
from typing import Optional, Dict, Any, Tuple, List

import numpy as np
import pandas as pd
from pydantic import BaseModel


# -------------------------------------------------------------


def build_price_profile():
    """Build a price profile for the day with hourly variations."""
    prices_hourly = (
            [1.2050] * 6  # 00:00–05:59 → Night
            + [2.1645] * 11  # 06:00–16:59 → Full
            + [8.1147] * 4  # 17:00–20:59 → Peak
            + [2.1645] * 1  # 21:00–21:59 → Full
            + [1.2050] * 2  # 22:00–23:59 → Night
    )
    prices_slotted = []
    for p_h in prices_hourly:
        prices_slotted.extend([p_h] * SLOTS_PER_HOUR)
    return pd.Series(prices_slotted, index=range(SLOTS_PER_DAY))


def build_load_profile(df_day: pd.DataFrame, devices: Dict[str, Any]) -> Tuple[pd.Series, Dict[str, pd.Series]]:
    """Build baseline and device-specific load profiles from daily data."""
    df_day = df_day.copy()  # Operate on a copy to avoid SettingWithCopyWarning
    df_day['slot'] = df_day.index.hour * SLOTS_PER_HOUR + df_day.index.minute // SLOT_DURATION_MIN

    # Get total usage and generation
    # =================================================================
    total_usage = df_day["use [kW]"]
    generation = df_day.get("gen [kW]", 0) # Use .get for safety if column is missing
    # =================================================================

    smart_device_cols = [c for c in df_day.columns if any(d in c for d in devices)]
    smart_total = df_day[smart_device_cols].sum(axis=1)

    # MODIFIED: Correct baseline calculation
    # Baseline is what the grid sees (usage) PLUS what was generated, MINUS smart devices.
    # This represents the "other" non-schedulable consumption.
    # =================================================================
    baseline = total_usage + generation - smart_total
    # =================================================================

    # --- FIX 1: Ensure baseline is never negative ---
    # This is still a good safety measure in case of other data errors.
    baseline = np.maximum(0, baseline)
    # -----------------------------------------------

    baseline_profile_kW = baseline.groupby(df_day['slot']).mean()
    baseline_profile_kW = baseline_profile_kW.reindex(range(SLOTS_PER_DAY), fill_value=0)

    device_profiles_kW = {}
    for dev_name in devices.keys():
        col = next((c for c in smart_device_cols if dev_name in c), None)
        if col:
            dev_profile = df_day[col].groupby(df_day['slot']).mean()
            device_profiles_kW[dev_name] = dev_profile.reindex(range(SLOTS_PER_DAY), fill_value=0)
        else:
            device_profiles_kW[dev_name] = pd.Series([0.0] * SLOTS_PER_DAY, index=range(SLOTS_PER_DAY))
    return baseline_profile_kW, device_profiles_kW


def calculate_device_power_for_solver(device_profiles: Dict[str, pd.Series],
                                      effective_devices: Dict[str, Any],
                                      params: Dict[str, Dict[str, Any]]) -> Dict[str, float]:
    """
    Calculate effective power values for each device to use in the solver.
    This ensures energy conservation between default and optimized simulations.
    """
    P_for_solver = {}
    for device_name, current_params in effective_devices.items():
        # 1. Calculate the actual total energy (kWh) consumed by this device on the target day
        actual_total_energy_kwh_on_target_day = sum(
            device_profiles[device_name][s] * (SLOT_DURATION_MIN / 60.0)
            for s in range(SLOTS_PER_DAY)
        )

        # 2. Get the duration (LOT_s) for this device from the daily task parameters
        duration_s = params[device_name]['LOT']

        # 3. Calculate the 'effective' power (P) for the solver.
        # This P is such that (P * duration_s / 3600) == actual_total_energy_kwh
        if duration_s > 0:
            P_for_solver[device_name] = (actual_total_energy_kwh_on_target_day * 3600.0) / duration_s
        else:
            P_for_solver[device_name] = 0.0  # If the device has no duration, its power is 0

        # Update the effective_devices dict's 'power' attribute for external consistency if needed
        current_params['power'] = P_for_solver[device_name]

    return P_for_solver


def create_fitness_function(P_for_solver: Dict[str, float], LOT_s: Dict[str, float],
                            W: Dict[str, float], L: Dict[str, float], M: Dict[str, float],
                            price_profile: pd.Series) -> Callable:
    """Create and return a fitness function for the optimization solver."""

    def fitness(n_: Dict[str, int]):  # Explicitly type n_ as Dict[str, int]
        total_cost = 0.0
        for d, start_slot in n_.items():
            power_kw = P_for_solver[d]
            duration_s = LOT_s[d]

            # Cost calculation logic
            num_slots_for_LOT = int(np.ceil(duration_s / (SLOT_DURATION_MIN * 60.0)))
            device_energy_cost_d = 0.0
            for offset in range(num_slots_for_LOT):
                current_slot = (start_slot + offset) % SLOTS_PER_DAY
                seconds_in_current_slot = min(duration_s - offset * (SLOT_DURATION_MIN * 60.0),
                                              SLOT_DURATION_MIN * 60.0)
                energy_kwh_in_slot = power_kw * (seconds_in_current_slot / 3600.0)
                cost_in_slot = energy_kwh_in_slot * price_profile[current_slot]
                device_energy_cost_d += cost_in_slot

            total_cost += W[d] * device_energy_cost_d

        # MODIFIED: Comfort penalty is disabled by setting L=0 for all devices. This term will be zero.
        comfort = sum(L[d] * W[d] * (n_[d] - M[d]) ** 2 for d in n_)
        return total_cost + comfort

    return fitness


def calculate_actual_consumption_from_profiles(baseline_profile: pd.Series,
                                               device_profiles: Dict[str, pd.Series]) -> Dict[int, float]:
    """Calculate actual consumption from baseline and device profiles."""
    consumption = baseline_profile.reindex(range(SLOTS_PER_DAY), fill_value=0).to_dict()
    for dev_name, dev_profile in device_profiles.items():
        for slot, value in dev_profile.items():
            consumption[slot] += value
    # No need to round here, let plot_total_consumption_results handle it if necessary
    return {s: kW for s, kW in consumption.items()}


def calculate_actual_cost_from_profiles(baseline_profile: pd.Series,
                                        device_profiles: Dict[str, pd.Series],
                                        price_profile: pd.Series) -> float:
    """Calculate actual cost from baseline and device profiles."""
    cost = 0.0
    cost += sum(
        baseline_profile[s] * price_profile[s] * (SLOT_DURATION_MIN / 60.0)
        for s in range(SLOTS_PER_DAY)
    )
    for dev_name, dev_profile in device_profiles.items():
        for slot, value in dev_profile.items():
            cost += value * price_profile[slot] * (SLOT_DURATION_MIN / 60.0)
    return cost


def simulate_consumption(schedule: Dict[str, int], baseline_profile: pd.Series,
                         lot_seconds_map: Dict[str, float],
                         effective_power_map: Dict[str, float]) -> Dict[int, float]:
    """Simulate total consumption based on a given schedule."""
    consumption = baseline_profile.reindex(range(SLOTS_PER_DAY), fill_value=0).to_dict()
    slot_duration_seconds = SLOT_DURATION_MIN * 60.0

    for d, start_slot in schedule.items():
        power_kw = effective_power_map[d]
        duration_s = lot_seconds_map[d]

        remaining_duration_s = duration_s
        current_slot_offset = 0

        while remaining_duration_s > 0:
            current_slot_index = (start_slot + current_slot_offset) % SLOTS_PER_DAY
            seconds_in_this_slot = min(remaining_duration_s, slot_duration_seconds)
            power_contribution_to_slot = power_kw * (seconds_in_this_slot / slot_duration_seconds)
            consumption[current_slot_index] += power_contribution_to_slot

            remaining_duration_s -= seconds_in_this_slot
            current_slot_offset += 1
    # No need to round here, let plot_total_consumption_results handle it if necessary
    return {s: kW for s, kW in consumption.items()}


def simulate_cost(schedule: Dict[str, int], baseline_profile: pd.Series,
                  price_profile: pd.Series,
                  lot_seconds_map: Dict[str, float],
                  effective_power_map: Dict[str, float]) -> float:
    """Simulate total cost based on a given schedule."""
    total_scheduled_device_cost = 0.0
    slot_duration_seconds = SLOT_DURATION_MIN * 60.0

    for d, start_slot in schedule.items():
        power_kw = effective_power_map[d]
        duration_s = lot_seconds_map[d]

        remaining_duration_s = duration_s
        current_slot_offset = 0

        while remaining_duration_s > 0:
            current_slot_index = (start_slot + current_slot_offset) % SLOTS_PER_DAY
            seconds_in_this_slot = min(remaining_duration_s, slot_duration_seconds)

            energy_kwh_in_slot = power_kw * (seconds_in_this_slot / 3600.0)
            cost_in_slot = energy_kwh_in_slot * price_profile[current_slot_index]
            total_scheduled_device_cost += cost_in_slot

            remaining_duration_s -= seconds_in_this_slot
            current_slot_offset += 1

    baseline_cost = sum(
        baseline_profile[s] * price_profile[s] * (SLOT_DURATION_MIN / 60.0)
        for s in range(SLOTS_PER_DAY)
    )
    return total_scheduled_device_cost + baseline_cost


def simulate_individual_device_consumption(schedule: Dict[str, int],
                                           lot_seconds_map: Dict[str, float],
                                           effective_power_map: Dict[str, float]) -> Dict[str, Dict[int, float]]:
    """
    Simulate consumption for each device based on a given schedule, showing the
    effective power (P_for_solver) when the device is active.
    """
    device_consumptions = {dev: {s: 0.0 for s in range(SLOTS_PER_DAY)} for dev in schedule.keys()}
    slot_duration_seconds = SLOT_DURATION_MIN * 60.0

    for d, start_slot in schedule.items():
        power_kw = effective_power_map[d]
        duration_s = lot_seconds_map[d]

        remaining_duration_s = duration_s
        current_slot_offset = 0

        while remaining_duration_s > 0:
            current_slot_index = (start_slot + current_slot_offset) % SLOTS_PER_DAY

            device_consumptions[d][current_slot_index] = power_kw

            seconds_in_this_slot = min(remaining_duration_s, slot_duration_seconds)
            remaining_duration_s -= seconds_in_this_slot
            current_slot_offset += 1

    # No need to round here, let plotting function handle it if necessary
    return {dev: {s: kW for s, kW in profile.items()} for dev, profile in device_consumptions.items()}


# This function is no longer needed as valid slots are the full day.
# def get_valid_slots(α: Dict[str, int], β: Dict[str, int], devices: Dict[str, Any]) -> Dict[str, List[int]]:
#    ...


# --- Request/Response models (no changes needed) ---
class DeviceParams(BaseModel):
    w: float
    lambda_: float


class PlanningRequest(BaseModel):
    date: str
    custom_params: Optional[Dict[str, DeviceParams]] = None
    algorithm: Optional[str] = 'csa'
    start_hour: Optional[int] = 0


class PlanningResponse(BaseModel):
    slot_duration_min: int = SLOT_DURATION_MIN
    devices: Dict[str, DeviceParams]
    default_planning: Dict[str, int]
    optimized_planning: Dict[str, int]
    default_cost: float
    optimized_cost: float
    default_consumption_real: Dict[int, float]
    default_consumption: Dict[int, float]
    optimized_consumption: Dict[int, float]
    fitness_history: List[float]  # New field
    device_parameters: Dict[str, Any]  # New field for plotting device ranges


def generate_planning(date: str = "2016-01-05", start_hour: int = 0, algorithm: str = 'csa', max_iter: int = 100,
                      picLimit: Optional[float] = None) -> Dict[str, Any]:
    print(f"Calculating planning data for {date} with {algorithm.upper()}...")

    # Access global dataframes
    global _df, _devices
    df = _df
    devices = _devices

    target = pd.to_datetime(date).date()
    df_day = df[df.index.date == target].copy()
    if df_day.empty:
        raise ValueError(f"No data for date {target}")

    df_day['slot'] = df_day.index.hour * SLOTS_PER_HOUR + df_day.index.minute // SLOT_DURATION_MIN

    # MODIFIED: Identify active devices and their tasks for the specific day.
    # The 'start_hour' parameter is no longer used for filtering.
    daily_tasks = extract_time_params(df_day, devices)
    devices_to_schedule = list(daily_tasks.keys())
    effective_devices = {dev: devices[dev] for dev in devices_to_schedule}

    # Use 'daily_tasks' as the new 'params' object
    params = daily_tasks

    price_profile = build_price_profile()
    baseline_load_profile, device_load_profiles = build_load_profile(df_day, devices)

    # ADD THIS DEBUG BLOCK
    # =================================================================
    if picLimit is not None:
        print("\n--- DEBUG: Baseline Load Analysis ---")
        peak_baseline_kw = baseline_load_profile.max()
        peak_baseline_slot = baseline_load_profile.idxmax()
        print(f"The maximum baseline load is {peak_baseline_kw:.2f} kW at slot {peak_baseline_slot}.")
        if peak_baseline_kw > picLimit:
            print(f"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
            print(
                f"!!! CRITICAL WARNING: The baseline itself ({peak_baseline_kw:.2f} kW) is higher than your picLimit ({picLimit} kW).")
            print(f"!!! The optimizer CANNOT fix this peak.")
            print(f"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
    # =================================================================

    P_for_solver = calculate_device_power_for_solver(device_load_profiles, effective_devices, params)

    # MODIFIED: The "default" schedule is the actual schedule from the target day.
    default_schedule_for_display = {d: p['actual_start_slot'] for d, p in params.items()}

    # MODIFIED: Set up parameters for a cost-only optimization over the full 24h day.
    LOT_s = {d: p['LOT'] for d, p in params.items()}
    W = {d: effective_devices[d]['w'] for d in effective_devices}
    L = {d: 0.0 for d in effective_devices}  # KEY CHANGE: Disables comfort penalty
    M = {d: p['actual_start_slot'] for d, p in params.items()}  # Used as anchor for immovable devices (W=0)

    # MODIFIED: The optimization window is the entire day for all active devices.
    # Historical comfort windows (alpha, beta) and forward-only constraints are removed.
    valid_slots = {d: list(range(SLOTS_PER_DAY)) for d in effective_devices}
    constrained_valid_slots = valid_slots  # Pass this to the repair function closure

    fitness = create_fitness_function(P_for_solver, LOT_s, W, L, M, price_profile)

    # --- NEW: Create a repair function for picLimit constraint ---
    repair_function = lambda s: s  # Default identity function
    if picLimit is not None:
        print(f"Enforcing peak consumption limit of {picLimit} kW.")

        def adjust_schedule_for_pic_limit(schedule: Dict[str, int]) -> Dict[str, int]:
            immovable_devices = {d: s for d, s in schedule.items() if W.get(d, 1) == 0}
            movable_schedule = {d: s for d, s in schedule.items() if W.get(d, 1) != 0}

            total_consumption = baseline_load_profile.copy().to_numpy()
            for device, start_slot in immovable_devices.items():
                power_kw = P_for_solver[device]
                duration_s = LOT_s[device]
                num_slots = int(np.ceil(duration_s / (SLOT_DURATION_MIN * 60.0)))
                if num_slots == 0: continue
                for offset in range(num_slots):
                    slot_idx = (start_slot + offset) % SLOTS_PER_DAY
                    total_consumption[slot_idx] += power_kw

            repaired_movable_schedule = {}
            devices_to_place = sorted(movable_schedule.keys(), key=lambda d: movable_schedule[d])

            for device in devices_to_place:
                power_kw = P_for_solver[device]
                duration_s = LOT_s[device]
                num_slots = int(np.ceil(duration_s / (SLOT_DURATION_MIN * 60.0)))

                if num_slots == 0:
                    repaired_movable_schedule[device] = movable_schedule[device]
                    continue

                # The repair function now searches within the full-day valid slots
                device_valid_slots = constrained_valid_slots[device]
                proposed_start_slot = movable_schedule[device]

                try:
                    start_index = device_valid_slots.index(proposed_start_slot)
                    slots_to_try = device_valid_slots[start_index:] + device_valid_slots[:start_index]
                except (ValueError, IndexError):
                    slots_to_try = device_valid_slots
                    if not slots_to_try:
                        repaired_movable_schedule[device] = proposed_start_slot
                        continue

                found_placement = False
                for try_slot in slots_to_try:
                    is_violation = False
                    for offset in range(num_slots):
                        slot_idx = (try_slot + offset) % SLOTS_PER_DAY
                        if total_consumption[slot_idx] + power_kw > picLimit:
                            is_violation = True
                            break
                    if not is_violation:
                        repaired_movable_schedule[device] = try_slot
                        for offset in range(num_slots):
                            slot_idx = (try_slot + offset) % SLOTS_PER_DAY
                            total_consumption[slot_idx] += power_kw
                        found_placement = True
                        break

                if not found_placement:
                    last_resort_slot = slots_to_try[-1] if slots_to_try else proposed_start_slot
                    repaired_movable_schedule[device] = last_resort_slot
                    for offset in range(num_slots):
                        slot_idx = (last_resort_slot + offset) % SLOTS_PER_DAY
                        total_consumption[slot_idx] += power_kw

            final_schedule = immovable_devices.copy()
            final_schedule.update(repaired_movable_schedule)
            return final_schedule

        repair_function = adjust_schedule_for_pic_limit

    # MODIFIED: Solver parameters are simplified for the new logic
    solver_params = {
        'LOT': LOT_s, 'P': P_for_solver, 'W': W, 'L': L, 'M': M,
        'valid_hours': valid_slots
    }

    if algorithm == 'csa':
        solver = CsaSolver(fitness=fitness, params=solver_params, repair_function=repair_function)
    elif algorithm == 'ga':
        solver = GaSolver(fitness=fitness, params=solver_params, repair_function=repair_function)
    else:
        raise ValueError(f"Unknown algorithm: {algorithm}")

    optimized_schedule, fitness_history = solver.run(devices_to_schedule, seed=42, max_iter=max_iter)

    default_consumption_real = df_day.groupby('slot')['use [kW]'].mean().reindex(range(SLOTS_PER_DAY),
                                                                                 fill_value=0).to_dict()
    # "Default Simulated" is now based on the actual schedule, so it's the same as "Real".
    default_consumption = calculate_actual_consumption_from_profiles(baseline_load_profile, device_load_profiles)
    default_cost = calculate_actual_cost_from_profiles(baseline_load_profile, device_load_profiles, price_profile)
    optimized_consumption = simulate_consumption(optimized_schedule, baseline_load_profile, LOT_s, P_for_solver)
    optimized_cost = simulate_cost(optimized_schedule, baseline_load_profile, price_profile, LOT_s, P_for_solver)
    default_individual_consumption = simulate_individual_device_consumption(
        default_schedule_for_display, LOT_s, P_for_solver)
    optimized_individual_consumption = simulate_individual_device_consumption(
        optimized_schedule, LOT_s, P_for_solver)

    # MODIFIED: Device parameters for plotting no longer include historical data.
    device_plot_params = {d: {'actual_start_slot': p['actual_start_slot'],
                              'LOT': LOT_s[d], 'power_for_solver': P_for_solver[d],
                              'w': W[d], 'lambda': L[d]}  # L[d] is 0
                          for d, p in params.items()}

    print("Optimized Consumption Preview (first 5 slots):", list(optimized_consumption.items())[:5])

    return {
        "slot_duration_min": SLOT_DURATION_MIN,
        "devices_info": {d: DeviceParams(w=v['w'], lambda_=v['lambda']) for d, v in effective_devices.items()},
        "default_planning": default_schedule_for_display,
        "optimized_planning": optimized_schedule,
        "default_cost": default_cost,
        "optimized_cost": optimized_cost,
        "default_consumption_real": default_consumption_real,
        "default_consumption": default_consumption,
        "optimized_consumption": optimized_consumption,
        "price_profile": price_profile.to_dict(),
        "fitness_history": fitness_history,
        "device_parameters": device_plot_params,
        "default_individual_consumption": default_individual_consumption,
        "optimized_individual_consumption": optimized_individual_consumption,
        "picLimit": picLimit,
        "baseline_load": baseline_load_profile.to_dict(),  # <--- ADD THIS LINE
    }

In [None]:
# ----- PLOTTING FUNCTION -----
import datetime
from typing import Dict, Any, List

import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import numpy as np  # Needed for np.ceil

# Note: The 'date' variable and the SLOTS_PER_DAY/HOUR/MIN constants are assumed
# to be defined in the context where these functions are called.
# To make this script runnable standalone, you might need to define them, e.g.:
# SLOTS_PER_DAY = 96
# SLOTS_PER_HOUR = 4
# SLOT_DURATION_MIN = 15
date = "2016-01-05"  # Target date for the simulation


# --- Plotting Function (Original, slightly refactored) ---
def plot_total_consumption_results(results: Dict[str, Any]):
    slot_duration_min = results["slot_duration_min"]
    default_consumption_real = results["default_consumption_real"]
    default_consumption = results["default_consumption"]
    optimized_consumption = results["optimized_consumption"]
    price_profile = pd.Series(results["price_profile"])  # Convert back to Series for easy indexing

    # Create x-axis labels (time in HH:MM format)
    x_labels = []
    # Assuming SLOTS_PER_DAY and SLOTS_PER_HOUR are available in the global scope
    for i in range(results.get("SLOTS_PER_DAY", 96)):
        hour = i // results.get("SLOTS_PER_HOUR", 4)
        minute = (i % results.get("SLOTS_PER_HOUR", 4)) * slot_duration_min
        x_labels.append(f"{hour:02d}:{minute:02d}")

    fig = go.Figure()

    # Add shaded regions for price profile
    # Determine the max Y value for shading height
    max_consumption_value = max(
        max(default_consumption_real.values()),
        max(default_consumption.values()),
        max(optimized_consumption.values())
    ) * 1.1  # Add a little buffer

    # Add picLimit line if it exists
    picLimit = results.get("picLimit")
    if picLimit is not None:
        fig.add_hline(y=picLimit, line_dash="dash", line_color="firebrick",
                      annotation_text=f"Limite de Puissance ({picLimit} kW)",
                      annotation_position="bottom right")
        max_consumption_value = max(max_consumption_value, picLimit * 1.1)


    price_colors = {
        1.2050: "rgba(144, 238, 144, 0.2)",  # Light green
        2.1645: "rgba(255, 255, 0, 0.2)",  # Yellow
        8.1147: "rgba(255, 165, 0, 0.2)",  # Orange
        #0.22: "rgba(255, 99, 71, 0.2)"  # Tomato/Red
    }

    current_price = None
    start_slot_for_price = 0
    # Iterate through slots to find price changes and add shaded regions
    for i in range(results.get("SLOTS_PER_DAY", 96)):
        price_at_slot = price_profile.get(i)
        if current_price is None:
            current_price = price_at_slot
            start_slot_for_price = i
        elif price_at_slot != current_price:
            fig.add_shape(
                type="rect",
                x0=start_slot_for_price,
                x1=i,  # The shape ends before the current slot, as current slot has new price
                y0=0,
                y1=max_consumption_value,
                fillcolor=price_colors.get(current_price, "rgba(0,0,0,0.1)"),
                layer="below",
                line_width=0,
            )
            # Add annotation for the completed price segment
            fig.add_annotation(
                x=(start_slot_for_price + i) / 2,  # Horizontal center of the segment
                y=max_consumption_value * 0.5,  # Vertical center of the segment (adjust as needed)
                text=f"{current_price:.2f}",  # The price value for this segment
                showarrow=False,
                font=dict(
                    size=9,  # Small font size
                    color="rgba(0, 0, 0, 0.5)"  # Muted text color (e.g., semi-transparent black)
                ),
                textangle=0,  # Horizontal text
            )
            current_price = price_at_slot
            start_slot_for_price = i
    # Add the last segment
    if current_price is not None:
        fig.add_shape(
            type="rect",
            x0=start_slot_for_price,
            x1=results.get("SLOTS_PER_DAY", 96),
            y0=0,
            y1=max_consumption_value,
            fillcolor=price_colors.get(current_price, "rgba(0,0,0,0.1)"),
            layer="below",
            line_width=0,
        )

    # Add consumption traces
    fig.add_trace(go.Scatter(x=list(default_consumption_real.keys()), y=list(default_consumption_real.values()),
                             mode='lines', name='Consommation Réelle Historique', line=dict(color='lightgray', width=3)))
    fig.add_trace(go.Scatter(x=list(default_consumption.keys()), y=list(default_consumption.values()),
                             mode='lines', name='Consommation Simulée par Défaut',
                             line=dict(color='blue', width=2, dash='dot')))
    fig.add_trace(go.Scatter(x=list(optimized_consumption.keys()), y=list(optimized_consumption.values()),
                             mode='lines', name='Consommation Optimisée', line=dict(color='red', width=2)))

    # Update layout for better aesthetics
    fig.update_layout(
        title_text=f"Consommation d'Énergie Totale et Profil Tarifaire du {datetime.date.fromisoformat(results['target_date_str'])}",
        xaxis_title="Plage Horaire",
        yaxis_title="Consommation d'Énergie (kW)",
        hovermode="x unified",
        legend=dict(x=0.01, y=0.99, bgcolor='rgba(255,255,255,0.7)', bordercolor='rgba(0,0,0,0.2)'),
        margin=dict(l=40, r=40, t=80, b=40),
        xaxis=dict(
            tickmode='array',
            tickvals=list(range(0, results.get("SLOTS_PER_DAY", 96), results.get("SLOTS_PER_HOUR", 4) * 2)),  # Every 2 hours
            ticktext=[x_labels[i] for i in range(0, results.get("SLOTS_PER_DAY", 96), results.get("SLOTS_PER_HOUR", 4) * 2)],
            showgrid=True, gridwidth=1, gridcolor='rgba(0,0,0,0.05)'
        ),
        yaxis=dict(showgrid=True, gridwidth=1, gridcolor='rgba(0,0,0,0.05)', range=[0, max_consumption_value])
    )
    fig.show()


# --- New Plotting Function: Fitness Evolution ---
def plot_fitness_evolution(fitness_history: List[float], algorithm_name: str):
    fig = go.Figure(data=go.Scatter(y=fitness_history, mode='lines', line=dict(color='green', width=2)))
    fig.update_layout(
        title_text=f"Évolution du Fitness pour l'Algorithme {algorithm_name.upper()}",
        xaxis_title="Itération/Génération",
        yaxis_title="Valeur du Fitness (Coût)",
        hovermode="x unified",
        margin=dict(l=40, r=40, t=80, b=40),
        yaxis=dict(type='log')  # Fitness can vary greatly, log scale can help
    )
    fig.show()


# --- New Plotting Function: Individual Device Schedules (Modified) ---
def plot_individual_device_schedules(
        device_parameters: Dict[str, Any],
        default_individual_consumption: Dict[str, Dict[int, float]],  # Pass the actual consumption profiles
        optimized_individual_consumption: Dict[str, Dict[int, float]]  # Pass the actual consumption profiles
):
    # MODIFIED: Plot only active devices for the day
    active_devices = {k: v for k, v in device_parameters.items() if v.get('LOT', 0) > 0}
    num_devices = len(active_devices)
    if num_devices == 0:
        print("Aucun appareil actif à afficher pour ce jour.")
        return

    rows = int(np.ceil(num_devices / 2))  # Arrange in 2 columns

    # Assuming SLOT_DURATION_MIN is available in the global scope
    SLOT_DURATION_MIN = 15 # Provide a default if not globally available
    if 'device_parameters' in device_parameters and device_parameters:
        # A bit of a trick to get a global-like constant from the results dict if available
        # This part depends on how `generate_planning` is structured.
        # A better way would be to pass these constants into the plotting functions.
        pass

    fig = make_subplots(rows=rows, cols=2,
                        subplot_titles=list(active_devices.keys()),
                        vertical_spacing=0.08,
                        horizontal_spacing=0.05)

    x_labels = []  # HH:MM labels for axis ticks and hover
    for i in range(96): # Assuming 96 slots per day
        hour = i // 4
        minute = (i % 4) * 15
        x_labels.append(f"{hour:02d}:{minute:02d}")

    x_slot_indices = list(range(96))

    # MODIFIED: Update dummy traces for legend; remove historical ones
    fig.add_trace(go.Scatter(x=[None], y=[None], mode='lines',
                             marker=dict(symbol='star', size=10, color='purple'),
                             line=dict(color='purple', width=0),
                             name='Heure de Début Initiale', showlegend=True), row=1, col=1)
    fig.add_trace(go.Scatter(x=[None], y=[None], mode='lines',
                             line=dict(color="blue", width=2, dash='dot', shape='hv'),
                             name='Consommation Initiale', showlegend=True), row=1, col=1)
    fig.add_trace(go.Scatter(x=[None], y=[None], mode='lines',
                             line=dict(color="red", width=2, dash='solid', shape='hv'),
                             name='Consommation Optimisée', showlegend=True), row=1, col=1)

    row_idx, col_idx = 1, 1
    for i, (dev_name, params) in enumerate(active_devices.items()):
        # MODIFIED: Use new parameters; no alpha, beta, or m_slot
        actual_start_slot = params['actual_start_slot']
        dev_power = params['power_for_solver']

        max_y_val = max(dev_power * 1.2, 0.1)

        # REMOVED: Shading for historical operating window (alpha to beta) is no longer relevant.

        # MODIFIED: Add a marker for the original start time on the target day.
        fig.add_trace(go.Scatter(
            x=[actual_start_slot], y=[max_y_val * 0.9], mode='markers',
            marker=dict(symbol='star', size=10, color='purple'),
            name='Heure de Début Initiale',
            customdata=[x_labels[actual_start_slot]],
            hovertemplate=f"Début Initial : {dev_name}<br>Slot : %{{x}}<br>Heure : %{{customdata}}<extra></extra>",
            showlegend=False
        ), row=row_idx, col=col_idx)

        # Plot Original Daily Consumption (using stepped line)
        default_profile_data = default_individual_consumption.get(dev_name, {s: 0.0 for s in x_slot_indices})
        default_y = [default_profile_data.get(s, 0.0) for s in x_slot_indices]

        fig.add_trace(go.Scatter(
            x=x_slot_indices, y=default_y, mode='lines',
            line=dict(color="blue", width=2, dash='dot', shape='hv'),
            name='Consommation Initiale',
            customdata=x_labels,
            hovertemplate=f"Marche (Initial) : {dev_name}<br>Slot : %{{x}}<br>Heure : %{{customdata}}<br>Puissance : %{{y:.2f}} kW<extra></extra>",
            showlegend=False
        ), row=row_idx, col=col_idx)

        # Plot Optimized Simulated Consumption (using stepped line)
        optimized_profile_data = optimized_individual_consumption.get(dev_name, {s: 0.0 for s in x_slot_indices})
        optimized_y = [optimized_profile_data.get(s, 0.0) for s in x_slot_indices]

        fig.add_trace(go.Scatter(
            x=x_slot_indices, y=optimized_y, mode='lines',
            line=dict(color="red", width=2, dash='solid', shape='hv'),
            name='Consommation Optimisée',
            customdata=x_labels,
            hovertemplate=f"Marche (Optimisé) : {dev_name}<br>Slot : %{{x}}<br>Heure : %{{customdata}}<br>Puissance : %{{y:.2f}} kW<extra></extra>",
            showlegend=False
        ), row=row_idx, col=col_idx)

        fig.update_xaxes(
            title_text="Plage Horaire",
            tickmode='array',
            tickvals=list(range(0, 96, 4 * 2)),
            ticktext=[x_labels[k] for k in range(0, 96, 4 * 2)],
            showgrid=True, gridwidth=1, gridcolor='rgba(0,0,0,0.05)',
            row=row_idx, col=col_idx
        )
        fig.update_yaxes(title_text="Puissance (kW)", range=[0, max_y_val], row=row_idx, col=col_idx)

        col_idx += 1
        if col_idx > 2:
            col_idx = 1
            row_idx += 1

    fig.update_layout(
        title_text="Planification Individuelle des Appareils : Initiale vs. Optimisée",
        height=400 * rows,
        showlegend=True,
        hovermode="closest",
        margin=dict(l=40, r=40, t=80, b=40),
        legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1, bgcolor='rgba(255,255,255,0.7)'),
    )
    fig.show()


def main():
    try:
        # Note: This function call relies on `generate_planning` being available.
        # To run this script standalone, you'd need to mock this function call
        # or import it correctly.
        planning_results = generate_planning(
            date,
            algorithm='csa',
            max_iter=100,
            picLimit=5
        )
        planning_results['target_date_str'] = date
        # Add constants to results to make plotting functions more robust
        planning_results['SLOTS_PER_DAY'] = 96
        planning_results['SLOTS_PER_HOUR'] = 4

        print("\n--- Résultats de la Planification ---")
        print(f"Coût Initial : {planning_results['default_cost']:.2f} DA")
        print(f"Coût Optimisé : {planning_results['optimized_cost']:.2f} DA")
        print(f"Économies Réalisées : {planning_results['default_cost'] - planning_results['optimized_cost']:.2f} DA")
        print(
            f"Pourcentage d'Économie : {((planning_results['default_cost'] - planning_results['optimized_cost']) / planning_results['default_cost']) * 100:.2f}%")

        print("\n--- Énergie Totale Consommée (kWh) ---")
        slot_duration_hours = 15 / 60.0
        total_energy_real = sum(planning_results['default_consumption_real'].values()) * slot_duration_hours
        total_energy_default_sim = sum(planning_results['default_consumption'].values()) * slot_duration_hours
        total_energy_optimized_sim = sum(planning_results['optimized_consumption'].values()) * slot_duration_hours

        print(f"Énergie Totale Réelle sur la Journée : {total_energy_real:.2f} kWh")
        print(f"Énergie Totale Initiale Simulée : {total_energy_default_sim:.2f} kWh")
        print(f"Énergie Totale Optimisée Simulée : {total_energy_optimized_sim:.2f} kWh")

        print("\n--- Planifications ---")
        print("Planification Initiale (Appareil -> Slot de Début) :")
        for d, s in planning_results['default_planning'].items():
            print(f"  {d}: Slot {s} ({s // 4:02d}:{(s % 4) * 15:02d})")
        print("Planification Optimisée (Appareil -> Slot de Début) :")
        for d, s in planning_results['optimized_planning'].items():
            print(f"  {d}: Slot {s} ({s // 4:02d}:{(s % 4) * 15:02d})")

        # Plot the results (total consumption)
        plot_total_consumption_results(planning_results)

        # Plot fitness evolution
        plot_fitness_evolution(planning_results['fitness_history'],
                               'csa')

        # Plot individual device schedules
        plot_individual_device_schedules(
            planning_results['device_parameters'],
            planning_results['default_individual_consumption'],
            planning_results['optimized_individual_consumption']
        )

    except NameError as e:
        print(f"Erreur : une fonction comme 'generate_planning' n'est pas définie. Ce script est destiné à être utilisé avec le reste du code. Détail : {e}")
    except ValueError as e:
        print(f"Erreur : {e}")
    except FileNotFoundError as e:
        print(f"Erreur : {e}")

# Note: To run this file directly, you would need to either comment out the main() call
# or provide dummy data/functions for it to use.
# main()

In [None]:
# ----- START THE PROGRAM -----

main()

Calculating planning data for 2016-01-05 with CSA...

--- DEBUG: Baseline Load Analysis ---
The maximum baseline load is 3.11 kW at slot 10.
Enforcing peak consumption limit of 5 kW.
Optimized Consumption Preview (first 5 slots): [(0, np.float64(1.3078979158266886)), (1, np.float64(1.251887920093355)), (2, np.float64(1.170114577893355)), (3, np.float64(1.2215512628933551)), (4, np.float64(1.3694890489600218))]

--- Résultats de la Planification ---
Coût Initial : 64.61 DA
Coût Optimisé : 62.57 DA
Économies Réalisées : 2.05 DA
Pourcentage d'Économie : 3.17%

--- Énergie Totale Consommée (kWh) ---
Énergie Totale Réelle sur la Journée : 26.63 kWh
Énergie Totale Initiale Simulée : 29.10 kWh
Énergie Totale Optimisée Simulée : 29.10 kWh

--- Planifications ---
Planification Initiale (Appareil -> Slot de Début) :
  Dishwasher [kW]: Slot 18 (04:30)
  Microwave [kW]: Slot 0 (00:00)
  Garage door [kW]: Slot 52 (13:00)
  Furnace 1 [kW]: Slot 0 (00:00)
  Furnace 2 [kW]: Slot 0 (00:00)
  Home offic

In [None]:
import pandas as pd
import plotly.graph_objects as go
import os
from typing import Optional

def load_and_prepare_data(file_path="HomeC.csv"):
    """
    Charge le jeu de données HomeC.csv et prépare l'index temporel.
    """
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"Erreur : Le fichier de données n'a pas été trouvé à {file_path}.")

    df = pd.read_csv(file_path, low_memory=False)
    df['time'] = pd.to_datetime(pd.date_range('2016-01-01 05:00', periods=len(df), freq='min'))
    df.set_index('time', inplace=True)
    return df

def plot_device_threshold_justification(
    df: pd.DataFrame,
    device_column_name: str,
    date_str: str,
    threshold_percentage: float = 10.0,
    zoom_y_max: Optional[float] = None,
    start_hour: Optional[int] = None, # NOUVEAU
    end_hour: Optional[int] = None    # NOUVEAU
):
    """
    Génère un graphique de la consommation pour une plage horaire et/ou un zoom vertical spécifié.
    """
    # Paramètres de taille de police
    TITLE_FONT_SIZE, AXIS_TITLE_FONT_SIZE, TICK_FONT_SIZE, LEGEND_FONT_SIZE, ANNOTATION_FONT_SIZE = 22, 18, 14, 16, 14

    # 1. Filtrer les données par date
    target_date = pd.to_datetime(date_str).date()
    df_day = df[df.index.date == target_date].copy()

    # ========================================================================
    # --- NOUVEAU : Filtrer par plage horaire si spécifié ---
    time_window_str = ""
    if start_hour is not None and end_hour is not None:
        if start_hour >= end_hour:
            print(f"Avertissement : L'heure de début ({start_hour}h) doit être inférieure à l'heure de fin ({end_hour}h). Affichage de la journée complète.")
        else:
            # Utilise between_time pour filtrer. 'end_hour-1:59' inclut toutes les minutes jusqu'à la fin de l'heure.
            df_day = df_day.between_time(f'{start_hour:02d}:00', f'{end_hour-1:02d}:59')
            time_window_str = f" (de {start_hour}h à {end_hour}h)"
    # ========================================================================

    if df_day.empty:
        print(f"Aucune donnée disponible pour la période sélectionnée pour '{device_column_name}' le {date_str}.")
        return

    device_series = df_day[device_column_name]
    max_power_on_day = device_series.max()

    if max_power_on_day == 0:
        print(f"L'appareil '{device_column_name}' n'a pas été utilisé pendant cette période.")
        return

    threshold_value = (threshold_percentage / 100.0) * df[df.index.date == target_date][device_column_name].max()

    # 2. Identifier les états "Marche"
    is_on = device_series > threshold_value
    was_on = is_on.shift(1).fillna(False)
    is_turn_off_point = (was_on) & (~is_on)
    red_line_condition = is_on | is_turn_off_point
    on_consumption = device_series.where(red_line_condition)

    # 3. Créer le graphique
    fig = go.Figure()
    fig.add_trace(go.Scatter(x=device_series.index, y=device_series, mode='lines', name='Signal de puissance brut', line=dict(color='lightblue', width=2)))
    fig.add_trace(go.Scatter(x=on_consumption.index, y=on_consumption, mode='lines', name=f'État "Marche" détecté (> {threshold_value:.3f} kW)', line=dict(color='crimson', width=2.5)))
    fig.add_hline(y=threshold_value, line_dash="dash", line_color="black", annotation_text=f"Seuil ({threshold_percentage}%)", annotation_position="bottom right", annotation_font_size=ANNOTATION_FONT_SIZE)

    # 4. Configurer la mise en page
    title_text = f"Détection Marche/Arrêt pour '{device_column_name}' le {date_str}{time_window_str}"
    if zoom_y_max is not None:
        title_text = f"VUE ZOOMÉE (0 à {zoom_y_max} kW) : " + title_text

    fig.update_layout(
        title=dict(text=title_text, font=dict(size=TITLE_FONT_SIZE)),
        legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1, font=dict(size=LEGEND_FONT_SIZE)),
        xaxis=dict(title=dict(text="Heure de la journée", font=dict(size=AXIS_TITLE_FONT_SIZE)), tickfont=dict(size=TICK_FONT_SIZE), tickformat='%H:%M'),
        yaxis=dict(title=dict(text="Consommation d'énergie (kW)", font=dict(size=AXIS_TITLE_FONT_SIZE)), tickfont=dict(size=TICK_FONT_SIZE), range=[0, zoom_y_max] if zoom_y_max is not None else None),
        hovermode="x unified", template="plotly_white"
    )

    fig.show()

# --- Bloc d'exécution principal ---
if __name__ == "__main__":
    DEVICE_TO_PLOT = "Fridge [kW]"
    DATE_TO_PLOT = "2016-01-05"
    THRESHOLD_PERCENT = 5

    # NOUVEAU : Définir la plage horaire d'intérêt
    START_HOUR_OF_INTEREST = 0  # 8h du matin
    END_HOUR_OF_INTEREST = 12 # 18h (jusqu'à 17h59)

    try:
        full_df = load_and_prepare_data("/content/drive/MyDrive/PFE/main_program/HomeC.csv")

        # Graphique 1 : Journée complète, zoom vertical (pour voir le bruit de fond)
        print(f"--- Génération du graphique zoomé verticalement")
        plot_device_threshold_justification(
            df=full_df, device_column_name=DEVICE_TO_PLOT, date_str=DATE_TO_PLOT,
            threshold_percentage=THRESHOLD_PERCENT,
            zoom_y_max=0.3,
            start_hour=START_HOUR_OF_INTEREST,
            end_hour=END_HOUR_OF_INTEREST
        )

        # Graphique 2 : Plage horaire limitée (pour voir les détails des cycles)
        print(f"\n--- Génération du graphique pour la plage horaire ({START_HOUR_OF_INTEREST}h - {END_HOUR_OF_INTEREST}h) ---")
        plot_device_threshold_justification(
            df=full_df, device_column_name=DEVICE_TO_PLOT, date_str=DATE_TO_PLOT,
            threshold_percentage=THRESHOLD_PERCENT,
            start_hour=START_HOUR_OF_INTEREST,
            end_hour=END_HOUR_OF_INTEREST
        )

    except Exception as e:
        print(f"Une erreur est survenue : {e}")

--- Génération du graphique zoomé verticalement



Downcasting object dtype arrays on .fillna, .ffill, .bfill is deprecated and will change in a future version. Call result.infer_objects(copy=False) instead. To opt-in to the future behavior, set `pd.set_option('future.no_silent_downcasting', True)`




--- Génération du graphique pour la plage horaire (0h - 12h) ---



Downcasting object dtype arrays on .fillna, .ffill, .bfill is deprecated and will change in a future version. Call result.infer_objects(copy=False) instead. To opt-in to the future behavior, set `pd.set_option('future.no_silent_downcasting', True)`



In [None]:
import pandas as pd
import plotly.graph_objects as go
from typing import Dict

def create_device_profile(start_time: str, duration_minutes: int, power_kw: float, time_index: pd.DatetimeIndex) -> pd.Series:
    """Crée un profil de consommation simple pour un appareil."""
    profile = pd.Series(0.0, index=time_index)
    start = time_index.min().replace(hour=int(start_time.split(':')[0]), minute=int(start_time.split(':')[1]))
    end = start + pd.Timedelta(minutes=duration_minutes - 1)
    profile.loc[start:end] = power_kw
    return profile

def plot_pic_limit_scenario(df_scenario: pd.DataFrame, picLimit: float, title: str):
    """Génère un graphique pour un scénario de consommation avec une limite de puissance."""

    # --- Paramètres de police ---
    TITLE_FONT_SIZE, AXIS_TITLE_FONT_SIZE, TICK_FONT_SIZE, LEGEND_FONT_SIZE, ANNOTATION_FONT_SIZE = 22, 18, 14, 16, 14

    fig = go.Figure()

    # --- Traces pour chaque appareil (zones colorées) ---
    colors = ['rgba(0, 123, 255, 0.4)', 'rgba(255, 193, 7, 0.5)'] # Bleu et Jaune
    for i, device in enumerate(df_scenario.columns):
        if device != 'Total':
            fig.add_trace(go.Scatter(
                x=df_scenario.index, y=df_scenario[device],
                mode='lines',
                line=dict(width=0), # Pas de ligne de contour
                fill='tozeroy',     # Remplir jusqu'à l'axe des Y
                name=device,
                fillcolor=colors[i % len(colors)]
            ))

    # --- Trace pour la consommation totale (ligne principale) ---
    fig.add_trace(go.Scatter(
        x=df_scenario.index, y=df_scenario['Total'],
        mode='lines',
        line=dict(color='black', width=3),
        name='Consommation Totale'
    ))

    # --- Zone de violation (mise en évidence en rouge) ---
    # Crée une série qui ne contient des valeurs que lorsque le total dépasse la limite.
    violation_series = df_scenario['Total'].where(df_scenario['Total'] > picLimit)
    fig.add_trace(go.Scatter(
        x=df_scenario.index, y=violation_series,
        mode='lines',
        line=dict(width=0),
        fill='tozeroy',
        fillcolor='rgba(220, 53, 69, 0.6)', # Rouge semi-transparent
        name='Dépassement de la limite'
    ))

    # --- Ligne de la limite de puissance (picLimit) ---
    fig.add_hline(
        y=picLimit, line_dash="dash", line_color="firebrick",
        annotation_text=f"Limite de Puissance ({picLimit} kW)",
        annotation_position="bottom right",
        annotation_font_size=ANNOTATION_FONT_SIZE,
        annotation_font_color="firebrick"
    )

    # --- Mise en page du graphique ---
    fig.update_layout(
        title=dict(text=title, font=dict(size=TITLE_FONT_SIZE)),
        xaxis=dict(title=dict(text="Heure de la journée", font=dict(size=AXIS_TITLE_FONT_SIZE)), tickfont=dict(size=TICK_FONT_SIZE), tickformat='%H:%M'),
        yaxis=dict(title=dict(text="Consommation d'énergie (kW)", font=dict(size=AXIS_TITLE_FONT_SIZE)), tickfont=dict(size=TICK_FONT_SIZE)),
        legend=dict(font=dict(size=LEGEND_FONT_SIZE)),
        hovermode="x unified",
        template="plotly_white",
        yaxis_range=[0, (df_scenario['Total'].max() * 1.2)]
    )

    fig.show()


if __name__ == "__main__":
    # --- PARAMÈTRES DE LA SIMULATION ---
    PIC_LIMIT = 3.0  # Limite de puissance fixée à 3.0 kW

    # Définition des appareils
    APPAREILS: Dict[str, Dict] = {
        "Lave-vaisselle": {"power_kw": 2.2, "duration": 45},
        "Micro-ondes":    {"power_kw": 1.5, "duration": 5}
    }

    # Création de l'axe du temps (de 19h00 à 21h00)
    time_index = pd.date_range(start="2024-01-01 19:00", end="2024-01-01 21:00", freq="min")

    # --- SCÉNARIO 1 : VIOLATION DE LA LIMITE ---
    print("--- Génération du graphique 1 : Scénario de Dépassement ---")

    # Les deux appareils démarrent en même temps
    profil_lv_1 = create_device_profile("19:30", APPAREILS["Lave-vaisselle"]["duration"], APPAREILS["Lave-vaisselle"]["power_kw"], time_index)
    profil_mo_1 = create_device_profile("19:30", APPAREILS["Micro-ondes"]["duration"], APPAREILS["Micro-ondes"]["power_kw"], time_index)

    df_violation = pd.DataFrame({
        "Lave-vaisselle": profil_lv_1,
        "Micro-ondes": profil_mo_1
    })
    df_violation["Total"] = df_violation.sum(axis=1)

    plot_pic_limit_scenario(df_violation, PIC_LIMIT, "Scénario 1 : Dépassement de la Limite de Puissance")

    # --- SCÉNARIO 2 : RESPECT DE LA LIMITE (PLANIFICATION) ---
    print("\n--- Génération du graphique 2 : Scénario Planifié ---")

    # Les appareils sont décalés pour ne pas fonctionner en même temps
    profil_lv_2 = create_device_profile("19:30", APPAREILS["Lave-vaisselle"]["duration"], APPAREILS["Lave-vaisselle"]["power_kw"], time_index)
    profil_mo_2 = create_device_profile("20:20", APPAREILS["Micro-ondes"]["duration"], APPAREILS["Micro-ondes"]["power_kw"], time_index)

    df_planifie = pd.DataFrame({
        "Lave-vaisselle": profil_lv_2,
        "Micro-ondes": profil_mo_2
    })
    df_planifie["Total"] = df_planifie.sum(axis=1)

    plot_pic_limit_scenario(df_planifie, PIC_LIMIT, "Scénario 2 : Respect de la Limite grâce à la Planification")

--- Génération du graphique 1 : Scénario de Dépassement ---



--- Génération du graphique 2 : Scénario Planifié ---
