In [20]:
# runtime/integrated_pipeline.py
"""
统一的阶段2+阶段3集成入口
将护栏层(阶段2)与生成层(阶段3)完整连接
"""

from __future__ import annotations
import json
from typing import Dict, Any, List, Optional
from pathlib import Path
import sys, csv, time

# 导入项目模块
try:
    from runtime.controller import run_once
    print("成功导入runtime.controller")
    from provider.generator import Generator
    print("成功导入provider.generator")
    from provider.oocChecker import OOCChecker
    print("成功导入provider.oocChecker")
    from provider.memory_store import MemoryStore
    print("成功导入provider.memory_store")
    from provider.memory_summarizer import MemorySummarizer
    print("成功导入provider.memory_summarizer")
    from provider.qwen import QwenProvider
    print("成功导入provider.qwen")
    from runtime.compile_data import compile_all, CACHE_DIR, CACHE_FILE
    print("成功导入所有模块")
except ImportError:
    # 回退到相对导入
    try:
        from .controller import run_once
        from qwen import QwenProvider
        from generator import Generator
        from oocChecker import OOCChecker
        from memory_store import MemoryStore
        from memory_summarizer import MemorySummarizer
        from compile_data import compile_all, CACHE_DIR, CACHE_FILE
    except ImportError:
        print("警告：无法导入所有模块，请检查项目结构")

# 每次都重新加载以上导入的模块，确保使用最新代码
%load_ext autoreload
%autoreload 2

成功导入runtime.controller
成功导入provider.generator
成功导入provider.oocChecker
成功导入provider.memory_store
成功导入provider.memory_summarizer
成功导入provider.qwen
成功导入所有模块
The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [21]:
# runtime/integrated_pipeline.py
class RobustGenerator(Generator):
    """增强的生成器，处理各种边界情况"""
    
    def generate_candidates(self, ctx: str, persona: str, n: int = 2) -> List[Dict[str, Any]]:
        """重写生成候选方法，确保返回有效结果"""
        try:
            candidates = super().generate_candidates(ctx, persona, n)
            
            # 确保所有候选都有有效文本
            valid_candidates = []
            for candidate in candidates:
                draft = candidate.get("draft", {})
                text = draft.get("text", "").strip()
                if text:  # 只保留有文本的候选
                    valid_candidates.append(candidate)
            
            return valid_candidates if valid_candidates else self._get_fallback_candidates()
            
        except Exception as e:
            print(f"候选生成失败: {e}")
            return self._get_fallback_candidates()
    
    def _get_fallback_candidates(self) -> List[Dict[str, Any]]:
        """获取回退候选"""
        return [{
            "draft": {
                "text": "I'm here to help. What would you like to know?",
                "meta": {
                    "self_report": "I'm ready to assist.",
                    "sentiment": "neutral"
                }
            }
        }]
    
    def align_with_post_infer(self, draft_envelope: Dict[str, Any], post_infer_emotion: str, target_emotion: Optional[str] = None) -> Dict[str, Any]:
        """重写情绪对齐，确保返回有效结果"""
        try:
            # 确保草稿有文本
            draft = draft_envelope.get("draft", {})
            if not draft.get("text"):
                return self._get_fallback_alignment(target_emotion or post_infer_emotion)
            
            result = super().align_with_post_infer(draft_envelope, post_infer_emotion, target_emotion)
            
            # 确保结果有效
            final_data = result.get("final", {})
            if not final_data.get("text") or not final_data.get("emotion"):
                # 使用原始草稿作为回退
                result["final"] = {
                    "text": draft.get("text", "I'm not sure how to respond."),
                    "emotion": target_emotion or post_infer_emotion or "neutral",
                    "audit": {"rewritten": False, "reason": "alignment_failed"}
                }
            
            return result
            
        except Exception as e:
            print(f"情绪对齐失败: {e}")
            draft = draft_envelope.get("draft", {})
            return {
                "final": {
                    "text": draft.get("text", "I'm not sure how to respond."),
                    "emotion": target_emotion or post_infer_emotion or "neutral",
                    "audit": {"rewritten": False, "reason": "alignment_error"}
                }
            }
    
    def _get_fallback_alignment(self, emotion: str) -> Dict[str, Any]:
        """获取回退对齐结果"""
        return {
            "final": {
                "text": "I understand your concern. Let me think about how best to respond.",
                "emotion": emotion or "neutral",
                "audit": {"rewritten": False, "reason": "fallback"}
            }
        }
"""
统一的阶段2+阶段3集成入口
将护栏层(阶段2)与生成层(阶段3)完整连接
"""
import re
import json

class RobustQwenProvider(QwenProvider):
    """增强的Qwen提供者，具有更好的JSON解析"""
    
    def _extract_json_from_text(self, text: str):
        """从文本中提取JSON"""
        if not text:
            return None
            
        # 尝试直接解析
        try:
            return json.loads(text)
        except json.JSONDecodeError:
            pass
            
        # 尝试提取JSON对象 { ... }
        try:
            json_match = re.search(r'\{[^{}]*\}', text)
            if json_match:
                return json.loads(json_match.group())
        except:
            pass
            
        # 尝试提取JSON数组 [ ... ]
        try:
            array_match = re.search(r'\[[^\[\]]*\]', text)
            if array_match:
                return json.loads(array_match.group())
        except:
            pass
            
        return None
    
    def generate(self, prompt: str, schema=None, retries=3):
        """重写生成方法，提供更好的JSON解析"""
        for attempt in range(retries):
            try:
                result = super().generate(prompt, schema, retries=1)
                
                # 如果结果是字符串，尝试提取JSON
                if isinstance(result, str):
                    parsed = self._extract_json_from_text(result)
                    if parsed:
                        return parsed
                    else:
                        print(f"第{attempt+1}次尝试: 无法解析JSON，重试...")
                        continue
                
                return result
                
            except Exception as e:
                print(f"Qwen生成尝试 {attempt+1} 失败: {e}")
                if attempt == retries - 1:
                    return self._get_fallback_response(schema)
    
    def _get_fallback_response(self, schema):
        """获取回退响应"""
        if schema == ["reply", "emotion"]:
            return {"reply": "I'm here to assist you.", "emotion": "neutral"}
        elif schema == ["self_report", "sentiment"]:
            return {"self_report": "I'm ready to help.", "sentiment": "neutral"}
        elif isinstance(schema, list):
            return {key: "" for key in schema}
        else:
            return {"text": "I'm available to help."}

