In [1]:
from hllset_swarm.trajectory import SwarmProgram
from hllset_swarm.io.env import Environment

__all__ = ["SwarmProgram", "Environment"]

Loading HLLSet kernel from: None


In [4]:
from hllset_swarm.hllset_wrapper import HllSet

h = HllSet(P=10)
print(h.count())

1.0


# Demo

Below is a single-file, zero-dependency Python script that:

- builds 3 dummy inverted indices (1-, 2-, 3-gram)
- creates a given HLLSet bit-vector
- trains a tiny TRM actuator (shared Wτ, Wρ) on fake “bad→good” pairs
- tests it until reconstruction error > threshold → triggers on-line tuning
- hot-swaps the lattice and resumes with lower error

Run it anywhere: python dummy_self_tune.py

In [1]:
#!/usr/bin/env python3
"""
Dummy self-tuning TRM actuator
- 3 inverted indices (1-,2-,3-gram)
- given HLLSet bit-vector
- train → test → trigger tune → hot-swap
No external deps – pure Python + built-in random
"""
import random, math, time, json, gzip, os
from typing import Dict, List, Set, Tuple
import torch

In [2]:
# ---------- 1.  CONFIG ----------
VOCAB_1G      = 5000          # dummy 1-grams
VOCAB_2G      = 3000          # dummy 2-grams
VOCAB_3G      = 2000          # dummy 3-grams
REGISTERS     = 1024          # P=10
HASH_WIDTH  = 32            # 32-bit hash
BEAM          = 5
TOLERANCE   = 0.05          # 5 % BER
TUNE_SAMPLES= 20            # mini-batch for tuning
MAX_PASS    = 8

In [3]:
# ---------- 2.  DUMMY INVERTED INDICES ----------
def hash32(x: str) -> int:
    return abs(hash(x)) & ((1 << HASH_WIDTH) - 1)

def build_index(vocab: List[str], name: str) -> Dict[str, dict]:
    idx = {}
    for tok in vocab:
        h = hash32(tok)
        reg = h % REGISTERS
        run = (h >> 10) & 31   # 0-31
        idx[tok] = {"hash": h, "reg": reg, "run": run}
    return idx

print("Building dummy indices…")
idx1 = build_index([f"1g{i:05d}" for i in range(VOCAB_1G)], "1g")
idx2 = build_index([f"2g{i:05d}" for i in range(VOCAB_2G)], "2g")
idx3 = build_index([f"3g{i:05d}" for i in range(VOCAB_3G)], "3g")
print(f"Indices ready: 1g={len(idx1)}, 2g={len(idx2)}, 3g={len(idx3)}")

Building dummy indices…
Indices ready: 1g=5000, 2g=3000, 3g=2000


In [4]:
# ---------- 3.  GIVEN HLLSet BIT-VECTOR ----------
def given_hllset() -> List[int]:
    """returns bit-vector 0/1 length REGISTERS"""
    # fake: 60 % density, mutually exclusive hashes
    bits = [0] * REGISTERS
    for tok in list(idx1.values())[:3000] + list(idx2.values())[:2000] + list(idx3.values())[:1000]:
        bits[tok["reg"]] |= 1 << tok["run"]
    return bits

ORIGINAL_BITS = given_hllset()
print(f"Given HLLSet: {sum(ORIGINAL_BITS)} bits set / {REGISTERS}")


Given HLLSet: 745796298825 bits set / 1024


In [5]:
# ---------- 4.  TOKEN ↔ BIT-VECTOR helpers ----------
def bits_to_tokens(bits: List[int], idx: Dict[str, dict]) -> List[str]:
    """naïve cover: return tokens whose (reg,run) is set"""
    out = []
    for tok, info in idx.items():
        if bits[info["reg"]] & (1 << info["run"]):
            out.append(tok)
    return out

def tokens_to_bits(tokens: List[str], idx: Dict[str, dict]) -> List[int]:
    bits = [0] * REGISTERS
    for tok in tokens:
        info = idx[tok]
        bits[info["reg"]] |= 1 << info["run"]
    return bits

In [6]:
# ---------- 5.  RECONSTRUCTION LOSS (Jaccard) ----------
def reconstruction_loss(pred_tokens: List[str], orig_bits: List[int]) -> float:
    pred_bits = tokens_to_bits(pred_tokens, {**idx1, **idx2, **idx3})
    intersect = sum(a & b for a, b in zip(orig_bits, pred_bits))
    union = sum(a | b for a, b in zip(orig_bits, pred_bits))
    return 1.0 - (intersect / max(union, 1))

