In [1]:
from enum import Enum
from typing import Dict, Any, List, Optional, Union
from pydantic import BaseModel, Field, field_validator, ValidationError
from core import utils


#### /validators/validator.py

In [2]:
from validators.validator import TokenValidator

### Run 예시

In [3]:
import os
import logging
from core import logging_utils, utils

log_levels = {
    "DEBUG": logging.DEBUG,
    "INFO": logging.INFO,
    "WARNING": logging.WARNING,
    "ERROR": logging.ERROR,
    "CRITICAL": logging.CRITICAL
}

log_level = log_levels.get(os.environ.get("LOG_LEVEL", "INFO").upper(), logging.INFO)

logging_utils.setup_logging(
    log_dir="logs",
    log_level=log_level,
    max_bytes=10 * 1024 * 1024, 
    backup_count=5, 
    console_output=True 
)

logging_utils.cleanup_old_logs(log_dir="logs", days_to_keep=7)

17:50:50 - core.logging_utils - INFO - Logging initialized - logs directory: c:\Users\kakao\Desktop\AI_Agent\Semantic Layer\logs
17:50:50 - core.logging_utils - INFO - Log rotation configured: max 10.0MB per file, keeping 5 backups


In [4]:
resource_db = utils.load_resource_specs("./ResourceSpec/TokenSpec.yaml")

validator = TokenValidator(resource_db)

In [5]:
# --- 3. 검증 수행 (Runtime) ---

# Case A: 정상 토큰
valid_token = {"text": "Samsung Elec Risk Analysis...", "risk_score": 0.85}
validator.validate(valid_token, "RS_RISK_TOKEN_V1")

17:50:54 - Validator - INFO - 토큰 검증 시작 (Spec: RS_RISK_TOKEN_V1)


True

In [7]:
# Case B: 불량 토큰 (점수 초과)
invalid_token = {"text": "Short msg", "risk_score": 1.5} # Max 1.0 위반
validator.validate(invalid_token, "RS_RISK_TOKEN_V1")

17:51:36 - Validator - INFO - 토큰 검증 시작 (Spec: RS_RISK_TOKEN_V1)
17:51:36 - Validator - ERROR - 'risk_score' value 1.5 > max 1.0


SemanticError: 'risk_score' value 1.5 > max 1.0

In [48]:
# Case C: 불량 토큰 (필수값 누락)
missing_token = {"text": "No Score"}
validator.validate(missing_token, "RS_RISK_TOKEN_V1")

17:49:51 - validators.validator - INFO - [Validator] 토큰 검증 시작 (Spec: RS_RISK_TOKEN_V1)
17:49:51 - validators.validator - ERROR - Missing Required Field: 'risk_score'


True

In [None]:
import importlib
import logging
from dataclasses import dataclass
from typing import Any, Optional, Dict
from datetime import datetime

# 제공해주신 validators.py의 클래스와 예외 import 가정
# from validators import TokenValidator, SemanticError

@dataclass
class FiringResult:
    """단일 Transition 실행 결과 리포트"""
    task_id: str
    success: bool
    message: str
    new_token: Optional[Any] = None
    elapsed_ms: float = 0.0
    routes_triggered: int = 0

