# explore_bis

Notebook de démonstration pour la brique *encoder* désormais packagée dans `hdc_project.encoder`.
Nous importons directement les modules M0–M8 depuis `src/` et rejouons les vérifications clés
présentes dans `explore.ipynb`.

## Initialisation du chemin `src`

On s'assure que `src/` est visible dans `sys.path` afin d'importer le package.

In [1]:
from pathlib import Path
import sys

ROOT = Path.cwd().parent
SRC = ROOT / "src"
if str(SRC) not in sys.path:
    sys.path.insert(0, str(SRC))
print(f"src path registered: {SRC}")

src path registered: /Users/aymenmejri/Desktop/MyCode/experiments/hdc_v2/hdc_project/src


## Imports des modules packagés

Les modules numérotés `M0` à `M7` et le pipeline `M8` sont exposés via `hdc_project.encoder`.

In [2]:
import numpy as np

from hdc_project.encoder import m0, m1, m2, m3, m4, m5, m6, m7, pipeline
import math
from typing import Optional, Sequence, Tuple, Any, Dict

import time
import heapq

## Vérification statistique de la clé Rademacher (M0 & M1)

On vérifie que deux clés indépendantes ont une similarité proche de 0 et que la probabilité de 
queues est conforme à la borne de Hoeffding.

In [3]:
D = 2048
n_pairs = 800
rng = np.random.default_rng(42)

sims = []
for _ in range(n_pairs):
    J = m0.M0_NewKey(int(rng.integers(0, 2**31 - 1)), D)
    Jp = m0.M0_NewKey(int(rng.integers(0, 2**31 - 1)), D)
    sims.append(m1.M1_sim(J, Jp))

sims = np.asarray(sims)
mean_sim = float(sims.mean())
tail_prob = float((np.abs(sims) > 0.1).mean())
hoeff = 2.0 * np.exp(-D * 0.1**2 / 2.0)

print(f"mean(sim) = {mean_sim:.4f}")
print(f"P(|sim| > 0.1) = {tail_prob:.4e} (bound={hoeff:.4e})")

mean(sim) = 0.0013
P(|sim| > 0.1) = 0.0000e+00 (bound=7.1426e-05)


## Encodage de phrases avec le pipeline `M8_ENC`

On utilise le lexique `M4`, la permutation `M2` et le pipeline `M8` pour encoder quelques phrases,
puis on calcule les métriques de validation décrites dans le notebook original.

In [4]:
D = 4096
n = 3
rng = np.random.default_rng(123)

sentences = [
    "hyperdimensional computing is fun",
    "vector symbolic architectures are powerful",
    "encoding words into hyperspace"
]

Lex = m4.M4_LexEN_new(seed=1, D=D)
pi = rng.permutation(D).astype(np.int64)
encoded = pipeline.encode_corpus_ENC(sentences, Lex, pi, D, n, seg_seed0=999)

E_list = [entry["E_seq"] for entry in encoded]
H_list = [entry["H"] for entry in encoded]

print(f"Nombre de segments encodés: {len(encoded)}")
print(f"Signature shape: {H_list[0].shape}, dtype: {H_list[0].dtype}")
print(f"Accumulateur dtype: {encoded[0]['S'].dtype}")

Nombre de segments encodés: 3
Signature shape: (4096,), dtype: int8
Accumulateur dtype: int16


## Métriques de similarité et courbes de majorité

Nous reproduisons les indicateurs `intra/inter` ainsi que les courbes d'erreur de majorité.

In [5]:
s_intra, s_inter = pipeline.intra_inter_ngram_sims(E_list, D)
inter_seg = pipeline.inter_segment_similarity(H_list)

print(f"Similarité intra n-gram (moyenne): {s_intra:.4f}")
print(f"Similarité inter n-gram |.|: {s_inter:.4f}")
print(f"Similarité inter segments |.|: {inter_seg:.4f}")

maj_curves = pipeline.majority_error_curve(E_list, pi, D, eta_list=(0.0, 0.05))
print("Majority error curve @eta=0.0:", maj_curves[0.0][:3])

maj_repeat = pipeline.majority_curve_repeated_vector(E_list, pi, D, eta_list=(0.0, 0.05), trials_per_m=500)
print("Repeated-vector curve @eta=0.05:", maj_repeat[0.05][:3])

Similarité intra n-gram (moyenne): 0.0008
Similarité inter n-gram |.|: 0.0117
Similarité inter segments |.|: 0.0173
Majority error curve @eta=0.0: [(4, 0.09375), (5, 0.0)]
Repeated-vector curve @eta=0.05: [(4, 0.012), (5, 0.0)]


## Encodage fin : comparaison strict vs unbiased

`M8_ENC` permet de choisir entre majorité strict (`strict`) et majority sans biais (`unbiased`).

In [6]:
tokens = "time flies like an arrow".split()

E_uni, X_uni, Xb_uni, S_uni, H_uni = pipeline.M8_ENC(
    tokens, pi, n=2, LexEN=Lex, D=D,
    majority_mode="unbiased", return_bound=True
)
E_strict, X_strict, S_strict, H_strict = pipeline.M8_ENC(
    tokens, pi, n=2, LexEN=Lex, D=D,
    majority_mode="strict", return_bound=False
)

print(f"#E (unbiased) = {len(E_uni)}, #X = {len(X_uni)}, #Xb = {len(Xb_uni)}")
print(f"H_unbiased unique values: {sorted(set(H_uni.tolist()))}")
print(f"H_strict unique values: {sorted(set(H_strict.tolist()))}")

#E (unbiased) = 5, #X = 5, #Xb = 5
H_unbiased unique values: [-1, 1]
H_strict unique values: [-1, 1]


# MEM . Part 2 : 

In [7]:
from __future__ import annotations
from dataclasses import dataclass
from typing import Iterable, Tuple
import numpy as np

HD = np.int8   # vecteurs HD en ±1 stockés en int8

@dataclass(frozen=True)
class ENFRPair:
    Z_en: np.ndarray  # shape=(D,), dtype=int8, valeurs {-1,+1}
    Z_fr: np.ndarray  # shape=(D,), dtype=int8, valeurs {-1,+1}

def check_pair(pair: ENFRPair) -> None:
    """Valide qu'une paire (Z_en, Z_fr) respecte le contrat : 
       même shape, dtype=int8, valeurs dans {-1,+1}."""
    z_en, z_fr = pair.Z_en, pair.Z_fr
    if z_en.dtype != HD or z_fr.dtype != HD or z_en.shape != z_fr.shape:
        raise ValueError("Shapes/Types incohérents : attendu (D,), dtype=int8 pour EN et FR.")
    # Vérification stricte des valeurs (optionnelle, coûteuse si gros corpus)
    # if not (np.all((z_en == 1)|(z_en == -1)) and np.all((z_fr == 1)|(z_fr == -1))):
    #     raise ValueError("Valeurs attendues : {-1,+1}.")

In [8]:
def bind_tranche(X: np.ndarray, G: np.ndarray) -> np.ndarray:
    """Applique le binding de tranche :
       - X et G : vecteurs ±1 en int8
       - retourne X ⊗ G en int8
       - garantit isométrie & involutivité
    """
    if X.dtype != np.int8 or G.dtype != np.int8:
        raise ValueError("X,G doivent être en int8 (±1).")
    if X.shape != G.shape:
        raise ValueError("X et G doivent avoir la même shape.")
    # int16 transitoire pour éviter débordement intermédiaire
    return (X.astype(np.int16) * G.astype(np.int16)).astype(np.int8, copy=False)

In [None]:
@dataclass
class MemComponents:
    mem: MemBank
    lsh: SignLSH
    Gmem: np.ndarray
    meta: dict

def _lsh_bucket(lsh: SignLSH, z_mem: np.ndarray, B: int) -> int:
    """Bucketisation robuste vers [0..B-1].
       Préfère lsh.bucket_unbiased(z,B) si disponible, sinon fallback modulo."""
    if hasattr(lsh, "bucket_unbiased"):
        return int(lsh.bucket_unbiased(z_mem, B))
    code = int(lsh.code(z_mem))
    return code % int(B)

def train_one_pass_MEM(components: MemComponents,
                       pairs_en_fr: Iterable[Tuple[np.ndarray, np.ndarray]]) -> None:
    """Chaîne MM6 complète: bind→LSH→bucket→update (cf. MM6, MM5, MM3, MM4)."""
    mem, lsh, Gmem = components.mem, components.lsh, components.Gmem
    D = int(Gmem.shape[0])
    if Gmem.dtype != np.int8:
        raise ValueError("Gmem doit être en int8 (±1).")
    for Z_en, Z_fr in pairs_en_fr:
        Z_en = Z_en.astype(np.int8, copy=False)
        Z_fr = Z_fr.astype(np.int8, copy=False)
        if Z_en.shape != (D,) or Z_fr.shape != (D,):
            raise ValueError("Z_en, Z_fr doivent avoir shape=(D,).")
        Z_en_mem = to_mem_tranche(Z_en, Gmem)              # MM5
        c = _lsh_bucket(lsh, Z_en_mem, mem.B)              # *** FIX: bucketisation ***
        mem.add(c, Z_fr)    

In [10]:
@dataclass(frozen=True)
class SignLSH:
    idx_bits: np.ndarray  # (k,) indices uniques [0..D-1], dtype=int64

    @property
    def k(self) -> int:
        return int(self.idx_bits.shape[0])

    @staticmethod
    def with_k_bits(D: int, k: int, seed: Optional[int] = None) -> "SignLSH":
        assert 1 <= k <= D
        g = np.random.default_rng(seed)
        bits = g.choice(D, size=int(k), replace=False)
        return SignLSH(idx_bits=bits.astype(np.int64, copy=False))

    def code(self, z_mem: np.ndarray) -> int:
        z = z_mem if z_mem.dtype == np.int8 else z_mem.astype(np.int8, copy=False)
        b = (z[self.idx_bits] > 0).astype(np.uint8, copy=False)
        c = 0
        for bit in b: c = (c << 1) | int(bit)
        return int(c)  # c in [0, 2^k)

    def bucket_mod(self, z_mem: np.ndarray, B: int) -> int:
        """Bucketisation simple: modulo (léger biais si 2^k % B != 0)."""
        return self.code(z_mem) % int(B)

    def bucket_unbiased(self, z_mem: np.ndarray, B: int) -> int:
        """Réduction quasi-sans biais: floor(code * B / 2^k)."""
        code = self.code(z_mem)
        return int((code * int(B)) >> self.k)

