A. System 1 Semantic Coding Model

1) Action-Concept **Relevance**

In [10]:
# ============================================
# System 1 SNC: Action–Concept Relevance (Eq. 1)
# p(X|A=a; t) = Π_c p(X_c | A=a; t),  X_c ∈ {True, False}
# ============================================

import numpy as np

class ActionConceptRelevanceModel:
    """
    Implements Eq. (1) in the paper:
        p(X|A=a; t) = Π_{c∈C} p(X_c | A=a; t)
    where X_c is Bernoulli (True/False).

    Internally we store:
        p_true[t, a, c] = P(X_c = True | A=a; t)
    """
    def __init__(self, num_tasks: int, num_actions: int, num_concepts: int, seed: int = 0):
        self.T = num_tasks
        self.A = num_actions
        self.C = num_concepts
        self.rng = np.random.default_rng(seed)

        # p_true[t, a, c] ∈ (0,1)
        # You can replace this initialization with learned logits from a neural model.
        self.p_true = None

    def random_init_beta(self, alpha: float = 0.7, beta: float = 0.7):
        """
        Randomly initialize P(X_c=True|a;t) using a Beta distribution.
        Smaller alpha/beta -> more extreme probabilities near 0 or 1.
        """
        self.p_true = self.rng.beta(alpha, beta, size=(self.T, self.A, self.C)).astype(np.float32)
        self._clip_inplace()
        return self.p_true

    def set_p_true(self, p_true: np.ndarray):
        """
        Set p_true explicitly. Expected shape: (T, A, C)
        """
        assert p_true.shape == (self.T, self.A, self.C)
        self.p_true = p_true.astype(np.float32)
        self._clip_inplace()

    def _clip_inplace(self, eps: float = 1e-8):
        # avoid log(0)
        self.p_true = np.clip(self.p_true, eps, 1.0 - eps)

    def sample_X(self, a: int, t: int, n: int = 1) -> np.ndarray:
        """
        Sample X = (X_1,...,X_C) for a fixed action a and task t.
        Returns boolean array of shape (n, C).
        """
        assert self.p_true is not None, "Call random_init_beta() or set_p_true() first."
        p = self.p_true[t, a]  # (C,)
        # Bernoulli sampling
        u = self.rng.random(size=(n, self.C))
        X = (u < p)
        return X

    def log_prob_X(self, X: np.ndarray, a: int, t: int) -> np.ndarray:
        """
        Compute log p(X | A=a; t) under conditional independence.
        X can be shape (C,) or (n, C). Returns shape () or (n,).
        """
        assert self.p_true is not None, "Call random_init_beta() or set_p_true() first."
        p = self.p_true[t, a]  # (C,)
        X = np.asarray(X)
        if X.ndim == 1:
            X = X[None, :]  # (1, C)

        # log p = Σ_c [ x_c log p_c + (1-x_c) log (1-p_c) ]
        x = X.astype(np.float32)
        logp = (x * np.log(p) + (1.0 - x) * np.log(1.0 - p)).sum(axis=1)
        return logp if logp.shape[0] > 1 else logp[0]

    def prob_X(self, X: np.ndarray, a: int, t: int) -> np.ndarray:
        """
        Compute p(X | A=a; t) (may underflow for large C).
        Prefer log_prob_X for stability.
        """
        return np.exp(self.log_prob_X(X, a, t))


# -----------------------------
# Demo / sanity check
# -----------------------------
if __name__ == "__main__":
    num_tasks = 3
    num_actions = 5
    num_concepts = 8

    model = ActionConceptRelevanceModel(num_tasks, num_actions, num_concepts, seed=42)
    model.random_init_beta(alpha=0.6, beta=0.6)

    t = 1
    a = 2
    X_samples = model.sample_X(a=a, t=t, n=4)
    print("Sampled X (rows are samples, cols are concepts):\n", X_samples.astype(int))
    print("\nlog p(X|a,t) for each sample:\n", model.log_prob_X(X_samples, a=a, t=t))

    # Factorization sanity: show that joint log-prob equals sum of per-concept terms
    X0 = X_samples[0].astype(np.float32)
    p = model.p_true[t, a]
    per_c = X0 * np.log(p) + (1 - X0) * np.log(1 - p)
    print("\nCheck factorization (difference should be ~0):",
          float(model.log_prob_X(X0, a=a, t=t) - per_c.sum()))


Sampled X (rows are samples, cols are concepts):
 [[1 0 0 0 0 1 1 0]
 [1 0 0 0 0 1 0 1]
 [1 0 0 1 0 1 1 0]
 [1 0 0 0 0 1 1 1]]

log p(X|a,t) for each sample:
 [-1.1806356 -2.80388   -2.6009328 -2.1326973]

Check factorization (difference should be ~0): 0.0


2) Action-Concept Model

In [11]:
# =========================================================
# System 1 SNC: (1) Action–Concept Relevance  ->  (2) A2C  ->  (3) C2A
# Colab-ready, numpy-only implementation
# =========================================================

import numpy as np

def logsumexp(x, axis=None, keepdims=False):
    x = np.asarray(x)
    m = np.max(x, axis=axis, keepdims=True)
    y = m + np.log(np.sum(np.exp(x - m), axis=axis, keepdims=True))
    return y if keepdims else np.squeeze(y, axis=axis)

