In [1]:
import jax
import jax.numpy as jnp
from jax import lax
from typing import Union
jax.config.update("jax_enable_x64", True)

In [None]:
import jax_dataclasses as jdc

# 1️⃣ Words


In [3]:
import jax
import jax.numpy as jnp
from jax import lax
from typing import Union


# -------------------------------------------------
# Longueur d’un mot (nombre de lettres)
# -------------------------------------------------
@jax.jit
def word_len(word: Union[int, jax.Array]) -> jax.Array:
    """
    Retourne la longueur d’un mot représenté comme un entier.
    Exemples :
        0   -> 0
        1   -> 1
        12  -> 2
        231 -> 3
    """
    return jnp.where(
        word == 0,
        0,
        jnp.floor(jnp.log10(word) + 1e-10) + 1
    ).astype(int)


# -------------------------------------------------
# Nombre total de mots de longueur <= trunc
# -------------------------------------------------
@jax.jit
def number_of_words_up_to_trunc(trunc: Union[int, jax.Array], dim: int) -> jax.Array:
    """
    Calcule le nombre total de mots sur un alphabet de taille `dim`
    dont la longueur est <= trunc.

    Formellement :
        1 + dim + dim^2 + ... + dim^trunc
    """
    return jnp.maximum(
        (dim ** (trunc + 1) - 1) // (dim - 1),
        trunc + 1
    )


# -------------------------------------------------
# Longueur du mot associé à un index
# -------------------------------------------------
@jax.jit
def index_to_word_len(index: Union[int, jax.Array], dim: int) -> jax.Array:
    """
    Donne la longueur du mot correspondant à un index donné.
    """
    return jnp.where(
        dim == 1,
        index,
        jnp.log2(index * (dim - 1) + 1) / jnp.log2(dim) + 1e-10
    ).astype(int)


# -------------------------------------------------
# Index -> mot (reconstruction explicite)
# -------------------------------------------------
@jax.jit
def index_to_word(index: int, dim: int) -> jnp.int64:
    """
    Reconstruit le mot (entier) correspondant à un index.
    """
    index = jnp.asarray(index, dtype=jnp.int64)
    dim = jnp.asarray(dim, dtype=jnp.int64)

    length = jnp.where(
        dim == 1,
        index,
        jnp.floor(jnp.log2(index * (dim - 1) + 1) / jnp.log2(dim) + 1e-10)
    ).astype(jnp.int64)

    index = jnp.where(
        dim == 1,
        0,
        index - (dim ** length - 1) // (dim - 1)
    )

    def body_fun(i, state):
        word, remainder = state
        power = dim ** (length - 1 - i)
        digit = remainder // power
        remainder = remainder % power
        word = 10 * word + (digit + 1)
        return word, remainder

    word, _ = lax.fori_loop(
        lower=0,
        upper=length,
        body_fun=body_fun,
        init_val=(jnp.int64(0), index)
    )

    return word


# -------------------------------------------------
# Somme des lambdas associées à un mot (via index)
# -------------------------------------------------
@jax.jit
def index_to_lam_sum(index: int, dim: int, lam: jax.Array) -> jax.Array:
    """
    Calcule la somme des coefficients lambda correspondant
    aux lettres du mot associé à `index`.
    """
    index = jnp.asarray(index, dtype=jnp.int64)
    dim = jnp.asarray(dim, dtype=jnp.int64)

    length = jnp.where(
        dim == 1,
        index,
        jnp.floor(jnp.log2(index * (dim - 1) + 1) / jnp.log2(dim) + 1e-10)
    ).astype(jnp.int64)

    index = jnp.where(
        dim == 1,
        0,
        index - (dim ** length - 1) // (dim - 1)
    )

    def body_fun(i, state):
        acc, remainder = state
        power = dim ** (length - 1 - i)
        digit = remainder // power
        remainder = remainder % power
        acc = acc + lam[digit]
        return acc, remainder

    acc, _ = lax.fori_loop(
        lower=0,
        upper=length,
        body_fun=body_fun,
        init_val=(0.0, index)
    )

    return acc


# -------------------------------------------------
# Mot -> nombre en base dim
# -------------------------------------------------
@jax.jit
def word_to_base_dim_number(word: int, dim: int) -> int:
    """
    Convertit un mot (ex: 231) en un nombre en base `dim`.
    """
    def cond(state):
        w, _, _ = state
        return w > 0

    def body(state):
        w, acc, p = state
        acc += ((w % 10) - 1) * (dim ** p)
        w //= 10
        p += 1
        return w, acc, p

    _, result, _ = lax.while_loop(cond, body, (word, 0, 0))
    return result


# -------------------------------------------------
# Mot -> index global
# -------------------------------------------------
@jax.jit
def word_to_index(word: int, dim: int) -> int:
    """
    Associe à un mot son index unique dans l’algèbre tensorielle.
    """
    return (
        number_of_words_up_to_trunc(word_len(word) - 1, dim)
        + word_to_base_dim_number(word, dim)
    )


# Vectorisations utiles
word_to_base_dim_number_vect = jax.jit(jax.vmap(word_to_base_dim_number, in_axes=(0, None)))
word_to_index_vect = jax.jit(jax.vmap(word_to_index, in_axes=(0, None)))
index_to_word_vect = jax.jit(jax.vmap(index_to_word, in_axes=(0, None)))
index_to_lam_sum_vect = jax.jit(jax.vmap(index_to_lam_sum, in_axes=(0, None, None)))


# 2️⃣ TensorSequence

In [4]:
from __future__ import annotations

import jax
import jax.numpy as jnp
import jax_dataclasses as jdc
import numpy as np
import matplotlib.pyplot as plt
from typing import Union, Tuple


@jdc.pytree_dataclass

