In [1]:
import torch
import math
import gc
import numpy as np
import polars as pl
import itertools
import torch.nn.functional as F
from typing import Literal
from dataclasses import dataclass
import warnings

In [2]:
warnings.filterwarnings("ignore")

In [3]:
class LabelledTensor:
    """
    Classe enveloppe pour un tenseur PyTorch avec des étiquettes explicites pour chaque dimension.

    Attributs
    ---------
    tensor : torch.Tensor
        Le tenseur brut.
    dim_labels : list[str]
        Liste ordonnée des noms des dimensions.
    index_to_label : dict[str, list[str]]
        Dictionnaire associant à chaque nom de dimension la liste de ses étiquettes.
    """

    def __init__(self, tensor: torch.Tensor, dim_labels: list[str], index_to_label: dict[str, list[str]]):
        """
        Initialise un objet LabelledTensor.

        Paramètres
        ----------
        tensor : torch.Tensor
            Le tenseur PyTorch à encapsuler.
        dim_labels : list[str]
            Les noms des dimensions du tenseur.
        index_to_label : dict[str, list[str]]
            Les étiquettes associées à chaque dimension.
        """
        self.tensor = tensor
        self.dim_labels = dim_labels
        self.index_to_label = index_to_label

    def to(self, device):
        """
        Déplace le tenseur vers le périphérique spécifié (CPU ou GPU).

        Paramètre
        ---------
        device : str
            'cpu' ou 'cuda' (ou tout autre périphérique reconnu par PyTorch).
        
        Retourne
        --------
        self : LabelledTensor
            L'objet lui-même après déplacement.
        """
        self.tensor = self.tensor.to(device)
        return self

    def __repr__(self):
        """
        Représentation courte de l'objet pour un affichage rapide.
        """
        return f"LabelledTensor(shape={self.tensor.shape}, dims={self.dim_labels})"

    
    def display(self, max_elements: int = 100, filters: dict[str, list[str]] = None) -> pl.DataFrame:
        """
        Affiche une vue lisible du tenseur sous forme d'un DataFrame Polars avec les étiquettes,
        en permettant un filtrage rapide par labels.

        Paramètres
        ----------
        max_elements : int, par défaut 100
            Nombre maximal d'éléments à afficher.
        filters : dict[str, list[str]], optional
            Dictionnaire {nom_dimension: [liste de labels à afficher]}.
            Permet de restreindre l'affichage à certaines étiquettes.

        Retour
        ------
        pl.DataFrame
            Un DataFrame Polars avec les étiquettes et les valeurs du tenseur.
        """
        flat_indices = torch.nonzero(self.tensor, as_tuple=False)

        # Si des filtres sont fournis
        if filters:
            mask = torch.ones(flat_indices.size(0), dtype=torch.bool)

            for dim_name, accepted_labels in filters.items():
                if dim_name not in self.dim_labels:
                    raise ValueError(f"Dimension '{dim_name}' inconnue.")

                dim_idx = self.dim_labels.index(dim_name)
                label_to_index = self.index_to_label[dim_name]

                try:
                    accepted_set = set(label_to_index.index(label) for label in accepted_labels)
                except ValueError as e:
                    raise ValueError(f"Un des labels fournis dans '{dim_name}' est invalide.") from e

                # Utilisation de torch.isin pour un filtrage rapide
                dim_values = flat_indices[:, dim_idx]
                accepted_tensor = torch.tensor(list(accepted_set), device=dim_values.device)
                mask &= torch.isin(dim_values, accepted_tensor)

            flat_indices = flat_indices[mask]

        # Limiter l'affichage
        if flat_indices.size(0) > max_elements:
            print(f"[INFO] Affichage des {max_elements} premiers éléments sur {flat_indices.size(0)} non nuls.")
            flat_indices = flat_indices[:max_elements]

        values = self.tensor[tuple(flat_indices.T)].cpu().tolist()

        # Récupération rapide des étiquettes
        records = []
        for i in range(flat_indices.size(0)):
            labels = [
                self.index_to_label[dim][flat_indices[i, j].item()]
                for j, dim in enumerate(self.dim_labels)
            ]
            records.append((*labels, values[i]))

        columns = self.dim_labels + ["valeur"]
        return pl.DataFrame(records, schema=columns)
    

    def to_dataframe(
        self,
        index_dim: str,
        column_dim: str | None = None,
        index_name: str | None = None,
        value_name: str = "valeur",
        fixed_dims: dict[str, str] = None,
    ) -> pl.DataFrame:
        """
        Convertit un LabelledTensor (1D, 2D ou 3D) en DataFrame Polars.

        Si 3D, nécessite de fixer les dimensions supplémentaires avec `fixed_dims`.

        Paramètres
        ----------
        index_dim : str
            Nom de la dimension pour les lignes.
        column_dim : str, optional
            Nom de la dimension pour les colonnes (requis pour 2D ou 3D).
        index_name : str, optional
            Nom personnalisé de la colonne d'index.
        value_name : str
            Nom de la colonne des valeurs (pour tenseurs 1D).
        fixed_dims : dict[str, str]
            Pour tenseurs 3D : dictionnaire {nom_dim: label_valeur} pour fixer la dimension restante.

        Retour
        ------
        pl.DataFrame
        """
        if index_dim not in self.dim_labels:
            raise ValueError(f"{index_dim} n'est pas une dimension valide.")

        if self.tensor.ndim == 1:
            if column_dim is not None:
                raise ValueError("column_dim doit être None pour les tenseurs 1D.")
            labels = self.index_to_label[index_dim]
            values = self.tensor.cpu().numpy().tolist()
            return pl.DataFrame({
                index_name or index_dim: labels,
                value_name: values
            })

        if self.tensor.ndim == 2:
            if column_dim is None:
                raise ValueError("column_dim est requis pour les tenseurs 2D.")
            if column_dim not in self.dim_labels:
                raise ValueError(f"{column_dim} n'est pas une dimension valide.")
            row_idx = self.dim_labels.index(index_dim)
            col_idx = self.dim_labels.index(column_dim)

            tensor = self.tensor.permute(row_idx, col_idx) if (row_idx, col_idx) != (0, 1) else self.tensor
            row_labels = self.index_to_label[index_dim]
            col_labels = self.index_to_label[column_dim]
            values_np = tensor.cpu().numpy()

            df_dict = {index_name or index_dim: row_labels}
            for j, col in enumerate(col_labels):
                df_dict[col] = values_np[:, j]

            return pl.DataFrame(df_dict)

        if self.tensor.ndim == 3:
            if fixed_dims is None or len(fixed_dims) != 1:
                raise ValueError("Pour les tenseurs 3D, il faut fixer une dimension avec `fixed_dims={dim: label}`.")

            fixed_dim_name, fixed_label = next(iter(fixed_dims.items()))

            if fixed_dim_name not in self.dim_labels:
                raise ValueError(f"Dimension '{fixed_dim_name}' non trouvée dans {self.dim_labels}.")

            dim_indices = {dim: self.dim_labels.index(dim) for dim in self.dim_labels}
            fixed_idx = self.index_to_label[fixed_dim_name].index(fixed_label)

            # Réorganiser pour amener la dimension fixée en dernière position, puis découper (ou extraire)
            perm = [dim_indices[index_dim], dim_indices[column_dim], dim_indices[fixed_dim_name]]
            tensor_perm = self.tensor.permute(perm)  # [i, j, t]
            tensor_2d = tensor_perm[:, :, fixed_idx]  # fix t

            # Préparer le DataFrame
            row_labels = self.index_to_label[index_dim]
            col_labels = self.index_to_label[column_dim]
            values_np = tensor_2d.cpu().numpy()

            df_dict = {index_name or index_dim: row_labels}
            for j, col in enumerate(col_labels):
                df_dict[col] = values_np[:, j]

            return pl.DataFrame(df_dict)

        raise ValueError("Seuls les tenseurs 1D, 2D ou 3D sont pris en charge.")
    

    def _align_and_apply(self, other, op):
        """
        Aligne deux LabelledTensors sur leurs dimensions et labels, 
        puis applique une opération élément par élément.

        Paramètres
        ----------
        other : LabelledTensor
            Le deuxième tenseur à utiliser dans l'opération.
        op : function
            Une fonction PyTorch à appliquer, comme torch.add, torch.mul, etc.

        Retour
        ------
        LabelledTensor
            Le résultat de l'opération entre les deux LabelledTensors, 
            avec conservation des métadonnées.

        Exceptions
        ----------
        TypeError : si `other` n'est pas un LabelledTensor.
        ValueError : si les dimensions ou les étiquettes ne correspondent pas parfaitement.
        """
        # Vérifie que l'autre objet est un LabelledTensor
        if not isinstance(other, LabelledTensor):
            raise TypeError("Opérations valides uniquement entre deux LabelledTensors.")

        # Vérifie que l'ordre et les noms des dimensions correspondent
        if self.dim_labels != other.dim_labels:
            raise ValueError("Les dimensions doivent être dans le même ordre.")

        # Vérifie que les étiquettes des dimensions correspondent une à une
        for dim in self.dim_labels:
            if self.index_to_label[dim] != other.index_to_label[dim]:
                raise ValueError(f"Les labels de la dimension '{dim}' ne correspondent pas.")

        # Applique l'opération élément par élément entre les tenseurs bruts
        result_tensor = op(self.tensor, other.tensor)

        # Retourne un nouveau LabelledTensor avec les mêmes métadonnées
        return LabelledTensor(result_tensor, self.dim_labels, self.index_to_label.copy())


    def __truediv__(self, other):
        """
        Division élément par élément entre deux LabelledTensors alignés.

        Paramètres
        ----------
        other : LabelledTensor

        Retour
        ------
        LabelledTensor
        """
        return self._align_and_apply(other, torch.div)


    def __mul__(self, other):
        """
        Multiplication élément par élément entre deux LabelledTensors alignés.

        Paramètres
        ----------
        other : LabelledTensor

        Retour
        ------
        LabelledTensor
        """
        return self._align_and_apply(other, torch.mul)


    def __add__(self, other):
        """
        Addition élément par élément entre deux LabelledTensors alignés.

        Paramètres
        ----------
        other : LabelledTensor

        Retour
        ------
        LabelledTensor
        """
        return self._align_and_apply(other, torch.add)


    def __sub__(self, other):
        """
        Soustraction élément par élément entre deux LabelledTensors alignés.

        Paramètres
        ----------
        other : LabelledTensor

        Retour
        ------
        LabelledTensor
        """
        return self._align_and_apply(other, torch.sub)

