현진: 호기심
원준: 플래그/폴리시
나: 예측 기반 오차/오류





에피소드 기반 보상은 나중에

In [None]:
# 에러 기반

def error():

In [None]:
from typing import Dict, Optional

Node = str

class SimpleTree:
    def __init__(self, parent: Dict[Node, Optional[Node]], depth: Dict[Node, int]):
        self.parent = parent          # 각 노드 -> 부모
        self.depth  = depth           # 각 노드 깊이(루트=0)

    def lca(self, u: Node, v: Node) -> Node:
        du, dv = self.depth[u], self.depth[v]
        # 깊이를 맞춘다
        while du > dv:
            u = self.parent[u]; du -= 1
        while dv > du:
            v = self.parent[v]; dv -= 1
        # 함께 올리며 만나는 지점 = LCA
        while u != v:
            u = self.parent[u]
            v = self.parent[v]
        return u

    def dist(self, u: Node, v: Node) -> int:
        a = self.lca(u, v)
        return (self.depth[u] - self.depth[a]) + (self.depth[v] - self.depth[a])

def reward_inverse_distance(d: int) -> float:
    """가중 없이 r = 1/(1+d)"""
    return 1.0 / (1 + d)

def reward_with_case_weights(d: int, weights: Dict[int, float]) -> float:
    """
    d가 (0,1,2,3) 등 소수 케이스만 나온다면 if/매핑으로 처리.
    테이블에 없으면 기본 역수형으로 폴백.
    """
    if d in weights:
        return weights[d]
    return 1.0 / (1 + d)

# ---- 사용 예시 ----
# parent/depth은 한 번만 준비해두면 됨.
# 예: 루트 R(0) - 프로그램 P(1) - 명령어 C(2) - 리프 L(3)
parent = {
    "R": None,
    "git": "R",
    "git:status": "git",
    "git:commit": "git",
    "L_status_v1": "git:status",
    "L_status_v2": "git:status",
    "L_commit_simple": "git:commit",
}
depth = {"R":0, "git":1, "git:status":2, "git:commit":2,
         "L_status_v1":3, "L_status_v2":3, "L_commit_simple":3}

tree = SimpleTree(parent, depth)

# 두 리프 거리/보상
u, v = "L_status_v1", "L_status_v2"
d = tree.dist(u, v)                 # 같은 명령어 하위 형제 → d=2
r = reward_inverse_distance(d)      # r = 1/(1+2) = 0.333...

# 케이스별 가중(if 테이블)
case_weights = {0: 1.0, 1: 0.7, 2: 0.4, 3: 0.2}  # 예시값
r_w = reward_with_case_weights(d, case_weights)  # d가 0~3 사이면 해당 값 사용




In [3]:
# phylo_reward_demo.py
from typing import Dict, Optional, Tuple
import math
import time

Node = str

# -----------------------------
# 간단 트리: LCA/거리/보상
# -----------------------------
class SimpleTree:
    def __init__(self, parent: Dict[Node, Optional[Node]], depth: Dict[Node, int]):
        self.parent = parent
        self.depth = depth

    def _align_depth(self, u: Node, v: Node) -> Tuple[Node, Node]:
        du, dv = self.depth[u], self.depth[v]
        while du > dv:
            u = self.parent[u]; du -= 1
        while dv > du:
            v = self.parent[v]; dv -= 1
        return u, v

    def lca(self, u: Node, v: Node) -> Node:
        u, v = self._align_depth(u, v)
        path = []
        while u != v:
            path.append((u, v))
            u = self.parent[u]
            v = self.parent[v]
        return u

    def dist(self, u: Node, v: Node) -> int:
        a = self.lca(u, v)
        return (self.depth[u] - self.depth[a]) + (self.depth[v] - self.depth[a])

def reward_inverse_distance(d: int) -> float:
    return 1.0 / (1 + d)

def reward_with_case_weights(d: int, weights: Dict[int, float]) -> float:
    return weights.get(d, 1.0 / (1 + d))