class TensorSequence:
    


    """
    Représente un élément de l’algèbre tensorielle tronquée.

    Mathématiquement :
        T = somme_{|v| ≤ trunc} a_v · v

    où les mots v sont codés implicitement par les indices du tableau `array`.
    """


    array: jax.Array   # Tableau des coefficients (indexés par les mots)
    trunc: int         # Longueur maximale des mots
    dim: int           # Dimension de l’alphabet {1, …, dim}

    # ---------------------------------------------------------
    # Représentation texte (pour affichage)
    # ---------------------------------------------------------
    
    def __repr__(self):
        return str(self.array)

    def __str__(self):
        """
        Affiche la TensorSequence comme une somme formelle :
            1*Ø + 0.5*1 + 0.3*12 + ...
        """
        res = ""
        premier = True

        for i in range(len(self)):
            coef = self.array[i].squeeze()

            if not np.allclose(coef, 0):
                if not premier:
                    res += " + "
                if np.isreal(coef):
                    coef = coef.real
                res += f"{coef}*{index_to_word(i, self.dim)}"
                premier = False

        return res if res else "0"

    
    # ---------------------------------------------------------
    # Taille et vérité logique
    # ---------------------------------------------------------
    
    def __len__(self) -> int:
        """
        Nombre total de coefficients (incluant les zéros).
        """
        return self.array.shape[0]

    def __bool__(self) -> bool:
        """
        Renvoie True si au moins un coefficient est non nul.
        """
        return not np.allclose(self.array, 0)

    # ---------------------------------------------------------
    # Accès direct aux coefficients
    
    # ---------------------------------------------------------
    
    def __getitem__(self, key):
        """
        Accès direct au coefficient d’index `key`.
        """
        return self.array[key]

    def subsequence(self, key: Tuple):
        """
        Extrait une sous-séquence (utile pour les trajectoires).
        """
        return TensorSequence(
            array=self.array[(slice(None), *key)],
            trunc=self.trunc,
            dim=self.dim
        )

    # ---------------------------------------------------------
    # Opérations algébriques de base
    # ---------------------------------------------------------
    def __rmul__(self, c: Union[float, complex, jax.Array]) -> TensorSequence:
        """
        Multiplication scalaire à droite.
        """
        return TensorSequence(self.array * c, self.trunc, self.dim)

    def __mul__(self, c: Union[float, complex, jax.Array]) -> TensorSequence:
        """
        Multiplication scalaire à gauche.
        """
        return self.__rmul__(c)

    def __truediv__(self, c: Union[float, complex, jax.Array]) -> TensorSequence:
        """
        Division par un scalaire.
        """
        return self * (1 / c)

    def __add__(self, ts: TensorSequence) -> TensorSequence:
        """
        Addition de deux TensorSequence.
        """
        return TensorSequence(
            self.array + ts.array,
            trunc=jnp.maximum(self.trunc, ts.trunc),
            dim=self.dim
        )

    def __sub__(self, ts: TensorSequence) -> TensorSequence:
        """
        Soustraction de deux TensorSequence.
        """
        return TensorSequence(
            self.array - ts.array,
            trunc=jnp.maximum(self.trunc, ts.trunc),
            dim=self.dim
        )

    def __matmul__(self, ts: TensorSequence) -> Union[float, jax.Array]:
        """
        Produit scalaire :
            <T1, T2> = somme_v T1[v] * T2[v]
        """
        return jnp.einsum("i..., i... -> ...", self.array, ts.array)

    # ---------------------------------------------------------
    # Propriétés utiles
    # ---------------------------------------------------------
    @property
    def shape(self) -> Tuple[int, ...]:
        """
        Forme du tableau de coefficients.
        """
        return self.array.shape

    # ---------------------------------------------------------
    # Projection sur un mot
    # ---------------------------------------------------------
    @jax.jit
    def proj(self, word: int) -> TensorSequence:
        """
        Projection de la TensorSequence sur un mot donné.

        Mathématiquement :
            Proj_v(T)(u) = T(vu)
        """
        indices = jnp.arange(len(self))
        longueur_mot = word_len(word)
        index_mot = word_to_index(word, self.dim)

        # Masque des indices valides
        masque = (
            ((indices - index_mot) % self.dim ** longueur_mot == 0)
            & (indices >= index_mot)
        )

        longueurs = index_to_word_len(indices, self.dim)

        nouveaux_indices = jnp.where(
            self.dim == 1,
            indices - index_mot,
            (indices - self.dim ** longueurs + 1)
            // self.dim ** longueur_mot
            + self.dim ** (longueurs - longueur_mot) - 1
        )

        nouveaux_indices = jnp.where(masque, nouveaux_indices, len(self) + 1)

        nouvel_array = jnp.zeros_like(self.array)
        nouvel_array = nouvel_array.at[nouveaux_indices].set(
            jnp.where(
                jnp.einsum("i..., i -> i...", jnp.ones_like(self.array), masque),
                jnp.einsum("i..., i -> i...", self.array, masque),
                0
            )
        )

        return TensorSequence(nouvel_array, self.trunc, self.dim)

    # ---------------------------------------------------------
    # Outils internes
    # ---------------------------------------------------------
    @jax.jit
    def get_lengths_array(self) -> jax.Array:
        """
        Renvoie la longueur de chaque mot indexé.
        """
        return index_to_word_len(jnp.arange(len(self)), self.dim)

    @jax.jit
    def get_lambdas_sum_array(self, lam: jax.Array) -> jax.Array:
        """
        Renvoie la somme des lambda associée à chaque mot.
        """
        return index_to_lam_sum_vect(jnp.arange(len(self)), self.dim, lam)

    # ---------------------------------------------------------
    # Visualisation
    # ---------------------------------------------------------
    def plot(self, trunc: int = None, ax: plt.axis = None, **kwargs) -> None:
        """
        Affiche les coefficients de la TensorSequence.
        """
        if trunc is None:
            trunc = self.trunc

        n = number_of_words_up_to_trunc(trunc, self.dim)
        indices = np.arange(n)
        valeurs = np.zeros(n)
        valeurs[:min(n, len(self))] = self.array[:min(n, len(self))]

        if ax is None:
            _, ax = plt.subplots()

        ax.plot(valeurs, "o", **kwargs)
        ax.set_xticks(indices)
        ax.set_xticklabels(
            [str(index_to_word(i, self.dim)) for i in indices],
            rotation=-90
        )
        ax.grid(True)


