In [2]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers


# -------------------------------------------------------------------
# Bloc de base : un petit réseau feed-forward résiduel (TinyBlock)
# -------------------------------------------------------------------
class TinyBlock(layers.Layer):
    def __init__(self, d):
        super().__init__()
        # Normalisation couche par couche pour stabiliser les activations
        self.ln = layers.LayerNormalization()
        # Première couche dense : expansion dimensionnelle + non-linéarité GELU
        self.fc1 = layers.Dense(4 * d, activation="gelu")
        # Deuxième couche dense : réduction vers la dimension d'origine
        self.fc2 = layers.Dense(d)

    def call(self, u):
        # Normalisation de l'entrée
        h = self.ln(u)
        # Passage à travers les deux couches denses
        h = self.fc1(h)
        h = self.fc2(h)
        # Résiduel : on ajoute l'entrée initiale à la sortie du bloc
        return u + h


# -------------------------------------------------------------------
# Modèle principal TRM : architecture récurrente inspirée des Transformers
# -------------------------------------------------------------------
class TRM(keras.Model):
    def __init__(
        self,
        vocab_size,     # Taille du vocabulaire (pour l'embedding et la sortie)
        d=128,          # Dimension du vecteur d'état
        max_len=16,     # Longueur maximale des séquences
        n_rec=6,        # Nombre d'itérations internes (récurrence fine)
        T=3,            # Nombre d'étapes de propagation avant backprop
        Nsup=8          # Nombre de cycles supervisés (multi-passes)
    ):
        super().__init__()

        # Sauvegarde des hyperparamètres
        self.d = d
        self.n_rec = n_rec
        self.T = T
        self.Nsup = Nsup

        # -------------------------------------------------------------
        # Embeddings : pour encoder les tokens et leurs positions
        # -------------------------------------------------------------
        self.emb = layers.Embedding(vocab_size, d)
        self.pos = self.add_weight(
            shape=(1, max_len, d),
            initializer="random_normal",
            trainable=True,
            name="positional_embeddings"
        )

        # -------------------------------------------------------------
        # États initiaux appris (y0 et z0)
        # Ils représentent les états "mémoire" de départ (appris globalement)
        # -------------------------------------------------------------
        self.y0 = self.add_weight(
            shape=(1, max_len, d),
            initializer="zeros",
            trainable=True,
            name="y0"
        )
        self.z0 = self.add_weight(
            shape=(1, max_len, d),
            initializer="zeros",
            trainable=True,
            name="z0"
        )

        # -------------------------------------------------------------
        # Réseau interne partagé : deux TinyBlocks utilisés partout
        # -------------------------------------------------------------
        self.block1 = TinyBlock(d)
        self.block2 = TinyBlock(d)

        # -------------------------------------------------------------
        # Têtes de sortie :
        # - to_vocab : logits sur le vocabulaire (sortie principale)
        # - halt_head : probabilité de "stop" conditionnelle
        # -------------------------------------------------------------
        self.to_vocab = layers.Dense(vocab_size)
        self.halt_head = layers.Dense(1)

    # -------------------------------------------------------------
    # Petit réseau composé de deux TinyBlocks successifs
    # -------------------------------------------------------------
    def tiny_net(self, u):
        u = self.block1(u)
        u = self.block2(u)
        return u

    # -------------------------------------------------------------
    # Mise à jour des états internes :
    # z dépend de x, y et z
    # y dépend de y et z
    # -------------------------------------------------------------
    def update_z(self, x, y, z):
        u = x + y + z
        return self.tiny_net(u)

    def update_y(self, y, z):
        u = y + z
        return self.tiny_net(u)

    # -------------------------------------------------------------
    # Boucle d’entraînement / inférence principale
    # -------------------------------------------------------------
    def call(self, x_tokens, y_true=None, training=False):
        B = tf.shape(x_tokens)[0]  # Taille du batch
        L = tf.shape(x_tokens)[1]  # Longueur de séquence

        # Embedding des tokens + ajout de l'encodage positionnel
        x = self.emb(x_tokens) + self.pos[:, :L, :]

        # Initialisation des états y et z à partir des tenseurs appris
        y = tf.tile(self.y0[:, :L, :], [B, 1, 1])
        z = tf.tile(self.z0[:, :L, :], [B, 1, 1])

        losses = []  # Stocke les pertes à chaque passe supervisée

        # -----------------------------------------------------------------
        # Boucle principale : Nsup cycles de raffinement successif
        # -----------------------------------------------------------------
        for step in range(self.Nsup):

            # Chaque cycle comporte T étapes, la dernière reçoit un gradient
            for t in range(self.T):

                if t < self.T - 1:
                    # --- Étapes sans gradient ---
                    # Permet au modèle de "réfléchir" sans affecter les poids
                    for _ in range(self.n_rec):
                        z = tf.stop_gradient(self.update_z(x, y, z))
                    y = tf.stop_gradient(self.update_y(y, z))
                else:
                    # --- Dernière étape avec gradient ---
                    # C’est ici que la rétropropagation agit
                    for _ in range(self.n_rec):
                        z = self.update_z(x, y, z)
                    y = self.update_y(y, z)

            # Calcul des logits et de la probabilité d'arrêt
            logits = self.to_vocab(y)
            halt_p = tf.sigmoid(tf.reduce_mean(self.halt_head(y), axis=1))

            # -------------------------------------------------------------
            # Si sortie supervisée (entraînement)
            # Calcul de la perte cross-entropy + halt loss
            # -------------------------------------------------------------
            if y_true is not None:
                # Cross entropy sur le vocabulaire
                ce = tf.reduce_mean(
                    tf.keras.losses.sparse_categorical_crossentropy(
                        y_true, logits, from_logits=True
                    )
                )

                # Prédictions correctes ou non
                pred = tf.argmax(logits, axis=-1)
                correct = tf.reduce_all(tf.equal(pred, y_true), axis=1)
                correct = tf.cast(correct, tf.float32)
                correct = tf.expand_dims(correct, axis=1)

                # Binary cross entropy pour la tête "halt"
                bce = tf.reduce_mean(
                    tf.keras.losses.binary_crossentropy(correct, halt_p)
                )

                # Perte totale pour cette passe
                losses.append(ce + 0.5 * bce)

            # Détachement des états pour le cycle suivant
            y = tf.stop_gradient(y)
            z = tf.stop_gradient(z)

        # -------------------------------------------------------------
        # Sortie :
        # - Si pas de labels : logits + probabilité d'arrêt
        # - Sinon perte moyenne sur toutes les passes
        # -------------------------------------------------------------
        if y_true is None:
            return logits, halt_p

        return tf.add_n(losses) / len(losses)