def _mix32(x: int) -> int:
    """Mix 32-bit (xorshift-like) pour mieux répartir les codes avant modulo."""
    x ^= (x << 13) & 0xFFFFFFFF
    x ^= (x >> 17)
    x ^= (x << 5)  & 0xFFFFFFFF
    return x & 0xFFFFFFFF

def code_to_bucket(code: int, B: int) -> int:
    """Mappe un code entier vers un bucket [0..B-1] via mix léger + modulo."""
    if B <= 0:
        raise ValueError("B doit être > 0")
    return _mix32(code) % int(B)

In [11]:
def bind_tranche_batch(X: np.ndarray, G: np.ndarray) -> np.ndarray:
    """Binding de tranche compatible batch.
    
    Args:
        X : (D,) ou (n,D) int8 ±1
        G : (D,) int8 ±1
    Returns:
        Y : même shape que X, en int8, tel que Y = X ⊗ G (broadcast sur l'axe n)
    
    Invariants:
        - types contrôlés (int8)
        - cast transitoire en int16 pour éviter overflow
        - retour en int8, sans copies inutiles
    """
    if G.dtype != np.int8:
        raise ValueError("G doit être en int8 (±1).")
    if X.dtype != np.int8:
        raise ValueError("X doit être en int8 (±1).")
    if G.ndim != 1:
        raise ValueError("G doit avoir shape (D,), pas de dimension batch.")
    if X.ndim == 1:
        if X.shape != G.shape:
            raise ValueError("X et G doivent avoir la même shape (D,) en 1D.")
        return (X.astype(np.int16) * G.astype(np.int16)).astype(np.int8, copy=False)
    elif X.ndim == 2:
        n, D = X.shape
        if G.shape != (D,):
            raise ValueError("Pour X (n,D), G doit avoir shape (D,).")
        return (X.astype(np.int16) * G.astype(np.int16)[None, :]).astype(np.int8, copy=False)
    else:
        raise ValueError("X doit être (D,) ou (n,D).")

In [12]:
def _rand_pm1(n: int, D: int, seed: int) -> np.ndarray:
    """Génère n vecteurs ±1 (int8) de dimension D, 
       reproductibles via un seed."""
    g = np.random.default_rng(seed)
    B = g.integers(0, 2, size=(n, D), dtype=np.int8)
    return (B << 1) - 1   # map {0,1} -> {-1,+1}

def test_isometrie_involutivite(D=8192, trials=100, seed=0):
    """Vérifie que :
       - le binding préserve les produits scalaires (isométrie)
       - le binding répété rend l’original (involutivité)
       Version compatible avec bind_tranche_batch (1D).
    """
    g = np.random.default_rng(seed)
    for t in range(trials):
        X = _rand_pm1(1, D, seed+2*t)[0]   # (D,)
        Y = _rand_pm1(1, D, seed+2*t+1)[0] # (D,)
        G = _rand_pm1(1, D, seed+3*t)[0]   # (D,)

        # produit scalaire de référence
        dot0 = int((X.astype(np.int32) * Y.astype(np.int32)).sum())

        # après binding (utilisation 1D de la version batch-safe)
        Xg = bind_tranche_batch(X, G)      # (D,)
        Yg = bind_tranche_batch(Y, G)      # (D,)
        dot1 = int((Xg.astype(np.int32) * Yg.astype(np.int32)).sum())
        assert dot0 == dot1, "Isométrie violée"

        # involutivité : (X ⊗ G) ⊗ G == X
        assert np.array_equal(bind_tranche_batch(Xg, G), X), "Involutivité violée"
    return True


def test_etancheite_inter_tranches(D=16384, n=4000, eps=0.05, seed=7):
    """Teste l’indépendance statistique de deux bindings 
       avec clés de tranche différentes (étanchéité), version batch.
    """
    X  = _rand_pm1(n, D, seed)            # (n,D)
    G  = _rand_pm1(1, D, seed+1)[0]       # (D,)
    Gp = _rand_pm1(1, D, seed+2)[0]       # (D,)

    # Binding batch-safe (vectorisé)
    Xg  = bind_tranche_batch(X,  G)       # (n,D)
    Xgp = bind_tranche_batch(X, Gp)       # (n,D)

    # sim_i = <Xg_i, Xgp_i>/D
    sims = ((Xg.astype(np.int32) * Xgp.astype(np.int32)).sum(axis=1) / D).astype(np.float64)
    mean, tail = float(sims.mean()), float((np.abs(sims) > eps).mean())
    bound = 2.0 * math.exp(- D * eps * eps / 2.0)

    assert abs(mean) < 1e-2 + 1e-3, "Centrage inter-tranches anormal"
    assert tail <= bound + 1e-6,    "Queue empirique > borne de Hoeffding"
    return {"mean": mean, "tail": tail, "bound": bound}


def test_indexer_post_binding(D=16384, B=1000, k=24, seed=11):
    """Vérifie que l’indexeur Sign-LSH appliqué après binding
       garde un taux de collisions acceptable (<0.5%).
    """
    slsh = SignLSH.with_k_bits(D, k, seed)    # indexeur aléatoire k-bits
    Z  = _rand_pm1(B, D, seed+1)              # (B,D)
    G  = _rand_pm1(1, D, seed+2)[0]           # (D,)

    # Binding batch-safe pour tout le lot
    Zg = bind_tranche_batch(Z, G)             # (B,D)

    # Codes LSH puis estimation du taux de collisions
    codes = np.array([slsh.code(zg) for zg in Zg], dtype=np.int64)
    uniq = np.unique(codes).size
    coll = 1.0 - uniq / B
    assert coll <= 0.005 + 1e-6, f"Collisions élevées: {coll:.3%}"
    return {"collisions": float(coll), "uniq": int(uniq)}

In [13]:
test_isometrie_involutivite()
test_etancheite_inter_tranches()
test_indexer_post_binding()

{'collisions': 0.0, 'uniq': 1000}

## MM3. 

In [14]:
@dataclass
class MultiSignLSH:
    """T indexeurs signe-LSH indépendants; fusion XOR des codes k-bits."""
    tables: List[SignLSH]

    @staticmethod
    def build(D: int, k: int, T: int, seed: Optional[int] = None) -> "MultiSignLSH":
        g = np.random.default_rng(seed)
        # seed dérivés pour assurer indépendance
        tables = [SignLSH.with_k_bits(D, k, int(g.integers(0, 2**31-1))) for _ in range(T)]
        return MultiSignLSH(tables=tables)

    def code(self, z_mem: np.ndarray) -> int:
        c = 0
        for t in self.tables:
            c ^= t.code(z_mem)  # fusion XOR
        return int(c)

    def bucket(self, z_mem: np.ndarray, B: int) -> int:
        return code_to_bucket(self.code(z_mem), B)

In [15]:
def _rand_pm1(n: int, D: int, seed: int) -> np.ndarray:
    g = np.random.default_rng(seed)
    B = g.integers(0, 2, size=(n, D), dtype=np.int8)
    return (B << 1) - 1  # {-1,+1}

# --- (T1) Uniformité empirique des codes (sans bucket) ---
def test_code_uniformity(D=16384, k=24, N=10000, seed=0):
    slsh = SignLSH.with_k_bits(D, k, seed)
    Z = _rand_pm1(N, D, seed+1)
    codes = np.array([slsh.code(z) for z in Z], dtype=np.int64)
    # Mesure rudimentaire : taux de remplissage vs 2^k (si N << 2^k, on s'attend à peu de collisions)
    uniq = np.unique(codes).size
    fill = uniq / min(N, 2**k)  # fraction de cases distinctes observées
    return {"uniq": int(uniq), "fill_fraction": float(fill)}

# --- (T2) Collisions vs k (bucket sur B) ---
def test_bucket_collisions(D=16384, B=1000, k_list=(16,24,32), N=20000, seed=1):
    g = np.random.default_rng(seed)
    Z = _rand_pm1(N, D, seed+1)
    results = {}
    for k in k_list:
        slsh = SignLSH.with_k_bits(D, k, int(g.integers(0, 2**31-1)))
        buckets = np.array([code_to_bucket(slsh.code(z), B) for z in Z], dtype=np.int32)
        # Collisions = 1 - (nb de buckets distincts / min(N,B))
        uniq = np.unique(buckets).size
        coll_frac = 1.0 - (uniq / min(N, B))
        results[int(k)] = {"uniq_buckets": int(uniq), "collision_fraction": float(coll_frac)}
    return results

# --- (T3) Invariance sous binding (indexer après binding) ---
def test_invariance_binding(D=16384, k=24, N=2000, seed=2):
    slsh = SignLSH.with_k_bits(D, k, seed)
    Z  = _rand_pm1(N, D, seed+1)
    G  = _rand_pm1(1, D, seed+2)[0]
    # codes sur Z⊗G et sur Z puis (option) mix (la distribution globale doit rester comparable)
    from copy import deepcopy
    Zg = (Z.astype(np.int16) * G.astype(np.int16)).astype(np.int8, copy=False)
    codes_Z  = np.array([slsh.code(z)  for z in Z],  dtype=np.int64)
    codes_Zg = np.array([slsh.code(zg) for zg in Zg], dtype=np.int64)
    # On mesure le taux d'accord bit-à-bit (Hamming) moyen entre codes_Z et codes_Zg :
    # (statistique indicative; attendre ~0.5 si indices choisis aléatoirement et G Rademacher)
    # Ici on vérifie surtout l'absence de biais massif (mêmes distributions marginales).
    agree = np.mean(codes_Z == codes_Zg)
    return {"match_fraction": float(agree)}