# Factor.py

In [5]:
# ---------------------------------------------------------
# ZÉRO — élément nul de l’algèbre tensorielle
# ---------------------------------------------------------
def zero(trunc: int, dim: int) -> TensorSequence:
    """
    Crée la TensorSequence nulle.

    Mathématiquement :
        0 = somme_v 0 · v

    Tous les coefficients sont nuls.
    """
    n = number_of_words_up_to_trunc(trunc=trunc, dim=dim)
    array = jnp.zeros(n)
    return TensorSequence(array=array, trunc=trunc, dim=dim)


# ---------------------------------------------------------
# UNITÉ — élément neutre de l’algèbre tensorielle
# ---------------------------------------------------------
def unit(trunc: int, dim: int) -> TensorSequence:
    """
    Crée l’élément unité de l’algèbre tensorielle.

    Mathématiquement :
        1 = 1·Ø

    Le coefficient du mot vide (index 0) vaut 1,
    tous les autres valent 0.
    """
    n = number_of_words_up_to_trunc(trunc=trunc, dim=dim)
    array = jnp.zeros(n)
    array = array.at[0].set(1.0)
    return TensorSequence(array=array, trunc=trunc, dim=dim)


# ---------------------------------------------------------
# ZÉRO "LIKE" — même forme qu’une TensorSequence existante
# ---------------------------------------------------------
@jax.jit
def zero_like(ts: TensorSequence) -> TensorSequence:
    """
    Crée une TensorSequence nulle ayant exactement la même forme
    que `ts`.

    Utile dans les boucles JAX (tensor_prod, shuffle, etc.).
    """
    return TensorSequence(
        array=jnp.zeros_like(ts.array),
        trunc=ts.trunc,
        dim=ts.dim
    )


# ---------------------------------------------------------
# UNITÉ "LIKE" — même contexte qu’une TensorSequence existante
# ---------------------------------------------------------
@jax.jit
def unit_like(ts: TensorSequence) -> TensorSequence:
    """
    Crée l’élément unité dans le même espace tensoriel que `ts`.
    """
    array = jnp.zeros_like(ts.array)
    array = array.at[0].set(1.0)
    return TensorSequence(array=array, trunc=ts.trunc, dim=ts.dim)


# ---------------------------------------------------------
# MOT CANONIQUE — un seul mot avec coefficient 1
# ---------------------------------------------------------
def from_word(word: int, trunc: int, dim: int) -> TensorSequence:
    """
    Crée une TensorSequence correspondant à un mot unique.

    Exemple :
        from_word(12, trunc=3, dim=2)
        → 1·12
    """
    n = number_of_words_up_to_trunc(trunc=trunc, dim=dim)
    array = jnp.zeros(n)

    index = word_to_index(word=word, dim=dim)
    array = array.at[index].set(1.0)

    return TensorSequence(array=array, trunc=trunc, dim=dim)


# ---------------------------------------------------------
# DEPUIS UN DICTIONNAIRE {mot: coefficient}
# ---------------------------------------------------------
def from_dict(word_dict: dict, trunc: int, dim: int) -> TensorSequence:
    """
    Crée une TensorSequence à partir d’un dictionnaire :

        { mot (int) : coefficient }

    Exemple :
        {12: 0.5, 21: -0.2}
    """
    n = number_of_words_up_to_trunc(trunc=trunc, dim=dim)
    array = jnp.zeros(n)

    if not word_dict:
        return TensorSequence(array=array, trunc=trunc, dim=dim)

    for word, coef in word_dict.items():
        index = word_to_index(word=word, dim=dim)
        array = array.at[index].set(coef)

    return TensorSequence(array=array, trunc=trunc, dim=dim)


# ---------------------------------------------------------
# DEPUIS UN TABLEAU BRUT
# ---------------------------------------------------------
def from_array(array: jax.Array, trunc: int, dim: int) -> TensorSequence:
    """
    Crée une TensorSequence à partir d’un tableau de coefficients.

    ⚠️ Si le tableau est plus court que nécessaire,
    il est automatiquement complété par des zéros.
    """
    n = number_of_words_up_to_trunc(trunc=trunc, dim=dim)

    array_ts = jnp.zeros((n,) + array.shape[1:], dtype=array.dtype)
    array_ts = array_ts.at[:array.shape[0]].set(array)

    return TensorSequence(array=array_ts, trunc=trunc, dim=dim)


# Algebra_basis.py

In [6]:
"""
algebra_basis.py
================

Ce fichier définit une BASE CANONIQUE de l’algèbre tensorielle.

L’idée :
    - chaque mot correspond à un vecteur de base
    - ces vecteurs sont créés à la demande (lazy)
    - et mis en cache pour éviter les recalculs

Usage typique :
    basis = AlgebraBasis(dim=2, trunc=3)
    e_12 = basis[12]   # TensorSequence correspondant au mot 12
"""



class AlgebraBasis:
    """
    Base canonique paresseuse de l’algèbre tensorielle.

    basis[word] = TensorSequence correspondant au mot `word`
    """

    def __init__(self, dim: int, trunc: int):
        """
        Initialise la base.

        Paramètres
        ----------
        dim : int
            Dimension de l’alphabet {1, …, dim}
        trunc : int
            Longueur maximale des mots
        """
        self._cache = {}     # dictionnaire : word -> TensorSequence
        self._dim = dim
        self._trunc = trunc

    def __getitem__(self, word: int):
        """
        Accès à l’élément de base associé à `word`.

        Si l’élément n’existe pas encore :
            - il est créé avec from_word
            - stocké dans le cache
            - puis retourné
        """
        if word not in self._cache:
            self._cache[word] = from_word(
                word=word,
                trunc=self._trunc,
                dim=self._dim
            )
        return self._cache[word]

    @property
    def trunc(self):
        """
        Ordre de troncature de la base.
        """
        return self._trunc

    @property
    def dim(self):
        """
        Dimension de l’alphabet.
        """
        return self._dim


