In [1]:
import torch
class LatencyEncoder:
    def __init__(self, time: int = 100):
        self.time = time  # Общее число временных шагов

    def __call__(self, image: torch.Tensor) -> torch.Tensor:
        """
        image: Tensor [1, 28, 28] или [28, 28], значения от 0 до 1 или до 255
        return: spike_tensor [time, 1, 784]
        """
        if image.ndim == 3:
            image = image.squeeze()

        if image.max() > 1:
            image = image / 255.0

        spike_tensor = torch.zeros((self.time, 1, 784))

        for i in range(28):
            for j in range(28):
                pixel = image[i, j].item()
                if pixel > 0:
                    spike_time = int((1.0 - pixel) * (self.time - 1))
                    spike_tensor[spike_time, 0, i * 28 + j] = 1.0

        return spike_tensor.view(self.time, 1, 28, 28)


In [16]:
def set_stdp_nu(conn, nu_plus, nu_minus):
    dev = conn.w.device
    conn.update_rule.nu = (torch.tensor(nu_plus, device=dev),
                           torch.tensor(nu_minus, device=dev))

In [31]:
_rate_ema = None
def adapt_thresholds_ema(layer, spike_counts, T, target=1.5, alpha=0.9, k=0.02):
    global _rate_ema
    with torch.no_grad():
        rate = spike_counts / max(1, T)
        if _rate_ema is None: _rate_ema = rate.clone()
        _rate_ema = alpha * _rate_ema + (1 - alpha) * rate
        vt = layer.v_thresh if hasattr(layer,"v_thresh") else layer.thresh
        vt += k * (_rate_ema - target)
        vt.clamp_(0.15, 1.2)
        if hasattr(layer, "v_thresh"): layer.v_thresh = vt
        else: layer.thresh = vt

In [39]:
# --- безопасное присваивание в буферы/атрибуты модуля ---
def _set_param(module, name, value, prefer_scalar=False):
    """
    Ставит value в module.<name>, учитывая, что это может быть зарегистрированный буфер (Tensor).
    prefer_scalar=True -> делает 0-D тензор (для refrac).
    """
    if not hasattr(module, name):
        return False
    cur = getattr(module, name)

    # Если это Tensor-буфер
    if isinstance(cur, torch.Tensor):
        if prefer_scalar:
            # Нам нужен 0-D, иначе masked_fill_ упадёт (для refrac)
            val = torch.tensor(float(value), device=cur.device)
            setattr(module, name, val)  # заменить буфер на 0-D тензор
            return True

        # Иначе заполняем по форме текущего буфера
        if torch.is_tensor(value):
            if value.numel() == 1 and cur.numel() > 1:
                cur.data.fill_(float(value))
            else:
                if value.shape != cur.shape:
                    value = value.view_as(cur)
                cur.data.copy_(value.to(cur.device, dtype=cur.dtype))
        else:
            cur.data.fill_(float(value))
        return True

    # Не буфер — обычный атрибут: ставим как есть
    setattr(module, name, value if not prefer_scalar else float(value))
    return True


def tune_lif_params(lif_layer, n_hidden, vt_mean=0.35, vt_jitter=0.02, tau_val=50.0, refrac_val=2.0):
    with torch.no_grad():
        # Пороги: вектор с лёгким разбросом
        vt = (vt_mean + vt_jitter * torch.randn(n_hidden)).clamp(0.05, 2.0)
        if not _set_param(lif_layer, "v_thresh", vt):
            _set_param(lif_layer, "thresh", vt)  # thresh здесь именно Tensor, не float!

        # Мембранная константа (имя варьируется по версиям)
        if not _set_param(lif_layer, "tau_m", torch.full((n_hidden,), tau_val)):
            _set_param(lif_layer, "tau",   torch.full((n_hidden,), tau_val))

        # ВАЖНО: рефрактерка должна быть СКАЛЯРОМ (0-D Tensor или float)
        _set_param(lif_layer, "refrac", refrac_val, prefer_scalar=True)

        # reset — скаляр
        if not _set_param(lif_layer, "v_reset", 0.0, prefer_scalar=True):
            _set_param(lif_layer, "reset",  0.0, prefer_scalar=True)


# 🧠 Spiking Neural Network (SNN) на базе BindsNET с обучением через STDP

В этом эксперименте реализована простая биологически правдоподобная спайковая нейросеть (SNN) для обработки изображений MNIST.

