In [6]:
!pip install brian2

Collecting brian2
  Downloading brian2-2.9.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.2 kB)
Downloading brian2-2.9.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m21.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: brian2
Successfully installed brian2-2.9.0


In [7]:
from brian2 import *
from collections import defaultdict
from pathlib import Path
from random import randrange, seed as rseed
from struct import unpack
import numpy as np
from tqdm import tqdm
import itertools
from tqdm.notebook import tqdm


# 파라ㅏ미터 값 축소
N_TRAIN = 3_000
N_OBSERVE = 500
N_TEST   = 300

SEED = 42
MNIST_PATH = Path('../mnist')   # MNIST IDX 파일 4개 있는 곳임
DATA_PATH  = Path('data')

N_SAVE_POINTS = 50

# 건드리지 말라고 되어 있는 부분이긴 한ㄷ;
# 빠르게 테스트 해야되니까 뉴런 개수만 좀 줄이기

N_INP = 784
N_NEURONS = 200
V_EXC_REST = -65 * mV
V_INH_REST = -60 * mV
INTENSITY = 2

# 시뮬 시간도 줄이기
ACTIVE_T = 200 * ms             # 입력 자극 시간 줄이기 (350ms -> 200ms)
REST_T   = 100 * ms             # 휴지 시간 줄이기 (150ms -> 100ms)
defaultclock.dt = 1 * ms        # 해상도 완화

# 측면 억제(고정 가중치)
W_EXC_INH = 10.4
W_INH_EXC = 17.0

# (선택) 수동 모드 실행을 원하면 아래 값을 'train'/'observe'/'test'/'plot' 중 택1
MODE = None  # None이면 자동 러너 사용


In [8]:
def save_npy(arr, path):
    arr = np.array(arr)
    print('%-9s %-15s => %-30s' % ('Saving', arr.shape, path))
    np.save(path, arr)

def load_npy(path):
    arr = np.load(path)
    print('%-9s %-30s => %-15s' % ('Loading', path, arr.shape))
    return arr

def read_mnist(training: bool):
    tag = 'train' if training else 't10k'
    img_file = MNIST_PATH / f'{tag}-images-idx3-ubyte'
    lbl_file = MNIST_PATH / f'{tag}-labels-idx1-ubyte'

    if img_file.exists() and lbl_file.exists():
        # ---- IDX 원본에서 직접 읽기 ----
        with open(img_file, 'rb') as images, open(lbl_file, 'rb') as labels:
            images.read(4)                              # magic
            n_images = unpack('>I', images.read(4))[0]
            n_rows   = unpack('>I', images.read(4))[0]
            n_cols   = unpack('>I', images.read(4))[0]
            labels.read(4)                              # magic
            x = np.frombuffer(images.read(), dtype=np.uint8)
            x = x.reshape(n_images, -1).astype(np.float32) / 8.0
            y = np.frombuffer(labels.read(), dtype=np.uint8)
        return x, y

    # ---- 대체 경로: Keras MNIST 사용 ----
    print(f"MNIST로드 (MNIST_PATH={MNIST_PATH})")
    try:
        try:
            from tensorflow.keras.datasets import mnist as keras_mnist  # TF가 있으면 여기서
        except Exception:
            from keras.datasets import mnist as keras_mnist             # 또는 standalone keras
    except Exception as e:
        raise FileNotFoundError(
        ) from e

    (X_train, y_train), (X_test, y_test) = keras_mnist.load_data()
    x, y = (X_train, y_train) if training else (X_test, y_test)
    x = x.reshape(x.shape[0], -1).astype(np.float32) / 8.0
    y = y.astype(np.uint8)
    return x, y


In [9]:
# 뉴런 막전위, 흥분성/억제성 전류, 시냅스 전도, 타이머 포함