# Tensor_prod_word

In [7]:
"""
tensor_product.py
=================

Ce fichier implémente le PRODUIT TENSORIEL dans l’algèbre tensorielle
des mots.

Idée mathématique :
    (u) ⊗ (v) = (uv)   ← concaténation des mots

Si :
    T1 = ∑ a_u · u
    T2 = ∑ b_v · v

Alors :
    T1 ⊗ T2 = ∑ a_u b_v · (uv)
"""

import jax
import jax.numpy as jnp
from typing import Union



# ---------------------------------------------------------
# Produit tensoriel avec un MOT
# ---------------------------------------------------------
@jax.jit
def tensor_prod_word(ts: TensorSequence, word: int) -> TensorSequence:
    """
    Produit tensoriel d’une TensorSequence avec un mot.

    Mathématiquement :
        T ⊗ w = ∑_u a_u · (u w)

    où (u w) est la concaténation du mot u avec le mot w.
    """
    indices = jnp.arange(len(ts))

    # Longueur et position lexicographique du mot
    longueur_mot = word_len(word)
    mot_base = word_to_base_dim_number(word, dim=ts.dim)

    # Longueur des mots correspondant aux indices
    longueurs_indices = index_to_word_len(indices, dim=ts.dim)

    # Position locale des mots (sans les mots plus courts)
    indices_base = indices - number_of_words_up_to_trunc(
        longueurs_indices - 1, ts.dim
    )

    # Calcul du nouvel index correspondant au mot concaténé
    nouveaux_indices = (
        number_of_words_up_to_trunc(
            longueurs_indices + longueur_mot - 1, ts.dim
        )
        + ts.dim ** longueur_mot * indices_base
        + mot_base
    )

    # Création du nouveau tableau de coefficients
    nouvel_array = jnp.zeros_like(ts.array)
    nouvel_array = nouvel_array.at[nouveaux_indices].set(ts.array)

    return TensorSequence(
        array=nouvel_array,
        trunc=ts.trunc,
        dim=ts.dim
    )


# ---------------------------------------------------------
# Produit tensoriel interne avec un index
# ---------------------------------------------------------
@jax.jit
def _tensor_prod_index(
    ts: TensorSequence,
    index: int,
    coefficient: Union[float, jax.Array] = 1.0
) -> TensorSequence:
    """
    Produit tensoriel interne entre une TensorSequence et
    un mot représenté par son index.

    Utilisé dans tensor_prod(ts1, ts2).
    """
    indices = jnp.arange(len(ts))
    dim = ts.dim

    # Longueur du mot associé à l’index
    longueur_autre = index_to_word_len(jnp.array([index]), dim=dim)

    # Position locale du mot
    base_autre = index - number_of_words_up_to_trunc(longueur_autre - 1, dim)

    # Informations sur les mots de ts
    longueurs_indices = index_to_word_len(indices, dim=dim)
    indices_base = indices - number_of_words_up_to_trunc(
        longueurs_indices - 1, dim
    )

    # Nouveaux indices après concaténation
    nouveaux_indices = (
        number_of_words_up_to_trunc(
            longueurs_indices + longueur_autre - 1, dim
        )
        + dim ** longueur_autre * indices_base
        + base_autre
    )

    nouvel_array = jnp.zeros_like(ts.array)
    nouvel_array = nouvel_array.at[nouveaux_indices].set(
        ts.array * coefficient
    )

    return TensorSequence(
        array=nouvel_array,
        trunc=ts.trunc,
        dim=dim
    )


# ---------------------------------------------------------
# Produit tensoriel entre DEUX TensorSequence
# ---------------------------------------------------------
@jax.jit
def tensor_prod(ts1: TensorSequence, ts2: TensorSequence) -> TensorSequence:
    """
    Produit tensoriel de deux TensorSequence.

    Mathématiquement :
        (∑ a_u u) ⊗ (∑ b_v v) = ∑ a_u b_v (u v)
    """

    array2 = ts2.array

    def corps(i, acc):
        coef = array2[i]
        return jax.lax.cond(
            jnp.allclose(coef, 0),
            lambda: acc,
            lambda: acc + _tensor_prod_index(ts1, i, coef)
        )

    resultat = jax.lax.fori_loop(
        lower=0,
        upper=len(ts2),
        body_fun=corps,
        init_val=zero_like(ts1)
    )

    return resultat


# ---------------------------------------------------------
# Puissance tensorielle
# ---------------------------------------------------------
@jax.jit
def tensor_pow(ts: TensorSequence, p: int) -> TensorSequence:
    """
    Calcule la puissance tensorielle :

        ts ⊗ ts ⊗ ... ⊗ ts   (p fois)
    """
    def corps(i, acc):
        return tensor_prod(acc, ts)

    return jax.lax.fori_loop(
        lower=0,
        upper=p,
        body_fun=corps,
        init_val=unit_like(ts)
    )


# ---------------------------------------------------------
# Exponentielle tensorielle
# ---------------------------------------------------------
@jax.jit
def tensor_exp(ts: TensorSequence) -> TensorSequence:
    """
    Calcule l’exponentielle tensorielle :

        exp(ts) = ∑ (ts^⊗n) / n!

    utilisée dans la définition des signatures.
    """
    x = ts - ts[0] * unit_like(ts)

    def corps(n, carry):
        ts_puissance, factorielle, somme = carry
        ts_puissance = tensor_prod(ts_puissance, x)
        factorielle = factorielle * n
        somme = somme + ts_puissance / factorielle
        return ts_puissance, factorielle, somme

    init = (unit_like(ts), 1.0, unit_like(ts))
    _, _, resultat = jax.lax.fori_loop(
        lower=1,
        upper=ts.trunc + 1,
        body_fun=corps,
        init_val=init
    )

    return resultat * jnp.exp(ts[0])


# shuffle_table.py