class System1_SNC:
    """
    Implements:
      (1) p(X|A=a;t) = Π_c p(X_c | A=a;t), with Bernoulli X_c
          store p_true[t,a,c] = P(X_c=TRUE | a,t)

      (2) A2C:  p(C=c | A=a; t) = p_xc|A(TRUE | a,t) / Σ_{c'} p_xc'|A(TRUE | a,t)

      (3) C2A via Bayes:
          p(A=a | C=c; t) = p(C=c | A=a;t) p_A(a) / Σ_{a'} p(C=c | A=a';t) p_A(a')
    """
    def __init__(self, num_tasks: int, num_actions: int, num_concepts: int, seed: int = 0):
        self.T = num_tasks
        self.A = num_actions
        self.C = num_concepts
        self.rng = np.random.default_rng(seed)

        # p_true[t,a,c] = P(X_c=TRUE | a,t)
        self.p_true = None

    # ---------- (1) Action–Concept Relevance ----------
    def random_init_beta(self, alpha: float = 0.7, beta: float = 0.7, eps: float = 1e-8):
        self.p_true = self.rng.beta(alpha, beta, size=(self.T, self.A, self.C)).astype(np.float32)
        self.p_true = np.clip(self.p_true, eps, 1.0 - eps)
        return self.p_true

    def set_p_true(self, p_true: np.ndarray, eps: float = 1e-8):
        assert p_true.shape == (self.T, self.A, self.C)
        self.p_true = np.clip(p_true.astype(np.float32), eps, 1.0 - eps)

    def sample_X(self, a: int, t: int, n: int = 1) -> np.ndarray:
        """Sample X vector(s) under (1). Returns bool array (n,C)."""
        assert self.p_true is not None
        p = self.p_true[t, a]  # (C,)
        u = self.rng.random(size=(n, self.C))
        return (u < p)

    def log_prob_X(self, X: np.ndarray, a: int, t: int) -> np.ndarray:
        """Compute log p(X|a,t) under conditional independence (1)."""
        assert self.p_true is not None
        p = self.p_true[t, a]  # (C,)
        X = np.asarray(X)
        if X.ndim == 1:
            X = X[None, :]
        x = X.astype(np.float32)
        logp = (x * np.log(p) + (1.0 - x) * np.log(1.0 - p)).sum(axis=1)
        return logp if logp.shape[0] > 1 else logp[0]

    # ---------- (2) Action-to-Concept (A2C) ----------
    def a2c_probs(self, a: int, t: int) -> np.ndarray:
        """
        Eq (2): p(C=c | A=a; t) by normalizing p_true[t,a,c] over concepts.
        Returns (C,) probabilities summing to 1.
        """
        assert self.p_true is not None
        w = self.p_true[t, a].astype(np.float64)  # (C,)
        s = np.sum(w)
        # If s is (numerically) zero, fall back to uniform (shouldn't happen due to clipping)
        if s <= 0:
            return np.ones(self.C, dtype=np.float64) / self.C
        return (w / s).astype(np.float64)

    def a2c_log_probs(self, a: int, t: int) -> np.ndarray:
        """Log-space version of Eq (2) for stability."""
        assert self.p_true is not None
        logw = np.log(self.p_true[t, a].astype(np.float64))  # (C,)
        return logw - logsumexp(logw, axis=0)

    def sample_C(self, a: int, t: int, n: int = 1) -> np.ndarray:
        """Sample concept index C ~ p(C|A=a;t) from Eq (2). Returns (n,) int."""
        pC = self.a2c_probs(a, t)
        return self.rng.choice(self.C, size=n, p=pC)

    # ---------- (3) Concept-to-Action (C2A) via Bayes ----------
    def c2a_posterior(self, c: int, t: int, pA: np.ndarray = None) -> np.ndarray:
        """
        Eq (3): p(A=a | C=c; t) ∝ p(C=c | A=a; t) * p_A(a)
        Returns (A,) posterior over actions.
        """
        assert self.p_true is not None
        if pA is None:
            pA = np.ones(self.A, dtype=np.float64) / self.A
        else:
            pA = np.asarray(pA, dtype=np.float64)
            assert pA.shape == (self.A,)
            # normalize prior just in case
            pA = pA / np.sum(pA)

        # compute p(C=c | A=a;t) for all actions a
        # using Eq (2): normalize p_true[t,a,:] across concepts
        w = self.p_true[t].astype(np.float64)          # shape (A, C)
        denom = np.sum(w, axis=1, keepdims=True)       # shape (A, 1)
        pC_given_A = (w / denom)[:, c]                 # shape (A,)

        unnorm = pC_given_A * pA                       # shape (A,)
        Z = np.sum(unnorm)
        if Z <= 0:
            return np.ones(self.A, dtype=np.float64) / self.A
        return unnorm / Z

    def c2a_log_posterior(self, c: int, t: int, pA: np.ndarray = None) -> np.ndarray:
        """Log-space Bayes posterior (recommended if A is large)."""
        assert self.p_true is not None
        if pA is None:
            logpA = -np.log(self.A) * np.ones(self.A, dtype=np.float64)
        else:
            pA = np.asarray(pA, dtype=np.float64)
            pA = pA / np.sum(pA)
            logpA = np.log(np.clip(pA, 1e-300, 1.0))

        # log p(C=c | A=a;t)
        log_pC_given_A = np.zeros(self.A, dtype=np.float64)
        for a in range(self.A):
            log_pC_given_A[a] = self.a2c_log_probs(a, t)[c]

        log_post_unnorm = log_pC_given_A + logpA
        return log_post_unnorm - logsumexp(log_post_unnorm, axis=0)

    def infer_action_map(self, c: int, t: int, pA: np.ndarray = None) -> int:
        """MAP action: argmax_a p(A=a | C=c; t)."""
        post = self.c2a_posterior(c, t, pA=pA)
        return int(np.argmax(post))