class FixedMemoryStore(MemoryStore):
    
    def __init__(self):
        """初始化记忆存储"""
        self.longterm_file = Path("longterm_memory.csv")
        self.shortterm_file = Path("shortterm_memory.csv")
        self._ensure_files_exist()
    
    def _ensure_files_exist(self):
        """确保记忆文件存在且有正确的列头"""
        try:
            # 长期记忆文件
            if not self.longterm_file.exists():
                with open(self.longterm_file, 'w', newline='', encoding='utf-8') as f:
                    writer = csv.DictWriter(f, fieldnames=['player_id', 'npc_id', 'fact', 'timestamp'])
                    writer.writeheader()
                print("✓ 创建长期记忆文件")
            
            # 短期记忆文件  
            if not self.shortterm_file.exists():
                with open(self.shortterm_file, 'w', newline='', encoding='utf-8') as f:
                    writer = csv.writer(f)
                    writer.writerow(['speaker', 'text', 'emotion', 'timestamp'])
                print("✓ 创建短期记忆文件")
                
        except Exception as e:
            print(f"✗ 初始化记忆文件失败: {e}")

In [22]:
class IntegratedPipeline:
    def __init__(self, api_key: Optional[str] = None, auto_compile: bool = True):
        """初始化集成管道"""
        # 确保缓存目录存在
        CACHE_DIR.mkdir(parents=True, exist_ok=True)
        
        # 自动编译数据文件
        if auto_compile and not CACHE_FILE.exists():
            print("编译数据文件...")
            self._compile_data()
        
        # 加载编译数据
        self.compiled_data = self._load_compiled_data()
        
        # 初始化提供者 - 使用增强版本
        self.provider = RobustQwenProvider(apikey=api_key or "QWEN_API_KEY")
        
        # 初始化阶段3组件
        self.generator = RobustGenerator(self.provider)
        self.ooc_checker = OOCChecker(self.provider)
        
        # 初始化记忆存储
        self.memory_store = FixedMemoryStore()
        self.memory_summarizer = MemorySummarizer(self.provider, self.ooc_checker)
        # 设置NPC名称映射
        self._setup_npc_names()
        # 初始化记忆文件
        self._initialize_memory_files()
        
        # 状态跟踪
        self.conversation_history = []
        
        # 检查数据文件状态
        self.data_available = CACHE_FILE.exists()
        if not self.data_available:
            print("警告：数据文件不可用，将使用简化模式")

    def _setup_npc_names(self):
        """设置NPC名称映射"""
        npc_names = {}
        if hasattr(self, 'compiled_data') and self.compiled_data:
            for npc in self.compiled_data.get("npc", []):
                npc_id = npc.get("npc_id")
                name = npc.get("name")
                if npc_id and name:
                    npc_names[npc_id] = name
        
        # 添加默认的guard_01
        if "guard_01" not in npc_names:
            npc_names["guard_01"] = "City Guard"
            
        self.memory_store.set_npc_names(npc_names)

    def _load_compiled_data(self) -> Dict[str, Any]:
        """加载编译数据"""
        try:
            if CACHE_FILE.exists():
                with open(CACHE_FILE, 'r', encoding='utf-8') as f:
                    data = json.load(f)
                print(f"✓ 加载编译数据: {len(data.get('npc', []))} 个NPC, {len(data.get('lore_public', []))} 条公共知识")
                return data
            else:
                print("✗ 编译数据文件不存在")
                return {}
        except Exception as e:
            print(f"✗ 加载编译数据失败: {e}")
            return {}
    
    def _initialize_memory_files(self):
        """初始化记忆文件"""
        try:
            # 确保长期记忆文件有正确的列头
            if hasattr(self.memory_store, 'longterm_file'):
                longterm_file = self.memory_store.longterm_file
            else:
                longterm_file = Path("longterm_memory.csv")
                self.memory_store.longterm_file = longterm_file
            
            if not longterm_file.exists():
                # 创建文件并写入列头
                with open(longterm_file, 'w', newline='', encoding='utf-8') as f:
                    writer = csv.DictWriter(f, fieldnames=["player_id", "npc_id", "fact", "timestamp"])
                    writer.writeheader()
                print("创建了长期记忆文件")
        except Exception as e:
            print(f"初始化记忆文件失败: {e}")
    
    def _ensure_valid_final_output(self, final_output: Dict[str, Any], best_candidate: Dict[str, Any], phase2_result: Dict[str, Any]) -> Dict[str, Any]:
        """确保最终输出有效 - 重新添加这个方法"""
        final_data = final_output.get("final", {})
        
        # 检查文本是否有效
        if not final_data.get("text") or final_data.get("text") is None:
            print("警告：最终输出文本为空，使用候选文本")
            draft_text = best_candidate.get("draft", {}).get("text", "I'm not sure how to respond.")
            final_data["text"] = draft_text
        
        # 检查情绪是否有效
        if not final_data.get("emotion") or final_data.get("emotion") is None:
            print("警告：最终输出情绪为空，使用默认情绪")
            final_emotion = phase2_result.get("emotion", {}).get("final", "neutral")
            final_data["emotion"] = final_emotion
        
        # 确保audit字段存在
        if "audit" not in final_data:
            final_data["audit"] = {"rewritten": False, "reason": "auto_fixed"}
        
        return final_output
    
    def _ensure_consistent_output_structure(self, output_data: Dict[str, Any]) -> Dict[str, Any]:
        """确保输出数据结构一致"""
        
        # 如果已经是标准结构，直接返回
        if "final" in output_data:
            return output_data
        
        # 如果是平铺结构，转换为标准结构
        if "text" in output_data:
            return {
                "final": {
                    "text": output_data.get("text", ""),
                    "emotion": output_data.get("emotion", "neutral"),
                    "audit": output_data.get("audit", {"rewritten": False, "reason": "auto_fixed"}),
                    "response_type": output_data.get("response_type", "unknown"),
                    "ooc_risk": output_data.get("ooc_risk", 0.0),
                    "ooc_reasons": output_data.get("ooc_reasons", [])
                }
            }
        
        # 默认回退结构
        return {
            "final": {
                "text": "I'm having trouble responding right now.",
                "emotion": "neutral",
                "audit": {"rewritten": False, "reason": "fallback"}
            }
        }
    
    def _run_phase3(self, phase2_result: Dict[str, Any], user_text: str, npc_id: str, player_id: str) -> Dict[str, Any]:
        """执行阶段3生成层处理"""
        
        try:
            # 确保phase2_result有必要的字段
            phase2_result.setdefault("emotion", {})
            phase2_result.setdefault("allow", True)
            
            # 构建生成上下文
            print("构建生成上下文...")
            context = self._build_generation_context(phase2_result, user_text, npc_id, player_id)
            
            # 生成候选回复
            print("生成候选回复...")
            persona = self._get_npc_persona(npc_id)
            candidates = self.generator.generate_candidates(
                ctx=user_text,
                persona=persona,
                n=2
            )
            
            if not candidates:
                return self._handle_generation_failure(phase2_result, "无候选回复生成")
            
            # 重排选优
            print("重排候选...")
            best_candidate = self.generator.rank(candidates, persona, user_text)
            
            # 情绪对齐
            print("情绪对齐检查...")
            aligned_response = self._apply_emotion_alignment(best_candidate, phase2_result)
            
            # OOC检查
            print("OOC检查...")
            final_output = self._apply_ooc_check(aligned_response, context)
            
            # 确保最终输出有效且结构一致
            final_output = self._ensure_valid_final_output(final_output, best_candidate, phase2_result)
            final_output = self._ensure_consistent_output_structure(final_output)
            
            # 记忆处理
            print("记忆处理...")
            self._update_memory_systems(final_output, user_text, player_id, npc_id)
            
            # 构建最终结果
            phase2_result["final_output"] = final_output
            phase2_result["response_type"] = "generated"
            phase2_result["generation_metadata"] = {
                "candidates_generated": len(candidates),
                "emotion_aligned": "post" in phase2_result.get("emotion", {}),
                "ooc_checked": True
            }
            
        except Exception as e:
            print(f"阶段3处理失败: {e}")
            import traceback
            traceback.print_exc()
            return self._handle_generation_failure(phase2_result, f"阶段3错误: {str(e)}")
        
        return phase2_result
    
    def _build_generation_context(self, phase2_result: Dict[str, Any], user_text: str, npc_id: str, player_id: str) -> Dict[str, Any]:
        """构建生成上下文"""
        
        # 获取检索到的证据
        evidence = phase2_result.get("retriever", {}).get("evidence", [])
        evidence_text = self._format_evidence(evidence)
        
        # 获取情绪信息
        emotion_info = phase2_result.get("emotion", {})
        pre_emotion = emotion_info.get("pre", {}).get("emotion_hint", "neutral")
        post_emotion = emotion_info.get("post", {}).get("emotion_from_content")
        final_emotion = emotion_info.get("final", "neutral")
        style_hooks = emotion_info.get("style", {})
        
        # 获取槽位信息
        slot = phase2_result.get("slot", "small_talk")
        
        # 安全地获取记忆
        memory_facts = []
        try:
            memory_facts = self.memory_store.retrieve_longterm(player_id, npc_id, top_k=3)
        except Exception as e:
            print(f"记忆检索失败: {e}")
            memory_facts = []
        
        context = {
            "user_text": user_text,
            "npc_id": npc_id,
            "player_id": player_id,
            "slot": slot,
            "evidence": evidence,
            "evidence_text": evidence_text,
            "pre_emotion": pre_emotion,
            "post_emotion": post_emotion,
            "target_emotion": final_emotion,
            "style_hooks": style_hooks,
            "memory_facts": memory_facts,
            "conversation_history": self.conversation_history[-5:] if self.conversation_history else []
        }
        
        return context
 
    
    def _compile_data(self):
        """编译数据文件"""
        try:
            from runtime.compile_data import main as compile_main
            compile_main()
            print("数据编译完成")
            self.data_available = True
        except Exception as e:
            print(f"数据编译失败: {e}")
            self.data_available = False
    
    def process_query(self, user_text: str, npc_id: str = "SV001", player_id: str = "player1") -> Dict[str, Any]:
        """
        处理用户查询的完整流程
        """
        print(f"\n=== 处理查询: {user_text} ===")
        print(f"NPC: {npc_id}, Player: {player_id}")
        
        # 如果没有数据文件，使用简化流程
        if not self.data_available:
            return self._process_without_data(user_text, npc_id, player_id)
        
        # 阶段2：护栏层处理
        print("阶段2：护栏层处理中...")
        phase2_result = self._run_phase2(user_text, npc_id)
        
        # 生成情绪报告（使用编译数据）
        emotion_report = get_emotion_report(phase2_result, npc_id, self.compiled_data)
        print_emotion_report(emotion_report)
        phase2_result["emotion_report"] = emotion_report
        
        # 检查是否被护栏拦截
        if not phase2_result.get("allow", True):
            print("阶段2：查询被护栏拦截")
            return self._handle_phase2_refusal(phase2_result, npc_id)
        
        # 阶段3：生成层处理
        print("阶段3：生成层处理中...")
        final_result = self._run_phase3(phase2_result, user_text, npc_id, player_id)
        
        # 记录对话历史
        print("更新对话历史...")
        self._update_conversation_history(user_text, final_result, player_id, npc_id)
        
        return final_result
    
    def _apply_emotion_alignment(self, candidate: Dict[str, Any], phase2_result: Dict[str, Any]) -> Dict[str, Any]:
        """应用情绪对齐"""
        
        emotion_info = phase2_result.get("emotion", {})
        post_emotion = emotion_info.get("post", {}).get("emotion_from_content")
        target_emotion = emotion_info.get("final", "neutral")
        
        # 检查是否需要情绪对齐
        needs_alignment = (
            post_emotion and 
            post_emotion != target_emotion and
            candidate.get("draft", {}).get("text")  # 确保有文本可以对齐
        )
        
        if needs_alignment:
            print(f"情绪对齐: {post_emotion} -> {target_emotion}")
            try:
                aligned = self.generator.align_with_post_infer(
                    candidate,
                    post_emotion,
                    target_emotion
                )
                return aligned
            except Exception as e:
                print(f"情绪对齐失败: {e}")
                # 返回原始候选
                draft = candidate.get("draft", {})
                return {
                    "final": {
                        "text": draft.get("text", ""),
                        "emotion": target_emotion,
                        "audit": {"rewritten": False, "reason": "alignment_failed"}
                    }
                }
        else:
            # 直接使用候选，包装成统一格式
            draft = candidate.get("draft", {})
            return {
                "final": {
                    "text": draft.get("text", ""),
                    "emotion": target_emotion,
                    "audit": {"rewritten": False, "reason": "no_alignment_needed"}
                }
            }
    def _process_without_data(self, user_text: str, npc_id: str, player_id: str) -> Dict[str, Any]:
        """在没有数据文件时的简化处理流程"""
        print("使用简化模式（无数据文件）")
        
        # 直接生成回复，跳过护栏检查
        persona = self._get_npc_persona(npc_id)
        
        try:
            candidates = self.generator.generate_candidates(
                ctx=user_text,
                persona=persona,
                n=2
            )
            
            if candidates:
                best_candidate = self.generator.rank(candidates, persona, user_text)
                draft_text = best_candidate.get("draft", {}).get("text", "I'm not sure how to respond.")
                emotion = best_candidate.get("draft", {}).get("meta", {}).get("sentiment", "neutral")
            else:
                draft_text = "I'm having trouble understanding. Could you rephrase?"
                emotion = "neutral"
                
        except Exception as e:
            print(f"生成失败: {e}")
            draft_text = "I'm experiencing technical difficulties. Please try again later."
            emotion = "neutral"
        
        # 构建结果
        result = {
            "allow": True,
            "final_output": {
                "text": draft_text,
                "emotion": emotion,
                "response_type": "generated_simple"
            },
            "response_type": "generated_simple",
            "data_status": "no_data_file",
            "npc_id": npc_id,
            "user_text": user_text
        }
        
        # 记录对话历史
        self._update_conversation_history(user_text, result, player_id, npc_id)
        
        return result
    
    def _run_phase2(self, user_text: str, npc_id: str) -> Dict[str, Any]:
        """执行阶段2护栏层处理"""
        try:
            # 尝试调用现有的controller流程
            phase2_result = run_once(user_text, npc_id)
            
            # 检查结果结构并确保一致
            if not isinstance(phase2_result, dict):
                phase2_result = {}
            
            # 确保必要的字段存在
            phase2_result.setdefault("allow", True)
            phase2_result.setdefault("emotion", {})
            phase2_result.setdefault("filters", {})
            
            # 检查过滤器结果
            filters_result = phase2_result.get("filters", {})
            if isinstance(filters_result, dict) and not filters_result.get("allow", True):
                phase2_result["allow"] = False
                phase2_result["deny_reason"] = filters_result.get("deny", {}).get("reason", "filter_rejection")
                phase2_result["deny_details"] = filters_result.get("hits", {})
            else:
                phase2_result["allow"] = True
                
            return phase2_result
            
        except Exception as e:
            print(f"阶段2处理错误: {e}")
            # 返回默认允许的结果，避免阻塞流程
            return {
                "allow": True,  # 默认允许，让阶段3处理
                "error": f"阶段2处理失败: {str(e)}",
                "emotion": {"final": "neutral"},
                "filters": {"allow": True}
            }
    
    def _handle_phase2_refusal(self, phase2_result: Dict[str, Any], npc_id: str) -> Dict[str, Any]:
        """处理阶段2的拒答情况"""
        deny_reason = phase2_result.get("deny_reason", "unknown")
        deny_details = phase2_result.get("deny_details", {})
        
        print(f"生成拒答，原因: {deny_reason}")
        
        # 构建拒答上下文
        persona = self._get_npc_persona(npc_id)
        tone_guidelines = self._get_tone_guidelines(phase2_result)
        
        # 调用生成器的拒答方法
        try:
            refusal_response = self.generator.refusal_response(
                {
                    "reason": deny_reason,
                    "details": deny_details
                },
                persona,
                tone_guidelines
            )
            
            reply_text = refusal_response.get("reply", "I cannot answer that.")
            emotion = refusal_response.get("emotion", "neutral")
        except Exception as e:
            print(f"拒答生成失败: {e}")
            reply_text = "I cannot discuss that topic."
            emotion = "neutral"
        
        # 构建最终结果
        final_output = {
            "text": reply_text,
            "emotion": emotion,
            "response_type": "refusal",
            "deny_reason": deny_reason,
            "deny_details": deny_details
        }
        
        # 合并阶段2结果
        phase2_result["final_output"] = final_output
        phase2_result["response_type"] = "refusal"
        
        return phase2_result

    
    def _apply_emotion_alignment(self, candidate: Dict[str, Any], phase2_result: Dict[str, Any]) -> Dict[str, Any]:
        """应用情绪对齐"""
        
        emotion_info = phase2_result.get("emotion", {})
        post_emotion = emotion_info.get("post", {}).get("emotion_from_content")
        target_emotion = emotion_info.get("final", "neutral")
        
        # 如果有post_infer情绪且与当前情绪不一致，进行对齐
        if post_emotion and post_emotion != target_emotion:
            print(f"情绪对齐: {post_emotion} -> {target_emotion}")
            try:
                aligned = self.generator.align_with_post_infer(
                    candidate,
                    post_emotion,
                    target_emotion
                )
                return aligned
            except Exception as e:
                print(f"情绪对齐失败: {e}")
                # 返回原始候选
                draft = candidate.get("draft", {})
                return {
                    "final": {
                        "text": draft.get("text", ""),
                        "emotion": target_emotion,
                        "audit": {"rewritten": False, "reason": "alignment_failed"}
                    }
                }
        else:
            # 直接使用候选，包装成统一格式
            draft = candidate.get("draft", {})
            return {
                "final": {
                    "text": draft.get("text", ""),
                    "emotion": target_emotion,
                    "audit": {"rewritten": False, "reason": "no_alignment_needed"}
                }
            }
    
    def _apply_ooc_check(self, response: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]:
        """应用OOC检查"""
        
        final_data = response.get("final", {})
        text = final_data.get("text", "")
        emotion = final_data.get("emotion", "neutral")
        
        if not text:
            return response
        
        # 构建OOC检查上下文
        ooc_context = f"NPC: {context['npc_id']}, Emotion: {emotion}. User: {context['user_text']}"
        
        # 执行OOC检查
        try:
            ooc_result = self.ooc_checker.judge_ooc(ooc_context, final_data)
            
            # 如果OOC风险高，使用降级回复
            ooc_risk = ooc_result.get("ooc_risk", 0.0)
            if ooc_risk > 0.7:
                print(f"高OOC风险 ({ooc_risk:.2f})，使用安全回复")
                safe_response = {
                    "text": "I need to be careful with my words. Let's talk about something else.",
                    "emotion": "neutral",
                    "ooc_risk": ooc_risk,
                    "ooc_reasons": ooc_result.get("reasons", [])
                }
                response["final"] = safe_response
            else:
                # 添加OOC信息到响应中
                final_data["ooc_risk"] = ooc_risk
                final_data["ooc_reasons"] = ooc_result.get("reasons", [])
                
        except Exception as e:
            print(f"OOC检查失败: {e}")
            # 继续使用原始响应
        
        return response
    
    def _update_memory_systems(self, response: Dict[str, Any], user_text: str, player_id: str, npc_id: str):
        """更新记忆系统 - 使用实际名称版本"""
        
        final_data = response.get("final", {})
        text = final_data.get("text", "")
        emotion = final_data.get("emotion", "neutral")
        
        if not text:
            print("记忆更新跳过：无响应文本")
            return
        
        try:
            # 记录玩家事件 - 传递玩家ID信息
            player_event = {
                "speaker": "player", 
                "text": user_text,
                "emotion": "unknown",
                "player_id": player_id,  # 添加玩家ID
                "npc_id": None  # 玩家事件没有NPC ID
            }
            self.memory_store.append_event(player_event)
            
            # 记录NPC事件 - 传递NPC ID信息
            npc_event = {
                "speaker": "NPC",
                "text": text, 
                "emotion": emotion,
                "player_id": None,  # NPC事件没有玩家ID
                "npc_id": npc_id   # 添加NPC ID
            }
            self.memory_store.append_event(npc_event)
            
            print(f"✓ 已记录短期事件: {self._get_npc_name(player_id) if player_id.startswith('npc_') else 'Player'}1条, {self._get_npc_name(npc_id)}1条")
            
            # 获取近期事件并生成长期记忆
            try:
                recent_events = self.memory_store.get_short_window(k=5)
                print(f"获取到 {len(recent_events)} 个近期事件")
                
                # 将字典格式的事件转换为字符串格式，供 MemorySummarizer 使用
                formatted_events = []
                for event in recent_events:
                    speaker = event.get('speaker', 'Unknown')
                    event_text = event.get('text', '')
                    event_emotion = event.get('emotion', 'neutral')
                    formatted_events.append(f"{speaker}: {event_text} (emotion: {event_emotion})")
                
                print(f"格式化后的事件: {formatted_events}")
                
                # 放宽条件：有2个以上事件就尝试生成摘要
                if len(formatted_events) >= 2:
                    print("尝试生成记忆摘要...")
                    try:
                        facts = self.memory_summarizer.summarize(formatted_events)
                        print(f"记忆摘要器返回: {facts}")
                        
                        if facts and len(facts) > 0:
                            # 提取事实文本
                            fact_texts = []
                            for fact in facts:
                                fact_text = fact.get('fact', '')
                                if fact_text and fact_text.strip():
                                    fact_texts.append(fact_text.strip())
                            
                            if fact_texts:
                                # 安全地写入长期记忆
                                try:
                                    self.memory_store.write_longterm(player_id, npc_id, fact_texts)
                                    print(f"✓ 成功写入 {len(fact_texts)} 条长期记忆")
                                    for i, fact in enumerate(fact_texts, 1):
                                        print(f"   {i}. {fact}")
                                except Exception as e:
                                    print(f"✗ 写入长期记忆失败: {e}")
                            else:
                                print("✗ 没有有效的事实文本可写入")
                        else:
                            print("✗ 记忆摘要器返回空结果")
                            
                    except Exception as e:
                        print(f"✗ 记忆摘要生成失败: {e}")
                        import traceback
                        traceback.print_exc()
                else:
                    print(f"事件数量不足 ({len(recent_events)})，跳过长期记忆生成")
                    
            except Exception as e:
                print(f"✗ 获取近期事件失败: {e}")
                
        except Exception as e:
            print(f"✗ 记忆系统更新失败: {e}")
    
    def _update_conversation_history(self, user_text: str, result: Dict[str, Any], player_id: str, npc_id: str):
        """更新对话历史"""
        
        final_output = result.get("final_output", {})
        npc_response = final_output.get("text", "")
        emotion = final_output.get("emotion", "neutral")
        
        turn = {
            "player_id": player_id,
            "npc_id": npc_id,
            "user_text": user_text,
            "npc_response": npc_response,
            "emotion": emotion,
            "response_type": result.get("response_type", "unknown"),
            "timestamp": self._get_current_timestamp()
        }
        
        self.conversation_history.append(turn)
        
        # 保持历史长度
        if len(self.conversation_history) > 50:
            self.conversation_history = self.conversation_history[-50:]
    
    def _get_npc_persona(self, npc_id: str) -> str:
        """从compiled.json获取NPC角色描述"""
        try:
            # 检查是否有编译数据
            if not hasattr(self, 'compiled_data') or not self.compiled_data:
                print("警告：没有编译数据，使用简化角色描述")
                return self._get_simple_persona(npc_id)
            
            # 从compiled.json中查找NPC
            npc_list = self.compiled_data.get("npc", [])
            for npc in npc_list:
                if npc.get("npc_id") == npc_id:
                    return self._build_detailed_persona(npc)
            
            # 如果没有找到，使用简化描述
            print(f"警告：未找到NPC '{npc_id}' 的配置")
            return self._get_simple_persona(npc_id)
            
        except Exception as e:
            print(f"获取NPC角色描述失败: {e}")
            return self._get_simple_persona(npc_id)

    def _build_detailed_persona(self, npc_data: dict) -> str:
        """构建详细的角色描述"""
        parts = []
        
        # 基础身份
        parts.append(f"{npc_data['name']}, a {npc_data['role']}")
        
        # 核心情绪特征
        parts.append(f"baseline emotion is {npc_data['baseline_emotion']}")
        parts.append(f"emotional range includes: {npc_data['emotion_range']}")
        
        # 说话风格
        if npc_data.get('speaking_style'):
            parts.append(f"speaking style: {npc_data['speaking_style']}")
        
        # 话题偏好
        if npc_data.get('allowed_tags'):
            parts.append(f"comfortable discussing: {npc_data['allowed_tags']}")
        
        # 禁忌话题
        if npc_data.get('taboo_topics'):
            parts.append(f"avoids topics about: {npc_data['taboo_topics']}")
        
        # 情绪风格映射
        style_map = npc_data.get('style_emotion_map', '')
        if style_map:
            parts.append(f"emotional expressions: {style_map}")
        
        return ". ".join(parts) + "."

    def _get_simple_persona(self, npc_id: str) -> str:
        """获取简化角色描述（回退）"""
        simple_personas = {
            "SV001": "Shane, a farmhand. Usually serious and blunt. Avoids personal topics.",
            "SV002": "Sam, a musician. Usually cheerful and playful. Loves music and town gossip.", 
            "SV003": "Linus, a hermit. Usually calm and philosophical. Enjoys nature and quiet.",
            "guard_01": "A vigilant city guard, formal and cautious.",
            "default": "A helpful NPC in this world."
        }
        return simple_personas.get(npc_id, simple_personas["default"])
    
    def _get_tone_guidelines(self, phase2_result: Dict[str, Any]) -> str:
        """获取语调指导"""
        emotion_info = phase2_result.get("emotion", {})
        style = emotion_info.get("style", {})
        return style.get("tone", "neutral")
    
    def _format_evidence(self, evidence: List[Dict[str, Any]]) -> str:
        """格式化证据文本"""
        if not evidence:
            return "No specific information available."
        
        facts = []
        for item in evidence[:3]:  # 最多3条证据
            entity = item.get("entity", "")
            fact = item.get("fact", "")
            if entity and fact:
                facts.append(f"{entity}: {fact}")
            elif fact:
                facts.append(fact)
        
        return "; ".join(facts)
    
    def _handle_generation_failure(self, phase2_result: Dict[str, Any], reason: str) -> Dict[str, Any]:
        """处理生成失败情况"""
        fallback_response = {
            "text": "I'm having trouble responding right now. Could you try again?",
            "emotion": "neutral",
            "error": reason
        }
        
        phase2_result["final_output"] = fallback_response
        phase2_result["response_type"] = "error"
        phase2_result["generation_metadata"] = {"error": reason}
        
        return phase2_result
    
    def _get_current_timestamp(self) -> str:
        """获取当前时间戳（简化实现）"""
        from datetime import datetime
        return datetime.now().isoformat()
    
    def get_conversation_summary(self, player_id: str = None, npc_id: str = None) -> List[Dict[str, Any]]:
        """获取对话摘要"""
        if player_id and npc_id:
            return [turn for turn in self.conversation_history 
                   if turn.get("player_id") == player_id and turn.get("npc_id") == npc_id]
        return self.conversation_history
    
    def recompile_data(self):
        """重新编译数据文件"""
        print("重新编译数据文件...")
        self._compile_data()

    # 在 IntegratedPipeline 类中添加辅助方法
    def _get_npc_name(self, npc_id: str) -> str:
        """获取NPC名称"""
        try:
            if hasattr(self, 'compiled_data') and self.compiled_data:
                for npc in self.compiled_data.get("npc", []):
                    if npc.get("npc_id") == npc_id:
                        return npc.get("name", npc_id)
        except:
            pass
        return npc_id

