In [2]:
# 환경
import os, random
import numpy as np
import torch

os.environ["TORCH_COMPILE_DISABLE"] = "1"  # 토치 컴파일 비활성

SEED = 42
random.seed(SEED); np.random.seed(SEED); torch.manual_seed(SEED)

if torch.cuda.is_available():
    torch.backends.cuda.matmul.allow_tf32 = True
    torch.backends.cudnn.allow_tf32 = True

if torch.cuda.is_available():
    device = torch.device("cuda")
    torch_dtype = torch.bfloat16 if torch.cuda.is_bf16_supported() else torch.float16
elif getattr(torch.backends, "mps", None) and torch.backends.mps.is_available():
    device = torch.device("mps"); torch_dtype = torch.float16
else:
    device = torch.device("cpu"); torch_dtype = torch.float32

print(f"Device: {device}, dtype: {torch_dtype}")

Device: cuda, dtype: torch.bfloat16


In [3]:
# 모델 및 토크나이저 로드

from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline

MODEL_ID = "openai/gpt-oss-20b"
REVISION = None

_tok = AutoTokenizer.from_pretrained(MODEL_ID, revision=REVISION, trust_remote_code=True, use_fast=True)
if _tok.pad_token is None:
    _tok.pad_token = _tok.eos_token

model = AutoModelForCausalLM.from_pretrained(
    MODEL_ID,
    revision=REVISION,
    trust_remote_code=True,
    torch_dtype=torch_dtype,
    low_cpu_mem_usage=False,
    device_map=None,
)
model.to(device); model.eval()

_gen = pipeline("text-generation", model=model, tokenizer=_tok,
                device=0 if device.type=="cuda" else -1)

print("Model & tokenizer ready.")

  from .autonotebook import tqdm as notebook_tqdm
`torch_dtype` is deprecated! Use `dtype` instead!
Fetching 41 files: 100%|██████████| 41/41 [00:00<?, ?it/s]
Fetching 41 files: 100%|██████████| 41/41 [00:00<?, ?it/s]?it/s]
Loading checkpoint shards: 100%|██████████| 3/3 [00:03<00:00,  1.04s/it]
Device set to use cuda:0


Model & tokenizer ready.


In [4]:
# LLM 테스트
TEST_PROMPT = [{"role": "user", "content": "안녕! 테스트야."}]
GEN_KW = dict(max_new_tokens=40, do_sample=False,
              eos_token_id=_tok.eos_token_id, pad_token_id=_tok.pad_token_id,
              return_full_text=False)

out = _gen(TEST_PROMPT, **GEN_KW)
print(out)


The following generation flags are not valid and may be ignored: ['temperature']. Set `TRANSFORMERS_VERBOSITY=info` for more details.


[{'generated_text': 'analysisThe user says "안녕! 테스트야." which means "Hi! This is a test." They likely want a friendly response. The instruction: "You are ChatGPT, a'}]


In [None]:
# -*- coding: utf-8 -*-
# LLM 텍스트 -> 신경전달물질 4축 스코어 추정 (OSS 모델용 견고 파서 포함)

import re
import json
from typing import Dict, List, Tuple

