In [5]:
# 1. 导入依赖和设置环境
import json
import pandas as pd
import random
import time
import requests
import os
import jieba
import re
from typing import Dict, List, Any, Tuple
from datetime import datetime
from collections import Counter

# 安装和导入OpenHowNet
try:
    import OpenHowNet
    print("✅ OpenHowNet已导入")
except ImportError:
    print("⚠️ 正在安装OpenHowNet...")
    import subprocess
    import sys
    subprocess.check_call([sys.executable, "-m", "pip", "install", "OpenHowNet"])
    import OpenHowNet
    print("✅ OpenHowNet安装并导入成功")

print("🚀 中文Taboo实验环境初始化完成")
print("📋 实验目标: 使用OpenHowNet构建100个中文词汇的Taboo数据集")
print("🎯 词性分布: 名词、动词、形容词、副词各25个")


ModuleNotFoundError: No module named 'jieba'

In [None]:
# 2. 初始化OpenHowNet和中文处理工具
print("🔧 正在初始化OpenHowNet和中文处理工具...")

# 初始化OpenHowNet实例
try:
    hownet_dict = OpenHowNet.HowNetDict()
    print("✅ OpenHowNet词典加载成功")
    print(f"📚 词典包含词汇数量: {len(hownet_dict)} 个概念")
except Exception as e:
    print(f"❌ OpenHowNet初始化失败: {e}")
    print("🔄 尝试重新下载HowNet数据...")
    hownet_dict = OpenHowNet.HowNetDict(init_sim=True)

# 设置jieba分词
jieba.setLogLevel(20)  # 减少jieba的日志输出
print("✅ jieba分词工具已配置")

# 设置随机种子
random.seed(42)
print("🎲 随机种子已设置为42，确保实验可复现")


In [None]:
# 3. 中文词汇数据集构建工具函数

def get_pos_mapping():
    """HowNet词性到标准词性的映射"""
    return {
        # 名词类
        'N': 'noun', 'noun': 'noun',
        # 动词类  
        'V': 'verb', 'verb': 'verb',
        # 形容词类
        'A': 'adj', 'adj': 'adj', 'a': 'adj',
        # 副词类
        'D': 'adv', 'adv': 'adv', 'd': 'adv'
    }

def is_valid_chinese_word(word: str) -> bool:
    """检查是否为有效的中文词汇"""
    if not word or len(word) < 1:
        return False
    
    # 检查是否包含中文字符
    chinese_pattern = re.compile(r'[\u4e00-\u9fff]+')
    if not chinese_pattern.search(word):
        return False
    
    # 过滤过长或过短的词
    if len(word) > 6 or len(word) < 1:
        return False
    
    # 过滤包含特殊字符的词
    special_chars = ['·', '—', '…', '〈', '〉', '《', '》', '「', '」']
    if any(char in word for char in special_chars):
        return False
    
    return True

def extract_similar_words_from_hownet(target_word: str, target_pos: str, hownet_dict, max_count: int = 10) -> List[str]:
    """从HowNet中提取与目标词相似的词汇作为禁用词候选"""
    similar_words = set()
    
    try:
        # 获取目标词的义项
        word_senses = hownet_dict.get_senses(target_word)
        if not word_senses:
            return []
        
        # 从第一个义项开始提取相似词
        primary_sense = word_senses[0]
        
        # 方法1: 获取同义词
        try:
            synonyms = hownet_dict.get_synonyms(target_word)
            for syn_group in synonyms:
                for word in syn_group:
                    if is_valid_chinese_word(word) and word != target_word:
                        similar_words.add(word)
        except:
            pass
        
        # 方法2: 通过语义相似度获取相似词
        try:
            # 获取HowNet中所有词汇，然后计算相似度
            all_words = list(hownet_dict.get_vocab())
            chinese_words = [w for w in all_words if is_valid_chinese_word(w)]
            
            # 随机采样一些词汇计算相似度（避免计算量过大）
            sample_size = min(1000, len(chinese_words))
            sampled_words = random.sample(chinese_words, sample_size)
            
            word_similarities = []
            for word in sampled_words:
                if word != target_word:
                    try:
                        similarity = hownet_dict.calculate_word_similarity(target_word, word)
                        if similarity > 0.3:  # 相似度阈值
                            word_similarities.append((word, similarity))
                    except:
                        continue
            
            # 按相似度排序，取前几个
            word_similarities.sort(key=lambda x: x[1], reverse=True)
            for word, _ in word_similarities[:5]:
                similar_words.add(word)
        except:
            pass
        
        # 方法3: 从定义中提取关键词
        try:
            definitions = [sense.get('def', '') for sense in word_senses]
            for definition in definitions:
                # 使用jieba分词提取定义中的关键词
                words_in_def = jieba.lcut(definition)
                for word in words_in_def:
                    if is_valid_chinese_word(word) and word != target_word and len(word) >= 2:
                        similar_words.add(word)
        except:
            pass
    
    except Exception as e:
        print(f"⚠️ 提取 {target_word} 的相似词时出错: {e}")
    
    # 过滤并返回结果
    result = [word for word in similar_words if is_valid_chinese_word(word)][:max_count]
    return result

