[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/0x-yuan/clintrial-nlp/blob/main/atomic_agents_baseline.ipynb)

# Atomic Agents 框架基線 - 臨床試驗 NLP

## 概述

本notebook展示如何使用簡單的多代理思考模式進行臨床試驗自然語言推理(NLI)。我們模擬「原子代理」概念，其中每個代理都是一個專門的思考功能，最終協作做出決策。

## 📚 學習目標
- 理解多代理思考的基本概念
- 建立專門的分析代理思考模式
- 實作簡單但有效的協作推理
- 評估系統效能

### 🏗️ 代理思考架構
我們實作4個專門的思考代理：
1. **醫療專家思考**: 分析醫學概念和臨床意義
2. **數值分析思考**: 處理統計數據和計算
3. **邏輯檢查思考**: 驗證推理邏輯
4. **決策綜合思考**: 整合所有分析做出最終判斷

> 💡 **核心概念**: "原子"指每個代理專注於單一、明確的思考任務，最終組合成完整的分析系統。

In [None]:
# 🔧 環境設置 - 一鍵安裝所需套件
!pip install -q google-generativeai python-dotenv pandas tqdm gdown

print("✅ 所有套件安裝完成！")

In [None]:
# 📥 從 Google Drive 下載訓練資料
import os
import gdown
import zipfile
import shutil

# Google Drive zip 檔案 ID
file_id = "15GA5XI39DDxQ5QkIZXsFbApx1yEvCpcR"
zip_url = f"https://drive.google.com/uc?id={file_id}"
zip_filename = "clinicaltrial-nlp.zip"

if not os.path.exists("training_data"):
    print("📥 從 Google Drive 下載 clinicaltrial-nlp.zip...")
    try:
        gdown.download(zip_url, zip_filename, quiet=False)
        
        print("📦 正在解壓縮檔案...")
        with zipfile.ZipFile(zip_filename, 'r') as zip_ref:
            zip_ref.extractall(".")
        
        if os.path.exists("clintrial-nlp/training_data"):
            shutil.move("clintrial-nlp/training_data", "training_data")
            if os.path.exists("clintrial-nlp"):
                shutil.rmtree("clintrial-nlp")
        
        os.remove(zip_filename)
        print("✅ 訓練資料下載並解壓縮完成！")
        
    except Exception as e:
        print(f"❌ 下載失敗: {e}")
        print("請手動下載: https://drive.google.com/file/d/15GA5XI39DDxQ5QkIZXsFbApx1yEvCpcR/view")
else:
    print("✅ 訓練資料已存在，跳過下載")

# 檢查下載的資料
if os.path.exists("training_data/CT json"):
    ct_files = len([f for f in os.listdir("training_data/CT json") if f.endswith('.json')])
    print(f"📄 找到 {ct_files} 個臨床試驗JSON檔案")

In [None]:
# 🧪 準備測試資料集
import json

def create_test_data_if_needed():
    if not os.path.exists("test.json"):
        try:
            with open("training_data/train.json", "r", encoding="utf-8") as f:
                train_data = json.load(f)
            test_data = dict(list(train_data.items())[:100])
            with open("test.json", "w", encoding="utf-8") as f:
                json.dump(test_data, f, indent=2, ensure_ascii=False)
            print(f"✅ 已創建測試資料集，包含 {len(test_data)} 個樣本")
        except Exception as e:
            print(f"❌ 創建測試資料失敗: {e}")
    else:
        print("✅ test.json 已存在")

create_test_data_if_needed()

In [None]:
# 載入環境變數和必要函式庫
from dotenv import load_dotenv
import os
import json
import pandas as pd
from tqdm import tqdm
import google.generativeai as genai
import time
import warnings
warnings.filterwarnings('ignore')

load_dotenv()
print("✅ 環境變數載入完成")

## 模型配置

配置Google Gemini模型進行推理：

In [None]:
# 配置 Google Gemini 模型
from google.colab import userdata
api_key = os.getenv("GEMINI_API_KEY") or userdata.get("GOOGLE_API_KEY")

if not api_key:
    print("⚠️ 請設定 GOOGLE_API_KEY 環境變數")
    print("可以在 Colab 左側面板的 'Secrets' 中設定")
    raise ValueError("缺少 API 金鑰")
else:
    print(f"✅ 找到 API 金鑰: {api_key[:8]}...{api_key[-4:]}")

genai.configure(api_key=api_key)

# 測試 API 連接
try:
    test_model = genai.GenerativeModel("gemini-2.5-flash")
    test_response = test_model.generate_content("Hello, respond with 'API test successful'")
    print(f"✅ API 連接測試成功: {test_response.text[:50]}...")