In [8]:
"""
shuffle_table.py
================

Ce fichier construit la TABLE DE SHUFFLE.

La table de shuffle encode le produit de Chen (produit shuffle)
entre deux mots de l’algèbre tensorielle.

Idée mathématique :
    u ⧢ v = somme de tous les entrelacements possibles
            des lettres de u et v, en respectant l’ordre interne.

Exemple :
    1 ⧢ 2 = 12 + 21
"""

import numpy as np
import jax.numpy as jnp
import numba as nb
from typing import Tuple



# ---------------------------------------------------------
# Construction de la table de shuffle
# ---------------------------------------------------------
def get_shuffle_table(table_trunc: int, dim: int):
    """
    Construit la table de shuffle pour tous les mots
    de longueur ≤ table_trunc.

    La table est utilisée pour accélérer le produit shuffle
    entre deux TensorSequence.

    Retour :
        tableau numpy de forme (4, N) contenant :
        - index gauche
        - index droit
        - index résultat
        - multiplicité (nombre de shuffles identiques)
    """
    # Nombre total de mots
    n_mots = number_of_words_up_to_trunc(table_trunc, dim)

    blocs = []
    taille_totale = 0

    # Toutes les paires d’indices possibles (i, j)
    ij = np.array(
        np.meshgrid(np.arange(n_mots), np.arange(n_mots))
    ).T.reshape(-1, 2)

    # On garde uniquement les paires dont la longueur totale
    # ne dépasse pas la troncature
    longueurs = index_to_word_len(jnp.array(ij), dim=dim).sum(axis=1)
    ij = ij[longueurs <= table_trunc]

    # Pour chaque paire de mots
    for i, j in ij:
        mot_i = int(index_to_word(i, dim))
        mot_j = int(index_to_word(j, dim))

        # Calcul du shuffle au niveau des MOTS
        mots_res, comptes = shuffle_product_words(mot_i, mot_j)

        # Conversion des mots résultats en indices
        indices_res = word_to_index_vect(
            jnp.array(mots_res, dtype=jnp.int64),
            dim
        )

        # Bloc : [i, j, index_resultat, multiplicité]
        bloc = np.zeros((len(comptes), 4), dtype=int)
        bloc[:, 0] = i
        bloc[:, 1] = j
        bloc[:, 2] = indices_res
        bloc[:, 3] = comptes

        taille_totale += bloc.shape[0]
        blocs.append(bloc)

    # Concaténation finale
    table_shuffle = np.vstack(blocs)

    # Format attendu : (4, N)
    return table_shuffle.T


# ---------------------------------------------------------
# Shuffle de deux MOTS (niveau symbolique)
# ---------------------------------------------------------
@nb.jit(nopython=True)
def shuffle_product_words(word_1: int, word_2: int) -> Tuple:
    """
    Calcule le shuffle de deux mots (entiers).

    Retour :
        - tableau des mots résultants
        - tableau des multiplicités associées
    """
    # Cas du mot vide
    if word_1 == 0:
        return np.array([word_2], dtype=np.int64), np.ones(1, dtype=np.int64)
    if word_2 == 0:
        return np.array([word_1], dtype=np.int64), np.ones(1, dtype=np.int64)

    # Longueur des mots
    l1 = int(np.log10(word_1)) + 1
    l2 = int(np.log10(word_2)) + 1

    # Concaténation brute
    mot_concat = word_1 * 10**l2 + word_2

    # Extraction des lettres
    lettres = np.array([
        mot_concat // 10**k % 10
        for k in range(l1 + l2 - 1, -1, -1)
    ])

    # Indices de positions possibles
    indices_gauche = combinations(np.arange(l1 + l2), l1)
    indices_droite = combinations(np.arange(l1 + l2), l2)[::-1]

    # Construction des shuffles
    indices = np.zeros((indices_gauche.shape[0], l1 + l2), dtype=np.int64)
    indices[:, :l1] = indices_gauche
    indices[:, l1:] = indices_droite

    puissances = 10**(l1 + l2 - 1 - indices)
    shuffles = np.sum(puissances * lettres, axis=1)

    # Comptage des duplications
    shuffles_tries = np.sort(shuffles)
    shuffles_tries = np.append(shuffles_tries, -1)

    changements = np.where(np.diff(shuffles_tries) != 0)[0]
    comptes = np.zeros(changements.size, dtype=np.int64)
    comptes[0] = changements[0] + 1
    comptes[1:] = np.diff(changements)

    return shuffles_tries[changements], comptes

# shuffle_product.py

In [9]:
"""
shuffle_product.py
==================

Ce fichier implémente le PRODUIT SHUFFLE (ou produit de Chen)
entre deux TensorSequence.

Idée mathématique :
------------------
Si
    T1 = ∑ a_u · u
    T2 = ∑ b_v · v

alors
    T1 ⧢ T2 = ∑ a_u b_v · (u ⧢ v)

où (u ⧢ v) est la somme de tous les entrelacements possibles
des lettres de u et v, en respectant l’ordre interne de chaque mot.

⚠️ Le calcul combinatoire est coûteux :
→ on utilise une TABLE DE SHUFFLE pré-calculée (shuffle_table).
"""

import jax
import jax.numpy as jnp


# ---------------------------------------------------------
# Calcul du shuffle au niveau des TABLEAUX
# ---------------------------------------------------------
@jax.jit
def _shuffle_prod_array(
    array1: jax.Array,
    array2: jax.Array,
    shuffle_table: jax.Array,
):
    """
    Applique le produit shuffle au niveau des tableaux de coefficients.

    Paramètres
    ----------
    array1, array2 : jax.Array
        Tableaux de coefficients des deux TensorSequence.
    shuffle_table : jax.Array
        Table de shuffle (4, N) :
            - index gauche
            - index droit
            - index résultat
            - multiplicité

    Retour
    ------
    jax.Array
        Tableau de coefficients du produit shuffle.
    """
    index_gauche, index_droit, index_resultat, multiplicite = shuffle_table

    # Contribution de chaque shuffle :
    # multiplicite × coeff_gauche × coeff_droit
    contributions = (
        multiplicite
        * array1[index_gauche]
        * array2[index_droit]
    )

    # Accumulation dans le tableau résultat
    resultat = jnp.zeros_like(array1)
    resultat = resultat.at[index_resultat].add(contributions)

    return resultat