In [4]:
def create_symmetric_matrix(df: pl.DataFrame, device="cpu") -> LabelledTensor:
    """
    Construit une matrice symétrique des temps de parcours à partir d'un DataFrame triangle supérieur.

    Paramètres :
    ------------
    df : pl.DataFrame
        Doit contenir les colonnes "Idloc_start", "Idloc_end", "temps_parcours".
    device : str
        'cpu' ou 'cuda' selon l'appareil souhaité.

    Retour :
    --------
    LabelledTensor
        Matrice [i, j] symétrique, avec labels.
    """
    # Étape 1 : identifiants uniques ordonnés
    unique_locs = pl.concat([df["Idloc_start"], df["Idloc_end"]]).unique().sort()
    # print(unique_locs)
    idx_to_id = unique_locs.to_list()
    id_to_idx = {idloc: idx for idx, idloc in enumerate(idx_to_id)}
    n = len(idx_to_id)

    # Étape 2 : conversion en indices numpy 
    i_idx = df["Idloc_start"].to_numpy()
    j_idx = df["Idloc_end"].to_numpy()
    values = df["temps_parcours"].to_numpy()

    # Étape 3 : mapping des ID vers index
    i_indices = np.vectorize(id_to_idx.get)(i_idx)
    j_indices = np.vectorize(id_to_idx.get)(j_idx)

    # Étape 4 : remplissage de la matrice via vecteurs
    T = torch.full((n, n), float("inf"), dtype=torch.float32, device=device)
    indices = torch.tensor(np.stack([i_indices, j_indices]), device=device)
    distances = torch.tensor(values, dtype=torch.float32, device=device)

    # Triangle supérieur
    T[indices[0], indices[1]] = distances
    # Symétrie
    T[indices[1], indices[0]] = distances
    # Diagonale
    T.fill_diagonal_(0.0)

    return LabelledTensor(T, ["i", "j"], {"i": idx_to_id, "j": idx_to_id})