# -----------------------------
# 로거(깔끔한 콘솔 출력)
# -----------------------------
def log(section: str, msg: str = ""):
    ts = time.strftime("%H:%M:%S")
    print(f"[{ts}] {section:<10} | {msg}")

def assert_eq(name: str, got, expect):
    ok = got == expect
    status = "PASS" if ok else "FAIL"
    log("TEST", f"{name:<28} -> {status} (got={got!r}, expect={expect!r})")
    return ok

def assert_close(name: str, got: float, expect: float, tol: float = 1e-9):
    ok = abs(got - expect) <= tol * max(1.0, abs(expect))
    status = "PASS" if ok else "FAIL"
    log("TEST", f"{name:<28} -> {status} (got={got:.12f}, expect={expect:.12f})")
    return ok

# -----------------------------
# 데모 트리(루트→프로그램→명령어→리프)
# 깊이: R(0) -> P(1) -> C(2) -> L(3)
# -----------------------------
def build_demo_tree() -> SimpleTree:
    parent = {
        "R": None,
        "git": "R",
        "git:status": "git",
        "git:commit": "git",
        # 리프
        "L_status_v1": "git:status",
        "L_status_v2": "git:status",
        "L_commit_simple": "git:commit",
    }
    depth = {
        "R": 0,
        "git": 1,
        "git:status": 2,
        "git:commit": 2,
        "L_status_v1": 3,
        "L_status_v2": 3,
        "L_commit_simple": 3,
    }
    return SimpleTree(parent, depth)

# -----------------------------
# 시나리오 실행 & 로깅
# -----------------------------
def run_demo():
    log("START", "Phylo reward demo (no pytest)")
    tree = build_demo_tree()

    # 거리 테스트
    log("CASE", "거리 d 테스트")
    d00 = tree.dist("L_status_v1", "L_status_v1")  # 0
    d02 = tree.dist("L_status_v1", "L_status_v2")  # 2 (형제)
    d04 = tree.dist("L_status_v1", "L_commit_simple")  # 4 (사촌, LCA=git)

    assert_eq("dist(same leaf)", d00, 0)
    assert_eq("dist(sibling leaves)", d02, 2)
    assert_eq("dist(cousin leaves)", d04, 4)

    # LCA 테스트
    log("CASE", "LCA 테스트")
    a1 = tree.lca("L_status_v1", "L_status_v2")
    a2 = tree.lca("L_status_v1", "L_commit_simple")
    assert_eq("lca(status siblings)", a1, "git:status")
    assert_eq("lca(status vs commit)", a2, "git")

    # 보상: 역수형
    log("CASE", "보상 r=1/(1+d) 테스트")
    r0 = reward_inverse_distance(d00)  # 1.0
    r2 = reward_inverse_distance(d02)  # 1/3
    r4 = reward_inverse_distance(d04)  # 1/5
    assert_close("reward(d=0)", r0, 1.0)
    assert_close("reward(d=2)", r2, 1.0/3.0)
    assert_close("reward(d=4)", r4, 1.0/5.0)

    # 보상: 케이스별 상수(weight 테이블)
    log("CASE", "케이스별 상수 보상(테이블)")
    weights = {0: 1.0, 2: 0.40, 3: 0.22}  # 예시: d=2는 0.40으로 덮어쓰기
    rw2 = reward_with_case_weights(d02, weights)
    rw4 = reward_with_case_weights(d04, weights)  # 테이블에 없으니 역수형 폴백(0.2)
    assert_close("case-weight(d=2)", rw2, 0.40)
    assert_close("case-weight(d=4)", rw4, 1.0/5.0)

    # 요약 출력
    log("SUMMARY", f"d=0 -> r={r0:.3f} | d=2 -> r={r2:.3f} (or {rw2:.3f} by table) | d=4 -> r={r4:.3f}")

    log("DONE", "All checks complete.")