# --- (T4) Robustesse au bruit (flips coordonnés à taux q) ---
def test_noise_stability(D=16384, k=24, q=0.01, N=2000, seed=3):
    g  = np.random.default_rng(seed)
    slsh = SignLSH.with_k_bits(D, k, seed+1)
    Z  = _rand_pm1(N, D, seed+2)
    # génère Z' en flipant chaque coordonnée avec prob q
    flips = g.random((N, D)) < q
    Zp = Z.copy()
    Zp[flips] = -Zp[flips]
    codes_Z  = np.array([slsh.code(z)  for z in Z],  dtype=np.int64)
    codes_Zp = np.array([slsh.code(zp) for zp in Zp], dtype=np.int64)
    # Taux d'accord des codes k-bits sous bruit
    agree = np.mean(codes_Z == codes_Zp)
    # Référence théorique ~ (1 - q)^k sous indépendance (approx.)
    ref = (1.0 - q) ** k
    return {"empirical_agreement": float(agree), "theoretical_ref": float(ref)}

In [16]:
test_code_uniformity()
test_bucket_collisions()
test_invariance_binding()
test_noise_stability()

{'empirical_agreement': 0.772, 'theoretical_ref': 0.7856781408072188}

## MM4.

In [17]:
class MemBank:
    """Banque associative HD (tranche G_MEM).
    
    - M : accumulateurs int32 (somme des payloads)
    - H : prototypes seuillés en ±1 (majorité coordonnée)
    - n : compte d'exemples par classe (m_c)
    """

    def __init__(self, B: int, D: int, thresh: bool = True) -> None:
        assert B > 0 and D > 0
        self.B, self.D, self.thresh = int(B), int(D), bool(thresh)
        self.M = np.zeros((B, D), dtype=np.int32)
        self.H = np.zeros((B, D), dtype=np.int8)
        self.n = np.zeros((B,),   dtype=np.int32)

    @staticmethod
    def _check_pm1(x: np.ndarray, D: int) -> None:
        if x.dtype != np.int8 or x.shape != (D,):
            raise ValueError("HD attendu: shape=(D,), dtype=int8")
        # Optionnel: vérifier que les valeurs sont bien ±1
        # if not np.all((x == 1) | (x == -1)): raise ValueError("valeurs {-1,+1} attendues")

    def add(self, c: int, Z_fr: np.ndarray) -> None:
        """One-pass: M_c += Z_fr; n_c += 1; (option) H_c <- sign(M_c)."""
        if not (0 <= c < self.B): raise IndexError("classe hors bornes")
        MemBank._check_pm1(Z_fr, self.D)
        self.M[c, :] += Z_fr.astype(np.int32, copy=False)
        self.n[c] += 1
        if self.thresh:
            self.H[c, :] = np.where(self.M[c, :] >= 0, 1, -1).astype(np.int8, copy=False)

    def seal(self, c: int) -> None:
        """Scellement explicite: H_c <- sign(M_c)."""
        self.H[c, :] = np.where(self.M[c, :] >= 0, 1, -1).astype(np.int8, copy=False)

    # ----- Utilitaires pédagogiques (LLN & majorité) -----
    def empirical_mean(self, c: int) -> np.ndarray:
        """Retourne m_c^{-1} M_c en float64 (nan si m_c=0)."""
        if self.n[c] == 0:
            return np.full((self.D,), np.nan, dtype=np.float64)
        return (self.M[c, :].astype(np.float64) / float(self.n[c]))

    def sign_error_rate(self, c: int, mu_true: np.ndarray) -> float:
        """Pour une vérité de référence mu_true (±biais par coordonnée),
           retourne la fraction de bits où sign(M_c) ≠ sign(mu_true)."""
        if mu_true.shape != (self.D,):
            raise ValueError("mu_true: shape=(D,) requis")
        Hc = np.where(self.M[c, :] >= 0, 1, -1).astype(np.int8, copy=False)
        ref = np.where(mu_true >= 0, 1, -1).astype(np.int8, copy=False)
        return float(np.mean(Hc != ref))

    def inf_norm_error(self, c: int, mu_true: np.ndarray) -> float:
        """Retourne || m_c^{-1} M_c - mu_true ||_∞ en float64."""
        return float(np.max(np.abs(self.empirical_mean(c) - mu_true)))

In [18]:
def sample_fr_payloads(mu: np.ndarray, m: int, seed: int) -> np.ndarray:
    """Échantillonne m vecteurs Z_fr ∈ {-1,+1}^D avec E[Z_fr]=mu.
    
    Args:
        mu   : cible moyenne (float64, shape=(D,), valeurs dans [-1,1])
        m    : nombre d'échantillons
        seed : graine pour reproductibilité
    Returns:
        Z    : array (m, D) en int8 ±1
    """
    if mu.ndim != 1: raise ValueError("mu doit être 1D, shape=(D,)")
    D = mu.shape[0]
    p = (1.0 + mu) / 2.0
    p = np.clip(p, 0.0, 1.0)
    g = np.random.default_rng(seed)
    U = g.random(size=(m, D))
    Z = np.where(U < p, 1, -1).astype(np.int8, copy=False)
    return Z

In [19]:
def test_lln_coordinate(D=4096, m_list=(8,16,32,64), bias=0.2, seed=0):
    """Vérifie la décroissance ~ m^{-1/2} de ||m^{-1}M - mu||_∞."""
    g = np.random.default_rng(seed)
    mu = np.full((D,), float(bias), dtype=np.float64)  # même biais sur toutes les coordonnées
    mem = MemBank(B=1, D=D, thresh=True)
    errs = []
    for m in m_list:
        Z = sample_fr_payloads(mu, m, seed + m)  # m échantillons biaisés
        mem.M[0, :] = Z.astype(np.int32, copy=False).sum(axis=0)
        mem.n[0] = m
        mem.seal(0)
        errs.append(mem.inf_norm_error(0, mu))
    # Vérification monotone (indicative)
    mono = all(errs[i] >= errs[i+1] - 1e-9 for i in range(len(errs)-1))
    return {"m_list": list(m_list), "inf_norm_errors": [float(e) for e in errs], "monotone": bool(mono)}

def test_majority_sign_error(D=4096, m=32, bias=0.2, seed=1):
    """Compare l'erreur de signe empirique à la borne exp(-m*bias^2/2)."""
    mu = np.full((D,), float(bias), dtype=np.float64)
    Z = sample_fr_payloads(mu, m, seed)
    M = Z.astype(np.int32).sum(axis=0)
    H = np.where(M >= 0, 1, -1).astype(np.int8, copy=False)
    ref = np.where(mu >= 0, 1, -1).astype(np.int8, copy=False)
    err_emp = float(np.mean(H != ref))
    bound   = math.exp(- m * (bias**2) / 2.0)  # borne coordonnée
    return {"m": int(m), "bias": float(bias), "err_emp": err_emp, "hoeffding_bound": bound}

def test_tie_policy(D=4096, seed=2):
    """Teste la convention sign(0)=+1 et son effet sur des moyennes ~0."""
    mu = np.zeros((D,), dtype=np.float64)
    Z  = sample_fr_payloads(mu, m=100, seed=seed)   # symétrique, E=0
    M  = Z.astype(np.int32).sum(axis=0)
    H  = np.where(M >= 0, 1, -1).astype(np.int8, copy=False)  # sign(0)=+1
    frac_plus = float(np.mean(H == 1))
    # Attendu: ≈ 0.5, léger excès dû à la convention sign(0)=+1.
    return {"frac_plus": frac_plus}

def test_seal_strategies_equivalence(D=4096, m=64, bias=0.1, seed=3):
    """Compare scellage en-ligne vs scellage final (équivalence attendue)."""
    mu = np.full((D,), float(bias), dtype=np.float64)
    Z  = sample_fr_payloads(mu, m, seed)
    # 1) Scellage en ligne
    mem1 = MemBank(B=1, D=D, thresh=True)
    for s in range(m):
        mem1.add(0, Z[s])
    H1 = mem1.H[0].copy()

    # 2) Scellage final
    mem2 = MemBank(B=1, D=D, thresh=False)
    for s in range(m):
        mem2.add(0, Z[s])
    mem2.seal(0)
    H2 = mem2.H[0].copy()
    return {"equal": bool(np.array_equal(H1, H2))}

In [20]:
test_lln_coordinate()
test_majority_sign_error()
test_tie_policy()
test_seal_strategies_equivalence()

{'equal': True}

## MM5 . 

In [21]:
def to_mem_tranche(X: np.ndarray, Gmem: np.ndarray) -> np.ndarray:
    """Projection en tranche MEM: X ⊗ Gmem (int8 sûr, involutif)."""
    if X.dtype != np.int8 or Gmem.dtype != np.int8 or X.shape != Gmem.shape:
        raise ValueError("X,Gmem: int8 ±1, même shape requis")
    return (X.astype(np.int16) * Gmem.astype(np.int16)).astype(np.int8, copy=False)

# Unbinding = binding avec la même clé (involutif)
from_mem_tranche = to_mem_tranche  # alias intentionnel

def to_mem_tranche_batch(X: np.ndarray, Gmem: np.ndarray) -> np.ndarray:
    """Batch: X shape=(N,D), Gmem shape=(D,) -> retour (N,D)."""
    if X.dtype != np.int8 or Gmem.dtype != np.int8: 
        raise ValueError("X, Gmem doivent être en int8")
    if X.ndim != 2 or X.shape[1] != Gmem.shape[0]:
        raise ValueError("X: (N,D), Gmem: (D,)")
    return (X.astype(np.int16) * Gmem.astype(np.int16)).astype(np.int8, copy=False)