def get_emotion_report(phase2_result: Dict[str, Any], npc_id: str = None, compiled_data: Dict[str, Any] = None) -> Dict[str, Any]:
    """生成情绪分析报告 - 增强版本"""
    emotion_info = phase2_result.get("emotion", {})
    
    # 获取各个阶段的情绪
    pre_emotion = emotion_info.get("pre", {})
    post_emotion = emotion_info.get("post", {})
    final_emotion = emotion_info.get("final", "neutral")
    
    # 如果有编译数据，使用其中的情绪模式
    if compiled_data and npc_id:
        # 查找NPC的情绪范围
        npc_emotion_range = ["neutral"]  # 默认
        for npc in compiled_data.get("npc", []):
            if npc.get("npc_id") == npc_id:
                emotion_range_str = npc.get("emotion_range", "")
                if emotion_range_str:
                    npc_emotion_range = [e.strip() for e in emotion_range_str.split(",")]
                break
        
        # 基于NPC情绪范围初始化分数
        emotion_scores = {emotion: 0.1 for emotion in npc_emotion_range}
        
        # 设置基础情绪分数
        baseline_emotion = None
        for npc in compiled_data.get("npc", []):
            if npc.get("npc_id") == npc_id:
                baseline_emotion = npc.get("baseline_emotion", "neutral")
                break
        
        if baseline_emotion and baseline_emotion in emotion_scores:
            emotion_scores[baseline_emotion] = 0.3
        
    else:
        # 回退到默认情绪范围
        emotion_scores = {
            "neutral": 0.3, "happy": 0.1, "sad": 0.1, "angry": 0.1, 
            "fearful": 0.1, "surprised": 0.1, "serious": 0.2
        }
        baseline_emotion = "neutral"
    
    # 如果有后推断情绪，调整分数
    if post_emotion:
        post_emotion_type = post_emotion.get("emotion_from_content")
        if post_emotion_type and post_emotion_type in emotion_scores:
            emotion_scores[post_emotion_type] = 0.4
        elif post_emotion_type:
            # 如果后推断情绪不在NPC的情绪范围内，添加到分数中但分数较低
            emotion_scores[post_emotion_type] = 0.2
    
    # 归一化分数
    total_score = sum(emotion_scores.values())
    if total_score > 0:
        normalized_scores = {k: v/total_score for k, v in emotion_scores.items()}
    else:
        normalized_scores = {k: 1.0/len(emotion_scores) for k in emotion_scores.keys()}
    
    # 分析情绪变化
    analysis = f"情绪从 {pre_emotion.get('emotion_hint', 'unknown')} 调整为 {final_emotion}"
    if post_emotion and post_emotion.get("emotion_from_content"):
        analysis += f" (检测到内容情绪: {post_emotion['emotion_from_content']})"
    
    report = {
        "pre_emotion": pre_emotion.get("emotion_hint", "unknown"),
        "post_emotion": post_emotion.get("emotion_from_content", "unknown"),
        "final_emotion": final_emotion,
        "emotion_scores": normalized_scores,
        "confidence": post_emotion.get("confidence", 0.8) if post_emotion else 0.8,
        "analysis": analysis,
        "npc_baseline": baseline_emotion,
        "npc_emotion_range": list(emotion_scores.keys())
    }
    
    return report