if __name__ == "__main__":
    run_demo()


[18:09:36] START      | Phylo reward demo (no pytest)
[18:09:36] CASE       | 거리 d 테스트
[18:09:36] TEST       | dist(same leaf)              -> PASS (got=0, expect=0)
[18:09:36] TEST       | dist(sibling leaves)         -> PASS (got=2, expect=2)
[18:09:36] TEST       | dist(cousin leaves)          -> PASS (got=4, expect=4)
[18:09:36] CASE       | LCA 테스트
[18:09:36] TEST       | lca(status siblings)         -> PASS (got='git:status', expect='git:status')
[18:09:36] TEST       | lca(status vs commit)        -> PASS (got='git', expect='git')
[18:09:36] CASE       | 보상 r=1/(1+d) 테스트
[18:09:36] TEST       | reward(d=0)                  -> PASS (got=1.000000000000, expect=1.000000000000)
[18:09:36] TEST       | reward(d=2)                  -> PASS (got=0.333333333333, expect=0.333333333333)
[18:09:36] TEST       | reward(d=4)                  -> PASS (got=0.200000000000, expect=0.200000000000)
[18:09:36] CASE       | 케이스별 상수 보상(테이블)
[18:09:36] TEST       | case-weight(d=2)             -> PASS

In [None]:
#뭐가 어떻게 될지 몰라 일단 함수 기틀만 만들어 보았음.
'''우선, 호기심 보상 함수를 두가지 버전으로 만들어보앗는데,
첫번째로는 은세가 말했던 스텝이 진행될 수록 기틀이 잡히니까 호기심 보상을 자동적으로 감쇠시키는 버전,
두번째로는 후반에 가서도 호기심 보상을 감소시키지 않을 수 있게 최악의 상태일떄 음의 보상으로 패치하는 버전으로 만들었음.
'''


#자동 감쇠형 호기심 함수
def curiosity_reward_decay(step, base_reward=1.0, decay_rate=0.005):
    """
    호기심 보상이 스텝(step)에 따라 자연스럽게 감소하는 함수.
    예: 초반엔 높은 보상, 후반으로 갈수록 점점 감소.

    R_c(t) = base_reward * (1 - decay_rate * step)
    (단, 0보다 작으면 0으로 제한)
    """
    reward = max(base_reward * (1 - decay_rate * step), 0)
    return reward
# 최악의 경우에 음의 보상을 주는 호기심 함수
def curiosity_reward_with_penalty(is_redundant=False, is_error=False, step=0,
                                  base_reward=1.0, penalty_redundant=0.3, penalty_error=0.5,
                                  decay_rate=0.003):
    """
    스텝 수에 따른 호기심 보상 감소 + 잘못된 행동에 대한 음의 보상 적용.

    Parameters:
    - is_redundant: 이미 여러 번 시도된 동일한 행동 수행시 True ---
    - is_error: 시스템 오류를 유발하는 탐색시 True >>>
    - step: 현재 스텝 수 >>>
    - base_reward: 기본 호기심 보상 >>>
    - penalty_redundant: 반복 행동에 부여할 음의 보상 강도
    - penalty_error: 심각한 오류 행위에 부여할 음의 보상 강도
    - decay_rate: 스텝에 따른 감쇠 비율
    """
    reward = max(base_reward * (1 - decay_rate * step), 0)
    if is_redundant:
        reward -= penalty_redundant
    if is_error:
        reward -= penalty_error
    return max(reward, -1.0)



# 새로운 행동 -> 보상

# 시간 감쇠

# 연속 반복 페널티


In [None]:
import hashlib