In [5]:
def create_population_tensor(
    df_pop: pl.DataFrame, 
    idloc_order: list[str], 
    device: str = "cpu", 
    normalization: str = "none"
) -> LabelledTensor:
    """
    Crée un vecteur de population ordonné selon idloc, avec option de normalisation.

    Paramètres
    ----------
    df_pop : pl.DataFrame
        Contient les colonnes "Idloc" et "taille_population".
    idloc_order : list[str]
        Ordre des localités à respecter.
    device : str, default="cpu"
        Appareil cible (ex. "cpu" ou "cuda").
    normalization : str, default="none"
        Méthode de normalisation à appliquer :
            - "none"   : pas de normalisation.
            - "minmax" : (x - min) / (max - min), borné entre 0 et 1.
            - "zscore" : (x - mean) / std, puis rendu positif (shifté par +|min| si nécessaire).

    Retour
    ------
    LabelledTensor
        Vecteur [i] des tailles de population (normalisées ou non).
    """
    # Extraction des valeurs de population selon l'ordre fourni
    raw_values = [
        df_pop.filter(pl.col("Idloc") == loc)["taille_population"][0]
        for loc in idloc_order
    ]
    pop = torch.tensor(raw_values, dtype=torch.float32, device=device)

    # Application d'une normalisation si demandée
    if normalization == "minmax":
        min_val = pop.min()
        max_val = pop.max()
        if max_val > min_val:
            pop = (pop - min_val) / (max_val - min_val)
        else:
            pop = torch.zeros_like(pop)  # évite division par 0

    elif normalization == "zscore":
        mean = pop.mean()
        std = pop.std()
        if std > 0:
            pop = (pop - mean) / std
        else:
            pop = pop - mean  # pas de std → centré uniquement
        # Décalage pour garantir positivité
        min_val = pop.min()
        if min_val < 0:
            pop = pop + (-min_val)

    elif normalization != "none":
        raise ValueError(f"Unknown normalization method: {normalization}")

    return LabelledTensor(pop, ["i"], {"i": idloc_order})

In [6]:
def create_infrastructure_tensor(df: pl.DataFrame, device="cpu") -> LabelledTensor:
    """
    Construit un tenseur D[i, t] représentant les infrastructures disponibles.

    Les noms des colonnes encodent le secteur (k) dans leurs deux premiers caractères.

    Paramètres :
    ------------
    df : pl.DataFrame
        Doit contenir la colonne "idloc" et des colonnes de sous-secteurs.
    device : str
        Appareil.

    Retour :
    --------
    LabelledTensor
        Tenseur [i, t] : localité × sous-secteur.
    """
    idlocs = df["idloc"].to_list()
    df_data = df.drop("idloc")
    sous_secteurs = df_data.columns

    D = torch.tensor(
        df_data.to_numpy(),
        dtype=torch.float32,
        device=device
    )

    return LabelledTensor(D, ["i", "t"], {
        "i": idlocs,
        "t": sous_secteurs
    })