def print_emotion_report(report: Dict[str, Any]):
    """打印情绪报告"""
    print("\n" + "="*60)
    print("📊 情绪分析报告")
    print("="*60)
    
    print(f"前推断情绪: {report['pre_emotion']}")
    print(f"后推断情绪: {report['post_emotion']}")
    print(f"最终情绪: {report['final_emotion']}")
    print(f"置信度: {report['confidence']:.2f}")
    print(f"分析: {report['analysis']}")
    
    print("\n情绪分数分布:")
    print("-" * 40)
    for emotion, score in report['emotion_scores'].items():
        bar_length = int(score * 30)
        bar = "█" * bar_length + " " * (30 - bar_length)
        print(f"{emotion:12} [{bar}] {score:.3f}")
    
    print("="*60)

In [None]:
# 两个npc连续对话演示脚本
def create_pipeline(api_key: Optional[str] = None, auto_compile: bool = True) -> IntegratedPipeline:
    """创建集成管道实例"""
    return IntegratedPipeline(api_key, auto_compile)

def demo_npc_conversation():
    """演示两个NPC之间的连续对话"""
    
    # 创建管道
    pipeline = create_pipeline()
    
    # 设置NPC名称映射（确保在对话开始前设置）
    npc_names = {
        "SV001": "Shane",
        "SV002": "Sam", 
        "SV003": "Linus",
        "guard_01": "City Guard"
    }
    pipeline.memory_store.set_npc_names(npc_names)
    
    print("🎭 开始NPC连续对话演示")
    print("=" * 60)
    
    # 定义对话的NPC
    npc1_id = "SV001"  # Shane (farmhand)
    npc2_id = "SV002"  # Sam (musician)
    
    # 初始对话
    current_speaker = npc1_id
    current_listener = npc2_id
    conversation_history = []
    
    # 起始消息
    initial_message = "Hello, how are you feeling today?"
    initial_emotion = "neutral"  # 起始消息的情绪
    
    print(f"\n💬 {pipeline._get_npc_name(current_speaker)} ({initial_emotion}): '{initial_message}'")
    
    # 记录第一轮发言
    conversation_history.append({
        "speaker": current_speaker,
        "listener": current_listener, 
        "message": initial_message,
        "emotion": initial_emotion
    })
    
    # 对话轮次
    max_rounds = 6  # 最多对话轮次，避免无限循环
    current_round = 0
    
    while current_round < max_rounds:
        # 获取当前发言者和倾听者
        speaker = current_speaker
        listener = current_listener
        
        # 获取最新的消息作为输入
        last_message = conversation_history[-1]["message"]
        
        print(f"💬 {pipeline._get_npc_name(speaker)} 对 {pipeline._get_npc_name(listener)} 说:")
        print(f"   '{last_message}'")
        
        # 处理查询，让listener回复speaker的消息
        result = pipeline.process_query(last_message, listener, f"npc_{speaker}")
        response = _extract_response(result)
        
        # 记录回复
        conversation_history.append({
            "speaker": listener,  # 回复者变成新的发言者
            "listener": speaker,  # 原来的发言者变成倾听者
            "message": response["text"],
            "emotion": response["emotion"]
        })
        
        print(f"💬 {pipeline._get_npc_name(listener)} ({response['emotion']}): '{response['text']}'")
        
        # 检查是否应该结束对话（基于回复内容或情绪）
        if _should_end_conversation(response["text"], response["emotion"]):
            print("\n💭 对话自然结束")
            break
        
        # 交换说话者和倾听者
        current_speaker, current_listener = current_listener, current_speaker
        current_round += 1
        
        # 添加小分隔线，但不是完全隔开
        if current_round < max_rounds:
            print("   ...")
    
    # 显示对话总结
    print(f"\n{'=' * 60}")
    print("📊 对话总结")
    print("=" * 60)
    
    for i, turn in enumerate(conversation_history):
        arrow = "→" if i % 2 == 0 else "←"
        if i == 0:
            print(f"\n初始发言:")
            print(f"  {pipeline._get_npc_name(turn['speaker'])} ({turn['emotion']}): {turn['message']}")
        else:
            print(f"\n第{i}轮回复:")
            print(f"  {pipeline._get_npc_name(turn['speaker'])} ({turn['emotion']}): {turn['message']}")
    
    # 显示记忆系统状态
    print(f"\n{'=' * 60}")
    print("🧠 记忆系统状态")
    print("=" * 60)
    _show_memory_status(pipeline)