## 📌 Архитектура сети:
- **Входной слой (`Input`, 784 нейрона)** — по одному нейрону на каждый пиксель изображения 28×28.
- **Poisson-кодировщик** — преобразует яркость пикселей в вероятностные временные спайки.
- **Полносвязный слой (`Connection`)** — соединяет вход с выходом (матрица весов 784 × 100).
- **Выходной слой (`LIF`, 100 нейронов)** — Leaky Integrate-and-Fire нейроны с утечкой и порогом.
- **STDP (Spike-Timing Dependent Plasticity)** — обучение без градиентов; веса усиливаются, если вход активен до выходного спайка.

## 🔬 Что делается:
1. Загружается одно изображение MNIST.
2. Кодируется в Poisson-спайковый поток.
3. Пропускается через сеть:
   - `Input` получает входные спайки,
   - `LIF` нейроны активируются в зависимости от весов.
4. Сохраняются:
   - Спайковая активность `LIF`-нейронов до и после подачи входа.
   - Сумма спайков на входе (`Input`) — показывает, какие пиксели активны.
   - Веса одного выбранного `LIF`-нейрона — до и после STDP.

## 📈 Визуализация:
- График: сравнение спайковой активности нейронов до/после + входные спайки.
- График: изменение весов, ведущих к нейрону `LIF[42]` — видно, как STDP усиливает значимые связи.

## 🎯 Цель:
Показать, как SNN:
- преобразует изображение в поток спайков,
- активирует только специфичные нейроны,
- адаптирует веса на основе временных шаблонов (STDP), без использования обратного распространения ошибки.


In [40]:
# ====== SETUP (Colab/Local) ======
# !pip -q install bindsnet==0.2.8 torchvision==0.18.1 torch==2.3.1 --extra-index-url https://download.pytorch.org/whl/cu121

import os, itertools, random, csv, time as _ptime
from dataclasses import dataclass, asdict

import torch
import numpy as np
import matplotlib.pyplot as plt

# ====== Utils ======
def set_seed(seed=42):
    random.seed(seed); np.random.seed(seed); torch.manual_seed(seed)

def to_2d(s):  # [T,N] or [T,1,N] -> [T,N]
    return s[:,0,:] if (s.dim()==3 and s.size(1)==1) else s

# ====== Config ======
@dataclass
class Cfg:
    # core
    time:   int = 200
    n_hidden: int = 100
    nu_plus:  float = 0.02
    nu_minus: float = -0.02

    # inhibition / WTA
    inhib_strength: float = 0.3
    inh_decay: float = 0.9
    top_k: int = 0                    # 0 = WTA off for diagnostics
    enable_inhibition_at_start: bool = False

    # encoder
    encoder: str = "latency"         # start with Poisson to "ignite" spikes

    # homeostasis
    target_spikes: float = 2.0
    eta_up: float = 1.0
    eta_down: float = 0.5
    thresh_min: float = 0.2
    thresh_max: float = 2.0
    thresh_init: float = 0.5          # v_thresh initial (BindsNET positive scale)

    # weights
    w_clip_min: float = 0.0
    w_clip_max: float = 1.5
    w_col_target_norm: float = 20.0
    w_init_lo: float = 0.8
    w_init_hi: float = 1.2
    wmin:float = 0.0
    wmax:float = 2.0
    # loop
    N: int = 200
    log_every: int = 50
    seed: int = 42

# ====== Helpers: WTA, norm, thresholds, plots, metrics ======
def apply_wta(s, top_k=1):
    s2 = to_2d(s)
    sb = s2.sum(0).float().squeeze()
    if sb.sum() == 0:
        return False, None
    vals, idxs = torch.topk(sb, k=min(top_k, sb.numel()))
    s.zero_()
    for j in idxs.tolist():
        if s.dim()==3:
            s[:,0,j] = True
        else:
            s[:,j] = True
    return True, idxs.tolist()

def weight_soft_bound_and_colnorm(conn_w, w_clip_min, w_clip_max, target_norm):
    with torch.no_grad():
        w = conn_w.data
        w.clamp_(w_clip_min, w_clip_max)
        col_norm = w.norm(p=1, dim=0, keepdim=True) + 1e-6
        w.mul_(target_norm / col_norm)

def adapt_thresholds(layer, spike_counts, cfg: Cfg):
    with torch.no_grad():
        vt = layer.v_thresh if hasattr(layer, "v_thresh") else layer.thresh
        vt -= 0.05 * (spike_counts < 1.0).float()        # if silent -> lower threshold
        vt += 0.02 * (spike_counts > 3.0).float()        # if too active -> raise
        vt.clamp_(cfg.thresh_min, cfg.thresh_max)
        if hasattr(layer, "v_thresh"): layer.v_thresh = vt
        else: layer.thresh = vt