# ---------------------------------------------------------
# Version vectorisée (pour tableaux 2D ou trajectoires)
# ---------------------------------------------------------
_shuffle_prod_array_vect = jax.jit(
    jax.vmap(
        _shuffle_prod_array,
        in_axes=(1, 1, None),
        out_axes=1
    )
)


# ---------------------------------------------------------
# Produit shuffle entre DEUX TensorSequence
# ---------------------------------------------------------
@jax.jit
def shuffle_prod(
    ts1: TensorSequence,
    ts2: TensorSequence,
    shuffle_table: jax.Array
) -> TensorSequence:
    """
    Calcule le produit shuffle de deux TensorSequence.

    Mathématiquement :
        ts1 ⧢ ts2 = ∑ a_u b_v (u ⧢ v)
    """
    array = _shuffle_prod_array(
        ts1.array,
        ts2.array,
        shuffle_table
    )

    return TensorSequence(
        array=array,
        trunc=ts1.trunc,
        dim=ts1.dim
    )


# ---------------------------------------------------------
# Produit shuffle pour TensorSequence 2D (trajectoires)
# ---------------------------------------------------------
@jax.jit
def shuffle_prod_2d(
    ts1: TensorSequence,
    ts2: TensorSequence,
    shuffle_table: jax.Array
) -> TensorSequence:
    """
    Version du produit shuffle pour des TensorSequence
    dont les coefficients dépendent du temps ou d’une autre dimension.

    Typiquement :
        ts.array.shape = (n_mots, n_temps)
    """
    array = _shuffle_prod_array_vect(
        ts1.array.reshape((len(ts1), -1)),
        ts2.array.reshape((len(ts2), -1)),
        shuffle_table
    ).reshape(ts1.array.shape)

    return TensorSequence(
        array=array,
        trunc=ts1.trunc,
        dim=ts1.dim
    )


# ---------------------------------------------------------
# Puissance shuffle
# ---------------------------------------------------------
@jax.jit
def shuffle_pow(
    ts: TensorSequence,
    p: int,
    shuffle_table: jax.Array
) -> TensorSequence:
    """
    Calcule la puissance shuffle :

        ts^{⧢ p} = ts ⧢ ts ⧢ ... ⧢ ts  (p fois)
    """
    def corps(i, acc):
        return shuffle_prod(acc, ts, shuffle_table)

    return jax.lax.fori_loop(
        lower=0,
        upper=p,
        body_fun=corps,
        init_val=unit_like(ts)
    )


# ---------------------------------------------------------
# Exponentielle shuffle
# ---------------------------------------------------------
@jax.jit
def shuffle_exp(
    ts: TensorSequence,
    shuffle_table: jax.Array
) -> TensorSequence:
    """
    Calcule l’exponentielle shuffle :

        exp⧢(ts) = ∑ ts^{⧢n} / n!

    Utilisée dans la théorie des signatures
    (équivalent du développement de Chen).
    """
    # On enlève la composante du mot vide
    x = ts - ts[0] * unit_like(ts)

    def corps(n, carry):
        ts_puissance, factorielle, somme = carry
        ts_puissance = shuffle_prod(ts_puissance, x, shuffle_table)
        factorielle = factorielle * n
        somme = somme + ts_puissance / factorielle
        return ts_puissance, factorielle, somme

    init = (unit_like(ts), 1.0, unit_like(ts))

    _, _, resultat = jax.lax.fori_loop(
        lower=1,
        upper=ts.trunc + 1,
        body_fun=corps,
        init_val=init
    )

    # facteur exp(coefficient du mot vide)
    return resultat * jnp.exp(ts[0])


# Operators

In [10]:
"""
operators.py
============

Ce fichier définit les OPÉRATEURS LINÉAIRES fondamentaux
agissant sur les TensorSequence.

Ces opérateurs sont utilisés pour :
- signatures à mémoire (FM-signatures)
- équations différentielles sur les signatures
- régularisation et résolvantes
"""

import jax
import jax.numpy as jnp


# ---------------------------------------------------------
# Opérateur G
# ---------------------------------------------------------
@jax.jit
def G(ts: TensorSequence, lam: jax.Array) -> TensorSequence:
    """
    Opérateur G : multiplication par la somme des λ du mot.

    Pour chaque mot v :
        (G ts)[v] = λ(v) · ts[v]

    où :
        λ(v) = somme des λ_i pour les lettres du mot v
    """
    lams = ts.get_lambdas_sum_array(lam)
    shape = (-1,) + (1,) * (ts.array.ndim - 1)

    return TensorSequence(
        array=ts.array * lams.reshape(shape),
        trunc=ts.trunc,
        dim=ts.dim
    )


# ---------------------------------------------------------
# Opérateur G^{-1} (pseudo-inverse)
# ---------------------------------------------------------
@jax.jit
def G_inv(ts: TensorSequence, lam: jax.Array) -> TensorSequence:
    """
    Pseudo-inverse de G.

    Pour chaque mot v :
        (G^{-1} ts)[v] = ts[v] / λ(v)   si λ(v) ≠ 0
                         0            sinon

    Le mot vide est toujours envoyé sur 0.
    """
    lams = ts.get_lambdas_sum_array(lam)
    shape = (-1,) + (1,) * (ts.array.ndim - 1)

    coeffs = jnp.where(lams != 0, 1.0 / lams, 0.0)

    return TensorSequence(
        array=ts.array * coeffs.reshape(shape),
        trunc=ts.trunc,
        dim=ts.dim
    )