def _should_end_conversation(text: str, emotion: str) -> bool:
    """判断是否应该结束对话"""
    # 基于内容判断
    end_phrases = [
        "goodbye", "bye", "see you", "farewell", "take care",
        "我得走了", "再见", "下次聊", "该走了"
    ]
    
    text_lower = text.lower()
    if any(phrase in text_lower for phrase in end_phrases):
        return True
    
    # 基于情绪判断 - 如果情绪非常负面可能结束对话
    negative_emotions = ["angry", "annoyed", "frustrated"]
    if emotion in negative_emotions:
        return True
    
    return False

def _extract_response(result: Dict[str, Any]) -> Dict[str, str]:
    """从结果中提取回复文本和情绪"""
    final_output = result.get("final_output", {})
    
    if isinstance(final_output, dict):
        if "final" in final_output:
            reply_data = final_output["final"]
            return {
                "text": reply_data.get('text', 'No response generated'),
                "emotion": reply_data.get('emotion', 'neutral')
            }
        elif "text" in final_output:
            return {
                "text": final_output.get('text', 'No response generated'),
                "emotion": final_output.get('emotion', 'neutral')
            }
    
    return {"text": "No response generated", "emotion": "neutral"}

def _show_memory_status(pipeline):
    """显示记忆系统状态"""
    try:
        # 检查短期记忆
        shortterm_file = Path("shortterm_memory.csv")
        if shortterm_file.exists():
            with open(shortterm_file, 'r', encoding='utf-8') as f:
                reader = csv.reader(f)
                rows = list(reader)
                print(f"短期记忆记录数: {len(rows) - 1}")
        
        # 检查长期记忆
        longterm_file = Path("longterm_memory.csv")
        if longterm_file.exists():
            with open(longterm_file, 'r', encoding='utf-8') as f:
                reader = csv.reader(f)
                rows = list(reader)
                print(f"长期记忆记录数: {len(rows) - 1}")
                if len(rows) > 1:
                    print("长期记忆内容:")
                    for i, row in enumerate(rows[1:], 1):
                        if len(row) >= 3:  # 确保有足够的数据
                            print(f"  {i}. {row[2]} (情绪: {row[3] if len(row) > 3 else 'unknown'})")
    except Exception as e:
        print(f"检查记忆状态失败: {e}")