def spiking_metrics_window(lif_s, winners=None):
    s = to_2d(lif_s).to(torch.bool)
    T, N = s.shape
    per_n = s.sum(0)
    tot = int(per_n.sum())
    active = int((per_n > 0).sum())
    if tot > 0:
        p = (per_n / tot).float().cpu().numpy()
        HHI = float((p**2).sum())
        ps = np.sort(p)
        Gini = float((np.cumsum(ps).sum()/ps.sum() - (len(ps)+1)/2)/len(ps))
    else:
        HHI, Gini = 1.0, 1.0
    uniq_winners = len(set(winners)) if winners else 0
    return dict(T=T, N=N, total_spikes=tot, active=active, HHI=HHI, Gini=Gini, uniq_winners=uniq_winners)

class SNNMeter:
    def __init__(self): self.reset()
    def reset(self):
        self.samples=0; self.S_out=0; self.S_in=0; self.SynOps=0; self.V_updates=0
        self.usage_counts = {}
    def log_sample(self, lif_s, in_s, n_hidden, T, winners=None):
        lif2 = to_2d(lif_s);  in2 = to_2d(in_s)
        s_out = int(lif2.sum().item())
        s_in  = int(in2.sum().item())
        self.S_out += s_out; self.S_in += s_in
        self.SynOps += s_in * n_hidden
        self.V_updates += n_hidden * T
        self.samples += 1
        if winners:
            for j in winners:
                self.usage_counts[j] = self.usage_counts.get(j,0)+1
    def report(self, a=1.0, b=0.05, c=0.005):
        s = max(1, self.samples)
        HHI_win = 0.0
        if self.usage_counts:
            tot = sum(self.usage_counts.values())
            ps = np.array([v/tot for v in self.usage_counts.values()], dtype=float)
            HHI_win = float((ps**2).sum())
        return {
            "spikes_per_sample": self.S_out/s,
            "synops_per_sample": self.SynOps/s,
            "v_updates_per_sample": self.V_updates/s,
            "energy_proxy_per_sample": (a*self.S_out + b*self.SynOps + c*self.V_updates)/s,
            "winners_unique": len(self.usage_counts),
            "winner_HHI": HHI_win,
        }

# ====== Build Net & Encoder ======
from bindsnet.network import Network
from bindsnet.network.nodes import Input, LIFNodes
from bindsnet.network.topology import Connection
from bindsnet.learning import PostPre
from torchvision import transforms
from bindsnet.datasets import MNIST
from bindsnet.network.monitors import Monitor

def build_net(cfg: Cfg):
    net = Network()

    input_layer = Input(n=784, traces=True)
    lif_layer   = LIFNodes(n=cfg.n_hidden, traces=True)
    tune_lif_params(lif_layer, cfg.n_hidden, vt_mean=0.35, vt_jitter=0.02, tau_val=50.0, refrac_val=2.0)
            
    net.add_layer(input_layer, name='Input')
    net.add_layer(lif_layer,   name='LIF')
    

    connection = Connection(source=input_layer, target=lif_layer)
    connection.update_rule = PostPre(connection=connection,
                                 nu=(torch.tensor(cfg.nu_plus),
                                     torch.tensor(cfg.nu_minus)))
    net.add_connection(connection, source='Input', target='LIF')

    # Lateral inhibition (created, but optionally disabled at start)
    W_inh = torch.full((cfg.n_hidden, cfg.n_hidden), -cfg.inhib_strength)
    W_inh.fill_diagonal_(0.0)
    recurrent_inh = Connection(source=lif_layer, target=lif_layer, w=W_inh.clone())
    net.add_connection(recurrent_inh, source='LIF', target='LIF')

    # Weights init (stronger to ignite)
    with torch.no_grad():
        connection.w.data.uniform_(cfg.w_init_lo, cfg.w_init_hi)

    # Thresholds: use v_thresh if available
    th0 = torch.full((cfg.n_hidden,), cfg.thresh_init)
    if hasattr(lif_layer, "v_thresh"): lif_layer.v_thresh = th0.clone()
    else: lif_layer.thresh = th0.clone()

    # Optionally disable inhibition at start (for diagnostics)
    if not cfg.enable_inhibition_at_start:
        with torch.no_grad():
            recurrent_inh.w.zero_()

    return net, input_layer, lif_layer, connection, recurrent_inh, W_inh