class LLM:
    # ====== 설정 ======
    CHEATSHEET = """
Definitions (short):
- Dopamine: reward/novelty/goal pursuit; approach & motivation.
- Serotonin: calm/satiety/stability; pro-social, contentment.
- Norepinephrine: alertness/vigilance/stress mobilization.
- Melatonin: circadian sleep drive; drowsiness/night-time cues.

Scoring rubric:
- Map cues in text to each axis independently in [0,1].
- Strong evidence -> ≥0.7, weak/ambiguous -> 0.3~0.6, absent -> ≤0.2.
- Do NOT infer medical claims; this is a heuristic proxy from language only.
""".strip()

    EXAMPLES: List[Tuple[str, Dict[str, float]]] = [
        ('Just aced the exam! Can’t wait to start a new project.',
         {"dopamine":0.82,"serotonin":0.55,"norepinephrine":0.38,"melatonin":0.10}),
        ('Breathing slowed, grateful after dinner with friends.',
         {"dopamine":0.35,"serotonin":0.78,"norepinephrine":0.22,"melatonin":0.18}),
        ('It’s 2am; eyes heavy; will sleep soon.',
         {"dopamine":0.20,"serotonin":0.32,"norepinephrine":0.18,"melatonin":0.85}),
    ]

    REQUIRED_KEYS = ("dopamine", "serotonin", "norepinephrine", "melatonin")

    def __init__(self):
        # 출력 마커 + 정확히 4키만
        self.EMO_SYSTEM = (
            "You are EmotionScore. Respond with JSON ONLY, wrapped by <json>...</json>.\n"
            "Output exactly these four keys as floats in [0,1]: "
            "dopamine, serotonin, norepinephrine, melatonin.\n"
            "No explanation. No extra keys. No text outside JSON."
        )

    # ====== 유틸 ======
    @staticmethod
    def _clip01(x):
        try:
            return max(0.0, min(1.0, float(x)))
        except:
            return 0.0

    @staticmethod
    def _coerce_float01(v):
        """문자열/퍼센트/과학표기 등 → [0,1] float 변환"""
        try:
            if isinstance(v, (int, float)):
                return max(0.0, min(1.0, float(v)))
            if isinstance(v, str):
                s = v.strip().lower()
                if s.endswith("%"):  # '70%' -> 0.7
                    num = float(s[:-1].strip())
                    return max(0.0, min(1.0, num/100.0))
                return max(0.0, min(1.0, float(s)))
        except:
            pass
        return 0.0

    @staticmethod
    def _strip_code_fences(text: str) -> str:
        # ```json ... ``` 제거
        if text.startswith("```"):
            lines = text.splitlines()
            if len(lines) >= 2 and lines[0].startswith("```"):
                for i in range(1, len(lines)):
                    if lines[i].startswith("```"):
                        return "\n".join(lines[1:i])
        return text

    def _extract_json_text(self, raw: str) -> str | None:
        if not raw:
            return None
        m = re.search(r"<json>\s*(\{.*?\})\s*</json>", raw, re.S | re.I)
        if m:
            return m.group(1)
        cleaned = self._strip_code_fences(raw)
        m2 = re.search(r"\{.*\}", cleaned, re.S)
        return m2.group(0) if m2 else None

    def _safe_parse(self, json_text: str | None) -> Dict[str, float]:
        if not json_text:
            return {k: 0.0 for k in self.REQUIRED_KEYS}
        try:
            data = json.loads(json_text)
            if not isinstance(data, dict):
                raise ValueError("not a dict")
        except:
            return {k: 0.0 for k in self.REQUIRED_KEYS}

        fixed = {}
        for k in self.REQUIRED_KEYS:
            fixed[k] = self._coerce_float01(data.get(k, 0.0))
        return fixed

    @staticmethod
    def _smooth01(vals: List[float], tau: float = 0.12) -> List[float]:
        """극단값 완화(선택적)"""
        return [min(1.0, max(0.0, v*(1-tau) + 0.5*tau)) for v in vals]

    # ====== 프롬프트 생성 ======
    def _build_prompt(self, IPT: str) -> str:
        fewshot = "\n".join(
            f'Text: "{t}"\n<json>{json.dumps(ex, separators=(",",":"))}</json>'
            for t, ex in self.EXAMPLES
        )
        return (
            f"{self.EMO_SYSTEM}\n"
            f"{self.CHEATSHEET}\n"
            f"{fewshot}\n"
            f'Text: "{IPT}"\n'
            f"JSON:"
        )

    # ====== 공개 API ======
    def IPT2NTL(self, IPT: str, log_raw: bool = True) -> List[int]:
        """
        입력 텍스트 -> 4축 점수 산출 -> 상위 2개 인덱스 반환 (0:dopamine, 1:serotonin, 2:norepinephrine, 3:melatonin)
        외부 의존: _gen(prompt, ...), _tok.eos_token_id / pad_token_id
        """
        prompt = self._build_prompt(IPT)

        out = _gen(
            prompt,
            max_new_tokens=120,
            do_sample=False,
            return_full_text=False,
            eos_token_id=_tok.eos_token_id,
            pad_token_id=_tok.pad_token_id
        )

        raw = (out[0].get("generated_text", "") if out and isinstance(out, list) else "") or ""
        if log_raw:
            print("-----[OSS raw output]-----")
            print(raw)
            print("-----[/OSS raw output]----")

        json_text = self._extract_json_text(raw)
        data = self._safe_parse(json_text)
        if log_raw:
            print(f"[LOG][json] {data}")

        vals = [self._clip01(data[k]) for k in self.REQUIRED_KEYS]
        vals = self._smooth01(vals, tau=0.12)  # 선택적 스무딩
        if log_raw:
            print(f"[LOG][vals] {vals}")

        # 동점 안정 정렬(인덱스 보조키)
        NTL = sorted(range(4), key=lambda i: (-vals[i], i))[:2]
        return NTL

    def IPT2SCORES(self, IPT: str, log_raw: bool = False) -> Dict[str, float]:
        """
        점수 딕셔너리 그대로 반환이 필요할 때 사용.
        """
        prompt = self._build_prompt(IPT)
        out = _gen(
            prompt,
            max_new_tokens=120,
            do_sample=False,
            return_full_text=False,
            eos_token_id=_tok.eos_token_id,
            pad_token_id=_tok.pad_token_id
        )
        raw = (out[0].get("generated_text", "") if out and isinstance(out, list) else "") or ""
        if log_raw:
            print("-----[OSS raw output]-----")
            print(raw)
            print("-----[/OSS raw output]----")

        json_text = self._extract_json_text(raw)
        data = self._safe_parse(json_text)
        vals = [self._clip01(data[k]) for k in self.REQUIRED_KEYS]
        vals = self._smooth01(vals, tau=0.12)
        return {k: vals[i] for i, k in enumerate(self.REQUIRED_KEYS)}