print("✅ 中文词汇处理工具函数已定义")


In [None]:
# 4. 构建中文Taboo数据集
print("🏗️ 开始构建中文Taboo数据集...")

def build_chinese_taboo_dataset(hownet_dict, target_count_per_pos: int = 25) -> List[Dict[str, Any]]:
    """构建中文Taboo数据集"""
    
    pos_mapping = get_pos_mapping()
    target_pos_list = ['noun', 'verb', 'adj', 'adv']
    dataset = []
    
    print(f"📊 目标: 每个词性 {target_count_per_pos} 个词，总计 {target_count_per_pos * 4} 个词")
    
    # 获取HowNet词汇表
    all_vocab = list(hownet_dict.get_vocab())
    chinese_vocab = [word for word in all_vocab if is_valid_chinese_word(word)]
    print(f"📚 HowNet中文词汇总数: {len(chinese_vocab)} 个")
    
    # 按词性分组收集词汇
    words_by_pos = {pos: [] for pos in target_pos_list}
    
    print("🔍 正在分析词汇词性...")
    progress_count = 0
    
    for word in chinese_vocab:
        progress_count += 1
        if progress_count % 1000 == 0:
            print(f"   已处理 {progress_count}/{len(chinese_vocab)} 个词汇")
        
        try:
            # 获取词汇的义项信息
            senses = hownet_dict.get_senses(word)
            if not senses:
                continue
            
            # 获取主要词性
            primary_sense = senses[0]
            pos_info = primary_sense.get('pos', '')
            
            # 映射到标准词性
            standard_pos = pos_mapping.get(pos_info, None)
            if standard_pos and standard_pos in target_pos_list:
                words_by_pos[standard_pos].append({
                    'word': word,
                    'senses': senses,
                    'primary_pos': standard_pos
                })
        
        except Exception:
            continue
    
    print("\n📈 词性分布统计:")
    for pos, words in words_by_pos.items():
        print(f"   {pos}: {len(words)} 个候选词")
    
    # 为每个词性随机选择指定数量的词汇
    print("\n🎯 开始选择目标词汇并生成禁用词...")
    
    for pos in target_pos_list:
        available_words = words_by_pos[pos]
        if len(available_words) < target_count_per_pos:
            print(f"⚠️ {pos} 词性可用词汇不足 ({len(available_words)} < {target_count_per_pos})")
            selected_count = len(available_words)
        else:
            selected_count = target_count_per_pos
        
        # 随机选择词汇
        selected_words = random.sample(available_words, selected_count)
        print(f"\n🔄 正在处理 {pos} 类词汇 ({selected_count} 个)...")
        
        for i, word_info in enumerate(selected_words):
            target_word = word_info['word']
            senses = word_info['senses']
            
            print(f"   处理 {i+1}/{selected_count}: {target_word}")
            
            # 生成禁用词
            taboo_words = extract_similar_words_from_hownet(
                target_word, pos, hownet_dict, max_count=8
            )
            
            # 如果禁用词不够，添加一些通用的相关词
            if len(taboo_words) < 5:
                # 使用jieba分词从定义中提取更多词汇
                for sense in senses[:2]:  # 只取前两个义项
                    definition = sense.get('def', '')
                    def_words = jieba.lcut(definition)
                    for def_word in def_words:
                        if (is_valid_chinese_word(def_word) and 
                            def_word != target_word and 
                            len(def_word) >= 2 and 
                            def_word not in taboo_words):
                            taboo_words.append(def_word)
                            if len(taboo_words) >= 5:
                                break
            
            # 确保至少有5个禁用词
            taboo_words = taboo_words[:5]  # 限制为5个
            if len(taboo_words) < 5:
                # 如果还是不够，添加一些通用词汇
                generic_taboos = ['东西', '事物', '物品', '概念', '内容']
                for generic in generic_taboos:
                    if generic not in taboo_words and generic != target_word:
                        taboo_words.append(generic)
                        if len(taboo_words) >= 5:
                            break
            
            # 构建数据集条目
            entry = {
                'target': target_word,
                'part_of_speech': pos,
                'taboo': taboo_words[:5],  # 确保正好5个禁用词
                'category': 'chinese_general',
                'senses': senses,
                'metadata': {
                    'sense_count': len(senses),
                    'taboo_count': len(taboo_words[:5]),
                    'source': 'openhownet'
                }
            }
            
            dataset.append(entry)
    
    return dataset