# =========================================================
# Quick demo (runs in Colab)
# =========================================================
if __name__ == "__main__":
    T, A, C = 2, 5, 8
    sys1 = System1_SNC(num_tasks=T, num_actions=A, num_concepts=C, seed=123)
    sys1.random_init_beta(alpha=0.6, beta=0.6)

    t = 0
    a_true = 3

    # Speaker: pick concept using A2C (Eq 2)
    c_sent = sys1.sample_C(a_true, t, n=1)[0]
    print(f"[Speaker] task t={t}, intended action a={a_true} -> sent concept c={c_sent}")

    # Listener: infer action using C2A posterior (Eq 3)
    # Example prior: slightly favors action 0
    pA = np.array([0.40, 0.15, 0.15, 0.15, 0.15], dtype=np.float64)
    post = sys1.c2a_posterior(c_sent, t, pA=pA)
    a_hat = int(np.argmax(post))
    print(f"[Listener] posterior p(A|c,t) = {np.round(post, 4)}")
    print(f"[Listener] MAP action = {a_hat}")

    # (Optional) also sample an X vector under Eq (1) for the same a,t
    X = sys1.sample_X(a_true, t, n=1)[0]
    print(f"[Eq(1) sample] X (as 0/1 over concepts): {X.astype(int)}")
    print(f"[Eq(1) log p(X|a,t)] {float(sys1.log_prob_X(X, a_true, t)):.4f}")

[Speaker] task t=0, intended action a=3 -> sent concept c=1
[Listener] posterior p(A|c,t) = [0.267  0.3176 0.0884 0.2843 0.0428]
[Listener] MAP action = 1
[Eq(1) sample] X (as 0/1 over concepts): [1 1 0 0 0 0 1 0]
[Eq(1) log p(X|a,t)] -2.5596


3) Concept-Symbol Model

In [12]:
import numpy as np
from dataclasses import dataclass
from typing import List, Tuple, Optional, Dict, Any

# =========================================================
# System 1 core: (1)(2)(3) 중 (1)과 (2)까지만 SR 만들 때 사용
# (3) C2A는 listener 쪽이라 Shannon 이후에 붙이면 됨.
# =========================================================
class System1_SNC:
    """
    Stores p_true[t,a,c] = P(X_c=TRUE | A=a; t)

    (2) A2C:
      p(C=c | A=a; t) = p_true[t,a,c] / sum_{c'} p_true[t,a,c']
    """
    def __init__(self, num_tasks: int, num_actions: int, num_concepts: int, seed: int = 0):
        self.T = num_tasks
        self.A = num_actions
        self.C = num_concepts
        self.rng = np.random.default_rng(seed)
        self.p_true = None  # (T, A, C)

    def random_init_beta(self, alpha: float = 0.7, beta: float = 0.7, eps: float = 1e-8):
        self.p_true = self.rng.beta(alpha, beta, size=(self.T, self.A, self.C)).astype(np.float64)
        self.p_true = np.clip(self.p_true, eps, 1.0 - eps)
        return self.p_true

    def a2c_probs(self, a: int, t: int) -> np.ndarray:
        """
        Eq (2): normalize p_true[t,a,:] across concepts to get p(C|A=a;t).
        """
        assert self.p_true is not None, "Initialize p_true first."
        w = self.p_true[t, a]  # (C,)
        return w / np.sum(w)

    def sample_concepts(self, a: int, t: int, k: int = 1, replace: bool = False) -> np.ndarray:
        """
        Sample k concepts from A2C distribution. (Paper uses single concept; k=1 is default.)
        """
        p = self.a2c_probs(a, t)
        idx = self.rng.choice(self.C, size=k, replace=replace, p=p)
        return idx

    def topk_concepts(self, a: int, t: int, k: int = 1) -> np.ndarray:
        """
        Deterministic alternative: pick top-k concepts by A2C probability.
        """
        p = self.a2c_probs(a, t)
        return np.argsort(-p)[:k]


# =========================================================
# (3) Concept-Symbol Model: 중앙 코드북, deterministic one-to-one
# =========================================================
@dataclass(frozen=True)
class CentralCodebookC2S:
    """
    Deterministic one-to-one mapping s: C -> S
    We implement it as two dicts: c2s and s2c.
    """
    c2s: Dict[int, Any]   # concept_id -> symbol (token)
    s2c: Dict[Any, int]   # symbol -> concept_id

    @staticmethod
    def make_identity(num_concepts: int):
        # symbol을 정수로 쓰는 가장 간단한 코드북: s(c)=c
        c2s = {c: c for c in range(num_concepts)}
        s2c = {c: c for c in range(num_concepts)}
        return CentralCodebookC2S(c2s=c2s, s2c=s2c)

    @staticmethod
    def make_tokenized(num_concepts: int, prefix: str = "S"):
        # symbol을 문자열 토큰으로: s(c) = "S_0007" 같은 형태
        c2s = {c: f"{prefix}_{c:04d}" for c in range(num_concepts)}
        s2c = {v: k for k, v in c2s.items()}
        return CentralCodebookC2S(c2s=c2s, s2c=s2c)

    def encode_concepts_to_symbols(self, concepts: np.ndarray) -> List[Any]:
        return [self.c2s[int(c)] for c in concepts]

    def decode_symbols_to_concepts(self, symbols: List[Any]) -> np.ndarray:
        return np.array([self.s2c[s] for s in symbols], dtype=int)