def flag_reward(flag_str, method, known_flags, found_paths):
    """
    FLAG 보상 함수 (기틀 버전)
    -----------------------------------------------------
    플래그 문자열을 발견했을 때 보상을 계산하는 함수.

    매개변수 설명:
    - flag_str (str): 코드 속에 끼워져 있는 플래그 원문 문자열
    - method (str): 해당 플래그를 찾은 탐색 경로나 행동(명령) 이름
    - known_flags (dict): 사전 등록된 정답 플래그 해시 목록 {flag_name: sha256_hex}
    - found_paths (dict): 이미 찾은 플래그 및 경로 기록 {flag_name: [methods]}
    -----------------------------------------------------
    반환값:
    - reward (float): 계산된 최종 보상값 (단일 스칼라)
    """

    # --- 기본 보상 정책(수정 가능) ---
    BASE_REWARD = 5.0         # 올바른 플래그를 찾았을 때의 기본 보상
    NEW_PATH_BONUS = 2.0      # 같은 플래그를 '새로운 경로'로 찾았을 때 보너스

    reward = 0.0  # 최종 보상 초기화

    # 1) 플래그 해시 계산 (sha256 권장: 실행 간 일관성 확보)
    flag_hash = hashlib.sha256(flag_str.encode("utf-8")).hexdigest()

    # ================================================================
    # [PARSE_HOOK - START]

    #
    # 아래 변수들은 '존재하면 보상 가중에 반영'되도록 설계됨.
    parsed_reward = None   # 예: 문자열에서 읽은 가산 보상(float). 없으면 None 유지
    parsed_flag_id = None  # 예: 플래그를 특정하는 ID가 있다면 사용(str)
    parsed_tags = None     # 예: ['web','sql'] 같은 태그 리스트. 정책에 활용 가능
   
    # ================================================================

    # 2) known_flags와 해시 매칭
    matched_flag_name = None
    for flag_name, valid_hash in known_flags.items():
        if flag_hash == valid_hash:
            matched_flag_name = flag_name
            break

    # 3) 매칭 실패 시 보상 0 반환
    if matched_flag_name is None:
        return 0.0

    # 4) 경로 중복 여부 확인 및 보상 결정
    prior_methods = found_paths.get(matched_flag_name, [])

    if method not in prior_methods:
        # 새로운 경로에서 같은 플래그를 처음 발견
        reward = BASE_REWARD + NEW_PATH_BONUS
        found_paths.setdefault(matched_flag_name, []).append(method)
    else:
        # 동일 경로로 이미 발견한 적 있음 → 기본 보상만
        reward = BASE_REWARD

    # 5) (선택) 파싱된 보상 parsed_reward가 있으면 가산
    #    친구가 파싱 부분에서 parsed_reward를 채우면 자동 반영됨.
    if parsed_reward is not None:
        # 필요 시 클리핑/스케일 조정 가능 (예: max(0, min(parsed_reward, 10)))
        reward += float(parsed_reward)

    # 6) (선택) parsed_flag_id / parsed_tags 활용 훅
    #    여기서 정책적으로 특정 ID나 태그에 가중치를 줄 수도 있음.

    # 7) 최종 스칼라 보상 반환
    return float(reward)


In [None]:
def main():   
    error_reward()
    prophecy_reward()
    flag_reward()
    curiosity()
    return

In [None]:
# Policy (ε-탐욕 기반)
def policy_update(actions, Q, epsilon=0.1, alpha=0.1, last_action=None, reward=0):
    """
    간단한 ε-탐욕 정책 함수 (기틀 버전)
    - actions: 가능한 행동 리스트
    - Q: 행동 가치 테이블 (dict)
    - epsilon: 무작위 탐색 확률
    - alpha: 학습률
    - last_action: 직전 수행한 행동
    - reward: 직전 보상
    """
    import random

    # Q값 업데이트 (최근 보상 반영)
    if last_action is not None:
        old_value = Q.get(last_action, 0)
        Q[last_action] = old_value + alpha * (reward - old_value)

    # ε-탐욕 정책으로 다음 행동 선택
    if random.random() < epsilon:
        next_action = random.choice(actions)
    else:
        next_action = max(actions, key=lambda a: Q.get(a, 0))

    return next_action