# 构建数据集
chinese_dataset = build_chinese_taboo_dataset(hownet_dict, target_count_per_pos=25)
print(f"\n✅ 中文Taboo数据集构建完成！")
print(f"📊 总词汇数: {len(chinese_dataset)} 个")


In [None]:
# 5. 数据集统计分析
print("📊 中文Taboo数据集统计分析:")
print("=" * 50)

# 基本统计
total_words = len(chinese_dataset)
print(f"📝 总词汇数: {total_words}")

# 词性分布
pos_counts = {}
taboo_counts = []
sense_counts = []

for item in chinese_dataset:
    pos = item.get('part_of_speech', 'unknown')
    pos_counts[pos] = pos_counts.get(pos, 0) + 1
    taboo_counts.append(len(item.get('taboo', [])))
    sense_counts.append(len(item.get('senses', [])))

print(f"\n🏷️ 词性分布:")
for pos, count in sorted(pos_counts.items(), key=lambda x: x[1], reverse=True):
    percentage = count / total_words * 100
    print(f"   {pos}: {count} 个 ({percentage:.1f}%)")

print(f"\n🚫 禁用词统计:")
print(f"   平均数量: {sum(taboo_counts) / len(taboo_counts):.1f}")
print(f"   范围: {min(taboo_counts)} - {max(taboo_counts)}")

print(f"\n💭 义项统计:")
print(f"   平均数量: {sum(sense_counts) / len(sense_counts):.1f}")
print(f"   范围: {min(sense_counts)} - {max(sense_counts)}")

# 显示数据样本
print(f"\n📋 数据样本 (随机5个):")
sample_items = random.sample(chinese_dataset, min(5, len(chinese_dataset)))
for i, item in enumerate(sample_items, 1):
    print(f"\n   样本 {i}:")
    print(f"     目标词: {item['target']}")
    print(f"     词性: {item['part_of_speech']}")
    print(f"     禁用词: {item['taboo']}")
    if item.get('senses') and len(item['senses']) > 0:
        definition = item['senses'][0].get('def', '无定义')
        print(f"     定义: {definition[:50]}...")

print(f"\n✅ 统计分析完成")


In [None]:
# 6. 保存中文数据集
print("💾 保存中文Taboo数据集...")

# 创建数据目录
data_dir = "data"
if not os.path.exists(data_dir):
    os.makedirs(data_dir)
    print(f"📁 创建数据目录: {data_dir}")

# 保存完整数据集
chinese_dataset_path = os.path.join(data_dir, "chinese_dataset.json")
with open(chinese_dataset_path, 'w', encoding='utf-8') as f:
    json.dump(chinese_dataset, f, ensure_ascii=False, indent=2)
print(f"✅ 完整数据集已保存: {chinese_dataset_path}")

# 创建简化版数据集（用于快速测试）
simplified_dataset = []
for item in chinese_dataset:
    simplified_item = {
        'target': item['target'],
        'part_of_speech': item['part_of_speech'],
        'taboo': item['taboo'],
        'category': item['category']
    }
    simplified_dataset.append(simplified_item)

simplified_path = os.path.join(data_dir, "chinese_dataset_simple.json")
with open(simplified_path, 'w', encoding='utf-8') as f:
    json.dump(simplified_dataset, f, ensure_ascii=False, indent=2)
print(f"✅ 简化数据集已保存: {simplified_path}")

# 生成数据集报告
report = {
    'dataset_info': {
        'total_words': len(chinese_dataset),
        'creation_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
        'source': 'OpenHowNet',
        'language': 'Chinese',
        'pos_distribution': pos_counts,
        'avg_taboo_count': sum(taboo_counts) / len(taboo_counts),
        'avg_sense_count': sum(sense_counts) / len(sense_counts)
    },
    'sample_data': sample_items
}

report_path = os.path.join(data_dir, "chinese_dataset_report.json")
with open(report_path, 'w', encoding='utf-8') as f:
    json.dump(report, f, ensure_ascii=False, indent=2)
print(f"✅ 数据集报告已保存: {report_path}")