In [7]:
def compute_infra_tensor(infra_tensor: LabelledTensor, prefix: str | list[str] = None) -> LabelledTensor:
    """
    Sélectionne les colonnes t du tenseur S[i, t] correspondant à un ou plusieurs préfixes.
    Le tenseur S[i, t] représente les infrastructures par type (t) et par localité (i).

    Paramètres
    ----------
    infra_tensor: LabelledTensor [i, t]
        Tenseur contenant les infrastructures par type.
    prefix : str ou list[str], optionnel
        Préfixe ou liste de préfixes à filtrer sur l'axe t.
        Si None, toutes les colonnes sont conservées.

    Retour
    ------
    LabelledTensor [i, t]
        Tenseur filtré.
    """
    tensor = infra_tensor.tensor
    labels_t = infra_tensor.index_to_label["t"]
    prefix = [prefix] if isinstance(prefix, str) else prefix

    if prefix is not None:
        keep = [i for i, label in enumerate(labels_t) if any(label.startswith(p) for p in prefix)]
        tensor = tensor[:, keep]
        labels_t = [labels_t[i] for i in keep]

    return LabelledTensor(tensor.clone(), infra_tensor.dim_labels, {
        "i": infra_tensor.index_to_label["i"],
        "t": labels_t
    })


In [8]:
def compute_theoretical_infra_tensor(
    infra_tensor: LabelledTensor,
    population_tensor: LabelledTensor,
    normalization_rules: dict[str, float] = None,
    default_ratio: float = 1.0
) -> LabelledTensor:
    """
    Calcule le nombre théorique d'infrastructures nécessaires par localité et type.

    Pour chaque type t, on suppose qu'une infrastructure est nécessaire pour `normalization_rules[t]` habitants.

    Paramètres
    ----------
    infra_tensor : LabelledTensor [i, t]
        Nombre d'infrastructures observées par localité (i) et secteur (t).
    population : LabelledTensor [i]
        Population par localité.
    normalization_rules : dict[str, float], optionnel
        Règles de normalisation par préfixe de t (ex : {'S': 3000}).
    default_ratio : float, optionnel
        Ratio par défaut si aucune règle ne correspond.

    Retour
    ------
    LabelledTensor [i, t]
        Nombre d'infrastructures théoriques.
    """
    device = infra_tensor.tensor.device
    pop = population_tensor.tensor.view(-1, 1).to(dtype=torch.float32, device=device)

    labels_t = infra_tensor.index_to_label["t"]

    if normalization_rules is None:
        factors = torch.full((1, len(labels_t)), default_ratio, dtype=torch.float32, device=device)
    else:
        factors = []
        for t in labels_t:
            factor = next((v for k, v in normalization_rules.items() if t.startswith(k)), default_ratio)
            factors.append(factor)
        factors = torch.tensor(factors, dtype=torch.float32, device=device).view(1, -1)

    S_theoretical = pop / factors  # [i, t]

    return LabelledTensor(S_theoretical, ["i", "t"], {
        "i": infra_tensor.index_to_label["i"],
        "t": labels_t
    })


In [9]:
def compute_distance_tensor(distance_tensor: LabelledTensor, infra_tensor: LabelledTensor, device="cpu") -> LabelledTensor:
    """
    Construit un tenseur D[i, j, t] des distances inter localités [i, j], ne conservant 
    les distances que vers les localités j possédant une infrastructure de type t.

    Paramètres
    ----------
    distance_tensor : LabelledTensor [i, j]
        Distances entre localités i et j.
    infra_tensor : LabelledTensor [j, t]
        Disponibilité des infrastructures à chaque localité j.
    device : str
        Périphérique ('cpu' ou 'cuda').

    Retour
    ------
    LabelledTensor [i, j, t]
        Distances pondérées.
    """
    dist = distance_tensor.tensor.to(device)
    infra_mask = (infra_tensor.tensor.to(device) > 0).float()

    dist_3d = dist.unsqueeze(-1)          # [i, j, 1]
    mask_3d = infra_mask.unsqueeze(0)     # [1, j, t]

    D_tensor = dist_3d * mask_3d          # [i, j, t]

    return LabelledTensor(D_tensor, ["i", "j", "t"], {
        "i": distance_tensor.index_to_label["i"],
        "j": distance_tensor.index_to_label["j"],
        "t": infra_tensor.index_to_label["t"]
    })


In [10]:
def compute_max_distance(distance_tensor: LabelledTensor) -> LabelledTensor:
    """
    Calcule Dmax[t] = max_ij D[i, j, t], soit la distance maximale observée par type.

    Paramètres
    ----------
    distance_tensor : LabelledTensor [i, j, t]

    Retour
    ------
    LabelledTensor [t]
    """
    distance_max = distance_tensor.tensor.amax(dim=(0, 1))
    return LabelledTensor(distance_max, ["t"], 
                          {"t": distance_tensor.index_to_label["t"]})


In [11]:
def normalize_distance_tensor(distance_tensor: LabelledTensor, distance_max: LabelledTensor) -> LabelledTensor:
    """
    Normalise chaque distance_tensor[i, j, t] par la distance maximale distance_max[t].

    Paramètres
    ----------
    distance_tensor : LabelledTensor [i, j, t]
    distance_max : LabelledTensor [t]

    Retour
    ------
    LabelledTensor [i, j, t]
    """
    D_norm = distance_tensor.tensor / distance_max.tensor.view(1, 1, -1)
    return LabelledTensor(D_norm, distance_tensor.dim_labels, 
                          distance_tensor.index_to_label)