def simhd(U: np.ndarray, V: np.ndarray) -> float:
    """Similarité HD normalisée: <U,V>/D en float64 (cohérent MM7)."""
    if U.shape != V.shape or U.dtype != np.int8 or V.dtype != np.int8:
        raise ValueError("U,V: même shape et dtype=int8 requis")
    D = U.shape[0]
    return float((U.astype(np.int32) @ V.astype(np.int32)) / float(D))

In [22]:
def _rand_pm1(n: int, D: int, seed: int) -> np.ndarray:
    g = np.random.default_rng(seed)
    B = g.integers(0, 2, size=(n, D), dtype=np.int8)
    return (B << 1) - 1

def test_isometry_involution(D=8192, trials=128, seed=0) -> bool:
    for t in range(trials):
        X = _rand_pm1(1, D, seed+3*t)[0]
        Y = _rand_pm1(1, D, seed+3*t+1)[0]
        G = _rand_pm1(1, D, seed+3*t+2)[0]
        dot0 = int((X.astype(np.int32) * Y.astype(np.int32)).sum())
        Xg, Yg = to_mem_tranche(X, G), to_mem_tranche(Y, G)
        dot1 = int((Xg.astype(np.int32) * Yg.astype(np.int32)).sum())
        assert dot0 == dot1, "Isométrie violée"
        assert np.array_equal(to_mem_tranche(Xg, G), X), "Involutivité violée"
    return True

def test_seal_gram_isometry(D=4096, N=256, seed=1) -> float:
    """Isométrie niveau Gram: ||G - G'||_max ≤ 5e-3 (critère EM3)."""
    X = _rand_pm1(N, D, seed)
    G = _rand_pm1(1, D, seed+1)[0]
    Xg = to_mem_tranche_batch(X, G)
    # Gram normalisés
    G0 = (X.astype(np.int32) @ X.T.astype(np.int32)) / float(D)
    G1 = (Xg.astype(np.int32) @ Xg.T.astype(np.int32)) / float(D)
    err_max = float(np.max(np.abs(G0 - G1)))
    return err_max  # attendu ≤ 5e-3

def test_inter_tranche_leakage(D=16384, n=4000, eps=0.05, seed=2):
    """Étanchéité: mean ~ 0, tail ≤ Hoeffding."""
    X  = _rand_pm1(n, D, seed)
    G  = _rand_pm1(1, D, seed+1)[0]
    Gp = _rand_pm1(1, D, seed+2)[0]
    Xg  = to_mem_tranche_batch(X,  G)
    Xgp = to_mem_tranche_batch(X, Gp)
    sims = ((Xg.astype(np.int32) * Xgp.astype(np.int32)).sum(axis=1)/D).astype(np.float64)
    mean = float(sims.mean())
    tail = float((np.abs(sims) > eps).mean())
    bound = 2.0 * math.exp(- D * eps * eps / 2.0)
    return {"mean": mean, "tail": tail, "bound": bound}

def test_permutation_equivariance(D=8192, trials=64, seed=3) -> bool:
    g = np.random.default_rng(seed)
    for t in range(trials):
        X = _rand_pm1(1, D, seed+5*t)[0]
        G = _rand_pm1(1, D, seed+5*t+1)[0]
        # permutation aléatoire
        pi = g.permutation(D).astype(np.int64)
        Xp, Gp = X[pi], G[pi]
        left  = to_mem_tranche(Xp, Gp)           # Π(X) ⊗ Π(G)
        right = to_mem_tranche(X, G)[pi]         # Π(X ⊗ G)
        assert np.array_equal(left, right), "Équivariance à Π violée"
    return True

In [23]:
test_isometry_involution()
test_seal_gram_isometry()
test_inter_tranche_leakage()
test_permutation_equivariance()

True

In [24]:
def sanity_mm5_mm3_mm7(D=8192, k=24, seed=7):
    # clés/payloads jouets
    Z_en = _rand_pm1(1, D, seed)[0]
    R    = Z_en.copy()                   # requête identique (cas « propre »)
    G    = _rand_pm1(1, D, seed+1)[0]

    # indexation post-binding (MM3)
    slsh = SignLSH.with_k_bits(D, k, seed+2)
    code_en_mem = slsh.code(to_mem_tranche(Z_en, G))

    # cohérence requête (MM7)
    R_mem = to_mem_tranche(R, G)
    code_req_mem = slsh.code(R_mem)

    # Les codes sont égaux dans ce cas propre (mêmes bits aux mêmes positions)
    return int(code_en_mem == code_req_mem)

In [25]:
sanity_mm5_mm3_mm7()

1

## MM6 . 

In [26]:
def mem_train_one_pass(mem: MemBank,
                       lsh: SignLSH,
                       pairs_en_fr: Iterable[Tuple[np.ndarray, np.ndarray]],
                       Gmem: np.ndarray) -> None:
    D = int(Gmem.shape[0])
    if Gmem.dtype != np.int8:
        raise ValueError("Gmem doit être en int8 (±1).")

    for Z_en, Z_fr in pairs_en_fr:
        Z_en = Z_en.astype(np.int8, copy=False)
        Z_fr = Z_fr.astype(np.int8, copy=False)
        if Z_en.shape != (D,) or Z_fr.shape != (D,):
            raise ValueError("Z_en et Z_fr doivent avoir shape=(D,) identique à Gmem.")

        Z_en_mem = to_mem_tranche(Z_en, Gmem)      # binding (MM5)
        # --- IMPORTANT: bucketisation du code k-bits vers [0..B-1]
        c = lsh.bucket_unbiased(Z_en_mem, mem.B)   # ou .bucket_mod(...)

        mem.add(c, Z_fr)

def mem_train_one_pass_batch(mem: MemBank,
                             lsh: SignLSH,
                             Z_en_batch: np.ndarray,   # (N,D) int8 ±1
                             Z_fr_batch: np.ndarray,   # (N,D) int8 ±1
                             Gmem: np.ndarray) -> None:
    if Z_en_batch.dtype != np.int8 or Z_fr_batch.dtype != np.int8:
        raise ValueError("batches en int8 requis")
    if Z_en_batch.shape != Z_fr_batch.shape or Z_en_batch.shape[1] != Gmem.shape[0]:
        raise ValueError("Shapes incohérents pour batch et Gmem")
    # 1) Binding vectorisé (N,D)
    Z_en_mem = (Z_en_batch.astype(np.int16) * Gmem.astype(np.int16)).astype(np.int8, copy=False)
    # 2) Indexation + 3) Updates
    for i in range(Z_en_mem.shape[0]):
        c = lsh.bucket_unbiased(Z_en_mem[i], mem.B)   # ou .bucket_mod(...)
        mem.add(c, Z_fr_batch[i])

In [27]:
def test_order_invariance(D=4096, N=200, B=128, k=24, seed=0) -> bool:
    """Même banque finale si on permute l'ordre des paires."""
    G   = _rand_pm1(1, D, seed+1)[0]
    lsh = SignLSH.with_k_bits(D, k, seed+2)
    mem1 = MemBank(B=B, D=D, thresh=True)
    mem2 = MemBank(B=B, D=D, thresh=True)
    # génère N paires
    Z_en = _rand_pm1(N, D, seed+3)
    Z_fr = _rand_pm1(N, D, seed+4)
    pairs = list(zip(Z_en, Z_fr))
    # ordre 1
    mem_train_one_pass(mem1, lsh, pairs, G)
    # ordre 2 (mélangé)
    rng = np.random.default_rng(seed+5)
    pairs_perm = [pairs[i] for i in rng.permutation(N)]
    mem_train_one_pass(mem2, lsh, pairs_perm, G)
    # égalité stricte des banques
    return bool(np.array_equal(mem1.M, mem2.M) and np.array_equal(mem1.H, mem2.H) and np.array_equal(mem1.n, mem2.n))

def test_collision_accumulation(D=4096, B=8, k=8, seed=7) -> bool:
    """Force quelques collisions et vérifie l'addition sur la même ligne."""
    G   = _rand_pm1(1, D, seed+1)[0]
    lsh = SignLSH.with_k_bits(D, k, seed+2)  # petit k -> plus de collisions
    mem = MemBank(B=B, D=D, thresh=False)
    # Deux clés différentes mais même bucket
    z1, z2 = _rand_pm1(1, D, seed+3)[0], _rand_pm1(1, D, seed+4)[0]
    c1, c2 = lsh.code(to_mem_tranche(z1, G)) % B, lsh.code(to_mem_tranche(z2, G)) % B
    # on répète jusqu'à collision (sécurisé)
    tries = 0
    while c1 != c2 and tries < 1000:
        z2 = _rand_pm1(1, D, seed+4+tries)[0]
        c2 = lsh.code(to_mem_tranche(z2, G)) % B
        tries += 1
    # payloads FR
    fr1, fr2 = _rand_pm1(1, D, seed+8)[0], _rand_pm1(1, D, seed+9)[0]
    mem.add(c1, fr1); mem.add(c2, fr2)
    # vérifie addition sur M[c1]
    return bool(np.array_equal(mem.M[c1], fr1.astype(np.int32)+fr2.astype(np.int32)))

def test_determinism(D=4096, N=300, B=128, k=24, seed=11) -> bool:
    """Même résultat à seeds identiques et données identiques."""
    G   = _rand_pm1(1, D, seed+1)[0]
    lsh = SignLSH.with_k_bits(D, k, seed+2)
    memA = MemBank(B=B, D=D, thresh=True)
    memB = MemBank(B=B, D=D, thresh=True)
    Z_en = _rand_pm1(N, D, seed+3)
    Z_fr = _rand_pm1(N, D, seed+4)
    pairs = list(zip(Z_en, Z_fr))
    mem_train_one_pass(memA, lsh, pairs, G); mem_train_one_pass(memB, lsh, pairs, G)
    return bool(np.array_equal(memA.M, memB.M) and np.array_equal(memA.H, memB.H))