print(f"\n🎉 中文Taboo数据集构建完成！")
print(f"📁 数据文件位置:")
print(f"   完整版: {chinese_dataset_path}")
print(f"   简化版: {simplified_path}")
print(f"   报告: {report_path}")


In [None]:
# 7. API客户端设置（支持中文模型）
print("🔧 设置中文Taboo实验API客户端...")

def load_api_keys(keys_path: str = "api_keys.json") -> Dict[str, str]:
    """加载API密钥"""
    with open(keys_path, 'r', encoding='utf-8') as f:
        return json.load(f)

class ChineseTabooClient:
    """中文Taboo游戏专用API客户端"""
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://openrouter.ai/api/v1/chat/completions"
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
    
    def call_model(self, model: str, messages: List[Dict[str, str]], temperature: float = 0.3) -> str:
        """调用模型API"""
        payload = {
            "model": model,
            "messages": messages,
            "temperature": temperature,
            "max_tokens": 2000
        }
        
        response = requests.post(self.base_url, headers=self.headers, json=payload, timeout=30)
        response.raise_for_status()
        result = response.json()
        content = result['choices'][0]['message']['content'].strip()
        
        return content

# 初始化API客户端
try:
    api_keys = load_api_keys()
    chinese_client = ChineseTabooClient(api_keys["OPENROUTER_API_KEY"])
    print("✅ 中文Taboo API客户端初始化成功")
except Exception as e:
    print(f"❌ API客户端初始化失败: {e}")
    chinese_client = None

# 定义支持中文的测试模型
CHINESE_TEST_MODELS = [
    "openai/gpt-4o",  # GPT-4o 支持中文
    "google/gemini-2.5-flash",  # Gemini 支持中文
    "deepseek/deepseek-chat-v3-0324",  # DeepSeek 中文模型
    "anthropic/claude-sonnet-4",  # Claude 支持中文
    "moonshotai/kimi-k2",  # kimi-k2 的API路径

]

print(f"🤖 中文实验模型列表 ({len(CHINESE_TEST_MODELS)} 个):")
for i, model in enumerate(CHINESE_TEST_MODELS, 1):
    print(f"   {i}. {model}")

print(f"\n💡 选择较少模型进行测试以节省成本和时间")


In [None]:
# 8. 中文Taboo游戏核心逻辑和工具函数
print("🎮 定义中文Taboo游戏核心逻辑...")

def safe_chinese_text_cleanup(text: str, max_length: int = 300) -> str:
    """安全清理中文文本"""
    if not text:
        return ""
    
    # 保留中文字符、英文字符、数字和常用标点
    import re
    cleaned = re.sub(r'[^\u4e00-\u9fff\w\s\.,!?;:"\'()[\]{}\-]', '', str(text))
    
    if len(cleaned) > max_length:
        cleaned = cleaned[:max_length] + "..."
    
    return cleaned

def extract_chinese_clue_text(response: str) -> str:
    """从响应中提取中文线索文本"""
    if "FORMAT_ERROR_EXCEEDED" in response:
        return "FORMAT_ERROR"
    
    # 检查中文格式标记
    if '[线索]' in response or '[CLUE]' in response.upper():
        import re
        # 优先匹配中文标记
        match = re.search(r'\[线索\]\s*(.+)', response, re.DOTALL)
        if not match:
            match = re.search(r'\[CLUE\]\s*(.+)', response, re.IGNORECASE | re.DOTALL)
        
        if match:
            return match.group(1).strip()
    
    # 备用格式
    if '线索:' in response or 'Clue:' in response:
        if '线索:' in response:
            return response.split('线索:')[1].strip()
        else:
            return response.split('Clue:')[1].strip()
    
    return "INVALID_FORMAT"

def extract_chinese_guess_word(response: str) -> str:
    """从响应中提取中文猜测词"""
    if "FORMAT_ERROR_EXCEEDED" in response:
        return "FORMAT_ERROR"
    
    # 检查中文格式标记
    if '[猜测]' in response or '[GUESS]' in response.upper():
        import re
        # 优先匹配中文标记
        match = re.search(r'\[猜测\]\s*(.+)', response)
        if not match:
            match = re.search(r'\[GUESS\]\s*(.+)', response, re.IGNORECASE)
        
        if match:
            guess_part = match.group(1).strip()
            # 提取第一个中文词汇
            chinese_words = re.findall(r'[\u4e00-\u9fff]+', guess_part)
            if chinese_words:
                return chinese_words[0]
    
    # 备用格式
    if '猜测:' in response or 'Guess:' in response:
        if '猜测:' in response:
            guess_part = response.split('猜测:')[1].strip()
        else:
            guess_part = response.split('Guess:')[1].strip()
        
        chinese_words = re.findall(r'[\u4e00-\u9fff]+', guess_part)
        if chinese_words:
            return chinese_words[0]
    
    return "INVALID_FORMAT"