In [12]:
def apply_exponential_decay(distance_tensor: LabelledTensor) -> LabelledTensor:
    """
    Applique une fonction de décroissance sur D[i, j, t] :
        f(x) = (exp(-0.5 * x) - exp(-0.5)) / (1 - exp(-0.5))

    Paramètre
    ---------
    distance_tensor : LabelledTensor [i, j, t]

    Retour
    ------
    LabelledTensor [i, j, t]
        Tenseur transformé.
    """
    x = distance_tensor.tensor
    base = math.exp(-0.5)
    denom = 1.0 - base

    result = x.clone()
    result.mul_(-0.5).exp_().sub_(base).div_(denom)

    return LabelledTensor(result, distance_tensor.dim_labels,
                          distance_tensor.index_to_label)


In [13]:
def restrict_infra(distance_tensor: LabelledTensor, infra_tensor: LabelledTensor, restrict: bool = True) -> LabelledTensor:
    """
    Met à zéro les distances D[i, j, t] là où l'infrastructure [j, t] est absente.

    Paramètres
    ----------
    distance_tensor : LabelledTensor [i, j, t]
    infra_tensor : LabelledTensor [j, t]
    restrict : bool
        Si False, retourne D inchangé.

    Retour
    ------
    LabelledTensor [i, j, t]
    """
    if not restrict:
        return distance_tensor

    mask = (infra_tensor.tensor > 0).float().unsqueeze(0)  # [1, j, t]
    D_masked = distance_tensor.tensor * mask

    return LabelledTensor(D_masked, distance_tensor.dim_labels, 
                          distance_tensor.index_to_label)


In [14]:
def compute_weighted_sum(
    population_tensor: LabelledTensor,
    distance_tensor: LabelledTensor,
    apply_inverse: bool = True,
    transform: str = None,
    a: float = 1.0,
    b: float = 0.0,
    eps: float = 1e-8
) -> LabelledTensor:
    """
    Calcule SOMME[j, t] = ∑_i P[i] * D[i, j, t], avec transformations possibles sur P.

    Paramètres
    ----------
    population_tensor : LabelledTensor [i]
        Poids de chaque localité i (peut être transformé).
    distance_tensor : LabelledTensor [i, j, t]
        Tenseur à pondérer.
    apply_inverse : bool, default=True
        Si True, retourne 1 / SOMME[j, t], avec 0 si somme == 0.
    transform : Optional[str]
        Transformation à appliquer sur la population :
        - "log"        : ln(x + eps)
        - "linear"     : a * x + b
        - "logaffine"  : a * ln(x + eps) + b
        - None         : pas de transformation
    a : float
        Coefficient multiplicatif dans la transformation.
    b : float
        Terme constant dans la transformation.
    eps : float
        Valeur pour éviter log(0).

    Retour
    ------
    LabelledTensor [j, t]
        Tenseur pondéré (ou son inverse sécurisé).
    """
    pop = population_tensor.tensor

    # Transformation de la population si précisé
    if transform == "log":
        pop = torch.log(pop + eps)
    elif transform == "linear":
        pop = a * pop + b
    elif transform == "logaffine":
        pop = a * torch.log(pop + eps) + b

    # Calcul pondéré
    weighted = pop.view(-1, 1, 1) * distance_tensor.tensor  # [i, j, t]
    summed = weighted.sum(dim=0)  # [j, t]

    # Inversion sécurisée
    if apply_inverse:
        summed = torch.where(summed != 0, 1.0 / summed, torch.zeros_like(summed))

    return LabelledTensor(summed, ["j", "t"], {
        "j": distance_tensor.index_to_label["j"],
        "t": distance_tensor.index_to_label["t"]
    })


In [15]:
def compute_weighted_product(infra_tensor: LabelledTensor,
                             distance_tensor: LabelledTensor) -> LabelledTensor:
    """
    Calcule PRODUIT[i, j, t] = S[j, t] * D[i, j, t]².

    Pondération appliquée colonne par colonne (j), pour chaque type t.

    Paramètres
    ----------
    S : LabelledTensor [j, t]
    D : LabelledTensor [i, j, t]

    Retour
    ------
    LabelledTensor [i, j, t]
    """
    D_squared = distance_tensor.tensor.square()
    S_exp = infra_tensor.tensor.unsqueeze(0)  # [1, j, t]
    produit = D_squared * S_exp

    return LabelledTensor(produit, distance_tensor.dim_labels,
                          distance_tensor.index_to_label)


In [16]:
def compute_access_index(
    produit: LabelledTensor,
    inv_sum: LabelledTensor
) -> LabelledTensor:
    """
    Calcule ACCESS[i, t] = ∑_j PRODUIT[i, j, t] * INV_SUM[j, t]

    Paramètres
    ----------
    produit : LabelledTensor [i, j, t]
        Tenseur produit pondéré.
    inv_sum : LabelledTensor [j, t]
        Inverse sécurisé des pondérations (1 / SOMME[j, t]).

    Retour
    ------
    LabelledTensor [i, t]
    """
    weighted = produit.tensor * inv_sum.tensor.unsqueeze(0)
    access = weighted.sum(dim=1)

    return LabelledTensor(access, ["i", "t"], {
        "i": produit.index_to_label["i"],
        "t": produit.index_to_label["t"]
    })