def test_complexity_trend(D_list=(2048,4096,8192), N=2000, B=64, k=24, seed=21) -> dict:
    """Vérifie que la latence par update croît ~ linéairement avec D (tendance)."""
    res = {}
    for D in D_list:
        G   = _rand_pm1(1, D, seed+1)[0]
        lsh = SignLSH.with_k_bits(D, k, seed+2)
        mem = MemBank(B=B, D=D, thresh=True)
        Z_en = _rand_pm1(N, D, seed+3)
        Z_fr = _rand_pm1(N, D, seed+4)
        t0 = time.perf_counter()
        mem_train_one_pass(mem, lsh, list(zip(Z_en, Z_fr)), G)
        ms = 1000.0 * (time.perf_counter() - t0) / N
        res[int(D)] = float(ms)
    return res

In [28]:
test_order_invariance()
test_collision_accumulation()
test_determinism()
test_complexity_trend()

{2048: 0.008137229538988322,
 4096: 0.00944877095753327,
 8192: 0.0133548749727197}

## MM7 . 

In [29]:
def mem_scores(mem: MemBank,
               R_mem: np.ndarray,
               use_thresh: bool = True) -> np.ndarray:
    """Scores normalisés s_c = <R_mem, Proto_c>/D.

    Args:
        mem       : MemBank (M accumulateurs int32, H seuillés int8).
        R_mem     : requête bindée (np.int8, ±1), shape=(D,)
        use_thresh: True -> H (±1); False -> sign(M) (±1)

    Returns:
        np.ndarray (float64, shape=(B,)) : scores ∈ [-1,1]
    """
    if R_mem.dtype != np.int8 or R_mem.shape != (mem.D,):
        raise ValueError("R_mem doit être int8 ±1, shape=(D,)")

    T = mem.H if use_thresh else np.where(mem.M >= 0, 1, -1).astype(np.int8, copy=False)
    dots = T.astype(np.int32, copy=False) @ R_mem.astype(np.int32, copy=False)
    return (dots / float(mem.D)).astype(np.float64, copy=False)

def mem_argmax(scores: np.ndarray) -> int:
    return int(np.argmax(np.asarray(scores)))

def mem_payload(mem: MemBank, c_star: int) -> np.ndarray:
    out = mem.H[c_star, :].view()
    out.setflags(write=False)  # lecture seule
    return out

In [30]:
def topk_indices(scores: np.ndarray, k: int) -> np.ndarray:
    """Retourne les indices des k meilleurs scores (sans copier inutilement)."""
    if k <= 0: return np.empty((0,), dtype=np.int64)
    k = int(min(k, scores.shape[0]))
    # argpartition (O(B)) puis tri local (O(k log k))
    part = np.argpartition(scores, -k)[-k:]
    return part[np.argsort(scores[part])[::-1]]

def margin_top1(scores: np.ndarray) -> float:
    """Marge s_(1) - s_(2) (0 si B<2)."""
    B = scores.shape[0]
    if B < 2: return float(scores.max()) if B == 1 else 0.0
    idx = topk_indices(scores, 2)
    return float(scores[idx[0]] - scores[idx[1]])

def argmax_tie_break(scores: np.ndarray, seed: int = 0) -> int:
    """Argmax avec tie-break déterministe par bruit infinitésimal."""
    g = np.random.default_rng(seed)
    eps = g.uniform(low=0.0, high=1e-9, size=scores.shape).astype(scores.dtype)
    return int(np.argmax(scores + eps))

In [31]:
def mem_scores_chunked(mem: MemBank,
                       R_mem: np.ndarray,
                       chunk: int = 4096,
                       use_thresh: bool = True) -> np.ndarray:
    """Calcule s_c par blocs (limite la RAM temporaire), retourne (B,)."""
    if R_mem.dtype != np.int8 or R_mem.shape != (mem.D,):
        raise ValueError("R_mem doit être int8 ±1, shape=(D,)")

    T = mem.H if use_thresh else np.where(mem.M >= 0, 1, -1).astype(np.int8, copy=False)
    R32 = R_mem.astype(np.int32, copy=False)
    B = mem.B
    out = np.empty((B,), dtype=np.float64)
    for start in range(0, B, chunk):
        end = min(B, start + chunk)
        dots = T[start:end, :].astype(np.int32, copy=False) @ R32
        out[start:end] = dots / float(mem.D)
    return out

def mem_topk_stream(mem: MemBank, R_mem: np.ndarray, k: int = 5, use_thresh: bool = True):
    """Retourne (indices, scores) du top-k via un heap min (streaming)."""
    if k <= 0: return np.empty((0,), dtype=np.int64), np.empty((0,), dtype=np.float64)
    T = mem.H if use_thresh else np.where(mem.M >= 0, 1, -1).astype(np.int8, copy=False)
    R32 = R_mem.astype(np.int32, copy=False)
    heap = []  # contient (score, idx), min-heap
    for c in range(mem.B):
        sc = float((T[c, :].astype(np.int32, copy=False) @ R32) / float(mem.D))
        if len(heap) < k: heapq.heappush(heap, (sc, c))
        elif sc > heap[0][0]: heapq.heapreplace(heap, (sc, c))
    heap.sort(reverse=True)
    scores = np.array([s for (s, _) in heap], dtype=np.float64)
    idx    = np.array([i for (_, i) in heap], dtype=np.int64)
    return idx, scores

In [32]:
def test_order_preserved_H_vs_signM(B=16, D=4096, m=24, noise=0.01, seed=10):
    """Le top-1 est identique pour H et sign(M) quand la marge est suffisante."""
    g = np.random.default_rng(seed)
    mem = MemBank(B=B, D=D, thresh=True)
    G   = _rand_pm1(1, D, seed+1)[0]
    FR  = _rand_pm1(B, D, seed+2)

    # alimente la banque
    for c in range(B):
        for _ in range(m):
            z = FR[c].copy()
            flip = g.random(D) < noise
            z[flip] = -z[flip]
            mem.add(c, z)

    # requête au hasard
    c_true = int(g.integers(0, B))
    R_mem  = to_mem_tranche(FR[c_true], G)
    sH = mem_scores(mem, R_mem, use_thresh=True)
    sM = mem_scores(mem, R_mem, use_thresh=False)

    return {"same_top1": bool(mem_argmax(sH) == mem_argmax(sM)),
            "margin": float(margin_top1(sH))}

def test_off_tranche_noise(D=16384, B=512, eps=0.05, seed=20):
    """Scores contre prototypes d'une autre tranche : moyenne ~ 0, queue ≤ Hoeffding."""
    memA = MemBank(B=B, D=D, thresh=True)
    memB = MemBank(B=B, D=D, thresh=True)
    GA   = _rand_pm1(1, D, seed+1)[0]
    GB   = _rand_pm1(1, D, seed+2)[0]
    # prototypes jouets (symétriques)
    ZA = _rand_pm1(B, D, seed+3)
    ZB = _rand_pm1(B, D, seed+4)
    for c in range(B):
        memA.add(c, ZA[c]); memB.add(c, ZB[c])

    # requête en tranche A, scorée contre prototypes de B
    R  = _rand_pm1(1, D, seed+5)[0]
    RA = to_mem_tranche(R, GA)
    s  = mem_scores(memB, RA, use_thresh=True)  # mauvais G -> bruit
    mean = float(s.mean())
    tail = float((np.abs(s) > eps).mean())
    bound = 2.0 * math.exp(- D * eps * eps / 2.0)
    return {"mean": mean, "tail": tail, "bound": bound}

def test_topk_chunk_vs_full(B=4096, D=2048, k=10, seed=30):
    """Comparaison chunké vs plein et top-k streaming vs tri complet."""
    mem  = MemBank(B=B, D=D, thresh=True)
    G    = _rand_pm1(1, D, seed+1)[0]
    FR   = _rand_pm1(B, D, seed+2)
    for c in range(B): mem.add(c, FR[c])
    Rm   = to_mem_tranche(_rand_pm1(1, D, seed+3)[0], G)

    s_full = mem_scores(mem, Rm, use_thresh=True)
    s_chunk = mem_scores_chunked(mem, Rm, chunk=512, use_thresh=True)
    idx_full = topk_indices(s_full, k)
    idx_stream, s_stream = mem_topk_stream(mem, Rm, k=k, use_thresh=True)

    return {"max_abs_diff": float(np.max(np.abs(s_full - s_chunk))),
            "topk_equal": bool(np.array_equal(np.sort(idx_full), np.sort(idx_stream)))}

In [33]:
test_order_preserved_H_vs_signM()
test_off_tranche_noise()
test_topk_chunk_vs_full()


{'max_abs_diff': 0.0, 'topk_equal': True}

In [34]:
def estimate_margin(scores: np.ndarray) -> float:
    """Retourne la marge empirique (s_(1)-s_(2)) d'un vecteur de scores."""
    return margin_top1(scores)

def required_dimension(B: int, delta: float, margin: float) -> int:
    """Calcule D minimal pour risque global ≲ delta, marge donnée."""
    if margin <= 0: return int(1e9)  # inatteignable sans marge
    return int(math.ceil((2.0 / (margin * margin)) * math.log((2.0 * B) / max(delta, 1e-12))))

## MM8 . 

In [35]:
def apply_perm_power(x: np.ndarray, pi: np.ndarray, power: int) -> np.ndarray:
    """Applique Π^power à un vecteur HD (±1).
    
    Args:
        x     : (D,) int8 ±1
        pi    : (D,) int64 permutation de base Π ( indices 0..D-1 )
        power : entier (peut être négatif) ; Π^{-1} = permutation inverse

    Returns:
        y = Π^{power} x, vue sans copie si power==0
    """
    if x.dtype != np.int8: raise ValueError("x doit être int8 ±1")
    if pi.dtype != np.int64 or pi.shape != (x.shape[0],):
        raise ValueError("pi doit être une permutation int64 de shape (D,)")

    if power == 0:
        return x  # évite copie
    D = x.shape[0]
    # exponentiation par répétition (power petit dans la pratique : ±W, ±γ_j)
    if power > 0:
        idx = pi.copy()
        for _ in range(power - 1):
            idx = idx[pi]  # composition de permutations
        return x[idx]
    else:
        # power < 0 : utiliser l'inverse
        inv = np.empty_like(pi)
        inv[pi] = np.arange(D, dtype=np.int64)
        p = -power
        idx = inv.copy()
        for _ in range(p - 1):
            idx = idx[inv]
        return x[idx]