class CSPNEngine:
    """
    [Core Execution Unit]
    TB-CSPN Transition Firing Engine
    
    책임:
    1. Guard Check (Semantic Guard)
    2. Runtime Validation (TokenValidator 사용)
    3. Action Execution (Python Function/LLM)
    4. Routing (Petri Net Token Propagation)
    """
    
    def __init__(self, token_validator):
        """
        :param token_validator: validators.py에 정의된 TokenValidator 인스턴스
        """
        self.tv = token_validator
        self.logger = logging.getLogger("CSPN_Engine")

    def run_step(self, process: Any) -> Optional[FiringResult]:
        """
        Atomic Operation: Fetch -> Validate -> Exec -> Validate -> Route
        """
        # 1. Fetch Job (Process Queue에서 병합 완료된 토큰 인출)
        if not process.token_queue:
            return None
        
        target_task_id, token = process.token_queue.popleft()
        task = process.tasks[target_task_id]

        start_time = datetime.now()

        # 2. Guard Check (토픽 가중치 평가)
        if not self._check_guards(task, token):
            return FiringResult(task.task_id, False, "Guard Condition Failed (Topic Weight Mismatch)")

        # 3. [Validator] Input Spec Validation
        # TokenValidator는 실패 시 SemanticError를 raise함
        try:
            self.tv.validate(token.content, task.input_spec_id)
        except Exception as e:
            # SemanticError, ValueError(Spec 없음) 등 모든 검증 에러 포착
            self.logger.warning(f"Task {task.task_id} Input Validation Failed: {e}")
            return FiringResult(task.task_id, False, f"Input Spec Fail: {str(e)}")

        # 4. Action Execution (Dynamic Loading & Run)
        try:
            func = self._resolve_function(task.target)
            
            # 실행: Token Content + Task Config 주입
            output_content = func(token.content, **task.config)

        except Exception as e:
            self.logger.error(f"Task {task.task_id} Execution Logic Failed: {e}", exc_info=True)
            return FiringResult(task.task_id, False, f"Runtime Execution Error: {str(e)}")

        # 5. [Validator] Output Spec Validation
        # 실행 결과물이 다음 파이프라인 규격을 준수하는지 확인
        try:
            self.tv.validate(output_content, task.output_spec_id)
        except Exception as e:
            self.logger.error(f"Task {task.task_id} Output Validation Failed: {e}")
            return FiringResult(task.task_id, False, f"Output Spec Fail: {str(e)}")

        # 6. Token Evolution (State Update)
        # 검증 통과한 데이터로 새 토큰 생성
        new_token = self._evolve_token(token, output_content, task)

        # 7. Routing (Petri Net Propagation)
        # Process에게 토큰 도착 알림 (Process가 병합/대기 로직 수행)
        routes_count = self._propagate_token(process, task, new_token)
        
        elapsed = (datetime.now() - start_time).total_seconds() * 1000
        return FiringResult(task.task_id, True, "Success", new_token, elapsed, routes_count)

    # -------------------------------------------------------------------------
    # Internal Logic
    # -------------------------------------------------------------------------

    def _check_guards(self, task: Any, token: Any) -> bool:
        """Topic 가중치 기반 실행 조건 평가"""
        if not task.guards:
            return True
        
        token_topics = getattr(token, 'topics', {})
        for guard in task.guards:
            score = token_topics.get(guard.target_topic_id, 0.0)
            if score < guard.min_relevance:
                self.logger.debug(f"Guard Fail: {guard.target_topic_id} ({score} < {guard.min_relevance})")
                return False
        return True

    def _resolve_function(self, target_path: str):
        """'module.path:func_name' 문자열을 실제 함수 객체로 변환"""
        try:
            module_path, func_name = target_path.split(":")
            module = importlib.import_module(module_path)
            return getattr(module, func_name)
        except (ValueError, ImportError, AttributeError) as e:
            raise ImportError(f"Function resolution failed for '{target_path}': {e}")

    def _evolve_token(self, old_token: Any, new_content: Dict, task: Any) -> Any:
        """이전 토큰 승계 및 이력 업데이트"""
        # Token 클래스 정의에 따라 model_copy 또는 생성자 사용
        # 예시: Pydantic model_copy
        new_history = getattr(old_token, 'history', []) + [task.task_id]
        
        return old_token.model_copy(update={
            "content": new_content,
            "history": new_history
        })

    def _propagate_token(self, process: Any, current_task: Any, token: Any) -> int:
        """
        [Routing Logic]
        다음 Task들을 찾아 'Process.arrive_token'을 호출함.
        """
        next_tasks = process.get_next_nodes(current_task.task_id)
        
        if not next_tasks:
            process.completed_tokens.append(token) # End of Chain
            return 0

        triggered = 0
        for next_task in next_tasks:
            # Look-ahead Guard Check: 갈 자격이 있는 경로인가?
            if self._check_guards(next_task, token):
                # [핵심] Process의 Place 로직 호출 (Petri Net 동기화 위임)
                process.arrive_token(
                    from_task_id=current_task.task_id,
                    to_task_id=next_task.task_id,
                    token=token
                )
                triggered += 1
            else:
                self.logger.info(f"Route Ignored: {current_task.task_id} -> {next_task.task_id}")
        
        return triggered