# Пре-процесс для произвольных изображений (PIL / np / tensor)
preprocess = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),
    transforms.Resize((28, 28), antialias=True),
    transforms.ToTensor(),                 # -> [1,28,28] float in [0,1]
    # Никаких Normalize(mean,std) здесь — нам нужны «сырые» 0..1!
])

def make_encoder(encoder_type: str, T: int, rate_floor: float = 0.0):  # floor по умолчанию 0
    encoder_type = encoder_type.lower()
    assert encoder_type in ("poisson", "latency")

    def encode_poisson(img_tensor):
        rates = img_tensor.view(-1).clamp(0, 1)
        # ДЕЛАЕМ РЕДКИЙ ПОТОК, ЧТОБЫ НЕ ЗАЛИВАТЬ СЕТЬ
        RATE_SCALE = 0.2
        rates = rates * RATE_SCALE
        # небольшой floor (если хочешь совсем не ноль)
        # rates = torch.maximum(rates, torch.full_like(rates, 5e-3))
        rand = torch.rand((T, rates.numel()))
        spikes = (rand < rates).float().view(T, 1, 784)
        return spikes

    def encode_latency(img_tensor):
        x = img_tensor.squeeze(0).clamp(0, 1)
        spikes = torch.zeros((T, 1, 784), dtype=torch.float32)
        nz = (x > 0).nonzero(as_tuple=False)
        if nz.numel() == 0:
            return spikes
        for idx in nz:
            i, j = int(idx[0]), int(idx[1])
            p = float(x[i, j])
            t = int(round((1.0 - p) * (T - 1)))
            # маленький джиттер ±1 тик (в пределах окна)
            if T >= 3:
                t += int(torch.randint(-1, 2, (1,)).item())
                t = max(0, min(T-1, t))
            spikes[t, 0, i*28 + j] = 1.0
        return spikes

    return (encode_poisson if encoder_type == "poisson" else encode_latency), preprocess
        
def _to_2d(s):  # [T, B, N] -> [T, N]
    return s[:, 0, :] if s.dim()==3 else s
    
# ====== One Experiment ======
def run_experiment(cfg: Cfg, verbose=True):
    set_seed(cfg.seed)

    # Датасет (MNIST уже в [0,1] и [1,28,28])
    transform = transforms.Compose([transforms.ToTensor()])
    dataset = MNIST(root='./data', train=True, download=True, transform=transform)
    # Сеть + мониторы
    net, input_layer, lif_layer, connection, recurrent_inh, W_inh = build_net(cfg)
    with torch.no_grad():
        recurrent_inh.w.fill_(0.0)
        recurrent_inh.w -= 0.2 * (1 - torch.eye(cfg.n_hidden))  # мягкая ингибиция
    lif_mon = Monitor(lif_layer, state_vars=("s",), time=cfg.time)
    inp_mon = Monitor(input_layer, state_vars=("s",), time=cfg.time)
    net.add_monitor(lif_mon, name="lif_mon")
    net.add_monitor(inp_mon, name="inp_mon")

    # Энкодер
    ENCODER_TYPE = cfg.encoder
    T = cfg.time
    encoder, _ = make_encoder(ENCODER_TYPE, T)

    # --------- WARMUP (без STDP) ---------
    WARMUP = getattr(cfg, "warmup_N", 50)
    if WARMUP > 0:
        # на прогрев STDP выкл.
        set_stdp_nu(connection, 0.0, 0.0)
        for wi in range(min(WARMUP, len(dataset))):
            image = dataset[wi]["image"]
            spike_input = encoder(image)
            net.run(inputs={"Input": spike_input}, time=cfg.time)

            # адаптация порогов на прогреве (по желанию — полезно)
            lif_s_full = lif_mon.get("s")
            spike_counts = to_2d(lif_s_full).sum(0).float().squeeze()
            adapt_thresholds_ema(lif_layer, spike_counts, cfg.time, target=2.0)
            

            # очистка состояний между примерами
            net.reset_state_variables()
            lif_mon.reset_state_variables()
            inp_mon.reset_state_variables()

    # --------- ОСНОВНОЙ ЦИКЛ ---------
    meter = SNNMeter()
    # мягкий STDP после прогрева
    set_stdp_nu(connection, 2e-4, -1e-4)  # было 1e-3 / -5e-4
    

    for i in range(cfg.N):
        sample = dataset[i]
        image  = sample["image"]
        spike_input = encoder(image)
        inputs = {"Input": spike_input}

        net.run(inputs=inputs, time=cfg.time)

        # Полный растр за окно
        lif_s_full = lif_mon.get("s")   # [T,B,N]
        in_s_full  = inp_mon.get("s")   # [T,B,784]
        lif2 = _to_2d(lif_s_full); in2 = _to_2d(in_s_full)

        # Диагностика по чекпоинтам
        if i in (0, 50, 150):
            print("INPUT spikes sum (window):", int(in2.sum().item()))
            print("LIF   spikes sum (window):",   int(lif2.sum().item()))
            print("INPUT window sum:", int(in2.sum()))
            print("LIF   window sum:", int(lif2.sum()))
            vt = (lif_layer.v_thresh if hasattr(lif_layer,'v_thresh') else lif_layer.thresh)
            print("v_thresh mean±std:", float(vt.mean()), float(vt.std()))
            print("w[min,max]:", float(connection.w.min()), float(connection.w.max()))

        # WTA (если включён)
        winners = None
        if cfg.top_k and cfg.top_k > 0:
            ok, winners = apply_wta(lif_layer.s, top_k=cfg.top_k)
            if not ok:
                net.reset_state_variables(); lif_mon.reset_state_variables(); inp_mon.reset_state_variables()
                continue

        # Метрики окна
        m = spiking_metrics_window(lif_s_full, winners)
        if verbose and ((i+1) % cfg.log_every == 0 or i == 0):
            print(f"[{i+1}] total={m['total_spikes']} active={m['active']}/{m['N']} HHI={m['HHI']:.3f}")

        # Homeostasis по «сырым» спайкам (до WTA)
        spike_counts = to_2d(lif_s_full).sum(0).float()
        adapt_thresholds_ema(lif_layer, spike_counts, cfg.time, target=1.5)

        # Кламп весов (без агрессивной колоночной нормировки каждый шаг)
        with torch.no_grad():
            connection.w.clamp_(0.0, 1.0)

        # Энергетика / учёт
        meter.log_sample(lif_s_full, in_s_full, cfg.n_hidden, cfg.time, winners=winners)

        # Сброс состояний и мониторов
        net.reset_state_variables()
        lif_mon.reset_state_variables()
        inp_mon.reset_state_variables()

   
    rpt = meter.report()
    if getattr(meter, "samples", 0) == 0:
        print("!! meter: no samples logged — проверь порядок log_sample()/reset() и continue в цикле")
    out = {**asdict(cfg), **rpt}
    return out