In [16]:
# LLM test
_llm = LLM()
ntl, why = _llm.IPT2NTL("행복해", log_raw=False)

AttributeError: 'LLM' object has no attribute 'EXAMPLES'

In [2]:
# 감정 저장소
class emotion_storage():
    def __init__(self):
        self.NTL_bef = [0] # 이전 상태의 NTL
        self.K = []
    
    def NTL2NTLandK(self, NTL):
        K = -70 # 초기 전위
        w = 0.1 # 이전 NTL 적용 가중치(10%)
        if len(self.K) != 0:
            for i, j in enumerate(self.K): # 시간 감쇠를 적용한 K 업데이트
                K += i**(1/j)
        NTL_bef_w = [i*w for i in self.NTL_bef]
        NTL_final = [NTL[i] + NTL_bef_w[i] for i in range(4)]
        return NTL_final, K
    
    def NTL_K_hist(self, NTL, K): # 마지막 모델에서 계산한 NTL, K를 현재 스텝의 최종 NTL, K로 기록
        self.NTL_bef = NTL
        self.K.append(K)

In [3]:
# 뉴런 정의
K_remember = 100 # 기억 막전위

class neuron():
    def __init__(self, K_critical, code, type): # 노드 고유 번호(code)
        self.K_critical = K_critical # 임계 막전위(활성화를 위한)
        self.saved_IPT = [None]
        self.code = code
        self.connected_node = []
        self.connected_node_code = []
        self.w_bef = 0
        self.w_curr = 0
        self.K_out = []
        self.NTL_out = []
        self.IPT_curr = ''

        self.type = type # types: exciting, inhibiting, modulating

        self.K_final = 0
        self.NTL_final = 0

    def do_work(self, NTL, K):
        #w 업데이트 시에는 self.w_curr을 업데이트 하기!
        # 할일 code needed
        return NTL, K
    
    def save(self, IPT):
        self.saved_IPT.append(IPT)
        self.IPT_curr = IPT

    def make_new_connection(self, node): # 노드와 연결 함수
        self.connected_node.append(node)
        node.connected_node.append(self)

        self.connected_node_code.append(node.code)
        node.connected_node_code.append(self.code)
        return node.code

    def main(self, NTL, K, IPT, node):
        if K > self.K_critical:
            NTL_out, K_out = self.do_work(NTL, K)
            self.NTL_out.append(NTL_out)
        if node is not None:
            self.K_out.append(K_out * getattr(node, "w_bef", 1.0))  # 기본 1.0
        else:
            self.K_out.append(K_out)

        if K > K_remember: # IPT 기억 조건
            self.save(IPT)
            print('[LOG] IPT Saved')

        if self.w_curr - self.w_bef > 0: # 가중치가 커지면 그만크 연결 노드 수 증가
            for _ in range(int(self.w_curr - self.w_bef)):
                available_node = list(set(node_codes + self.connected_node_code))
                if not available_node:
                    return  # 또는 continue
                self.make_new_connection(nodes[available_node[0]-1])

        K_final = sum(K_out) if self.K_out else 0
        NTL_final = []
        for i in range(4):
            k=0
            for j in NTL_out:
                k += j[i]
            NTL_final.append(k/len(NTL_out) if self.K_out else 0)

        self.NTL_final = NTL_final
        self.K_final = K_final
        self.update_nodes()
        return
    
    def update_nodes(self):
        a_kept = self.saved_IPT.copy()
        if self.IPT_curr in a_kept:
            idx = a_kept.index(self.IPT_curr)
            IPT_out = a_kept[:idx] + a_kept[idx+1:]
        else:
            IPT_out = [x for x in a_kept if x is not None]

        if not IPT_out:
            return  # 전파할 IPT가 없으면 종료

        for i in self.connected_node:
            i.main(self.NTL_final, self.K_final, IPT_out[-1], self)