# =========================================================
# Semantic Encoder (System 1 기준): action -> concept(s) -> symbol(s) = SR
# Shannon 코딩 단계 "직전" 출력 포맷까지
# =========================================================
@dataclass
class SemanticRepresentation:
    """
    SR: the semantic representation of action a, i.e., a set/list of symbols
    obtained via A2C and C2S.
    """
    task_t: int
    action_a: int
    concepts: List[int]
    symbols: List[Any]
    a2c_probs: Optional[np.ndarray] = None  # (C,) optional for analysis/debug


class System1SemanticEncoder:
    def __init__(self, sys1: System1_SNC, codebook: CentralCodebookC2S):
        self.sys1 = sys1
        self.codebook = codebook

    def encode(self, a: int, t: int, k: int = 1, mode: str = "sample") -> SemanticRepresentation:
        """
        mode:
          - "sample": sample k concepts from A2C
          - "topk"  : pick top-k concepts deterministically
        output:
          SR.symbols -> 이걸 다음 단계 Shannon(bit encoder)의 입력으로 넘기면 됨
        """
        if mode == "sample":
            concepts = self.sys1.sample_concepts(a=a, t=t, k=k, replace=False)
        elif mode == "topk":
            concepts = self.sys1.topk_concepts(a=a, t=t, k=k)
        else:
            raise ValueError("mode must be one of ['sample', 'topk'].")

        symbols = self.codebook.encode_concepts_to_symbols(concepts)
        p = self.sys1.a2c_probs(a, t)

        return SemanticRepresentation(
            task_t=t,
            action_a=a,
            concepts=[int(x) for x in concepts],
            symbols=symbols,
            a2c_probs=p
        )


# =========================================================
# Demo: Shannon 코딩 '직전'까지 확인
# =========================================================
if __name__ == "__main__":
    # toy sizes
    T, A, C = 2, 5, 12

    # 1) System 1 relevance model 준비 (p_true)
    sys1 = System1_SNC(num_tasks=T, num_actions=A, num_concepts=C, seed=123)
    sys1.random_init_beta(alpha=0.6, beta=0.6)

    # 2) 중앙 코드북(C2S): 1:1 deterministic
    #   - 정수 심볼로 할지, 문자열 토큰으로 할지 선택
    codebook = CentralCodebookC2S.make_tokenized(num_concepts=C, prefix="SYM")

    # 3) Semantic encoder: action -> SR(symbols)  (Shannon 직전)
    sem_enc = System1SemanticEncoder(sys1=sys1, codebook=codebook)

    t = 0
    a = 3

    # paper는 single concept이 기본이지만, k>1로 "symbol set"도 만들 수 있게 해둠
    sr = sem_enc.encode(a=a, t=t, k=3, mode="sample")

    print("=== Semantic Encoding (System 1) ===")
    print(f"task t={sr.task_t}, action a={sr.action_a}")
    print(f"chosen concepts: {sr.concepts}")
    print(f"SR symbols (Shannon input): {sr.symbols}")

    # Shannon coding 단계는 여기서부터!
    print("\n--- Next step placeholder (NOT implemented here) ---")
    print("bitstream = shannon_bit_encoder.encode(sr.symbols)")
    print("rx_bits   = noisy_channel(bitstream)")
    print("rx_symbols= shannon_bit_decoder.decode(rx_bits)")


=== Semantic Encoding (System 1) ===
task t=0, action a=3
chosen concepts: [3, 11, 5]
SR symbols (Shannon input): ['SYM_0003', 'SYM_0011', 'SYM_0005']

--- Next step placeholder (NOT implemented here) ---
bitstream = shannon_bit_encoder.encode(sr.symbols)
rx_bits   = noisy_channel(bitstream)
rx_symbols= shannon_bit_decoder.decode(rx_bits)


B. Shannon Coding under System 1 Semantic Coding

In [15]:
import numpy as np
from dataclasses import dataclass
from typing import Optional

# =========================================================
# Shannon coding under System 1 semantic coding
# Theorem 1: Expected SR bit-length bounds (Eq. 4,5)
#
# p_Xc(TRUE) = Σ_a p_Xc|A(TRUE|a; t) p_A(a)
# q_c = p_Xc(TRUE) / Σ_{c'} p_Xc'(TRUE)
#
# Lower bound:
#   L_S1 ≥ - Σ_c p_Xc(TRUE) log2( q_c )
#
# Upper bound:
#   L_S1 ≤ - Σ_c p_Xc(TRUE) floor_like? (paper uses [·] as integer length)
# We implement the common Shannon code upper bound with ceil:
#   l_c = ceil(-log2 q_c)
#   L_S1 ≤ Σ_c p_Xc(TRUE) l_c
# =========================================================

@dataclass
class ExpectedSRBitLength:
    t: int
    pA: np.ndarray          # (A,)
    p_x_true: np.ndarray    # (C,)  p_Xc(TRUE)
    q: np.ndarray           # (C,)  normalized frequency q_c
    denom: float            # Σ_c p_Xc(TRUE)
    L_lower: float          # Eq (4)
    L_upper: float          # Eq (5) (Shannon length version)
    entropy_q: float        # H(q) in bits
    avg_shannon_len: float  # Σ_c q_c ceil(-log2 q_c)