# 运行演示
if __name__ == "__main__":
    demo_npc_conversation()

✓ 加载编译数据: 3 个NPC, 14 条公共知识
api from env
🎭 开始NPC连续对话演示

💬 Shane (neutral): 'Hello, how are you feeling today?'
💬 Shane 对 Sam 说:
   'Hello, how are you feeling today?'

=== 处理查询: Hello, how are you feeling today? ===
NPC: SV002, Player: npc_SV001
阶段2：护栏层处理中...

📊 情绪分析报告
前推断情绪: serious
后推断情绪: neutral
最终情绪: serious
置信度: 1.00
分析: 情绪从 serious 调整为 serious (检测到内容情绪: neutral)

情绪分数分布:
----------------------------------------
neutral      [█████████████                 ] 0.444
cheerful     [██████████                    ] 0.333
friendly     [███                           ] 0.111
serious      [███                           ] 0.111
阶段3：生成层处理中...
构建生成上下文...
生成候选回复...

QwenProvider.generate attempt 1
Parsed type: <class 'list'>
[ERROR] Missing key: reply
[WARN] Failed to parse JSON output, retrying...
Exiting generate after retries.

QwenProvider.generate attempt 1
Parsed type: <class 'dict'>
Parsed JSON: {'self_report': 'I feel upbeat and friendly after that exchange.', 'sentiment': 'positive'}