def check_chinese_taboo_violation(hint: str, taboo_words: List[str]) -> bool:
    """检查中文线索是否违反禁用词规则"""
    hint_cleaned = re.sub(r'[^\u4e00-\u9fff]', '', hint.lower())
    
    for taboo in taboo_words:
        taboo_cleaned = re.sub(r'[^\u4e00-\u9fff]', '', taboo.lower())
        
        # 检查完整匹配
        if taboo_cleaned in hint_cleaned:
            return True
        
        # 检查部分匹配（对于较长的词）
        if len(taboo_cleaned) >= 2:
            # 检查是否包含禁用词的主要部分
            if len(taboo_cleaned) >= 3:
                core_part = taboo_cleaned[:2]  # 取前两个字符作为核心
                if core_part in hint_cleaned:
                    return True
    
    return False

def robust_chinese_api_call(client, model: str, base_prompt: str, expected_prefix: str, max_retries: int = 3):
    """健壮的中文API调用"""
    failed_outputs = []
    
    for attempt in range(1, max_retries + 1):
        try:
            if attempt == 1:
                prompt = base_prompt
            else:
                prev_output = failed_outputs[-1] if failed_outputs else "未知"
                format_reminder = f"""

⚠️ 格式错误 ⚠️
您之前的回复是: "{prev_output}"

必需格式:
- 您必须以 '{expected_prefix}' 开头（包括方括号）
- 不要在 {expected_prefix} 前添加任何文字

请使用正确格式重试:"""
                prompt = base_prompt + format_reminder
            
            response = client.call_model(model, [{"role": "user", "content": prompt}])
            
            if (response.strip().startswith(expected_prefix) or 
                response.strip().upper().startswith(expected_prefix.upper())):
                return {
                    'success': True,
                    'response': response,
                    'attempts': attempt,
                    'error': None,
                    'failed_outputs': failed_outputs
                }
            else:
                safe_response = safe_chinese_text_cleanup(response, max_length=150)
                failed_outputs.append(safe_response)
                
                if attempt == max_retries:
                    all_failed = " | ".join(failed_outputs)
                    return {
                        'success': False,
                        'response': f"FORMAT_ERROR_EXCEEDED: {safe_response}",
                        'attempts': attempt,
                        'error': f"尝试 {max_retries} 次后失败。期望格式 '{expected_prefix}'。所有失败输出: {all_failed}",
                        'failed_outputs': failed_outputs
                    }
                time.sleep(0.5)
                
        except Exception as e:
            safe_error = safe_chinese_text_cleanup(str(e), max_length=150)
            error_msg = f"API错误 (尝试 {attempt}/{max_retries}): {safe_error}"
            
            if attempt == max_retries:
                return {
                    'success': False,
                    'response': None,
                    'attempts': attempt,
                    'error': error_msg,
                    'failed_outputs': failed_outputs
                }
            time.sleep(1.0)
    
    return {
        'success': False,
        'response': None,
        'attempts': max_retries,
        'error': "超出最大重试次数",
        'failed_outputs': failed_outputs
    }

print("✅ 中文Taboo游戏核心逻辑已定义")