except Exception as e:
    print(f"❌ API 連接測試失敗: {e}")
    raise

# 創建 Gemini 模型實例
model = genai.GenerativeModel(
    model_name="gemini-2.5-flash",
    generation_config=genai.types.GenerationConfig(
        temperature=0.1,
        max_output_tokens=4096,
        top_p=1,
        top_k=1
    )
)

print(f"✅ Google Gemini模型配置完成")

## 資料工具函式

建立用於載入和處理臨床試驗資料的工具函式：

In [None]:
def load_clinical_trial(trial_id: str) -> dict:
    """載入臨床試驗資料"""
    try:
        file_path = os.path.join("training_data", "CT json", f"{trial_id}.json")
        with open(file_path, "r", encoding="utf-8") as f:
            data = json.load(f)
        return data
    except FileNotFoundError:
        return {"error": f"找不到臨床試驗 {trial_id}"}
    except Exception as e:
        return {"error": f"載入 {trial_id} 時發生錯誤: {str(e)}"}

def extract_trial_section(trial_data: dict, section_id: str = None) -> str:
    """提取試驗相關區段"""
    if "error" in trial_data:
        return f"錯誤: {trial_data['error']}"
    
    sections = {
        "Eligibility": trial_data.get("Eligibility", []),
        "Intervention": trial_data.get("Intervention", []),
        "Results": trial_data.get("Results", []),
        "Adverse Events": trial_data.get("Adverse_Events", [])
    }
    
    if section_id and section_id in sections:
        section_data = sections[section_id]
        if isinstance(section_data, list):
            return "\n".join(str(item) for item in section_data)
        return str(section_data)
    
    # 返回所有區段
    result = []
    for section_name, section_data in sections.items():
        if section_data:
            result.append(f"{section_name}:")
            if isinstance(section_data, list):
                for item in section_data[:3]:  # 限制長度
                    result.append(f"  {item}")
                if len(section_data) > 3:
                    result.append(f"  ... ({len(section_data)-3} more items)")
            else:
                result.append(f"  {section_data}")
    
    return "\n".join(result)

# 測試工具函式
sample_trial = load_clinical_trial("NCT00066573")
print(f"✅ 資料工具函式準備就緒。範例試驗: {sample_trial.get('Clinical Trial ID', '錯誤')}")

## 原子代理思考模式

定義四個專門的思考代理，每個都有特定的分析角色：

In [None]:
def medical_expert_thinking(statement: str, trial_evidence: str) -> str:
    """醫療專家思考代理"""
    prompt = f"""你是一位醫療專家，專精臨床試驗分析。你的任務是從醫學角度分析以下陳述的準確性。

分析重點：
- 醫學術語的準確性
- 臨床相關性和意義
- 醫學概念的合理性
- 與試驗證據的醫學一致性

陳述: "{statement}"

試驗證據:
{trial_evidence}

請提供你的醫學分析，最後以「醫學評估: [支持/反駁/不明確]」結尾。"""
    
    try:
        response = model.generate_content(prompt)
        return response.text
    except Exception as e:
        return f"醫學分析錯誤: {str(e)}"

def numerical_analyst_thinking(statement: str, trial_evidence: str) -> str:
    """數值分析思考代理"""
    prompt = f"""你是一位數值分析專家，專精統計數據和計算驗證。你的任務是分析陳述中的數值準確性。

分析重點：
- 提取所有數值、百分比、統計數據
- 驗證計算和數值關係
- 檢查統計的合理性
- 與試驗數據的數值一致性

陳述: "{statement}"

試驗證據:
{trial_evidence}

請提供你的數值分析，最後以「數值評估: [準確/不準確/部分準確]」結尾。"""
    
    try:
        response = model.generate_content(prompt)
        return response.text
    except Exception as e:
        return f"數值分析錯誤: {str(e)}"

def logic_checker_thinking(statement: str, trial_evidence: str) -> str:
    """邏輯檢查思考代理"""
    prompt = f"""你是一位邏輯分析專家，專精推理邏輯和一致性檢查。你的任務是驗證陳述的邏輯合理性。

分析重點：
- 邏輯結構和推理鏈
- 因果關係的合理性
- 內在邏輯一致性
- 推論的有效性

陳述: "{statement}"

試驗證據:
{trial_evidence}

請提供你的邏輯分析，最後以「邏輯評估: [合理/不合理/有疑慮]」結尾。"""
    
    try:
        response = model.generate_content(prompt)
        return response.text
    except Exception as e:
        return f"邏輯分析錯誤: {str(e)}"