Qw

In [None]:
# # 自己作为user来和NPC交互的演示
# def create_pipeline(api_key: Optional[str] = None, auto_compile: bool = True) -> IntegratedPipeline:
#     """创建集成管道实例"""
#     return IntegratedPipeline(api_key, auto_compile)

# #实现两个player ID和NPC ID的交互演示
# def demo_integrated_pipeline():
#     """演示集成管道的使用"""
    
#     # 创建管道
#     pipeline = create_pipeline()
    
#     # 测试轮数
#     num_rounds = 3
#     start_texts = ["Hello,how are you feeling today?"]
#     test_cases = [
#         "What's new in the market today?",
#         "How are you feeling today?",
#         "Who is the king?",
#     ]
    
#     for user_text in test_cases:
#         print(f"\n{'='*50}")
#         print(f"测试: {user_text}")
#         print(f"{'='*50}")
        
#         # 确保传递 player_id
#         result = pipeline.process_query(user_text, "SV001", "Shane")
        
#         # 安全地显示结果摘要
#         final_output = result.get("final_output", {})
#         print("final_output结构:", list(final_output.keys()) if isinstance(final_output, dict) else "不是字典")
        
#         # 统一处理不同的输出结构
#         reply_text = "No response generated"
#         reply_emotion = "neutral"
        