In [7]:
# ---------- 6.  TINY TRM ACTUATOR ----------
class TRMActuator:
    def __init__(self, n: int):
        self.n = n
        self.Wτ = self._random_sparse()
        self.Wρ = self.Wτ * 0.3
        self.alpha, self.beta, self.gamma = 0.2, 0.15, 0.05
        self.eta = 0.02

    def _random_sparse(self, density=0.001):
        # import torch
        idx = torch.randint(0, n, (2, int(n * n * density)))
        val = torch.rand(idx.shape[1])
        return torch.sparse_coo_tensor(idx, val, (n, n)).coalesce()

    def step(self, z: torch.Tensor, teacher: torch.Tensor) -> torch.Tensor:
        # import torch
        cognitive = torch.sparse.mm(self.Wτ, z.unsqueeze(1)).squeeze()
        exclusion = torch.sparse.mm(self.Wρ, z.unsqueeze(1)).squeeze()
        z_new = z + self.alpha * cognitive - self.beta * exclusion + self.gamma * teacher
        return torch.clamp(z_new, 0.0, 1.0)

    def hebb_update(self, z: torch.Tensor, loss: float):
        # import torch
        outer = loss * torch.outer(z, z).flatten()
        idx = self.Wτ.coalesce().indices()
        delta = torch.sparse_coo_tensor(idx, outer, self.Wτ.shape)
        self.Wτ += delta
        self.Wρ += delta * 0.3
        self.Wτ = self.Wτ.coalesce()
        self.Wρ = self.Wρ.coalesce()

In [8]:
# ---------- 7.  BASE TRAINING (bad → good pairs) ----------
def base_training(actuator: TRMActuator, epochs: int = 3):
    print("Base training…")
    for epoch in range(epochs):
        # fake: create “bad” cover and “good” cover
        bad_bits = [random.randint(0, 1) for _ in range(REGISTERS)]
        good_bits = ORIGINAL_BITS
        z = torch.tensor(bad_bits, dtype=torch.float32)
        teacher = torch.tensor(good_bits, dtype=torch.float32)
        for pass_ in range(MAX_PASS):
            z = actuator.step(z, teacher)
        loss = reconstruction_loss(bits_to_tokens(z, {**idx1, **idx2, **idx3}), good_bits)
        actuator.hebb_update(z, loss)
        if epoch % 1 == 0:
            print(f"  epoch {epoch} loss={loss:.4f}")

In [9]:
# ---------- 8.  TEST LOOP (until threshold breach) ----------
def test_until_fail(actuator: TRMActuator, max_tests: int = 100):
    print("Testing (until fail)...")
    for step in range(max_tests):
        # generate noisy input
        noisy = [ORIGINAL_BITS[i] if random.random() < 0.95 else 1 - ORIGINAL_BITS[i] for i in range(REGISTERS)]
        z = torch.tensor(noisy, dtype=torch.float32)
        for pass_ in range(MAX_PASS):
            z = actuator.step(z, torch.zeros_like(z))  # no teacher during test
        pred_tokens = bits_to_tokens(z, {**idx1, **idx2, **idx3})
        loss = reconstruction_loss(pred_tokens, ORIGINAL_BITS)
        print(f"  test {step} loss={loss:.4f}", end="\r")
        if loss > TOLERANCE:
            print(f"\n>>> TOLERANCE BREACHED ({loss:.4f} > {TOLERANCE}) – triggering tune-up!")
            return step, loss
    print("\n>>> all tests passed – no tune-up needed")
    return -1, 0.0

In [10]:
# ---------- 9.  ON-LINE TUNING EPISODE ----------
def tune_up(actuator: TRMActuator, fail_step: int, fail_loss: float):
    print("On-line tuning episode…")
    # collect last 50 failing samples
    samples = []
    for _ in range(TUNE_SAMPLES):
        noisy = [ORIGINAL_BITS[i] if random.random() < 0.95 else 1 - ORIGINAL_BITS[i] for i in range(REGISTERS)]
        samples.append(noisy)
    # train on those 50
    for noisy in samples:
        z = torch.tensor(noisy, dtype=torch.float32)
        teacher = torch.tensor(ORIGINAL_BITS, dtype=torch.float32)
        for pass_ in range(MAX_PASS):
            z = actuator.step(z, teacher)
        loss = reconstruction_loss(bits_to_tokens(z, {**idx1, **idx2, **idx3}), ORIGINAL_BITS)
        actuator.hebb_update(z, loss)
    print(">>> tune-up complete – new lattice loaded")

In [12]:
# ---------- 10.  DEMO RUN ----------
# if __name__ == "__main__":
random.seed(42)
actuator = TRMActuator(REGISTERS)
base_training(actuator, epochs=3)
fail_step, fail_loss = test_until_fail(actuator, max_tests=20)
if fail_step >= 0:
    tune_up(actuator, fail_step, fail_loss)
# final test
final_loss = reconstruction_loss(bits_to_tokens(actuator.step(torch.tensor(ORIGINAL_BITS, dtype=torch.float32), torch.zeros_like(torch.tensor(ORIGINAL_BITS, dtype=torch.float32)), {**idx1, **idx2, **idx3}), ORIGINAL_BITS))
print(f"\nFinal loss after tune-up: {final_loss:.4f} (tolerance {TOLERANCE})")

NameError: name 'n' is not defined