def decision_synthesizer_thinking(statement: str, medical_analysis: str, numerical_analysis: str, logic_analysis: str) -> str:
    """決策綜合思考代理"""
    prompt = f"""你是決策綜合專家，負責整合所有專家分析並做出最終判斷。

任務: 判斷陳述是「蘊含」(Entailment)還是「矛盾」(Contradiction)
- 蘊含: 陳述被試驗證據直接支持
- 矛盾: 陳述被試驗證據反駁

原始陳述: "{statement}"

醫學專家分析:
{medical_analysis}

數值分析專家分析:
{numerical_analysis}

邏輯檢查專家分析:
{logic_analysis}

請綜合以上分析，提供簡要推理，然後以「最終決策: [Entailment/Contradiction]」結尾。"""
    
    try:
        response = model.generate_content(prompt)
        return response.text
    except Exception as e:
        return f"決策綜合錯誤: {str(e)}"

print("✅ 四個原子思考代理定義完成")

## 原子代理分析管道

創建協調所有思考代理的分析管道：

In [None]:
def atomic_agents_pipeline(statement: str, primary_id: str, secondary_id: str = None, 
                          section_id: str = None, verbose: bool = False) -> str:
    """運行完整的原子代理分析管道"""
    
    try:
        # 載入試驗資料
        primary_data = load_clinical_trial(primary_id)
        primary_evidence = extract_trial_section(primary_data, section_id)
        
        trial_context = f"主要試驗 ({primary_id}):\n{primary_evidence}"
        
        if secondary_id:
            secondary_data = load_clinical_trial(secondary_id)
            secondary_evidence = extract_trial_section(secondary_data, section_id)
            trial_context += f"\n\n次要試驗 ({secondary_id}):\n{secondary_evidence}"
        
        if verbose:
            print(f"📄 分析陳述: {statement[:100]}...")
            print(f"🏥 主要試驗: {primary_id}")
            if secondary_id:
                print(f"🏥 次要試驗: {secondary_id}")
        
        # 步驟1: 醫學專家思考
        medical_analysis = medical_expert_thinking(statement, trial_context)
        if verbose:
            print("🩺 醫學專家: 完成分析")
        
        # 步驟2: 數值分析思考
        numerical_analysis = numerical_analyst_thinking(statement, trial_context)
        if verbose:
            print("🔢 數值分析: 完成分析")
        
        # 步驟3: 邏輯檢查思考
        logic_analysis = logic_checker_thinking(statement, trial_context)
        if verbose:
            print("🧠 邏輯檢查: 完成分析")
        
        # 步驟4: 決策綜合
        final_analysis = decision_synthesizer_thinking(
            statement, medical_analysis, numerical_analysis, logic_analysis
        )
        
        # 提取最終決策
        if "最終決策: Entailment" in final_analysis:
            decision = "Entailment"
        elif "最終決策: Contradiction" in final_analysis:
            decision = "Contradiction"
        else:
            # 備用解析
            if "entailment" in final_analysis.lower() and "contradiction" not in final_analysis.lower():
                decision = "Entailment"
            else:
                decision = "Contradiction"
        
        if verbose:
            print(f"⚖️ 最終決策: {decision}")
            print("-" * 50)
        
        return decision
        
    except Exception as e:
        if verbose:
            print(f"❌ 管道錯誤: {e}")
        return "Contradiction"  # 保守的備用方案

print("✅ 原子代理分析管道準備就緒")

## 測試範例

測試我們的原子代理系統：

In [None]:
# 測試範例
test_statement = "there is a 13.2% difference between the results from the two the primary trial cohorts"
test_primary_id = "NCT00066573"

print(f"測試原子代理系統:")
print(f"陳述: '{test_statement}'")
print(f"主要試驗: {test_primary_id}")
print("\n" + "="*80)

# 執行分析
start_time = time.time()
result = atomic_agents_pipeline(
    statement=test_statement,
    primary_id=test_primary_id,
    section_id="Results",
    verbose=True
)
end_time = time.time()

print(f"\n🎯 原子代理結果: {result}")
print(f"⏱️ 執行時間: {end_time - start_time:.2f} 秒")
print("="*80)

## 在訓練資料上評估

在訓練資料樣本上評估我們的系統：

In [None]:
# 載入訓練資料
with open("training_data/train.json", "r", encoding="utf-8") as f:
    train_data = json.load(f)
print(f"載入 {len(train_data)} 個訓練範例")

# 在樣本上評估
sample_size = 20
examples = list(train_data.items())[:sample_size]

print(f"\n在 {len(examples)} 個範例上評估原子代理系統...")

results = []
correct = 0
total_time = 0