In [None]:
# 9. 中文Taboo游戏主函数
def play_chinese_taboo_game(client, hinter_model, guesser_model, target_word, taboo_words, max_turns=5):
    """中文Taboo游戏主函数"""
    
    conversation_history = []
    taboo_list = '、'.join(taboo_words)
    
    # 中文系统提示 - Hinter
    hinter_system = f"""您正在玩中文Taboo游戏，担任提示者角色。

关键格式要求:
- 您必须以 '[线索]' 开头回复（包括方括号）
- 不要在 [线索] 前添加任何文字
- 示例: [线索] 这是一种有长鼻子的大型哺乳动物

游戏规则:
1. 绝对不能使用这些禁用词: {taboo_list}
2. 给出有用的线索帮助猜测者找到目标词
3. 可以看到对话历史，在之前线索基础上补充
4. 要有创意但严格遵循格式
5. 使用标准中文表达

目标词: {target_word}
禁用词（绝对禁止）: {taboo_list}"""

    # 中文系统提示 - Guesser
    guesser_system = """您正在玩中文Taboo游戏，担任猜测者角色。

关键格式要求:
- 您必须以 '[猜测]' 开头回复（包括方括号）
- 不要在 [猜测] 前添加任何文字
- 示例: [猜测] 大象

游戏规则:
1. 根据收到的所有线索进行最佳猜测
2. 可以看到对话历史
3. 在 [猜测] 后只给出一个中文词汇作为答案
4. 使用标准中文词汇"""

    # 记录统计信息
    total_hinter_attempts = 0
    total_guesser_attempts = 0
    format_errors = []
    hinter_failed_outputs = []
    guesser_failed_outputs = []

    for turn in range(1, max_turns + 1):
        # 构建Hinter提示
        if turn == 1:
            hinter_prompt = f"{hinter_system}\n\n请提供您的第一个线索:"
        else:
            history_text = "\n".join([f"第{i}轮: {msg}" for i, msg in enumerate(conversation_history, 1)])
            hinter_prompt = f"{hinter_system}\n\n对话历史:\n{history_text}\n\n猜测者还没有找到答案。请提供下一个线索:"
        
        # Hinter给出线索
        hinter_result = robust_chinese_api_call(client, hinter_model, hinter_prompt, "[线索]", max_retries=3)
        total_hinter_attempts += hinter_result['attempts']
        
        if hinter_result.get('failed_outputs'):
            hinter_failed_outputs.extend(hinter_result['failed_outputs'])
        
        if not hinter_result['success']:
            error_type = "FORMAT_FAILURE" if "FORMAT_ERROR_EXCEEDED" in str(hinter_result.get('response', '')) else "API_FAILURE"
            format_errors.append(f"第{turn}轮 提示者: {hinter_result['error']}")
            
            return {
                'success': False,
                'turns': turn,
                'conversation': conversation_history,
                'final_guess': f"HINTER_{error_type}",
                'error': f"{error_type}: {hinter_result['error']}",
                'failure_reason': error_type,
                'total_hinter_attempts': total_hinter_attempts,
                'total_guesser_attempts': total_guesser_attempts,
                'format_errors': format_errors,
                'hinter_failed_outputs': hinter_failed_outputs,
                'guesser_failed_outputs': guesser_failed_outputs,
                'all_hints': [msg for msg in conversation_history if msg.startswith('提示者:')],
                'all_guesses': [msg for msg in conversation_history if msg.startswith('猜测者:')]
            }
        
        # 提取线索并检查taboo violation
        hint_text = extract_chinese_clue_text(hinter_result['response'])
        
        # 检查是否违反禁用词规则
        taboo_violated = check_chinese_taboo_violation(hint_text, taboo_words)
        if taboo_violated:
            return {
                'success': False,
                'turns': turn,
                'conversation': conversation_history,
                'final_guess': '违反禁用词规则: 提示者违规',
                'error': f'违反禁用词规则: 提示者在第{turn}轮违反规则，使用了禁用词: {hint_text}',
                'failure_reason': 'TABOO_VIOLATION',
                'taboo_violation_turn': turn,
                'taboo_violation_hint': hint_text,
                'total_hinter_attempts': total_hinter_attempts,
                'total_guesser_attempts': total_guesser_attempts,
                'format_errors': format_errors,
                'hinter_failed_outputs': hinter_failed_outputs,
                'guesser_failed_outputs': guesser_failed_outputs,
                'all_hints': [msg for msg in conversation_history if msg.startswith('提示者:')],
                'all_guesses': [msg for msg in conversation_history if msg.startswith('猜测者:')]
            }
        
        conversation_history.append(f"提示者: {hinter_result['response']}")
        
        # 构建Guesser提示
        history_text = "\n".join([f"第{i}轮: {msg}" for i, msg in enumerate(conversation_history, 1)])
        guesser_prompt = f"{guesser_system}\n\n对话历史:\n{history_text}\n\n您的猜测是什么?"
        
        # Guesser进行猜测
        guesser_result = robust_chinese_api_call(client, guesser_model, guesser_prompt, "[猜测]", max_retries=3)
        total_guesser_attempts += guesser_result['attempts']
        
        if guesser_result.get('failed_outputs'):
            guesser_failed_outputs.extend(guesser_result['failed_outputs'])
        
        if not guesser_result['success']:
            error_type = "FORMAT_FAILURE" if "FORMAT_ERROR_EXCEEDED" in str(guesser_result.get('response', '')) else "API_FAILURE"
            format_errors.append(f"第{turn}轮 猜测者: {guesser_result['error']}")
            
            return {
                'success': False,
                'turns': turn,
                'conversation': conversation_history,
                'final_guess': f"GUESSER_{error_type}",
                'error': f"{error_type}: {guesser_result['error']}",
                'failure_reason': error_type,
                'total_hinter_attempts': total_hinter_attempts,
                'total_guesser_attempts': total_guesser_attempts,
                'format_errors': format_errors,
                'hinter_failed_outputs': hinter_failed_outputs,
                'guesser_failed_outputs': guesser_failed_outputs,
                'all_hints': [msg for msg in conversation_history if msg.startswith('提示者:')],
                'all_guesses': [msg for msg in conversation_history if msg.startswith('猜测者:')]
            }
        
        conversation_history.append(f"猜测者: {guesser_result['response']}")
        guess = extract_chinese_guess_word(guesser_result['response'])
        
        # 检查是否成功
        if guess == target_word:
            return {
                'success': True,
                'turns': turn,
                'conversation': conversation_history,
                'final_guess': guess,
                'failure_reason': None,
                'total_hinter_attempts': total_hinter_attempts,
                'total_guesser_attempts': total_guesser_attempts,
                'format_errors': format_errors,
                'hinter_failed_outputs': hinter_failed_outputs,
                'guesser_failed_outputs': guesser_failed_outputs,
                'all_hints': [msg for msg in conversation_history if msg.startswith('提示者:')],
                'all_guesses': [msg for msg in conversation_history if msg.startswith('猜测者:')]
            }
        
        # 如果不是最后一轮，添加反馈
        if turn < max_turns:
            conversation_history.append(f"系统: '{guess}' 不正确。请继续！")
    
    # 达到最大轮数仍未成功
    return {
        'success': False,
        'turns': max_turns,
        'conversation': conversation_history,
        'final_guess': guess if 'guess' in locals() else 'N/A',
        'failure_reason': 'MAX_TURNS_EXCEEDED',
        'total_hinter_attempts': total_hinter_attempts,
        'total_guesser_attempts': total_guesser_attempts,
        'format_errors': format_errors,
        'hinter_failed_outputs': hinter_failed_outputs,
        'guesser_failed_outputs': guesser_failed_outputs,
        'all_hints': [msg for msg in conversation_history if msg.startswith('提示者:')],
        'all_guesses': [msg for msg in conversation_history if msg.startswith('猜测者:')]
    }