def superpose_signed(vectors: Sequence[np.ndarray],
                     weights: Optional[Sequence[int]] = None) -> np.ndarray:
    """Somme pondérée en int16 des vecteurs ±1 puis seuillage en int8.
    
    Args:
        vectors : liste de (D,) en int8 ±1
        weights : liste d'entiers (mêmes longueur ou None -> tous =1)
    Returns:
        R : (D,) int8 ±1
    """
    if len(vectors) == 0: raise ValueError("au moins un vecteur requis")
    D = vectors[0].shape[0]
    acc = np.zeros((D,), dtype=np.int16)
    if weights is None: weights = [1] * len(vectors)
    for v, w in zip(vectors, weights):
        if v.dtype != np.int8 or v.shape != (D,):
            raise ValueError("vecteurs doivent être (D,) int8 ±1")
        acc += (w * v.astype(np.int16, copy=False)).astype(np.int16, copy=False)
    return np.where(acc >= 0, 1, -1).astype(np.int8, copy=False)

def build_query_from_context(H_window: Sequence[np.ndarray],
                             pi: np.ndarray,
                             w_left: int, w_right: int,
                             weights_ctx: Optional[Sequence[int]] = None,
                             targets_hist: Optional[Sequence[Tuple[np.ndarray, int, int]]] = None
                             ) -> np.ndarray:
    """Construit R à partir d'une fenêtre de spans ENC et d'un historique cible.

    Args:
        H_window    : liste [H^{(t0-w_left)}, ..., H^{(t0)}, ..., H^{(t0+w_right)}], chaque (D,) int8 ±1
        pi          : permutation Π (D,) int64
        w_left/right: taille de fenêtre à gauche/droite (cohérente avec H_window)
        weights_ctx : poids entiers pour chaque offset u ∈ [-w_left..w_right]
        targets_hist: liste optionnelle [(proto, beta, gamma), ...]
                      où 'proto'=(D,) int8 ±1 (ex: \hat M_{c_{t-j}}),
                      'beta' poids entier, 'gamma' décalage positionnel (appliquer Π^{gamma})

    Returns:
        R : (D,) int8 ±1
    """
    U = []
    W = []
    # (a) contexte source : aligne chaque H^{(t0+u)} par Π^{u}
    if weights_ctx is None:
        weights_ctx = [1] * (w_left + w_right + 1)
    assert len(H_window) == (w_left + w_right + 1)
    for u, H in zip(range(-w_left, w_right+1), H_window):
        U.append(apply_perm_power(H, pi, power=u))
        W.append(int(weights_ctx[u + w_left]))

    # (b) historique cible : prototypes précédents, re-positionnés
    if targets_hist is not None:
        for proto, beta, gamma in targets_hist:
            U.append(apply_perm_power(proto, pi, power=gamma))
            W.append(int(beta))

    # (c) superposition + seuillage
    R = superpose_signed(U, W)
    return R

  où 'proto'=(D,) int8 ±1 (ex: \hat M_{c_{t-j}}),


In [36]:
def build_query_mem(R: np.ndarray, Gmem: np.ndarray) -> np.ndarray:
    """R_mem = R ⊗ G_MEM (int8, isométrique, involutif)."""
    return to_mem_tranche(R, Gmem)

def infer_top1(mem: MemBank, R: np.ndarray, Gmem: np.ndarray, use_thresh: bool = True) -> Tuple[int, float]:
    """Chaîne complète: bind → scores → argmax."""
    R_mem = build_query_mem(R, Gmem)
    s = mem_scores(mem, R_mem, use_thresh=use_thresh)
    c_star = mem_argmax(s)
    return c_star, float(s[c_star])

In [37]:
def test_build_query_shapes_types(D: int = 4096, seed: int = 0) -> bool:
    pi   = np.random.default_rng(seed).permutation(D).astype(np.int64)
    H0   = _rand_pm1(1, D, seed+1)[0]
    R    = build_query_from_context([H0], pi, 0, 0)
    assert R.dtype == np.int8 and R.shape == (D,)
    return True

def test_query_isometry_involution(D: int = 8192, seed: int = 1) -> bool:
    pi   = np.random.default_rng(seed).permutation(D).astype(np.int64)
    H0   = _rand_pm1(1, D, seed+1)[0]
    R    = build_query_from_context([H0], pi, 0, 0)
    G    = _rand_pm1(1, D, seed+2)[0]
    Rm   = build_query_mem(R, G)
    # isométrie: <Rm, X> == <R, X⊗G>
    X    = _rand_pm1(1, D, seed+3)[0]
    dot1 = int((Rm.astype(np.int32) * X.astype(np.int32)).sum())
    dot2 = int((R.astype(np.int32)  * to_mem_tranche(X, G).astype(np.int32)).sum())
    assert dot1 == dot2
    # involutif
    assert np.array_equal(to_mem_tranche(Rm, G), R)

In [38]:
test_build_query_shapes_types()
test_query_isometry_involution()

In [39]:
def similarity_vs_shift(D: int = 8192, Wl: int = 2, Wr: int = 2, seed: int = 2, deltas=range(-6,7)):
    """Construit R d'une fenêtre et mesure simhd(R, Π^Δ R)."""
    g  = np.random.default_rng(seed)
    pi = g.permutation(D).astype(np.int64)
    Hs = [_rand_pm1(1, D, seed+10+u)[0] for u in range(-Wl, Wr+1)]
    R  = build_query_from_context(Hs, pi, Wl, Wr)  # w=1 par défaut
    sims = {}
    for Δ in deltas:
        Rp = apply_perm_power(R, pi, Δ)
        # simhd = <R, Π^Δ R> / D
        s = float((R.astype(np.int32) @ Rp.astype(np.int32)) / float(D))
        sims[int(Δ)] = s
    return sims  # décroissance attendue quand |Δ| ↑

def margin_drop_with_shift(B: int = 64, D: int = 8192, Wl: int = 1, Wr: int = 1, seed: int = 3, deltas=range(0,6)):
    """Impact du décalage sur la marge MAP en scorant contre une banque jouet."""
    g   = np.random.default_rng(seed)
    pi  = g.permutation(D).astype(np.int64)
    mem = MemBank(B=B, D=D, thresh=True)
    G   = _rand_pm1(1, D, seed+1)[0]
    # Prototypes FR (un seul échantillon pour illustrer)
    FR  = _rand_pm1(B, D, seed+2)
    for c in range(B): mem.add(c, FR[c])

    # Construit une fenêtre rangée autour de t0 pour la classe vraie
    Hs_true = [_rand_pm1(1, D, seed+10+u)[0] for u in range(-Wl, Wr+1)]
    R0 = build_query_from_context(Hs_true, pi, Wl, Wr)
    # c_true tirée au hasard
    c_true = int(g.integers(0, B))
    # Pour rendre R corrélé à FR[c_true], on injecte un mélange (+) biaisé
    R_mix = superpose_signed([R0, FR[c_true]], weights=[2, 1])

    margins = {}
    for Δ in deltas:
        # simule un décalage de la fenêtre non compensé
        R_shift = apply_perm_power(R_mix, pi, Δ)
        c_star, s_star = infer_top1(mem, R_shift, G, use_thresh=True)
        s_all = mem_scores(mem, build_query_mem(R_shift, G), use_thresh=True)
        margins[int(Δ)] = {"c_star": int(c_star), "margin": float(margin_top1(s_all))}
    return margins  # marge ↓ quand Δ ↑ (tendance)

In [40]:
def window_effect_study(D: int = 8192, seed: int = 4, deltas=range(0,6)):
    g  = np.random.default_rng(seed)
    pi = g.permutation(D).astype(np.int64)
    # deux fenêtres: étroite (1,1) et large (3,3)
    configs = [("narrow",(1,1), [1,1,1]), ("wide",(3,3), [1,1,1,1,1,1,1])]
    curves = {}
    for name,(Wl,Wr),weights in configs:
        Hs = [_rand_pm1(1, D, seed+10+u)[0] for u in range(-Wl, Wr+1)]
        R  = build_query_from_context(Hs, pi, Wl, Wr, weights_ctx=weights)
        sims = []
        for Δ in deltas:
            Rp = apply_perm_power(R, pi, Δ)
            s  = float((R.astype(np.int32) @ Rp.astype(np.int32)) / float(D))
            sims.append(s)
        curves[name] = sims
    return curves  # la fenêtre large décroit plus lentement

In [41]:
window_effect_study()

{'narrow': [1.0,
  0.025390625,
  -0.0068359375,
  0.0107421875,
  0.00439453125,
  -0.021484375],
 'wide': [1.0,
  0.00634765625,
  0.0146484375,
  0.0029296875,
  -0.009765625,
  -0.00927734375]}

## MM9 . 

In [42]:
@dataclass(frozen=True)
class MemConfig:
    B: int           # nombre de buckets/classes mémoire
    D: int           # dimension HD
    k: int           # nombre de bits LSH (MM3)
    seed_lsh: int    # seed pour LSH
    seed_gmem: int   # seed pour la tranche G_MEM
    thresh: bool = True  # seuillage en-ligne (MM4)

@dataclass(frozen=True)
class MemComponents:
    mem: MemBank
    lsh: SignLSH
    Gmem: np.ndarray  # (D,) int8 ±1
    meta: Dict[str, Any]