In [17]:
def load_matrix(path_dt: str,
                path_infra: str,
                path_pop: str,
                device: str = "cuda") -> tuple[LabelledTensor, LabelledTensor, LabelledTensor]:
    """
    Charge et construit les matrices LabelledTensor nécessaires au calcul d'accessibilité :
    - Matrice des distances symétrique [i, j]
    - Tenseur des infrastructures [i, t]
    - Vecteur de population [i]

    Paramètres :
    ------------
    path_dt : str
        Chemin vers le fichier Parquet contenant les temps de parcours (triangle supérieur).
    path_infra : str
        Chemin vers le fichier Parquet contenant les infrastructures.
    path_pop : str
        Chemin vers le fichier Parquet contenant les populations.
    device : str
        Périphérique ("cpu" ou "cuda").

    Retour :
    --------
    tuple[LabelledTensor, LabelledTensor, LabelledTensor]
        distance_tensor  : Matrice des temps de parcours [i, j]
        infra_tensor     : Tenseur des infrastructures [i, t]
        population_tensor: Vecteur des populations [i]
    """
    # Lecture des fichiers parquet
    distance = pl.read_parquet(path_dt)
    infrastructures = pl.read_parquet(path_infra)
    population = pl.read_parquet(path_pop)

    # Construction des matrices
    distance_tensor = create_symmetric_matrix(distance, device=device)
    infra_tensor = create_infrastructure_tensor(infrastructures, device=device)
    population_tensor = create_population_tensor(population, population["Idloc"].to_list(),
                                                 device=device)

    return distance_tensor, infra_tensor, population_tensor

In [18]:
path_dt = r"C:\Users\e_koffie\Documents\IAI_Project\SIMULATIONS\Data\dt_matrix_ligne_VF.parquet"
path_infra = r"C:\Users\e_koffie\Documents\IAI_Project\SIMULATIONS\Data\infrastructure_matrix_VF.parquet"
path_pop = r"C:\Users\e_koffie\Documents\IAI_Project\SIMULATIONS\Data\population_matrix.parquet"

In [19]:
distance_tensor, infra_tensor, population_tensor = load_matrix(path_dt, path_infra, 
                                                               path_pop, device="cuda")

infra_tensor = compute_infra_tensor(infra_tensor=infra_tensor, 
                         prefix=["06"])
infra_tensor_th = compute_theoretical_infra_tensor(infra_tensor=infra_tensor,
                                        population_tensor=population_tensor,
                                        default_ratio=2000)

In [20]:
def compute_access_formula(
    distance_tensor: LabelledTensor,
    population_tensor: LabelledTensor,
    infra_tensor: LabelledTensor,
    device: str = "cpu"
) -> LabelledTensor:
    """
    Calcule l'indice final d'accessibilité pour chaque zone et chaque type d'infrastructure.

    Ce calcul suit les étapes suivantes :
    1. Génère D[i, j, t] à partir des distances et de la présence d'infrastructure.
    2. Normalise D par type t.
    3. Applique une fonction de décroissance exponentielle.
    4. Restreint les distances aux seules zones équipées.
    5. Calcule un dénominateur : somme pondérée des distances selon la population.
    6. Calcule un numérateur : produit pondéré des distances au carré selon l'offre.
    7. Calcule l'indice final ACCESS[i, t].

    Paramètres
    ----------
    distance_tensor : LabelledTensor [i, j]
        Matrice des distances entre localités.
    population_tensor : LabelledTensor [i]
        Population par localité.
    infra_tensor : LabelledTensor [j, t]
        Présence d'infrastructures par localité et type.

    Retour
    ------
    LabelledTensor [i, t]
        Indice d'ccessibilité final.
    """

    # Étape 1 — Construction de D[i, j, t]
    print("Construction du tenseur de distance pondérée D[i, j, t]...")
    distance_tensor = compute_distance_tensor(
        distance_tensor=distance_tensor,
        infra_tensor=infra_tensor,
        device=device
    )

    # Étape 2 — Distance maximale par type
    print("Calcul des distances maximales par type d'infrastructure...")
    distance_max = compute_max_distance(distance_tensor)

    # Étape 3 — Normalisation par Dmax[t]
    print("Normalisation des distances...")
    distance_tensor = normalize_distance_tensor(
        distance_tensor=distance_tensor,
        distance_max=distance_max
    )

    # Étape 4 — Application de la décroissance exponentielle
    print("Application de la fonction de décroissance exponentielle...")
    distance_tensor = apply_exponential_decay(distance_tensor)

    # Étape 5 — Masquage des zones sans infrastructure
    print("Restriction aux infrastructures réellement présentes...")
    distance_tensor = restrict_infra(
        distance_tensor=distance_tensor,
        infra_tensor=infra_tensor,
        restrict=True
    )

    # Étape 6 — Calcul du dénominateur : somme pondérée (avec inversion)
    print("Calcul du dénominateur (somme pondérée inversée)...")
    denominator = compute_weighted_sum(
        population_tensor=population_tensor,
        distance_tensor=distance_tensor,
        apply_inverse=True,
        transform="linear",
        a= 1/100000,
        b=0.0
    )

    # Étape 7 — Calcul du numérateur : produit pondéré (avec carré de D)
    print("Calcul du numérateur (produit pondéré)...")
    numerator = compute_weighted_product(
        infra_tensor=infra_tensor,
        distance_tensor=distance_tensor
    )

    # Libération explicite de la mémoire des tenseurs intermédiaires
    del distance_tensor
    del distance_max
    gc.collect()  # Nettoyage mémoire Python

    # Étape 8 — Division pour obtenir l'indice d'accessibilité
    print("Calcul final de l'indice d'accessibilité...")
    index = compute_access_index(numerator, denominator)

    return index