print("✅ 中文Taboo游戏主函数已定义")


In [None]:
# 10. 执行中文Taboo测试实验
print("🧪 开始执行中文Taboo测试实验...")

def run_chinese_test_experiment(client, models, dataset, num_test_words=3):
    """运行中文Taboo测试实验"""
    
    if not client:
        print("❌ API客户端未初始化，无法执行实验")
        return None
    
    print(f"\n🎯 测试配置:")
    print(f"   测试词汇数: {num_test_words}")
    print(f"   模型数量: {len(models)}")
    print(f"   总游戏数: {num_test_words * len(models) * len(models)}")
    
    # 随机选择测试词汇
    test_words = random.sample(dataset, min(num_test_words, len(dataset)))
    print(f"\n📋 选择的测试词汇:")
    for i, word_data in enumerate(test_words, 1):
        print(f"   {i}. {word_data['target']} ({word_data['part_of_speech']}) - 禁用词: {word_data['taboo']}")
    
    all_results = []
    total_games = len(test_words) * len(models) * len(models)
    game_counter = 0
    
    print(f"\n🚀 开始执行实验...")
    
    for word_data in test_words:
        target_word = word_data['target']
        taboo_words = word_data['taboo']
        
        print(f"\n🎯 测试词汇: {target_word}")
        print(f"🚫 禁用词: {taboo_words}")
        
        for hinter_model in models:
            for guesser_model in models:
                game_counter += 1
                hinter_name = hinter_model.split('/')[-1]
                guesser_name = guesser_model.split('/')[-1]
                pair_name = f"{hinter_name}→{guesser_name}"
                
                print(f"  🔄 游戏 {game_counter}/{total_games}: {pair_name}")
                
                start_time = time.time()
                
                try:
                    # 执行游戏
                    game_result = play_chinese_taboo_game(
                        client, hinter_model, guesser_model, 
                        target_word, taboo_words, max_turns=5
                    )
                    
                    duration = round(time.time() - start_time, 2)
                    
                    # 记录结果
                    result = {
                        'game_id': game_counter,
                        'target_word': target_word,
                        'part_of_speech': word_data['part_of_speech'],
                        'category': word_data['category'],
                        'taboo_words': '|'.join(taboo_words),
                        'hinter_model': hinter_model,
                        'guesser_model': guesser_model,
                        'success': game_result['success'],
                        'turns_used': game_result['turns'],
                        'final_guess': game_result['final_guess'],
                        'failure_reason': game_result.get('failure_reason', None),
                        'taboo_violation_turn': game_result.get('taboo_violation_turn', None),
                        'taboo_violation_hint': game_result.get('taboo_violation_hint', None),
                        'has_taboo_violation': game_result.get('failure_reason') == 'TABOO_VIOLATION',
                        'all_hints': ' | '.join(game_result['all_hints']),
                        'all_guesses': ' | '.join(game_result['all_guesses']),
                        'conversation': ' | '.join(game_result['conversation']),
                        'total_api_attempts': game_result.get('total_hinter_attempts', 0) + game_result.get('total_guesser_attempts', 0),
                        'format_errors': ' | '.join(game_result.get('format_errors', [])),
                        'has_format_errors': len(game_result.get('format_errors', [])) > 0,
                        'duration_seconds': duration,
                        'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                        'language': 'chinese',
                        'dataset_source': 'openhownet'
                    }
                    
                    if 'error' in game_result:
                        result['error'] = game_result['error']
                    
                    all_results.append(result)
                    
                    # 显示结果
                    status = "✅ 成功" if game_result['success'] else "❌ 失败"
                    failure_info = ""
                    if not game_result['success'] and game_result.get('failure_reason'):
                        failure_reason = game_result['failure_reason']
                        if failure_reason == 'TABOO_VIOLATION':
                            failure_info = " (违反禁用词)"
                        elif failure_reason == 'FORMAT_FAILURE':
                            failure_info = " (格式错误)"
                        elif failure_reason == 'API_FAILURE':
                            failure_info = " (API失败)"
                        elif failure_reason == 'MAX_TURNS_EXCEEDED':
                            failure_info = " (轮数耗尽)"
                    
                    print(f"     {status}{failure_info} | {game_result['turns']}轮 | 最终猜测: {game_result['final_guess']}")
                    
                except Exception as e:
                    print(f"     ❌ 游戏执行异常: {e}")
                    # 记录异常结果
                    result = {
                        'game_id': game_counter,
                        'target_word': target_word,
                        'hinter_model': hinter_model,
                        'guesser_model': guesser_model,
                        'success': False,
                        'failure_reason': 'EXCEPTION',
                        'error': str(e),
                        'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                        'language': 'chinese'
                    }
                    all_results.append(result)
                
                time.sleep(0.5)  # API调用间隔
    
    return all_results