def make_mem_pipeline(cfg: MemConfig) -> MemComponents:
    """Construit les composants MEM : banque, LSH, G_MEM ; journalise les seeds."""
    mem  = MemBank(B=cfg.B, D=cfg.D, thresh=cfg.thresh)
    lsh  = SignLSH.with_k_bits(cfg.D, cfg.k, seed=cfg.seed_lsh)
    # Gmem : Rademacher en ±1
    g    = np.random.default_rng(cfg.seed_gmem)
    Gmem = ((g.integers(0, 2, size=(cfg.D,), dtype=np.int8) << 1) - 1).astype(np.int8, copy=False)
    meta = {"B": cfg.B, "D": cfg.D, "k": cfg.k, "seed_lsh": cfg.seed_lsh, "seed_gmem": cfg.seed_gmem,
            "thresh": cfg.thresh, "numpy_version": np.__version__}
    return MemComponents(mem=mem, lsh=lsh, Gmem=Gmem, meta=meta)

In [43]:
def train_one_pass_MEM(components: MemComponents,
                       pairs_en_fr: Iterable[Tuple[np.ndarray, np.ndarray]]) -> None:
    """Chaîne MM6 complète: bind→LSH→update (cf. MM6, MM5, MM3, MM4)."""
    mem, lsh, Gmem = components.mem, components.lsh, components.Gmem
    for Z_en, Z_fr in pairs_en_fr:
        Z_en = Z_en.astype(np.int8, copy=False)
        Z_fr = Z_fr.astype(np.int8, copy=False)
        Z_en_mem = to_mem_tranche(Z_en, Gmem)  # MM5
        c = lsh.code(Z_en_mem)                 # MM3
        mem.add(c, Z_fr)                       # MM4

def infer_map_top1(components: MemComponents,
                   R: np.ndarray,
                   use_thresh: bool = True) -> Tuple[int, float]:
    """Chaîne MM7 complète: bind requête → scores → argmax (MAP)."""
    R_mem = build_query_mem(R.astype(np.int8, copy=False), components.Gmem)  # MM5
    scores = mem_scores(components.mem, R_mem, use_thresh=use_thresh)        # MM7
    c_star = mem_argmax(scores)
    return c_star, float(scores[c_star])

def infer_map_topk(components: MemComponents,
                   R: np.ndarray, k: int = 5,
                   use_thresh: bool = True) -> Tuple[np.ndarray, np.ndarray]:
    """Top-k via utilitaires MM7 (chunking/streaming possible)."""
    R_mem = build_query_mem(R.astype(np.int8, copy=False), components.Gmem)
    scores = mem_scores(components.mem, R_mem, use_thresh=use_thresh)
    idx = topk_indices(scores, k)
    return idx, scores[idx]

In [44]:
def make_aligned_pairs(B: int, D: int, m_per_class: int,
                       noise_fr: float, noise_en: float,
                       seed_proto: int, seed_stream: int) -> Dict[str, Any]:
    """Crée B prototypes FR et génère m_per_class paires EN/FR corrélées par classe.
    
    EN = Π P_c avec flips (noise_en).
    FR = P_c avec flips (noise_fr).
    """
    g  = np.random.default_rng(seed_proto)
    pi = g.permutation(D).astype(np.int64)          # Π (MM1)
    P  = _rand_pm1(B, D, seed_proto+1)              # prototypes FR
    # génère paires
    pairs = []
    g2 = np.random.default_rng(seed_stream)
    for c in range(B):
        for _ in range(m_per_class):
            z_fr = P[c].copy()
            flip_fr = g2.random(D) < noise_fr
            z_fr[flip_fr] = -z_fr[flip_fr]
            # clé EN corrélée : ΠP_c puis flips
            z_en = P[c][pi].copy()
            flip_en = g2.random(D) < noise_en
            z_en[flip_en] = -z_en[flip_en]
            pairs.append((z_en.astype(np.int8, copy=False),
                          z_fr.astype(np.int8, copy=False)))
    # requêtes propres (une par classe)
    R_clean = P.copy()  # requêtes « idéales » côté FR (compatibles avec MM7)
    return {"pairs": pairs, "P": P, "pi": pi, "R_clean": R_clean}

In [45]:
def accept_leakage(D: int = 16384, n: int = 4000, eps: float = 0.05, seed: int = 7) -> dict:
    """Étanchéité inter-tranches (MM5) : moyenne≈0, queue ≤ Hoeffding (version batch)."""
    X  = _rand_pm1(n, D, seed)           # (n,D) int8 ±1
    G  = _rand_pm1(1, D, seed+1)[0]      # (D,)  int8 ±1
    Gp = _rand_pm1(1, D, seed+2)[0]      # (D,)  int8 ±1

    # --- Correctif: binding batch-safe ---
    Xg  = bind_tranche_batch(X,  G)      # (n,D) int8
    Xgp = bind_tranche_batch(X, Gp)      # (n,D) int8

    # Similarités normalisées sim_i = <(X⊗G)_i, (X⊗G')_i>/D
    sims = ((Xg.astype(np.int32) * Xgp.astype(np.int32)).sum(axis=1) / D).astype(np.float64)
    mean = float(sims.mean())
    tail = float((np.abs(sims) > eps).mean())
    bound = 2.0 * math.exp(- D * eps * eps / 2.0)

    ok = (abs(mean) <= 1e-2 and tail <= bound + 1e-6)
    return {"mean": mean, "tail": tail, "bound": bound, "ok": bool(ok)}

def accept_collisions(D: int = 16384, B: int = 1000, k: int = 24, seed: int = 13) -> dict:
    """Taux de collisions LSH (MM3)."""
    lsh = SignLSH.with_k_bits(D, k, seed)
    X = _rand_pm1(B, D, seed+1)
    codes = np.array([lsh.code(x) for x in X], dtype=np.int64)
    uniq = np.unique(codes).size
    coll = 1.0 - uniq / B
    return {"collisions": float(coll), "uniq": int(uniq), "ok": (coll <= 0.005 + 1e-6)}

def accept_train_infer_precision(cfg: MemConfig,
                                 m_per_class: int = 32,
                                 noise_fr: float = 0.01,
                                 noise_en: float = 0.01,
                                 seed_proto: int = 100,
                                 seed_stream: int = 101,
                                 seed_infer: int = 102) -> dict:
    """Entraîne puis évalue précision@1 sur requêtes propres (une par classe)."""
    comp = make_mem_pipeline(cfg)
    data = make_aligned_pairs(cfg.B, cfg.D, m_per_class, noise_fr, noise_en, seed_proto, seed_stream)
    # train
    t0 = time.perf_counter()
    train_one_pass_MEM(comp, data["pairs"])
    train_ms = 1000.0 * (time.perf_counter() - t0)

    # inference (B requêtes propres)
    correct = 0
    scores_all = []
    for c in range(cfg.B):
        R = data["R_clean"][c]
        c_star, s_star = infer_map_top1(comp, R, use_thresh=True)
        correct += int(c_star == c)
        scores_all.append(s_star)
    prec1 = correct / float(cfg.B)
    margin_est = float(np.sort(scores_all)[-1] - np.sort(scores_all)[-2]) if cfg.B >= 2 else float(scores_all[0])

    return {"prec_at_1": float(prec1),
            "avg_score": float(np.mean(scores_all)),
            "train_ms_total": float(train_ms),
            "ok": (prec1 >= 0.995)}  # CA sur requêtes propres

def accept_determinism(cfg: MemConfig,
                       m_per_class: int = 8,
                       noise_fr: float = 0.01,
                       noise_en: float = 0.01,
                       seed_proto: int = 200,
                       seed_stream: int = 201) -> dict:
    """Deux runs identiques (mêmes seeds) -> mêmes M/H."""
    compA = make_mem_pipeline(cfg)
    compB = make_mem_pipeline(cfg)
    dataA = make_aligned_pairs(cfg.B, cfg.D, m_per_class, noise_fr, noise_en, seed_proto, seed_stream)
    dataB = make_aligned_pairs(cfg.B, cfg.D, m_per_class, noise_fr, noise_en, seed_proto, seed_stream)
    train_one_pass_MEM(compA, dataA["pairs"])
    train_one_pass_MEM(compB, dataB["pairs"])
    same = (np.array_equal(compA.mem.M, compB.mem.M) and
            np.array_equal(compA.mem.H, compB.mem.H) and
            np.array_equal(compA.mem.n, compB.mem.n))
    return {"deterministic": bool(same), "ok": bool(same)}

def accept_complexity_trend(B: int = 128, D_list=(2048, 4096, 8192), k: int = 24,
                            m_per_class: int = 8, seed: int = 300) -> dict:
    """Latence ∝ D (tendance)."""
    out = {}
    for D in D_list:
        cfg = MemConfig(B=B, D=D, k=k, seed_lsh=seed+1, seed_gmem=seed+2)
        comp = make_mem_pipeline(cfg)
        data = make_aligned_pairs(B, D, m_per_class, 0.01, 0.01, seed+3, seed+4)
        t0 = time.perf_counter()
        train_one_pass_MEM(comp, data["pairs"])
        ms_per_upd = 1000.0 * (time.perf_counter() - t0) / (B * m_per_class)
        out[int(D)] = float(ms_per_upd)
    return out

In [46]:
def sanity_payload_readonly_and_isometry(cfg: MemConfig, seed: int = 400) -> dict:
    comp = make_mem_pipeline(cfg)
    # banque jouet : H = FR
    FR = _rand_pm1(cfg.B, cfg.D, seed+1)
    for c in range(cfg.B): comp.mem.add(c, FR[c])
    # requête
    R   = _rand_pm1(1, cfg.D, seed+2)[0]
    Rm  = build_query_mem(R, comp.Gmem)
    s1  = mem_scores(comp.mem, Rm, use_thresh=True)
    # isométrie: <Rm, H> == <R, H⊗G>
    H_bind = (comp.mem.H.astype(np.int16) * comp.Gmem.astype(np.int16)).astype(np.int8, copy=False)
    s2  = (H_bind.astype(np.int32) @ R.astype(np.int32)) / float(cfg.D)
    iso_ok = bool(np.allclose(s1, s2.astype(np.float64), atol=0.0))
    # payload readonly
    p    = mem_payload(comp.mem, int(np.argmax(s1)))
    ro_ok = False
    try:
        p[0] = 0
    except ValueError:
        ro_ok = True
    return {"isometry_ok": iso_ok, "readonly_ok": ro_ok, "ok": (iso_ok and ro_ok)}