# ---------------------------------------------------------
# Opérateur de décote D
# ---------------------------------------------------------
@jax.jit
def D(ts: TensorSequence, dt: float, lam: jax.Array) -> TensorSequence:
    """
    Opérateur de décote exponentielle.

    Pour chaque mot v :
        (D ts)[v] = exp(-λ(v) · dt) · ts[v]

    Utilisé pour les signatures à mémoire finie.
    """
    lams = ts.get_lambdas_sum_array(lam)
    shape = (-1,) + (1,) * (ts.array.ndim - 1)

    facteur = jnp.exp(-lams * dt)

    return TensorSequence(
        array=ts.array * facteur.reshape(shape),
        trunc=ts.trunc,
        dim=ts.dim
    )


# ---------------------------------------------------------
# Décote dépendante du temps (vecteur dt)
# ---------------------------------------------------------
@jax.jit
def D_timedep(ts: TensorSequence, dt: jax.Array, lam: jax.Array) -> TensorSequence:
    """
    Version vectorisée de l’opérateur D pour un vecteur de temps.

    Utilisée pour des trajectoires de signatures.
    """
    lams = ts.get_lambdas_sum_array(lam)

    shape = (
        lams.size,
        dt.size
    ) + (1,) * (ts.array.ndim - 2)

    facteur = jnp.exp(-jnp.outer(lams, dt))

    return TensorSequence(
        array=ts.array * facteur.reshape(shape),
        trunc=ts.trunc,
        dim=ts.dim
    )


# ---------------------------------------------------------
# Résolvante de G : (Id + G)^{-1}
# ---------------------------------------------------------
@jax.jit
def G_resolvent(ts: TensorSequence, lam: jax.Array) -> TensorSequence:
    """
    Opérateur résolvante :

        (Id + G)^{-1}

    Pour chaque mot v :
        ts[v] / (1 + λ(v))
    """
    lams = ts.get_lambdas_sum_array(lam)
    shape = (-1,) + (1,) * (ts.array.ndim - 1)

    return TensorSequence(
        array=ts.array * (1.0 / (1.0 + lams)).reshape(shape),
        trunc=ts.trunc,
        dim=ts.dim
    )


# ---------------------------------------------------------
# Schéma semi-intégré
# ---------------------------------------------------------
@jax.jit
def semi_integrated_scheme(
    ts: TensorSequence,
    dt: float,
    lam: jax.Array
) -> TensorSequence:
    """
    Schéma numérique pour l’équation :

        ψ' = -G(ψ) + F

    Implémente :
        G^{-1}(Id - D)
    """
    lams = ts.get_lambdas_sum_array(lam)
    shape = (-1,) + (1,) * (ts.array.ndim - 1)

    coeffs = jnp.where(
        lams != 0,
        (1.0 - jnp.exp(-lams * dt)) / lams,
        dt
    )

    return TensorSequence(
        array=ts.array * coeffs.reshape(shape),
        trunc=ts.trunc,
        dim=ts.dim
    )




##  Opérateur de décote exponentielle pour signatures à mémoire (EFM).

@jax.jit
def Phi(ts: TensorSequence,  dt_alpha: float, lam: jax.Array) -> TensorSequence:
    """
    Opérateur de décote exponentielle pour signatures à mémoire (EFM).

    Pour chaque mot v :
        (Φ ts)[v] = exp( - λ(v) * Δα ) * ts[v]

    où :
        Δα = t^α - s^α
        λ(v) = somme des λ_i correspondant aux lettres du mot v
    """
    # λ(v) pour tous les mots
    lams = ts.get_lambdas_sum_array(lam)

    # reshape pour broadcast
    shape = (-1,) + (1,) * (ts.array.ndim - 1)

    facteur = jnp.exp(-lams * dt_alpha)

    return TensorSequence(
        array=ts.array * facteur.reshape(shape),
        trunc=ts.trunc,
        dim=ts.dim
    )




## Générateur mémoire G_α pour les EFM-signatures

@jax.jit
def G_alpha(ts: TensorSequence, t: float, alpha: float, lam: jax.Array) -> TensorSequence:
    """
    Générateur mémoire G_α pour les EFM-signatures.

    Pour chaque mot v :
        (G_α ts)[v] = α · t^{α-1} · λ(v) · ts[v]

    où :
        λ(v) = somme des λ_i associées aux lettres du mot v
    """
    lams = ts.get_lambdas_sum_array(lam)

    # broadcast
    shape = (-1,) + (1,) * (ts.array.ndim - 1)

    coef = alpha * (t ** (alpha - 1)) * lams

    return TensorSequence(
        array=ts.array * coef.reshape(shape),
        trunc=ts.trunc,
        dim=ts.dim
    )


# Path_signature.py

In [12]:
import numpy as np
from dataclasses import dataclass
from math import factorial
from scipy.special import hyp1f1  # 1F1 de Kummer

# =========================================================
# 0) Outils : taille signature
# =========================================================
def n_words_upto(trunc: int, dim: int) -> int:
    # 1 + d + d^2 + ... + d^trunc
    if dim == 1:
        return trunc + 1
    return (dim**(trunc + 1) - 1) // (dim - 1)

# =========================================================
# 1) Coefficient exact EFM : C(a,b)
# =========================================================
def C_efm(a: float, b: float, lam: float, alpha: float) -> float:
    """
    C(a,b) = exp(-lam*b^alpha) * ( b*1F1(1/a,1+1/a,lam*b^a) - a*1F1(1/a,1+1/a,lam*a^a) ) / (b-a)
    """
    if b <= a:
        return 0.0

    A = 1.0 / alpha
    B = 1.0 + 1.0 / alpha

    term_b = b * hyp1f1(A, B, lam * (b**alpha))
    term_a = a * hyp1f1(A, B, lam * (a**alpha))

    val = np.exp(-lam * (b**alpha)) * (term_b - term_a) / (b - a)

    # sécurité : parfois hyp1f1 renvoie complex (numériquement), on prend la partie réelle
    return float(np.real(val))