# 执行测试实验
if chinese_client:
    test_results = run_chinese_test_experiment(
        chinese_client, CHINESE_TEST_MODELS, chinese_dataset, num_test_words=3
    )
    
    if test_results:
        print(f"\n🎉 中文Taboo测试实验完成！")
        print(f"📊 总游戏数: {len(test_results)}")
        
        # 统计结果
        successful_games = [r for r in test_results if r['success']]
        success_rate = len(successful_games) / len(test_results) * 100
        print(f"📈 成功率: {len(successful_games)}/{len(test_results)} ({success_rate:.1f}%)")
        
        # 保存测试结果
        test_results_path = f"results/chinese_test_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
        os.makedirs("results", exist_ok=True)
        
        df_results = pd.DataFrame(test_results)
        df_results.to_csv(test_results_path, index=False, encoding='utf-8-sig')
        print(f"💾 测试结果已保存: {test_results_path}")
        
        # 按模型统计
        print(f"\n📊 各模型表现:")
        for model in CHINESE_TEST_MODELS:
            model_name = model.split('/')[-1]
            model_as_hinter = [r for r in test_results if r['hinter_model'] == model]
            model_as_guesser = [r for r in test_results if r['guesser_model'] == model]
            
            hinter_success = len([r for r in model_as_hinter if r['success']])
            guesser_success = len([r for r in model_as_guesser if r['success']])
            
            print(f"   {model_name}:")
            if len(model_as_hinter) > 0:
                print(f"     作为提示者: {hinter_success}/{len(model_as_hinter)} ({hinter_success/len(model_as_hinter)*100:.1f}%)")
            if len(model_as_guesser) > 0:
                print(f"     作为猜测者: {guesser_success}/{len(model_as_guesser)} ({guesser_success/len(model_as_guesser)*100:.1f}%)")
    else:
        print("❌ 测试实验失败")
else:
    print("❌ 无法执行测试实验：API客户端未初始化")