In [48]:
accept_leakage()
accept_collisions()
accept_complexity_trend()

IndexError: classe hors bornes

## MM10 . 

In [None]:
def validate_types_shapes(mem: MemBank) -> Dict[str, Any]:
    ok = (mem.M.dtype == np.int32 and mem.H.dtype == np.int8 and mem.n.dtype == np.int32)
    return {"M_dtype": str(mem.M.dtype), "H_dtype": str(mem.H.dtype),
            "n_dtype": str(mem.n.dtype), "ok": bool(ok)}

def validate_tranche_isometry(G: np.ndarray, trials: int = 64, D: int = None, seed: int = 0) -> Dict[str, Any]:
    """Vérifie isométrie & involutivité (échantillonnage)."""
    if D is None: D = G.shape[0]
    g = np.random.default_rng(seed)
    ok_iso = True; ok_inv = True
    for t in range(trials):
        X = ((g.integers(0,2,size=D,dtype=np.int8)<<1)-1)
        Y = ((g.integers(0,2,size=D,dtype=np.int8)<<1)-1)
        Xg, Yg = to_mem_tranche(X, G), to_mem_tranche(Y, G)
        dot0 = int((X.astype(np.int32)*Y.astype(np.int32)).sum())
        dot1 = int((Xg.astype(np.int32)*Yg.astype(np.int32)).sum())
        ok_iso &= (dot0 == dot1)
        ok_inv &= np.array_equal(to_mem_tranche(Xg, G), X)
    return {"isometry_ok": bool(ok_iso), "involution_ok": bool(ok_inv),
            "ok": bool(ok_iso and ok_inv)}

def validate_lsh_collisions(lsh: SignLSH, D: int, B: int = 1000, seed: int = 13, tol: float = 0.005) -> Dict[str, Any]:
    X = ((np.random.default_rng(seed).integers(0,2,size=(B,D),dtype=np.int8)<<1)-1)
    codes = np.array([lsh.code(x) for x in X], dtype=np.int64)
    uniq = np.unique(codes).size
    coll = 1.0 - uniq / B
    return {"collisions": float(coll), "uniq": int(uniq), "ok": bool(coll <= tol)}

def monitor_lln_majority(mem: MemBank, classes: np.ndarray, step: int = 1) -> Dict[str, Any]:
    """Estime que le seuillage est stabilisé : H == sign(M) (sous thresh=True)."""
    stable = True; mism = 0; tot = 0
    for c in classes[::step]:
        signM = np.where(mem.M[c] >= 0, 1, -1).astype(np.int8, copy=False)
        diff  = (signM != mem.H[c])
        mism += int(diff.sum()); tot += diff.size
        stable &= not np.any(diff)
    rate = mism / float(max(1, tot))
    return {"majority_mismatch_rate": float(rate), "ok": bool(stable)}

In [None]:
def estimate_ram_bytes(mem: MemBank) -> int:
    return int(mem.M.nbytes + mem.H.nbytes + mem.n.nbytes)

def measure_train_latency(comp, pairs, warmup: int = 100) -> dict:
    mem, lsh, G = comp.mem, comp.lsh, comp.Gmem
    D = int(G.shape[0])
    items = list(pairs)
    w = min(warmup, len(items))
    # Warmup
    for i in range(w):
        Z_en, Z_fr = items[i]
        Z_en = Z_en.astype(np.int8, copy=False); Z_fr = Z_fr.astype(np.int8, copy=False)
        Z_en_mem = to_mem_tranche(Z_en, G)
        c = _lsh_bucket(lsh, Z_en_mem, mem.B)    # *** FIX ***
        mem.add(c, Z_fr)
    # Mesure
    t0, n = time.perf_counter(), 0
    for i in range(w, len(items)):
        Z_en, Z_fr = items[i]
        Z_en = Z_en.astype(np.int8, copy=False); Z_fr = Z_fr.astype(np.int8, copy=False)
        Z_en_mem = to_mem_tranche(Z_en, G)
        c = _lsh_bucket(lsh, Z_en_mem, mem.B)    # *** FIX ***
        mem.add(c, Z_fr); n += 1
    dt = time.perf_counter() - t0
    return {"count": int(n), "avg_ms_per_update": float(1000.0*dt/max(1,n))}

def measure_margins(comp: MemComponents, R_list) -> Dict[str, Any]:
    margins = []
    for R in R_list:
        Rm = build_query_mem(R, comp.Gmem)
        s  = mem_scores(comp.mem, Rm, use_thresh=True)
        margins.append(margin_top1(s))
    return {"margin_mean": float(np.mean(margins)), "margin_min": float(np.min(margins))}

def log_metadata(comp: MemComponents) -> Dict[str, Any]:
    md = dict(comp.meta)
    md["ram_bytes"] = estimate_ram_bytes(comp.mem)
    return md

In [None]:
def run_mm10_checklist(cfg: MemConfig,
                       synth_params: Tuple[int,int,float,float,int,int] = (32, 0.01, 0.01, 100, 101, 102),
                       eps_leak: float = 0.05) -> Dict[str, Any]:
    """Lance tous les contrôles MM10 et renvoie un rapport structuré.

    synth_params = (m_per_class, noise_fr, noise_en, seed_proto, seed_stream, seed_infer)
    CA par défaut :
      - étanchéité: |mean| ≤ 1e-2 & tail ≤ Hoeffding
      - collisions: ≤ 0.5% (k≥24, B≈1e3)
      - préc@1 (requêtes propres): ≥ 99.5%
      - LLN/majorité: mismatch_rate ~ 0
      - isométrie/involutivité: OK
      - perf: latence ∝ D (indicative)
    """
    m_per_class, noise_fr, noise_en, seed_proto, seed_stream, seed_infer = synth_params

    # 1) Construction pipeline + données synthétiques alignées
    comp = make_mem_pipeline(cfg)
    data = make_aligned_pairs(cfg.B, cfg.D, m_per_class, noise_fr, noise_en, seed_proto, seed_stream)

    # 2) Validateurs de base
    v_types = validate_types_shapes(comp.mem)
    v_tr    = validate_tranche_isometry(comp.Gmem, D=cfg.D)
    v_lsh   = validate_lsh_collisions(comp.lsh, cfg.D, B=min(cfg.B,1000))

    # 3) Entraînement one-pass
    train_one_pass_MEM(comp, data["pairs"])

    # 4) LLN/majorité (vérif rapide)
    classes = np.arange(cfg.B, dtype=np.int64)
    v_lln = monitor_lln_majority(comp.mem, classes, step=max(1, cfg.B//16))

    # 5) Étanchéité (inter-tranches)
    leak = accept_leakage(D=cfg.D, n=4000, eps=eps_leak)

    # 6) Précision@1 sur requêtes propres (e2e)
    e2e = accept_train_infer_precision(cfg, m_per_class, noise_fr, noise_en, seed_proto, seed_stream, seed_infer)

    # 7) Performance (latence moyenne / update)
    perf = measure_train_latency(comp, data["pairs"])

    # 8) Marges (sur les requêtes propres)
    marg = measure_margins(comp, data["R_clean"])

    # 9) Métadonnées / RAM
    meta = log_metadata(comp)

    # 10) Agrégation Go/No-Go
    ok = (v_types["ok"] and v_tr["ok"] and v_lsh["ok"] and v_lln["ok"] and leak["ok"] and e2e["ok"])
    report = {
        "ok": bool(ok),
        "types": v_types, "tranche": v_tr, "lsh": v_lsh, "lln_majority": v_lln,
        "leakage": leak, "e2e_prec1": e2e, "perf": perf, "margins": marg, "meta": meta
    }
    return report

In [None]:
@dataclass
class MemConfig:
    # --- hyperparamètres ---
    D: int              # dimension HD
    B: int              # capacité mémoire (nb de buckets/classes)
    k: int              # nombre de bits LSH (MM3)
    seed: int           # seed globale pour reproductibilité
    seed_lsh: int       # seed spécifique pour LSH
    seed_gmem: int      # seed spécifique pour tranche G_MEM
    thresh: bool = True # seuillage en-ligne (MM4)

    # --- objets instanciés ---
    mem: "MemBank" = None
    lsh: "SignLSH" = None
    Gmem: np.ndarray = None

def make_memconfig(D=4096, B=256, k=24, seed=42, thresh: bool = True) -> MemConfig:
    """Fabrique une configuration mémoire complète pour les tests MM10."""
    # dérive des seeds pour LSH et Gmem à partir du seed global
    seed_lsh = seed + 1
    seed_gmem = seed + 2

    # clé de tranche ±1 int8 (seed_gmem)
    rng = np.random.default_rng(seed_gmem)
    Gmem = rng.choice([-1, +1], size=D).astype(np.int8)

    # indexeur LSH (seed_lsh)
    lsh = SignLSH.with_k_bits(D, k, seed=seed_lsh)

    # banque mémoire
    mem = MemBank(B=B, D=D)

    return MemConfig(
        D=D, B=B, k=k, seed=seed,
        mem=mem, lsh=lsh, Gmem=Gmem,
        seed_lsh=seed_lsh, seed_gmem=seed_gmem,
        thresh=thresh,
    )

In [107]:
cfg = make_memconfig(D=4096, B=256, k=24, seed=123)
report = run_mm10_checklist(cfg)

NameError: name 'validate_types_shapes' is not defined