# 
# 2) Objet signature : stockage niveau par niveau
# 
@dataclass
class SigLevels:
    levels: list  # levels[n] shape (dim**n,)
    dim: int
    trunc: int

    @staticmethod
    def unit(trunc: int, dim: int) -> "SigLevels":
        levels = [np.array([1.0])]
        for n in range(1, trunc + 1):
            levels.append(np.zeros(dim**n, dtype=float))
        return SigLevels(levels=levels, dim=dim, trunc=trunc)

    def copy(self) -> "SigLevels":
        return SigLevels(levels=[lvl.copy() for lvl in self.levels], dim=self.dim, trunc=self.trunc)

    def to_flat(self) -> np.ndarray:
        return np.concatenate(self.levels, axis=0)

# 
# 3) exp^{⊗}(w) quand w est un vecteur niveau 1
# 
def tensor_exp_level1(w: np.ndarray, trunc: int) -> list:
    """
    exp^{⊗}(w) :
      level0 = 1
      leveln = w^{⊗n}/n!
    """
    w = np.asarray(w, float)
    dim = w.size

    levels = [np.array([1.0])]
    if trunc == 0:
        return levels

    levels.append(w.copy())  # niveau 1
    tp = w.copy()
    for n in range(2, trunc + 1):
        tp = np.kron(tp, w)           # w^{⊗n}
        levels.append(tp / factorial(n))
    return levels

# 
# 4) Produit tensoriel tronqué (Chen)
# 
def tensor_prod(a: SigLevels, b: SigLevels) -> SigLevels:
    """
    (a ⊗ b)_n = sum_{k=0..n} a_k ⊗ b_{n-k}
    """
    assert a.dim == b.dim and a.trunc == b.trunc
    dim, trunc = a.dim, a.trunc
    out = SigLevels.unit(trunc, dim)

    for n in range(0, trunc + 1):
        acc = np.zeros(dim**n, dtype=float)
        for k in range(0, n + 1):
            acc += np.kron(a.levels[k], b.levels[n - k])
        out.levels[n] = acc
    return out

# 
# 5) Phi EFM (décote) — cas "lam constant par lettre"
# 
def Phi_efm(sig: SigLevels, dt_alpha: float, lam_scalar: float) -> SigLevels:
    """
    (Phi sig)^v = exp(-lam(v) * dt_alpha) sig^v

    Ici on suppose lam constant par lettre (ou lam identique sur les dimensions),
    donc lam(v) = |v| * lam_scalar.
    """
    out = sig.copy()
    for n in range(sig.trunc + 1):
        out.levels[n] *= np.exp(-(n * lam_scalar) * dt_alpha)
    return out

# =========================================================
# 6) EFM signature cumulée d'une trajectoire
# =========================================================
def efm_signature_trajectory(
    x: np.ndarray,
    t: np.ndarray,
    trunc: int,
    lam,
    alpha: float,
    return_traj: bool = True
):
    """
    x : (N, dim)
    t : (N,)
    lam : scalaire OU vecteur (dim,) mais supposé identique sur toutes les dims pour C_efm
    return_traj : si True -> renvoie (Sig_final, Sig_traj_flat)
                  Sig_traj_flat shape = (n_features, N)
    """
    x = np.asarray(x, float)
    t = np.asarray(t, float)

    if x.ndim != 2:
        raise ValueError("x doit être de forme (N, dim).")
    if t.ndim != 1 or t.size != x.shape[0]:
        raise ValueError("t doit être de forme (N,) et compatible avec x.")
    if np.any(np.diff(t) < 0):
        raise ValueError("t doit être croissant.")

    N, dim = x.shape
    lam = np.asarray(lam, float).reshape(-1)

    if lam.size == 1:
        lam_scalar = float(lam[0])
    else:
        # on accepte un vecteur seulement s'il est constant (même valeur partout)
        if not np.allclose(lam, lam[0]):
            raise ValueError("Cette version 'const_lam' requiert lam identique sur toutes les dimensions.")
        lam_scalar = float(lam[0])

    # init
    S = SigLevels.unit(trunc, dim)
    n_feat = n_words_upto(trunc, dim)
    traj = np.zeros((n_feat, N), dtype=float)
    traj[:, 0] = S.to_flat()

    # boucle segments
    for k in range(1, N):
        a, b = t[k - 1], t[k]
        if b == a:
            traj[:, k] = S.to_flat()
            continue

        dt_alpha = (b**alpha) - (a**alpha)

        # 1) décote du passé
        S = Phi_efm(S, dt_alpha, lam_scalar)

        # 2) signature du segment (EFM)
        dX = x[k] - x[k - 1]
        v = dX 
        C = C_efm(a, b, lam_scalar, alpha)
        w = C * v

        seg_levels = tensor_exp_level1(w, trunc)
        Sig_k = SigLevels(levels=seg_levels, dim=dim, trunc=trunc)

        # 3) Chen
        S = tensor_prod(S, Sig_k)

        traj[:, k] = S.to_flat()

    if return_traj:
        return S, traj
    return S


# Calcul alpha- EFM

In [13]:
# =========================================================
# 7) Exemple OU -> EFM signature ordre trunc
# =========================================================
if __name__ == "__main__":
    # Exemple : suppose que tu as déjà ton OU (t, x)
    # Ici mini exemple:
    t = np.linspace(0.0, 2.0, 2001)
    x = np.zeros((t.size, 2))
    x[:, 0] = np.sin(t)
    x[:, 1] = np.cos(t)

    trunc = 3
    lam = 1.0
    alpha = 0.2

    Sig_final, Sig_traj = efm_signature_trajectory(x, t, trunc, lam, alpha, return_traj=True)

    print("Sig_traj shape =", Sig_traj.shape)  # (n_features, N)
    print("Signature finale (10 premiers coeffs) =", Sig_traj[:10, -1])

Sig_traj shape = (15, 2001)
Signature finale (10 premiers coeffs) = [ 1.          0.6603398  -1.257294    0.21802433 -0.78924402 -0.04099725
  0.79039411  0.04799005 -0.26293732  0.0047054 ]