In [21]:
def compute_final_access_index( 
    distance_tensor: LabelledTensor,
    population_tensor: LabelledTensor,
    infra_tensor: LabelledTensor,
    infra_tensor_th: LabelledTensor,
    device: str = "cpu",
    apply_minmax: bool = True
) -> LabelledTensor:
    """
    Calcule un indice d'accessibilité relatif ACCESS / ACCESS_TH, avec normalisation min-max optionnelle.

    Ajoute une étape pour évaluer si chaque ACCESS_TH[i, t] ≥ ACCESS[i, t].

    Paramètres
    ----------
    distance_tensor : LabelledTensor [i, j, t]
        Tenseur des distances pondérées.
    population_tensor : LabelledTensor [i]
        Population par localisation.
    infra_tensor : LabelledTensor [j, t]
        Offre réelle d'infrastructure.
    infra_tensor_th : LabelledTensor [j, t]
        Offre théorique d'infrastructure.
    device : str
        Appareil de calcul ("cpu" ou "cuda").
    apply_minmax : bool
        Si True, applique une normalisation min-max par colonne t.

    Retour
    ------
    LabelledTensor [i, t]
        Indice d'accessibilité normalisé (ou brut).
    """

    # Étape 1 : accessibilité réelle
    access_real = compute_access_formula(
        distance_tensor=distance_tensor,
        population_tensor=population_tensor,
        infra_tensor=infra_tensor,
        device=device
    )

    # Étape 2 : accessibilité théorique
    access_th = compute_access_formula(
        distance_tensor=distance_tensor,
        population_tensor=population_tensor,
        infra_tensor=infra_tensor_th,
        device=device
    )

    # Étape 3 : Évaluation ACCESS_TH ≥ ACCESS
    comparison_mask = access_th.tensor >= access_real.tensor
    count_true = comparison_mask.sum().item()
    total_elements = comparison_mask.numel()
    percentage = 100 * count_true / total_elements

    # Logs détaillés
    print(f"[INFO] Elements where access_th ≥ access_real: {count_true} / {total_elements} "
        f"({percentage:.2f}%)")

    # Étape 4 : ratio ACCESS / ACCESS_TH (sécurisé)
    ratio_tensor = access_real.tensor / access_th.tensor.clamp(min=1e-8)

    # Étape 5 : normalisation min-max par colonne t
    if apply_minmax:
        min_vals, _ = ratio_tensor.min(dim=0, keepdim=True)  # [1, t]
        max_vals, _ = ratio_tensor.max(dim=0, keepdim=True)  # [1, t]
        range_vals = (max_vals - min_vals).clamp(min=1e-8)
        normalized = (ratio_tensor - min_vals) / range_vals
    else:
        normalized = ratio_tensor

    return LabelledTensor(
        normalized,
        dim_labels=access_real.dim_labels,
        index_to_label=access_real.index_to_label
    )

In [24]:
index = compute_final_access_index(
    distance_tensor=distance_tensor,
    population_tensor=population_tensor,
    infra_tensor=infra_tensor,
    infra_tensor_th=infra_tensor_th,
    device="cuda",
    apply_minmax=False)

Construction du tenseur de distance pondérée D[i, j, t]...
Calcul des distances maximales par type d'infrastructure...
Normalisation des distances...
Application de la fonction de décroissance exponentielle...
Restriction aux infrastructures réellement présentes...
Calcul du dénominateur (somme pondérée inversée)...
Calcul du numérateur (produit pondéré)...
Calcul final de l'indice d'accessibilité...
Construction du tenseur de distance pondérée D[i, j, t]...
Calcul des distances maximales par type d'infrastructure...
Normalisation des distances...
Application de la fonction de décroissance exponentielle...
Restriction aux infrastructures réellement présentes...
Calcul du dénominateur (somme pondérée inversée)...
Calcul du numérateur (produit pondéré)...
Calcul final de l'indice d'accessibilité...
[INFO] Elements where access_th ≥ access_real: 112486 / 125940 (89.32%)


In [25]:
index.to_dataframe(
    index_dim="i",
    column_dim="t",
    index_name="Localité",
    value_name="Indice d'accessibilité",
)