def expected_sr_bit_length_theorem1(
    sys1: System1_SNC,
    t: int,
    pA: Optional[np.ndarray] = None,
    eps: float = 1e-12
) -> ExpectedSRBitLength:
    """
    Computes Theorem 1 bounds using sys1.p_true[t,a,c] and action prior pA(a).

    This function is designed to plug directly after your SR generation code:
      - It uses the same sys1 object
      - It does NOT depend on Shannon encoder implementation (bit encoder), only on the distribution.
    """
    assert sys1.p_true is not None, "sys1.p_true must be initialized first."

    A = sys1.A
    C = sys1.C

    if pA is None:
        pA = np.ones(A, dtype=np.float64) / A
    else:
        pA = np.asarray(pA, dtype=np.float64)
        assert pA.shape == (A,)
        pA = pA / np.sum(pA)

    # p_x_true[c] = Σ_a p_true[t,a,c] * pA[a]
    p_x_true = (sys1.p_true[t] * pA[:, None]).sum(axis=0)  # shape (C,)

    denom = float(np.sum(p_x_true))  # Σ_c p_Xc(TRUE)
    if denom <= 0:
        # 극단적인 경우(거의 안나옴): 균일 분포로 fallback
        q = np.ones(C, dtype=np.float64) / C
        p_x_true = q.copy()
        denom = 1.0
    else:
        q = p_x_true / denom

    # Numerical safety
    q = np.clip(q, eps, 1.0)

    # Eq (4): L_lower = - Σ_c p_Xc(TRUE) log2(q_c)
    L_lower = float(-np.sum(p_x_true * np.log2(q)))

    # Eq (5): use Shannon integer lengths l_c = ceil(-log2 q_c)
    l_shannon = np.ceil(-np.log2(q))
    L_upper = float(np.sum(p_x_true * l_shannon))

    entropy_q = float(-np.sum(q * np.log2(q)))
    avg_shannon_len = float(np.sum(q * l_shannon))

    return ExpectedSRBitLength(
        t=t,
        pA=pA,
        p_x_true=p_x_true,
        q=q,
        denom=denom,
        L_lower=L_lower,
        L_upper=L_upper,
        entropy_q=entropy_q,
        avg_shannon_len=avg_shannon_len
    )


# =========================================================
# (Optional) SR "realized" bit-length proxy for ONE SR sample
# - This is NOT Theorem 1; it's a convenient measurement for a realized SR.
# - It uses q_c to assign ideal Shannon lengths per symbol and sums them.
# =========================================================
def realized_sr_shannon_length(sr: SemanticRepresentation, q: np.ndarray, codebook: CentralCodebookC2S) -> float:
    """
    Given one SR (list of symbols), compute sum of ideal Shannon lengths:
        length(sr) ≈ Σ_{symbol in SR} ceil(-log2 q_c(symbol))
    where c(symbol) is the concept corresponding to that symbol.
    """
    # map symbols -> concepts
    concepts = codebook.decode_symbols_to_concepts(sr.symbols)
    q_safe = np.clip(q, 1e-12, 1.0)
    lengths = np.ceil(-np.log2(q_safe[concepts]))
    return float(np.sum(lengths))


# =========================================================
# Demo that plugs into YOUR previous cell variables: sys1, codebook, sem_enc, sr
# =========================================================
# 아래는 "이전 셀"을 실행한 상태에서 그대로 실행하면 동작하게 만든 예시야.
try:
    # 예시 prior: uniform or you can set your own pA
    pA_demo = np.ones(sys1.A, dtype=np.float64) / sys1.A

    res = expected_sr_bit_length_theorem1(sys1=sys1, t=sr.task_t, pA=pA_demo)

    print("\n=== Theorem 1: Expected bit-length of SR (System 1 SNC) ===")
    print(f"task t = {res.t}")
    print(f"denom = Σ_c p_Xc(TRUE) = {res.denom:.6f}  (scale ~ expected #extracted concepts)")
    print(f"H(q) = {res.entropy_q:.6f} bits")
    print(f"Lower bound (Eq.4) L_S1 >= {res.L_lower:.6f} bits")
    print(f"Upper bound (Eq.5) L_S1 <= {res.L_upper:.6f} bits  (Shannon ceil lengths)")
    print(f"Avg Shannon len (normalized) = {res.avg_shannon_len:.6f} bits/symbol")

    # (선택) 방금 뽑은 sr 하나에 대해 '근사' 비트길이도 찍어보기
    realized_len = realized_sr_shannon_length(sr, res.q, codebook)
    print(f"\nRealized SR approx. Shannon length (for this SR sample) = {realized_len:.2f} bits")

    print("\n--- Next (bit encoder) connection point ---")
    print("bitstream = source_encoder.encode(sr.symbols)  # e.g., Huffman/Arithmetic using q")
    print("... then channel coding / noisy channel / decoding ...")

except NameError as e:
    print("You need to run the previous cell first (sys1, sr, codebook must exist).")
    print("Missing name:", e)