for i, (uuid, example) in enumerate(tqdm(examples, desc="原子代理處理")):
    try:
        statement = example.get("Statement")
        primary_id = example.get("Primary_id")
        secondary_id = example.get("Secondary_id")
        section_id = example.get("Section_id")
        expected = example.get("Label")
        
        if not statement or not primary_id:
            results.append({
                "uuid": uuid,
                "expected": expected,
                "predicted": "SKIPPED",
                "correct": False,
                "time": 0
            })
            continue
        
        # 獲取預測
        start_time = time.time()
        predicted = atomic_agents_pipeline(
            statement=statement,
            primary_id=primary_id,
            secondary_id=secondary_id,
            section_id=section_id,
            verbose=False
        )
        end_time = time.time()
        
        execution_time = end_time - start_time
        total_time += execution_time
        
        # 檢查正確性
        is_correct = (predicted.strip() == expected.strip())
        if is_correct:
            correct += 1
            
        results.append({
            "uuid": uuid,
            "statement": statement[:80] + "..." if len(statement) > 80 else statement,
            "expected": expected,
            "predicted": predicted,
            "correct": is_correct,
            "time": execution_time
        })
        
        status = "✅" if is_correct else "❌"
        print(f"範例 {i+1:2d}: {expected:12} -> {predicted:12} {status} ({execution_time:.1f}s)")
        
    except Exception as e:
        print(f"處理範例 {i+1} 時發生錯誤: {e}")
        results.append({
            "uuid": uuid,
            "expected": expected,
            "predicted": "ERROR",
            "correct": False,
            "time": 0
        })

# 計算準確率
accuracy = correct / len(examples) if examples else 0
avg_time = total_time / len(examples) if examples else 0

print(f"\n📊 原子代理系統結果:")
print(f"準確率: {accuracy:.2%} ({correct}/{len(examples)})")
print(f"平均執行時間: {avg_time:.2f} 秒/例")
print(f"總執行時間: {total_time:.2f} 秒")

## 產生提交檔案

使用我們的原子代理系統產生預測結果：

In [None]:
def generate_atomic_submission(test_file="test.json", output_file="atomic_agents_submission.json", sample_size=None):
    """使用原子代理系統產生提交檔案"""
    
    # 載入測試資料
    try:
        with open(test_file, "r", encoding="utf-8") as f:
            test_data = json.load(f)
    except:
        print(f"❌ 無法載入測試資料 {test_file}")
        return
    
    examples = list(test_data.items())
    if sample_size:
        examples = examples[:sample_size]
        
    print(f"🚀 為 {len(examples)} 個範例產生原子代理預測...")
    
    submission = {}
    
    for i, (uuid, example) in enumerate(tqdm(examples, desc="原子代理處理")):
        try:
            statement = example.get("Statement")
            primary_id = example.get("Primary_id")
            secondary_id = example.get("Secondary_id")
            section_id = example.get("Section_id")
            
            if not statement or not primary_id:
                submission[uuid] = {"Prediction": "Contradiction"}
                continue
                
            # 獲取預測
            prediction = atomic_agents_pipeline(
                statement=statement,
                primary_id=primary_id,
                secondary_id=secondary_id,
                section_id=section_id,
                verbose=False
            )
            
            submission[uuid] = {"Prediction": prediction}
            
        except Exception as e:
            print(f"處理 {uuid} 時發生錯誤: {e}")
            submission[uuid] = {"Prediction": "Contradiction"}
    
    # 儲存提交檔案
    with open(output_file, "w", encoding="utf-8") as f:
        json.dump(submission, f, indent=2)
    
    print(f"✅ 原子代理提交檔案已儲存至 {output_file}")
    return submission

# 產生小樣本提交
atomic_submission = generate_atomic_submission(
    test_file="test.json", 
    output_file="atomic_agents_submission.json",
    sample_size=10
)

print(f"為 {len(atomic_submission)} 個範例產生了預測")

## 結論

### 原子代理系統優勢：
1. **簡潔性**: 無複雜框架，純粹使用Gemini API
2. **專業分工**: 四個專門的思考代理各司其職
3. **透明性**: 每個思考步驟都清晰可見
4. **高效性**: 直接的API調用，無額外開銷
5. **可靠性**: 簡單架構，較少出錯點

### 思考代理架構：
- **醫療專家思考**: 專注於醫學準確性和臨床意義
- **數值分析思考**: 驗證統計數據和計算
- **邏輯檢查思考**: 確保推理邏輯的合理性
- **決策綜合思考**: 整合所有分析做出最終判斷

### 適用場景：
- 需要快速部署的臨床NLP分析
- 資源有限但要求準確性的應用
- 教學和演示用途
- 作為更複雜系統的基準

這個原子代理系統展示了如何用最簡單的方式實現有效的多代理思考，適合作為臨床試驗NLP分析的實用基準。