In [4]:
# nn 정의
w_exciting = 0.33
w_inhibiting = 0.33
w_modulating = 0.34
node_count = 100

# 1) 비율 정리(혹시 합이 1이 아닐 수도 있으니 정규화)
weights = [
    ('exciting',  w_exciting),
    ('inhibiting', w_inhibiting),
    ('modulating', w_modulating),
]
total_w = sum(w for _, w in weights)
weights = [(t, (w / total_w) if total_w else 0.0) for t, w in weights]

# 2) 바닥 할당 + 나머지를 소수점 큰 순서대로 분배
raw = [node_count * w for _, w in weights]
base = [int(x) for x in raw]
remainder = node_count - sum(base)

# 소수 부분 큰 순서대로 remainder 만큼 1씩 추가
frac_idx = sorted(range(len(raw)), key=lambda i: (raw[i] - base[i]), reverse=True)
for k in range(remainder):
    base[frac_idx[k]] += 1

# 3) 노드 생성 (code 유니크하게 증가)
nodes = []
node_codes = []
code = 0
for (t, _), cnt in zip(weights, base):
    for _ in range(cnt):
        n = neuron(+30, code, t)   # 네 클래스명 그대로 사용
        nodes.append(n)
        node_codes.append(code)
        code += 1

node_count = len(nodes)  # 실제 생성된 수로 갱신


**FLOW**

In [5]:
# class들 호출
LLM = LLM()
emotion_storage = emotion_storage()

In [6]:
# 데이터 박스 구성
data_box = dict()

In [7]:
# INPUT
data_box['IPT'] = 'I hate you'

In [None]:
# 감정 저장소
data_box['NTL'] = LLM.IPT2NTL(data_box['IPT'])
data_box['NTL'], data_box['K'] = emotion_storage.NTL2NTLandK(data_box['NTL'])

In [None]:
# 초기 뉴런 자극
import random

class dummy_node():
    def __init__(self):
        self.code = 0

dummy_node = dummy_node()

start_neuron = nodes[random.randrange(1, node_count+1)+1]
start_neuron.main(data_box['NTL'], data_box['K'], data_box['IPT'], dummy_node)