#         if isinstance(final_output, dict):
#             if "final" in final_output:
#                 reply_data = final_output["final"]
#                 reply_text = reply_data.get('text', 'No text available')
#                 reply_emotion = reply_data.get('emotion', 'neutral')
#             elif "text" in final_output:
#                 # 处理平铺结构
#                 reply_text = final_output.get('text', 'No text available')
#                 reply_emotion = final_output.get('emotion', 'neutral')
        
#         print(f"\n{'='*50}")
#         print(f"回复: {reply_text}")
#         print(f"情绪: {reply_emotion}")
#         print(f"类型: {result.get('response_type')}")
#         print(f"允许: {result.get('allow')}")
        
#         # 显示情绪报告摘要
#         emotion_report = result.get("emotion_report", {})
#         if emotion_report:
#             print(f"\n情绪报告摘要:")
#             print(f"  最终情绪: {emotion_report.get('final_emotion', 'unknown')}")
#             scores = emotion_report.get('emotion_scores', {})
#             if scores:
#                 top_emotions = sorted(scores.items(), key=lambda x: x[1], reverse=True)[:3]
#                 print(f"  主要情绪: {', '.join([f'{e}({s:.2f})' for e, s in top_emotions])}")
        
#         print(f"{'='*50}\n")
        
#         if result.get("response_type") == "refusal":
#             print(f"拒答原因: {result.get('deny_reason')}")
        
#         if result.get("data_status") == "no_data_file":
#             print("状态: 使用简化模式（无数据文件）")

# # 运行演示
# if __name__ == "__main__":
#     demo_integrated_pipeline()

In [25]:
# def fix_all_issues():
#     """修复所有已知问题"""
#     print("开始修复所有问题...")
    
#     # 修复记忆文件
#     try:
#         longterm_file = Path("longterm_memory.csv")
#         if longterm_file.exists():
#             # 重新创建文件确保正确的格式
#             longterm_file.unlink()
        
#         with open(longterm_file, 'w', newline='', encoding='utf-8') as f:
#             writer = csv.DictWriter(f, fieldnames=["player_id", "npc_id", "fact", "timestamp"])
#             writer.writeheader()
#         print("✓ 记忆文件已修复")
#     except Exception as e:
#         print(f"✗ 记忆文件修复失败: {e}")
    
#     print("修复完成")

# # 在运行演示前调用修复
# fix_all_issues()