=== Theorem 1: Expected bit-length of SR (System 1 SNC) ===
task t = 0
denom = Σ_c p_Xc(TRUE) = 5.840479  (scale ~ expected #extracted concepts)
H(q) = 3.549122 bits
Lower bound (Eq.4) L_S1 >= 20.728574 bits
Upper bound (Eq.5) L_S1 <= 23.722610 bits  (Shannon ceil lengths)
Avg Shannon len (normalized) = 4.061758 bits/symbol

Realized SR approx. Shannon length (for this SR sample) = 12.00 bits

--- Next (bit encoder) connection point ---
bitstream = source_encoder.encode(sr.symbols)  # e.g., Huffman/Arithmetic using q
... then channel coding / noisy channel / decoding ...


add Shannon communication

In [19]:
# =========================================================
# System 1 SNC: Full end-to-end (simple) simulation with
# - Semantic encoder: action -> concept(s) -> symbol(s) (SR)
# - Source encoder: Huffman (lossless source coding) on symbols
# - Channel encoder: repetition code (simple channel coding)
# - Noisy channel: BSC (bit-flip) as a proxy for noise/fading
# - Channel decoder: majority vote
# - Source decoder: Huffman decode -> recovered symbols
# - Semantic decoder: symbols -> concepts -> action (C2A via Bayes)
#
# Plug-in requirement:
#   This cell assumes you have already run your previous cells and have:
#     - sys1 : System1_SNC
#     - codebook : CentralCodebookC2S
#     - sem_enc : System1SemanticEncoder
#   If not, it will create a tiny toy setup automatically.
# =========================================================

import numpy as np
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
import heapq

# ---------------------------------------------------------
# 0) Listener side semantic decoder: implement Eq (3) C2A
# ---------------------------------------------------------
def c2a_posterior_from_sys1(sys1, c: int, t: int, pA: Optional[np.ndarray] = None, eps: float = 1e-12) -> np.ndarray:
    """
    p(A=a | C=c; t) ∝ p(C=c | A=a; t) pA(a)
    where p(C=c|A=a;t) is derived from Eq (2): normalize p_true[t,a,:] over concepts.
    """
    A = sys1.A
    C = sys1.C

    if pA is None:
        pA = np.ones(A, dtype=np.float64) / A
    else:
        pA = np.asarray(pA, dtype=np.float64)
        pA = pA / np.sum(pA)

    # p(C=c | A=a;t) for all actions a
    w = sys1.p_true[t].astype(np.float64)              # (A, C)
    denom = np.sum(w, axis=1, keepdims=True)           # (A,1)
    pC_given_A = (w / denom)[:, c]                     # (A,)

    unnorm = pC_given_A * pA
    Z = np.sum(unnorm)
    if Z <= eps:
        return np.ones(A, dtype=np.float64) / A
    return unnorm / Z

def infer_action_map(sys1, concept_list: List[int], t: int, pA: Optional[np.ndarray] = None) -> int:
    """
    If SR contains multiple concepts, combine evidence by multiplying posteriors
    (log-add in practice). Here we do a simple product in log-domain.
    """
    A = sys1.A
    logp = np.zeros(A, dtype=np.float64)

    # start from prior
    if pA is None:
        logp += -np.log(A)
    else:
        pA = np.asarray(pA, dtype=np.float64)
        pA = pA / np.sum(pA)
        logp += np.log(np.clip(pA, 1e-300, 1.0))

    for c in concept_list:
        post = c2a_posterior_from_sys1(sys1, c=c, t=t, pA=None)  # likelihood-like term
        logp += np.log(np.clip(post, 1e-300, 1.0))

    return int(np.argmax(logp))


# ---------------------------------------------------------
# 1) Huffman source coding (lossless)
# ---------------------------------------------------------
@dataclass
class HuffmanCode:
    code: Dict[Any, str]   # symbol -> bitstring
    trie: Dict             # decode trie

def _make_trie(code: Dict[Any, str]) -> Dict:
    root: Dict = {}
    for sym, bits in code.items():
        cur = root
        for b in bits:
            cur = cur.setdefault(b, {})
        cur["$"] = sym
    return root

def build_huffman_code(symbols: List[Any], probs: np.ndarray) -> HuffmanCode:
    """
    Build a prefix-free Huffman code for the given symbol alphabet and probs.
    """
    probs = np.asarray(probs, dtype=np.float64)
    probs = probs / np.sum(probs)

    # Priority queue items: (prob, unique_id, tree)
    # tree is either a symbol or (left,right)
    uid = 0
    pq = []
    for s, p in zip(symbols, probs):
        heapq.heappush(pq, (float(p), uid, s))
        uid += 1

    # Handle degenerate alphabet
    if len(pq) == 1:
        s = pq[0][2]
        code = {s: "0"}
        return HuffmanCode(code=code, trie=_make_trie(code))

    while len(pq) > 1:
        p1, _, t1 = heapq.heappop(pq)
        p2, _, t2 = heapq.heappop(pq)
        heapq.heappush(pq, (p1 + p2, uid, (t1, t2)))
        uid += 1

    _, _, tree = pq[0]

    # Traverse tree to assign codes
    code: Dict[Any, str] = {}
    def dfs(node, prefix):
        if not isinstance(node, tuple):
            code[node] = prefix if prefix != "" else "0"
            return
        left, right = node
        dfs(left, prefix + "0")
        dfs(right, prefix + "1")
    dfs(tree, "")

    return HuffmanCode(code=code, trie=_make_trie(code))

def huffman_encode(seq: List[Any], hc: HuffmanCode) -> str:
    return "".join(hc.code[s] for s in seq)

def huffman_decode(bitstring: str, hc: HuffmanCode) -> List[Any]:
    out = []
    cur = hc.trie
    for b in bitstring:
        if b not in cur:
            # decoding failure (due to bit errors)
            raise ValueError("Huffman decode failed (prefix mismatch).")
        cur = cur[b]
        if "$" in cur:
            out.append(cur["$"])
            cur = hc.trie
    # If we end not at root, leftover bits => failure
    if cur is not hc.trie and cur != hc.trie:
        # not necessarily fatal in all schemes, but treat as error here
        pass
    return out


# ---------------------------------------------------------
# 2) Simple channel coding: repetition code (n=rep)
# ---------------------------------------------------------
def repetition_encode(bits: str, rep: int = 3) -> str:
    assert rep % 2 == 1, "Use odd repetition for majority vote."
    return "".join(b * rep for b in bits)

def repetition_decode(rx_bits: str, rep: int = 3) -> str:
    assert len(rx_bits) % rep == 0
    out = []
    for i in range(0, len(rx_bits), rep):
        block = rx_bits[i:i+rep]
        ones = block.count("1")
        out.append("1" if ones > rep//2 else "0")
    return "".join(out)


# ---------------------------------------------------------
# 3) Noisy channel: BSC (bit-flip with prob p)
# ---------------------------------------------------------
def bsc_channel(bits: str, flip_p: float, rng: np.random.Generator) -> str:
    bits_arr = np.fromiter((1 if b == "1" else 0 for b in bits), dtype=np.int8)
    flips = rng.random(bits_arr.shape[0]) < flip_p
    bits_arr = bits_arr ^ flips.astype(np.int8)
    return "".join("1" if x else "0" for x in bits_arr)


# ---------------------------------------------------------
# 4) Glue: end-to-end one-shot simulation
# ---------------------------------------------------------
@dataclass
class E2EResult:
    a_true: int
    t: int
    sr_symbols: List[Any]
    sr_concepts: List[int]
    src_bits: str
    ch_bits: str
    rx_ch_bits: str
    dec_src_bits: str
    sr_hat_symbols: List[Any]
    sr_hat_concepts: List[int]
    a_hat: Optional[int]
    success: bool
    notes: str

def simulate_system1_e2e(
    sys1,
    sem_enc,
    codebook,
    t: int,
    a_true: int,
    k_concepts: int = 1,
    semantic_mode: str = "sample",
    pA: Optional[np.ndarray] = None,
    flip_p: float = 0.02,
    rep: int = 3,
    rng: Optional[np.random.Generator] = None
) -> E2EResult:
    """
    Full pipeline:
      action -> SR(symbols)
      source coding (Huffman)
      channel coding (repetition)
      noisy channel (BSC)
      channel decode
      source decode
      SR_hat -> action_hat
    """
    if rng is None:
        rng = np.random.default_rng(0)

    # ---- semantic encoder (System 1) ----
    sr = sem_enc.encode(a=a_true, t=t, k=k_concepts, mode=semantic_mode)

    # ---- build Huffman code from q distribution over symbols ----
    # Use A2C mixture distribution as a proxy for symbol frequency:
    #   q_c ∝ p_Xc(TRUE) = Σ_a p_true[t,a,c] pA(a)
    # Here we compute q over concepts then transfer to symbols via codebook.
    A = sys1.A
    C = sys1.C
    if pA is None:
        pA_use = np.ones(A, dtype=np.float64) / A
    else:
        pA_use = np.asarray(pA, dtype=np.float64)
        pA_use = pA_use / np.sum(pA_use)

    p_x_true = (sys1.p_true[t] * pA_use[:, None]).sum(axis=0)  # (C,)
    denom = np.sum(p_x_true)
    if denom <= 0:
        q = np.ones(C, dtype=np.float64) / C
    else:
        q = p_x_true / denom

    # alphabet: all symbols in codebook (must be shared / centralized)
    symbol_alphabet = [codebook.c2s[c] for c in range(C)]
    hc = build_huffman_code(symbol_alphabet, q)

    # ---- source encode SR symbols -> bits ----
    try:
        src_bits = huffman_encode(sr.symbols, hc)
    except KeyError as e:
        return E2EResult(
            a_true=a_true, t=t,
            sr_symbols=sr.symbols, sr_concepts=sr.concepts,
            src_bits="", ch_bits="", rx_ch_bits="", dec_src_bits="",
            sr_hat_symbols=[], sr_hat_concepts=[],
            a_hat=None, success=False,
            notes=f"Source encode failed: unknown symbol {e}"
        )

    # ---- channel encode ----
    ch_bits = repetition_encode(src_bits, rep=rep)

    # ---- noisy channel ----
    rx_ch_bits = bsc_channel(ch_bits, flip_p=flip_p, rng=rng)

    # ---- channel decode ----
    dec_src_bits = repetition_decode(rx_ch_bits, rep=rep)

    # ---- source decode -> symbols ----
    try:
        sr_hat_symbols = huffman_decode(dec_src_bits, hc)
    except Exception as e:
        return E2EResult(
            a_true=a_true, t=t,
            sr_symbols=sr.symbols, sr_concepts=sr.concepts,
            src_bits=src_bits, ch_bits=ch_bits, rx_ch_bits=rx_ch_bits, dec_src_bits=dec_src_bits,
            sr_hat_symbols=[], sr_hat_concepts=[],
            a_hat=None, success=False,
            notes=f"Huffman decode failed (likely due to residual bit errors): {e}"
        )

    # ---- S2C (deterministic inverse) ----
    try:
        sr_hat_concepts = codebook.decode_symbols_to_concepts(sr_hat_symbols).tolist()
    except Exception as e:
        return E2EResult(
            a_true=a_true, t=t,
            sr_symbols=sr.symbols, sr_concepts=sr.concepts,
            src_bits=src_bits, ch_bits=ch_bits, rx_ch_bits=rx_ch_bits, dec_src_bits=dec_src_bits,
            sr_hat_symbols=sr_hat_symbols, sr_hat_concepts=[],
            a_hat=None, success=False,
            notes=f"S2C failed: {e}"
        )

    # ---- semantic decode: concepts -> action_hat (MAP) ----
    a_hat = infer_action_map(sys1, concept_list=sr_hat_concepts, t=t, pA=pA_use)

    success = (a_hat == a_true)

    return E2EResult(
        a_true=a_true, t=t,
        sr_symbols=sr.symbols, sr_concepts=sr.concepts,
        src_bits=src_bits, ch_bits=ch_bits, rx_ch_bits=rx_ch_bits, dec_src_bits=dec_src_bits,
        sr_hat_symbols=sr_hat_symbols, sr_hat_concepts=sr_hat_concepts,
        a_hat=a_hat, success=success,
        notes="OK"
    )


# =========================================================
# If your previous objects aren't defined, create a toy setup.
# =========================================================
try:
    sys1
    codebook
    sem_enc
except NameError:
    print("Previous objects not found; creating toy sys1/codebook/sem_enc.")
    # Minimal toy setup using your class definitions (must be in scope)
    T, A, C = 2, 6, 12
    sys1 = System1_SNC(num_tasks=T, num_actions=A, num_concepts=C, seed=123)
    sys1.random_init_beta(alpha=0.6, beta=0.6)
    codebook = CentralCodebookC2S.make_tokenized(num_concepts=C, prefix="SYM")
    sem_enc = System1SemanticEncoder(sys1=sys1, codebook=codebook)

# =========================================================
# Run a small simulation sweep
# =========================================================
rng = np.random.default_rng(2026)

t = 0
pA = np.ones(sys1.A, dtype=np.float64) / sys1.A

flip_p = 0.02     # channel noise
rep = 3           # repetition code strength (odd)
k_concepts = 1    # paper default is 1; try 3 to see robustness change

num_trials = 30
hits = 0
decode_fail = 0

print("=== System 1 SNC end-to-end with channel (simple) ===")
print(f"flip_p={flip_p}, repetition={rep}, k_concepts={k_concepts}\n")

for i in range(num_trials):
    a_true = int(rng.integers(low=0, high=sys1.A))
    r = simulate_system1_e2e(
        sys1=sys1, sem_enc=sem_enc, codebook=codebook,
        t=t, a_true=a_true,
        k_concepts=k_concepts, semantic_mode="sample",
        pA=pA, flip_p=flip_p, rep=rep, rng=rng
    )
    if r.notes.startswith("Huffman decode failed"):
        decode_fail += 1
    if r.success:
        hits += 1

    if i < 3:
        print(f"[Trial {i}] a_true={r.a_true} -> a_hat={r.a_hat}, success={r.success}")
        print(f"  SR symbols: {r.sr_symbols}")
        print(f"  src_bits(len={len(r.src_bits)}): {r.src_bits[:80]}{'...' if len(r.src_bits)>80 else ''}")
        print(f"  ch_bits(len={len(r.ch_bits)}): {r.ch_bits[:80]}{'...' if len(r.ch_bits)>80 else ''}")
        print(f"  rx_bits(len={len(r.rx_ch_bits)}): {r.rx_ch_bits[:80]}{'...' if len(r.rx_ch_bits)>80 else ''}")
        print(f"  SR_hat symbols: {r.sr_hat_symbols}")
        print(f"  note: {r.notes}\n")

print(f"Accuracy (MAP action) = {hits}/{num_trials} = {hits/num_trials:.3f}")
print(f"Source decode failures = {decode_fail}/{num_trials} = {decode_fail/num_trials:.3f}")

print("\nTip: increase repetition (e.g., rep=5) or reduce flip_p to improve reliability, "
      "at the cost of longer bitstream (compromising minimality), matching the paper's narrative.")


=== System 1 SNC end-to-end with channel (simple) ===
flip_p=0.02, repetition=3, k_concepts=1

[Trial 0] a_true=4 -> a_hat=0, success=False
  SR symbols: ['SYM_0005']
  src_bits(len=4): 1100
  ch_bits(len=12): 111111000000
  rx_bits(len=12): 111111000000
  SR_hat symbols: ['SYM_0005']
  note: OK

[Trial 1] a_true=0 -> a_hat=4, success=False
  SR symbols: ['SYM_0008']
  src_bits(len=4): 1101
  ch_bits(len=12): 111111000111
  rx_bits(len=12): 111111000101
  SR_hat symbols: ['SYM_0008']
  note: OK

[Trial 2] a_true=4 -> a_hat=4, success=True
  SR symbols: ['SYM_0009']
  src_bits(len=3): 010
  ch_bits(len=9): 000111000
  rx_bits(len=9): 000111000
  SR_hat symbols: ['SYM_0009']
  note: OK

Accuracy (MAP action) = 14/30 = 0.467
Source decode failures = 0/30 = 0.000

Tip: increase repetition (e.g., rep=5) or reduce flip_p to improve reliability, at the cost of longer bitstream (compromising minimality), matching the paper's narrative.