def build_network(training: bool):
    # 공통 뉴런 방정식
    eqs = '''
    dv/dt = (v_rest - v + i_exc + i_inh) / tau_mem  : volt (unless refractory)
    i_exc = ge * -v                         : volt
    i_inh = gi * (v_inh_base - v)           : volt
    dge/dt = -ge/(1 * ms)                   : 1
    dgi/dt = -gi/(2 * ms)                   : 1
    dtimer/dt = 1                           : second
    '''
    reset = f'v = {V_EXC_REST!r}; timer = 0 * ms'

    # 흥분성 뉴런: 학습 시 theta 동적, 평가 시 저장된 theta 사용
    if training:
        exc_eqs = eqs + '\n        dtheta/dt = -theta / (1e7 * ms) : volt\n'
        arr_theta = np.ones(N_NEURONS) * 20 * mV
        reset += '; theta += 0.05 * mV'
    else:
        exc_eqs = eqs + '\n        theta : volt\n'
        arr_theta = load_npy(DATA_PATH / 'theta.npy') * volt

    exc_eqs = Equations(exc_eqs, tau_mem=100*ms, v_rest=V_EXC_REST, v_inh_base=-100*mV)
    ng_exc = NeuronGroup(
        N_NEURONS, exc_eqs,
        threshold='v > (theta - 72 * mV) and (timer > 50 * ms)',
        refractory=5*ms, reset=reset, method='euler', name='exc'
    )
    ng_exc.v = V_EXC_REST
    ng_exc.theta = arr_theta

    # 억제성 뉴런
    inh_eqs = Equations(eqs, tau_mem=10*ms, v_rest=V_INH_REST, v_inh_base=-85*mV)
    ng_inh = NeuronGroup(
        N_NEURONS, inh_eqs,
        threshold='v > -40 * mV', refractory=2*ms, reset='v = -45 * mV',
        method='euler', name='inh'
    )
    ng_inh.v = V_INH_REST

    # 측면 억제: exc→inh (1:1), inh→exc (자기 제외 전결합)
    syns_exc_inh = Synapses(ng_exc, ng_inh, on_pre=f'ge_post += {W_EXC_INH}')
    syns_exc_inh.connect(j='i')
    syns_inh_exc = Synapses(ng_inh, ng_exc, on_pre=f'gi_post += {W_INH_EXC}')
    syns_inh_exc.connect('i != j')

    # 입력층(포아송 발화)
    pg_inp = PoissonGroup(N_INP, 0*Hz, name='inp')

    # 입력 -> 흥분성: 학습 시 STDP, 아니면 저장된 w 사용
    model = 'w : 1'
    on_pre = 'ge_post += w'
    on_post = ''
    if training:
        on_pre  += '; pre = 1.; w = clip(w - 0.0001 * post1, 0, 1.0)'
        on_post += 'post2bef = post2; w = clip(w + 0.01 * pre * post2bef, 0, 1.0); post1=1.; post2=1.'
        model   += '''
        post2bef : 1
        dpre/dt   = -pre/(20*ms)  : 1 (event-driven)
        dpost1/dt = -post1/(20*ms): 1 (event-driven)
        dpost2/dt = -post2/(40*ms): 1 (event-driven)
        '''
        weights = (np.random.random(N_INP * N_NEURONS) + 0.01) * 0.3
    else:
        weights = load_npy(DATA_PATH / 'weights.npy')

    syns_inp_exc = Synapses(pg_inp, ng_exc, model=model, on_pre=on_pre, on_post=on_post, name='inp_exc')
    syns_inp_exc.connect(True)
    syns_inp_exc.delay = 'rand() * 10 * ms'
    syns_inp_exc.w = weights

    # 모니터 + 네트워크 구성
    exc_mon = SpikeMonitor(ng_exc, name='sp_exc')
    net = Network([pg_inp, ng_exc, ng_inh, syns_inp_exc, syns_exc_inh, syns_inh_exc, exc_mon])
    net.run(0*ms)  # 초기화
    return net



In [10]:
def show_sample(net, sample, intensity):
    # sample(784) -> 포아송 발화율로 200ms 제시, 100ms 휴지
    exc_mon = net['sp_exc']
    prev = exc_mon.count[:]
    net['inp'].rates = sample * intensity * Hz
    net.run(ACTIVE_T)
    nextc = exc_mon.count[:]
    net['inp'].rates = 0 * Hz
    net.run(REST_T)
    pat = nextc - prev
    if np.sum(pat) < 5:  # 발화가 너무 적으면 강도 높여서 재시도 해보깅
        return show_sample(net, sample, intensity + 1)
    return pat

def predict(groups, rates):
    # 각 클래스 그룹의 평균 발화율을 비교해 argmax
    return np.argmax([rates[grp].mean() for grp in groups])


In [11]:
def normalize_plastic_weights(syns):
    # 열 합(각 뉴런으로 들어오는 총합)을 78로 정규화 -> 발산 방지/경쟁 안정화
    conns = np.reshape(syns.w, (N_INP, N_NEURONS))
    col_sums = np.sum(conns, axis=0)
    factors = 78. / col_sums
    conns *= factors
    syns.w = conns.reshape(-1)