Localité,06,06001,060010001,060010002,060010003,060010004,06002,060020001,060020002,060020004,06003,060030001,060030002,060030004,060030005
str,f32,f32,f32,f32,f32,f32,f32,f32,f32,f32,f32,f32,f32,f32,f32
"""010020201001""",0.978221,0.943222,0.041562,0.738847,0.141172,0.021895,0.020847,0.018287,0.002406,0.000135,0.014027,0.000139,0.013352,0.000051,0.000493
"""010020201003""",0.980008,0.944971,0.041686,0.740592,0.141154,0.021564,0.0208,0.018244,0.002396,0.000135,0.014064,0.000135,0.013383,0.000048,0.000499
"""010020201004""",0.980557,0.94551,0.041639,0.741162,0.141363,0.021536,0.020815,0.018248,0.002412,0.000137,0.014104,0.000137,0.013425,0.000051,0.000499
"""010020201005""",0.980784,0.945736,0.04164,0.741412,0.141415,0.021544,0.020827,0.018257,0.002417,0.000137,0.014115,0.000137,0.013437,0.000051,0.000501
"""010020201007""",0.982851,0.947727,0.041787,0.743161,0.141583,0.021387,0.020865,0.018298,0.002413,0.000136,0.014139,0.000136,0.013457,0.00005,0.000506
…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…
"""330840498007""",1.020363,0.984201,0.042069,0.782238,0.142556,0.017126,0.020652,0.017774,0.002797,0.000082,0.015457,0.000098,0.014806,0.000076,0.000493
"""330840498008""",1.017735,0.981608,0.041988,0.780164,0.142469,0.017088,0.020724,0.017832,0.002812,0.000098,0.015445,0.000098,0.014798,0.000078,0.000499
"""330840498009""",1.006459,0.970501,0.041876,0.769085,0.140941,0.018386,0.02088,0.018013,0.002758,0.000108,0.014989,0.000108,0.014349,0.000065,0.000469
"""330840498010""",1.00654,0.970585,0.041878,0.769177,0.140972,0.018396,0.020887,0.018019,0.002761,0.000108,0.014994,0.000108,0.014354,0.000065,0.00047


#### **SIMULATIONS**

In [None]:
# Define the function to update the time-distance matrix
def update_mat_dist_temps(troncons, ancien_type, nouveau_type, data_type):
    # Load the old itinerary matrix for the old type
    path_old_matrix = os.path.join(ITINERAIRES_DIR, f"Itineraire_Matrix_{ancien_type.upper()}")
    Mat_Dist_Temps = pl.from_pandas(Mat_Dist_Temps_0)

    pass_var = "vehicule_lg"*(data_type=="en ligne") + "vehicule_p"*(data_type=="terrain") 

    itineraire_bin = pl.DataFrame()


    # Loop through the files in the directory
    for file in os.listdir(path_old_matrix):
        print(file)

        # Load the parquet file into a Polars DataFrame
        locals()["%s" % file] = pl.read_parquet(os.path.join(path_old_matrix, file))

        # Sort by "Idloc_start"
        locals()["%s" % file] = locals()["%s" % file].sort("Idloc_start")
        locals()["%s" % file] = locals()["%s" % file].with_columns([
                                    pl.col(col).fill_null("").alias(col)
                                    for col in locals()["%s" % file].columns
                                    ])

        # interm_matrix = pl.DataFrame()

        agregated_matrix = locals()["%s" % file].with_columns([
            pl.lit(0).alias(col) if col != "Idloc_start" else pl.col(col)
            for col in locals()["%s" % file].columns
        ])

        # Loop through each column of the current DataFrame
        for segment in troncons:

            long = route_loc[route_loc['gid']==int(segment)]['length'].values[0]
            coef_old = route_loc[route_loc['gid']==int(segment)][pass_var].values[0]
            coef_new = route_loc[route_loc['stat_voie']==nouveau_type.upper()][pass_var].values[0]
        
            time_saved = (coef_new - coef_old) * long

            print(f"time saved for {segment}: ", time_saved)

            interm_matrix = locals()["%s" % file].select([
                pl.col(column).map_elements(lambda x: binary(str(x), segment)).alias(column)
                if column != "Idloc_start" else pl.col(column) for column in 
                locals()["%s" % file].columns])

            interm_matrix[interm_matrix.columns[1:]] = interm_matrix[interm_matrix.columns[1:]]\
                                                       .select(pl.col("*").cast(pl.Int64))

            agregated_matrix[agregated_matrix.columns[1:]] += interm_matrix[interm_matrix.columns[1:]]\
                                                       .select(pl.all() * time_saved)
            
        # Concatenate the intermediate matrix with the global binary itinerary matrix
        itineraire_bin = pl.concat([itineraire_bin, agregated_matrix], 
                                   how="vertical")

    mdt_combined = Mat_Dist_Temps.join(itineraire_bin, on="Idloc_start", how="left")
    updated_columns = [(pl.col(col) + pl.col(f"{col}_right").fill_null(0)).alias(col) 
                       for col in Mat_Dist_Temps.columns if col != "Idloc_start"]
    
    mdt_combined = mdt_combined.with_columns(updated_columns)\
                               .select(Mat_Dist_Temps.columns)
    
    return mdt_combined.to_pandas()


def simulation_voie(secteur, troncons, ancien_type, nouveau_type, data_type="terrain"):

    Mat_Dist_Temps = update_mat_dist_temps(troncons, ancien_type, nouveau_type, data_type)
    print("Mat_Dist_Temps OK")
    print(Mat_Dist_Temps.shape)
    accessibilite = compute_accessibility_index(Mat_Dist_Temps, secteur, list_loc, data_type)
    print("Accessibilte OK")
    eloig = eloignement(Mat_Dist_Temps, secteur, list_loc, data_type)
    print("Eloignement OK")
    accessibilite = accessibilite.merge(list_loc[["idloc_iai","NomLoc","NomReg"]],
                                         on="idloc_iai",how="outer")\
                                  .sort_values(by=["idloc_iai"])
    print("Access format OK")
    eloig = eloig.merge(list_loc[["idloc_iai","NomLoc","NomReg"]],
                                        on="idloc_iai",how="outer")\
                                 .sort_values(by=["idloc_iai"])
    print("Eloignement format OK")

    return accessibilite, eloig