# ====== Grid Runner (compact) ======
def grid_run(base: Cfg):
    grid = {
        "inhib_strength": [0.3, 0.5],
        "top_k": [0, 3],
        "time": [200, 300],
        "use_latency": [False, True],
    }
    keys, vals = zip(*grid.items())
    results = []
    t0 = _ptime.time()
    for combo in itertools.product(*vals):
        cfg = Cfg(**{**asdict(base), **dict(zip(keys, combo))})
        print(">>> run:", {k: getattr(cfg,k) for k in keys})
        res = run_experiment(cfg, verbose=False)
        print({k: res[k] for k in ["spikes_per_sample","winners_unique","winner_HHI","energy_proxy_per_sample"]})
        results.append(res)

    os.makedirs("out", exist_ok=True)
    csv_path = os.path.join("out","snn_energy_accuracy_grid.csv")
    with open(csv_path, "w", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=results[0].keys())
        writer.writeheader(); writer.writerows(results)
    print(f"Saved: {csv_path} | runs={len(results)} | elapsed={_ptime.time()-t0:.1f}s")

    # quick Pareto-ish
    best = (sorted(results, key=lambda r: (r["energy_proxy_per_sample"], r["winner_HHI"], -r["winners_unique"])))[:5]
    print("\nTop-5 Pareto-ish:")
    for r in best:
        print({k: r[k] for k in ["inhib_strength","top_k","time","use_latency",
                                 "spikes_per_sample","winners_unique","winner_HHI","energy_proxy_per_sample"]})


In [41]:
cfg = Cfg(
    time=300,
    n_hidden=100,
    encoder="latency",                 # сначала Poisson
    top_k=0,                           # WTA выкл. для диагностики
    enable_inhibition_at_start=False,  # ингибицию включим позже
    nu_plus = 0.002,
    nu_minus = -0.001
)
res = run_experiment(cfg, verbose=True)
print("\nSUMMARY:", {k: res[k] for k in ["spikes_per_sample","winners_unique","winner_HHI","energy_proxy_per_sample"]})


RuntimeError: shape '[]' is invalid for input of size 100