def stats(net):
    tick = defaultclock.timestep[:]
    cnt  = np.sum(net['sp_exc'].count[:])
    w    = net['inp_exc'].w
    w_mu, w_std = np.mean(w), np.std(w)
    theta = net['exc'].theta / mV
    return [tick, cnt, w_mu, w_std, np.mean(theta), np.std(theta)]

def train():
    # STDP 학습
    X, Y = read_mnist(True)
    n_samples = X.shape[0]
    net = build_network(True)

    rows   = [stats(net) + [-1]]
    w_hist = [np.array(net['inp_exc'].w)]
    ratio  = max(N_TRAIN // N_SAVE_POINTS, 1)

    for i in tqdm(range(N_TRAIN), desc="Train", unit="img", leave=True):
        ix = i % n_samples
        normalize_plastic_weights(net['inp_exc'])
        show_sample(net, X[ix], INTENSITY)
        rows.append(stats(net) + [Y[ix]])

        if i % ratio == 0:
            w_hist.append(np.array(net['inp_exc'].w))

        if (i + 1) % 100 == 0:
            t = rows[-1]
            tqdm.write(f"[Train] step={i+1}  w_mu={t[2]:.4f}  theta_mu={t[4]:.2f}mV")

    save_npy(rows,                 DATA_PATH / 'train_stats.npy')
    save_npy(w_hist,               DATA_PATH / 'train_w_hist.npy')
    save_npy(net['inp_exc'].w,     DATA_PATH / 'weights.npy')
    save_npy(net['exc'].theta,     DATA_PATH / 'theta.npy')


def observe():
    # 클래스별 평균 반응 -> 뉴런별 최다 반응 클래스 매핑(assign.npy)
    X, Y = read_mnist(True)
    n_samples = X.shape[0]
    net = build_network(False)

    rows = [stats(net) + [-1]]
    responses = defaultdict(list)

    for i in tqdm(range(N_OBSERVE), desc="Observe", unit="img", leave=True):
        ix = i % n_samples
        exc = show_sample(net, X[ix], INTENSITY)
        rows.append(stats(net) + [Y[ix]])
        responses[Y[ix]].append(exc)

        if (i + 1) % 100 == 0:
            tqdm.write(f"[Observe] step={i+1}")

    res = np.zeros((10, N_NEURONS))
    for cls, vals in responses.items():
        res[cls] = np.array(vals).mean(axis=0)

    assign = np.argmax(res, axis=0)
    save_npy(assign, DATA_PATH / 'assign.npy')
    save_npy(rows,   DATA_PATH / 'observe_stats.npy')


def test():
    # assign.npy(뉴런 -> 클래스 매핑) 필요
    conf   = np.zeros((10, 10))
    assign = np.load(DATA_PATH / 'assign.npy')
    groups = [np.where(assign == i)[0] for i in range(10)]

    X, Y = read_mnist(False)
    net  = build_network(False)
    correct = 0

    for i in tqdm(range(N_TEST), desc="Test", unit="img", leave=True):
        ix = randrange(len(X))
        exc = show_sample(net, X[ix], INTENSITY)
        guess, real = predict(groups, exc), Y[ix]
        if guess == real:
            correct += 1
        conf[real, guess] += 1

        if (i + 1) % 10 == 0:
            tqdm.write(f"[Test] {i+1}/{N_TEST}  Acc={correct/(i+1):.3f}")

    final_acc = np.trace(conf) / np.sum(conf)
    print('Final Accuracy: %6.3f' % final_acc)

    conf = conf / conf.sum(axis=1, keepdims=True)
    save_npy(conf, DATA_PATH / 'confusion.npy')



In [12]:
def plot():
    conf = np.load(DATA_PATH / 'confusion.npy')
    import matplotlib.pyplot as plt

    plt.imshow(100*conf, interpolation='nearest', cmap=plt.cm.Blues)
    for i, j in itertools.product(range(conf.shape[0]), range(conf.shape[1])):
        if conf[i, j] == 0: continue
        plt.text(j, i, f"{round(100*conf[i,j])}%",
                 ha='center', va='center',
                 color='white' if conf[i, j] > 0.5 else 'black')
    plt.colorbar()
    plt.xticks(range(10)); plt.yticks(range(10))
    plt.xlabel('Predicted label'); plt.ylabel('True label')
    plt.show()


In [13]:
if __name__ == '__main__':
    seed(SEED); rseed(SEED)
    DATA_PATH.mkdir(parents=True, exist_ok=True)

    if MODE is None:
        need_train   = not (DATA_PATH / 'weights.npy').exists() or not (DATA_PATH / 'theta.npy').exists()
        need_observe = not (DATA_PATH / 'assign.npy').exists()

        if need_train:
            print('[Auto] train 실행')
            train()      # Train 0/3000
        if need_observe:
            print('[Auto] observe 실행')
            observe()    # Observe 0/500
        print('[Auto] test 실행')
        test()          # Test 0/300
    else:
        cmds = dict(train=train, observe=observe, test=test, plot=plot)
        print(f'[Manual] {MODE} 실행')
        cmds[MODE]()


[Auto] weights/theta 없음 -> train 실행
[read_mnist] IDX 파일이 없어 Keras MNIST로 대체 로드합니다. (MNIST_PATH=../mnist)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
[1m11490434/11490434[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step


Train:   0%|          | 0/3000 [00:00<?, ?img/s]

[Train] step=100  w_mu=0.0995  theta_mu=20.43mV
[Train] step=200  w_mu=0.0995  theta_mu=20.66mV
[Train] step=300  w_mu=0.0995  theta_mu=20.89mV
[Train] step=400  w_mu=0.0995  theta_mu=21.11mV
[Train] step=500  w_mu=0.0995  theta_mu=21.32mV
[Train] step=600  w_mu=0.0995  theta_mu=21.56mV
[Train] step=700  w_mu=0.0995  theta_mu=21.82mV
[Train] step=800  w_mu=0.0995  theta_mu=22.11mV
[Train] step=900  w_mu=0.0995  theta_mu=22.39mV
[Train] step=1000  w_mu=0.0995  theta_mu=22.61mV
[Train] step=1100  w_mu=0.0995  theta_mu=22.90mV
[Train] step=1200  w_mu=0.0995  theta_mu=23.27mV
[Train] step=1300  w_mu=0.0995  theta_mu=23.90mV
[Train] step=1400  w_mu=0.0995  theta_mu=24.58mV
[Train] step=1500  w_mu=0.0995  theta_mu=24.79mV
[Train] step=1600  w_mu=0.0995  theta_mu=25.00mV
[Train] step=1700  w_mu=0.0995  theta_mu=25.17mV
[Train] step=1800  w_mu=0.0995  theta_mu=25.47mV
[Train] step=1900  w_mu=0.0995  theta_mu=25.78mV
[Train] step=2000  w_mu=0.0995  theta_mu=26.14mV
[Train] step=2100  w_mu=0.099

Observe:   0%|          | 0/500 [00:00<?, ?img/s]

[Observe] step=100
[Observe] step=200
[Observe] step=300
[Observe] step=400
[Observe] step=500
Saving    (200,)          => data/assign.npy               
Saving    (501, 7)        => data/observe_stats.npy        
[Auto] test 실행
[read_mnist] IDX 파일이 없어 Keras MNIST로 대체 로드합니다. (MNIST_PATH=../mnist)
Loading   data/theta.npy                 => (200,)         
Loading   data/weights.npy               => (156800,)      


Test:   0%|          | 0/300 [00:00<?, ?img/s]

[Test] 10/300  Acc=0.400
[Test] 20/300  Acc=0.450
[Test] 30/300  Acc=0.567
[Test] 40/300  Acc=0.625
[Test] 50/300  Acc=0.620
[Test] 60/300  Acc=0.650
[Test] 70/300  Acc=0.643
[Test] 80/300  Acc=0.662
[Test] 90/300  Acc=0.656
[Test] 100/300  Acc=0.670
[Test] 110/300  Acc=0.682
[Test] 120/300  Acc=0.692
[Test] 130/300  Acc=0.708
[Test] 140/300  Acc=0.707
[Test] 150/300  Acc=0.700
[Test] 160/300  Acc=0.681
[Test] 170/300  Acc=0.688
[Test] 180/300  Acc=0.667
[Test] 190/300  Acc=0.653
[Test] 200/300  Acc=0.645
[Test] 210/300  Acc=0.643
[Test] 220/300  Acc=0.641
[Test] 230/300  Acc=0.648
[Test] 240/300  Acc=0.646
[Test] 250/300  Acc=0.636
[Test] 260/300  Acc=0.631
[Test] 270/300  Acc=0.630
[Test] 280/300  Acc=0.632
[Test] 290/300  Acc=0.638
[Test] 300/300  Acc=0.633
Final Accuracy:  0.633
Saving    (10, 10)        => data/confusion.npy            
