# 安裝套件

In [None]:
!pip install langchain langchain-openai langchain-community chromadb openai

In [None]:
# 導入核心模組
from langchain.retrievers.multi_query import MultiQueryRetriever
from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
from langchain.chat_models import ChatOpenAI
from langchain.prompts import PromptTemplate
from langchain.schema import Document
import os
import logging

# 設定環境
os.environ["OPENAI_API_KEY"] = "your-openai-api-key-here"

# 啟用詳細日誌以觀察 MultiQueryRetriever 的工作過程
logging.basicConfig()
logging.getLogger("langchain.retrievers.multi_query").setLevel(logging.INFO)

print("環境設定完成，準備實作進階檢索策略...")


### 建立知識庫

In [None]:
# 重建知識庫（基於之前章節的內容）
def setup_enhanced_vector_store():
    """建立增強版的向量知識庫"""
    
    embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")
    
    # 建立更豐富的文件集，包含不同風格的表達
    enhanced_documents = [
        Document(
            page_content="""
機器學習演算法可以分為三大類別：

1. 監督式學習（Supervised Learning）
監督式學習使用標記的訓練資料來學習輸入和輸出之間的映射關係。
- 分類演算法：邏輯迴歸、支援向量機（SVM）、隨機森林、梯度提升
- 回歸演算法：線性回歸、多項式回歸、Ridge回歸、Lasso回歸

2. 無監督式學習（Unsupervised Learning）
從無標記的資料中發現隱藏的模式和結構。
- 聚類演算法：K-means、階層聚類、DBSCAN
- 降維技術：主成分分析（PCA）、t-SNE、UMAP

3. 強化學習（Reinforcement Learning）
通過與環境的互動學習最優的行為策略。
- Q-learning、Policy Gradient、Actor-Critic方法
            """,
            metadata={
                "source": "ml_algorithms_comprehensive.md",
                "category": "機器學習",
                "type": "算法分類",
                "difficulty": "中級"
            }
        ),
        Document(
            page_content="""
深度學習演算法優化技術：

1. 優化器選擇
- SGD（隨機梯度下降）：基礎但有效的優化方法
- Adam：自適應學習率，適合大多數情況
- RMSprop：處理非平穩目標函數效果好
- AdaGrad：適合稀疏資料的優化

2. 正則化技術
- Dropout：隨機關閉部分神經元，防止過擬合
- Batch Normalization：標準化輸入，加速訓練
- L1/L2正則化：限制權重大小，提高泛化能力

3. 學習率調度
- 學習率衰減：逐漸降低學習率
- 週期性學習率：在訓練過程中循環調整
- 自適應學習率：根據訓練進展自動調整
            """,
            metadata={
                "source": "deep_learning_optimization.md",
                "category": "深度學習",
                "type": "優化技術",
                "difficulty": "進階"
            }
        ),
        Document(
            page_content="""
AI 系統性能評估指標：

1. 分類任務評估
- 準確率（Accuracy）：正確預測的比例
- 精確率（Precision）：預測為正的樣本中實際為正的比例
- 召回率（Recall）：實際為正的樣本中被正確預測的比例  
- F1分數：精確率和召回率的調和平均數

2. 回歸任務評估
- 均方誤差（MSE）：預測值與真實值差的平方的平均
- 平均絕對誤差（MAE）：預測值與真實值差的絕對值的平均
- R²決定係數：模型解釋變異的比例

3. 模型泛化能力評估
- 交叉驗證：將資料分為多個子集進行驗證
- 學習曲線：觀察訓練和驗證誤差隨樣本數量的變化
- 驗證曲線：觀察模型性能隨超參數變化的趨勢
            """,
            metadata={
                "source": "ai_evaluation_metrics.md",
                "category": "模型評估",
                "type": "評估指標",
                "difficulty": "中級"
            }
        ),
        Document(
            page_content="""
資料科學專案生命週期：

1. 問題定義階段
明確業務目標，將業務問題轉化為資料科學問題。
關鍵活動：需求分析、可行性評估、成功指標定義

2. 資料獲取與探索
收集相關資料，進行初步的探索性資料分析。
關鍵活動：資料來源識別、資料品質評估、初步統計分析

3. 資料預處理
清理資料，處理缺失值，進行特徵工程。
關鍵活動：資料清理、特徵選擇、資料轉換

4. 建模與驗證  
選擇合適的演算法，訓練模型，評估性能。
關鍵活動：模型選擇、超參數調優、性能評估

5. 部署與監控
將模型部署到生產環境，持續監控性能。
關鍵活動：模型部署、性能監控、模型更新
            """,
            metadata={
                "source": "data_science_lifecycle.md",
                "category": "資料科學",
                "type": "專案管理",
                "difficulty": "專業"
            }
        )
    ]
    
    vector_store = Chroma.from_documents(
        documents=enhanced_documents,
        embedding=embeddings,
        persist_directory="./enhanced_chroma_db",
        collection_name="advanced_ai_kb"
    )
    
    print(f"增強版向量知識庫建立完成，包含 {len(enhanced_documents)} 個文件")
    return vector_store

# 建立增強版知識庫
enhanced_vector_store = setup_enhanced_vector_store()


### 實作並且比較多種策略

In [None]:
# 實作並比較基礎檢索與多查詢檢索
def compare_retrieval_strategies():
    """比較基礎檢索與多查詢檢索的效果"""
    
    # 設定語言模型
    llm = ChatOpenAI(temperature=0.1, model_name="gpt-3.5-turbo")
    
    # 1. 基礎檢索器
    basic_retriever = enhanced_vector_store.as_retriever(
        search_kwargs={"k": 4}
    )
    
    # 2. 多查詢檢索器
    multi_query_retriever = MultiQueryRetriever.from_llm(
        retriever=basic_retriever,
        llm=llm
    )
    
    # 測試查詢
    test_query = "如何選擇合適的機器學習演算法？"
    
    print(f"=== 檢索策略比較測試 ===")
    print(f"測試查詢：{test_query}")
    print("=" * 60)
    
    # 基礎檢索結果
    print("\n【基礎檢索結果】")
    basic_results = basic_retriever.get_relevant_documents(test_query)
    
    for i, doc in enumerate(basic_results):
        print(f"\n{i+1}. [{doc.metadata['category']}] {doc.metadata['source']}")
        print(f"   內容：{doc.page_content[:100]}...")
    
    print(f"\n基礎檢索共找到 {len(basic_results)} 個文件")
    
    # 多查詢檢索結果  
    print("\n【多查詢檢索結果】")
    print("（注意觀察日誌中的查詢重寫過程）")
    
    multi_results = multi_query_retriever.get_relevant_documents(test_query)
    
    for i, doc in enumerate(multi_results):
        print(f"\n{i+1}. [{doc.metadata['category']}] {doc.metadata['source']}")
        print(f"   內容：{doc.page_content[:100]}...")
    
    print(f"\n多查詢檢索共找到 {len(multi_results)} 個文件")
    
    # 分析結果差異
    basic_sources = set(doc.metadata['source'] for doc in basic_results)
    multi_sources = set(doc.metadata['source'] for doc in multi_results)
    
    only_in_multi = multi_sources - basic_sources
    only_in_basic = basic_sources - multi_sources
    
    print(f"\n=== 結果分析 ===")
    print(f"僅在多查詢中找到的文件：{list(only_in_multi)}")
    print(f"僅在基礎檢索中找到的文件：{list(only_in_basic)}")
    print(f"多查詢檢索的額外發現率：{len(only_in_multi)/len(basic_sources)*100:.1f}%")
    
    return basic_results, multi_results


In [None]:
# 執行比較測試
basic_results, multi_results = compare_retrieval_strategies()

### 自訂查詢生成策略

In [None]:
# 自訂更精確的查詢生成提示
def create_custom_multi_query_retriever():
    """創建自訂的多查詢檢索器"""
    
    # 自訂查詢生成提示模板
    query_prompt = PromptTemplate(
        input_variables=["question"],
        template="""你是一個AI助理，擅長將使用者的問題重寫成多個不同角度的搜尋查詢。

原始問題：{question}

請生成5個不同角度的搜尋查詢，每個查詢應該：
1. 使用不同的詞彙和表達方式
2. 關注問題的不同面向
3. 包含相關的技術術語和概念
4. 保持查詢的具體性和可搜尋性

重要：只輸出查詢，每行一個，不要包含編號或其他文字。

範例：
如果原始問題是「什麼是深度學習？」，你可以生成：
深度學習的定義和基本概念
神經網路與深度學習的關係  
深度學習與傳統機器學習的區別
深度學習的主要架構和模型
深度學習在人工智慧中的作用

現在請為上述問題生成查詢："""
    )
    
    llm = ChatOpenAI(temperature=0.3, model_name="gpt-3.5-turbo")
    
    # 創建基礎檢索器
    base_retriever = enhanced_vector_store.as_retriever(search_kwargs={"k": 3})
    
    # 創建自訂多查詢檢索器
    custom_retriever = MultiQueryRetriever.from_llm(
        retriever=base_retriever,
        llm=llm,
        prompt=query_prompt
    )
    
    return custom_retriever



In [None]:
# 測試自訂查詢生成器
def test_custom_query_generation():
    """測試自訂查詢生成效果"""
    
    custom_retriever = create_custom_multi_query_retriever()
    
    test_queries = [
        "模型訓練時如何避免過擬合？",
        "評估機器學習模型性能的方法",
        "資料預處理包括哪些步驟？"
    ]
    
    print("=== 自訂查詢生成測試 ===")
    
    for i, query in enumerate(test_queries, 1):
        print(f"\n測試 {i}: {query}")
        print("-" * 50)
        
        results = custom_retriever.get_relevant_documents(query)
        
        print(f"檢索到 {len(results)} 個相關文件：")
        
        # 分析結果的多樣性
        categories = [doc.metadata['category'] for doc in results]
        category_counts = {}
        for cat in categories:
            category_counts[cat] = category_counts.get(cat, 0) + 1
        
        print(f"涵蓋類別：{dict(category_counts)}")
        
        # 顯示前2個結果
        for j, doc in enumerate(results[:2]):
            print(f"\n  結果 {j+1}: [{doc.metadata['category']}]")
            print(f"  來源：{doc.metadata['source']}")
            print(f"  內容：{doc.page_content[:80]}...")


In [None]:
test_custom_query_generation()

---

### 實作 HyDE 策略

In [None]:
# 實作 HyDE 策略
class HyDERetriever:
    """假設性文件嵌入檢索器"""
    
    def __init__(self, vector_store, llm):
        self.vector_store = vector_store
        self.llm = llm
        self.hyde_prompt = PromptTemplate(
            input_variables=["question"],
            template="""請根據以下問題寫一個詳細、準確的答案。即使你不確定某些細節，也請寫出一個合理的、專業的回答。

問題：{question}

請寫出一個包含相關技術細節、具體例子和實用建議的完整答案："""
        )
    
    def get_relevant_documents(self, query, k=4):
        """使用 HyDE 策略檢索文件"""
        
        print(f"=== HyDE 檢索過程 ===")
        print(f"原始問題：{query}")
        
        # 步驟1：生成假設性文件
        hyde_chain = self.hyde_prompt | self.llm
        hypothetical_doc = hyde_chain.invoke({"question": query})
        
        if hasattr(hypothetical_doc, 'content'):
            hypothetical_content = hypothetical_doc.content
        else:
            hypothetical_content = str(hypothetical_doc)
        
        print(f"\n生成的假設性文件（前200字符）：")
        print(f"{hypothetical_content[:200]}...")
        
        # 步驟2：使用假設性文件進行檢索
        results = self.vector_store.similarity_search(hypothetical_content, k=k)
        
        print(f"\n基於假設性文件檢索到 {len(results)} 個相關文件")
        
        return results
    
    def compare_with_direct_search(self, query, k=4):
        """比較 HyDE 與直接搜尋的結果"""
        
        print(f"=== HyDE vs 直接搜尋比較 ===")
        print(f"查詢：{query}")
        print("=" * 60)
        
        # 直接搜尋
        direct_results = self.vector_store.similarity_search(query, k=k)
        
        print(f"\n【直接搜尋結果】")
        for i, doc in enumerate(direct_results):
            print(f"{i+1}. [{doc.metadata['category']}] {doc.metadata['source']}")
        
        # HyDE 搜尋
        hyde_results = self.get_relevant_documents(query, k=k)
        
        print(f"\n【HyDE 搜尋結果】")
        for i, doc in enumerate(hyde_results):
            print(f"{i+1}. [{doc.metadata['category']}] {doc.metadata['source']}")
        
        # 分析差異
        direct_sources = set(doc.metadata['source'] for doc in direct_results)
        hyde_sources = set(doc.metadata['source'] for doc in hyde_results)
        
        overlap = len(direct_sources & hyde_sources)
        total_unique = len(direct_sources | hyde_sources)
        
        print(f"\n【結果分析】")
        print(f"重疊文件數：{overlap}")
        print(f"總唯一文件數：{total_unique}")
        print(f"結果相似度：{overlap/k*100:.1f}%")
        print(f"HyDE 獨有發現：{list(hyde_sources - direct_sources)}")
        
        return direct_results, hyde_results


### 測試 HyDE 檢索器

In [None]:
# 測試 HyDE 檢索器
llm = ChatOpenAI(temperature=0.1, model_name="gpt-3.5-turbo")
hyde_retriever = HyDERetriever(enhanced_vector_store, llm)

# 比較不同類型的查詢
test_queries = [
    "什麼情況下應該使用決策樹演算法？",
    "如何診斷模型的過擬合問題？",
    "資料科學專案失敗的常見原因"
]

for query in test_queries:
    print("\n" + "="*80)
    direct_results, hyde_results = hyde_retriever.compare_with_direct_search(query)
    print("\n")


---

## 安裝評估套件

In [None]:
!pip install ragas langchain-openai

導入套件

In [None]:
# 導入評估模組
from ragas import evaluate
from ragas.metrics import (
    answer_relevancy,
    faithfulness, 
    context_recall,
    context_precision,
    context_relevancy
)
from datasets import Dataset
import pandas as pd

# 導入之前建立的 RAG 系統
from langchain.chains import RetrievalQA
from langchain.chat_models import ChatOpenAI
from langchain.prompts import PromptTemplate

print("評估框架模組載入完成")


### 實作評估框架

In [None]:
# 建立綜合評估類別
class RAGEvaluationFramework:
    """RAG 系統綜合評估框架"""
    
    def __init__(self, vector_store, llm):
        self.vector_store = vector_store
        self.llm = llm
        self.evaluation_results = {}
        
        # 設定 RAG 鏈
        self.retriever = vector_store.as_retriever(search_kwargs={"k": 3})
        
        self.qa_chain = RetrievalQA.from_chain_type(
            llm=llm,
            chain_type="stuff",
            retriever=self.retriever,
            return_source_documents=True
        )
    
    def create_evaluation_dataset(self):
        """創建評估資料集"""
        
        # 設計涵蓋不同難度和類型的測試問題
        evaluation_questions = [
            {
                "question": "什麼是監督式學習？它與無監督式學習有什麼區別？",
                "expected_topics": ["監督式學習", "無監督式學習", "標記資料", "分類", "回歸", "聚類"],
                "difficulty": "基礎",
                "type": "概念解釋"
            },
            {
                "question": "在深度學習中，如何防止模型過擬合？",
                "expected_topics": ["過擬合", "正則化", "Dropout", "Batch Normalization", "驗證集"],
                "difficulty": "中級",
                "type": "問題解決"
            },
            {
                "question": "資料科學專案的生命週期包括哪些階段？",
                "expected_topics": ["問題定義", "資料獲取", "資料預處理", "建模", "部署", "監控"],
                "difficulty": "中級", 
                "type": "流程描述"
            },
            {
                "question": "如何評估機器學習模型的性能？有哪些常用的評估指標？",
                "expected_topics": ["準確率", "精確率", "召回率", "F1分數", "ROC", "AUC", "交叉驗證"],
                "difficulty": "中級",
                "type": "技術細節"
            },
            {
                "question": "Adam 優化器相比 SGD 有什麼優勢？",
                "expected_topics": ["Adam", "SGD", "自適應學習率", "動量", "偏差修正"],
                "difficulty": "進階",
                "type": "比較分析"
            }
        ]
        
        print("=== 建立評估資料集 ===")
        
        dataset_records = []
        
        for i, item in enumerate(evaluation_questions, 1):
            question = item["question"]
            print(f"\n處理問題 {i}: {question}")
            
            # 獲取 RAG 系統的回答
            result = self.qa_chain({"query": question})
            
            # 提取相關資訊
            answer = result["result"]
            source_docs = result["source_documents"]
            contexts = [doc.page_content for doc in source_docs]
            
            # 創建記錄
            record = {
                "question": question,
                "answer": answer,
                "contexts": contexts,
                "ground_truth": f"這是一個關於{item['type']}的{item['difficulty']}問題，應該涵蓋：{', '.join(item['expected_topics'])}",
                "metadata": item
            }
            
            dataset_records.append(record)
            
            print(f"  答案長度：{len(answer)} 字符")
            print(f"  上下文數量：{len(contexts)}")
        
        # 轉換為 Dataset 格式
        dataset = Dataset.from_list(dataset_records)
        
        print(f"\n評估資料集建立完成，包含 {len(dataset)} 個測試案例")
        return dataset, dataset_records
    
    def run_comprehensive_evaluation(self, dataset):
        """執行綜合評估"""
        
        print("\n=== 開始綜合評估 ===")
        print("這可能需要幾分鐘時間，因為需要調用 LLM 進行評估...")
        
        # 定義評估指標
        metrics = [
            faithfulness,      # 忠實性
            answer_relevancy,   # 答案相關性
            context_precision,  # 上下文精確度
            context_recall,     # 上下文召回率
            context_relevancy   # 上下文相關性
        ]
        
        try:
            # 執行評估
            results = evaluate(
                dataset=dataset,
                metrics=metrics,
                llm=self.llm,
                embeddings=OpenAIEmbeddings()
            )
            
            self.evaluation_results = results
            
            print("評估完成！")
            return results
            
        except Exception as e:
            print(f"評估過程中出現錯誤：{str(e)}")
            print("這可能是由於 API 限制或網路問題。讓我們使用簡化的評估方法...")
            return self.simplified_evaluation(dataset)
    
    def simplified_evaluation(self, dataset_records):
        """簡化的評估方法（當完整評估失敗時使用）"""
        
        print("\n=== 執行簡化評估 ===")
        
        evaluation_results = {
            "answer_lengths": [],
            "context_counts": [],
            "topic_coverage": [],
            "question_types": [],
            "difficulties": []
        }
        
        for record in dataset_records:
            # 分析答案長度
            answer_length = len(record["answer"])
            evaluation_results["answer_lengths"].append(answer_length)
            
            # 分析上下文數量
            context_count = len(record["contexts"])
            evaluation_results["context_counts"].append(context_count)
            
            # 分析主題覆蓋度（簡單的關鍵字匹配）
            expected_topics = record["metadata"]["expected_topics"]
            answer_lower = record["answer"].lower()
            covered_topics = sum(1 for topic in expected_topics if topic.lower() in answer_lower)
            coverage_rate = covered_topics / len(expected_topics)
            evaluation_results["topic_coverage"].append(coverage_rate)
            
            # 記錄問題類型和難度
            evaluation_results["question_types"].append(record["metadata"]["type"])
            evaluation_results["difficulties"].append(record["metadata"]["difficulty"])
        
        return evaluation_results
    
    def generate_evaluation_report(self, results):
        """生成評估報告"""
        
        print("\n=== RAG 系統評估報告 ===")
        
        if isinstance(results, dict) and "answer_lengths" in results:
            # 簡化評估結果
            print("\n【基本統計】")
            print(f"平均答案長度：{sum(results['answer_lengths'])/len(results['answer_lengths']):.0f} 字符")
            print(f"平均上下文數量：{sum(results['context_counts'])/len(results['context_counts']):.1f}")
            print(f"平均主題覆蓋率：{sum(results['topic_coverage'])/len(results['topic_coverage']):.1%}")
            
            # 按難度分析
            difficulties = results['difficulties']
            topic_coverage = results['topic_coverage']
            
            print(f"\n【按難度分析】")
            for difficulty in set(difficulties):
                indices = [i for i, d in enumerate(difficulties) if d == difficulty]
                avg_coverage = sum(topic_coverage[i] for i in indices) / len(indices)
                print(f"{difficulty}問題平均覆蓋率：{avg_coverage:.1%}")
        
        else:
            # 完整評估結果
            print("\n【核心評估指標】")
            for metric_name, score in results.items():
                if isinstance(score, (int, float)):
                    print(f"{metric_name}: {score:.3f}")
        
        print(f"\n【評估建議】")
        
        # 根據結果提供改進建議
        if isinstance(results, dict) and "topic_coverage" in results:
            avg_coverage = sum(results['topic_coverage']) / len(results['topic_coverage'])
            if avg_coverage < 0.6:
                print("- 主題覆蓋率偏低，建議改進檢索策略或擴充知識庫")
            elif avg_coverage > 0.8:
                print("- 主題覆蓋率良好，系統能夠找到相關資訊")
        
        print("- 定期更新評估資料集以反映使用者需求變化")
        print("- 考慮加入使用者回饋機制以補充自動評估")
        print("- 監控不同類型問題的表現差異")


### 執行綜合評估

In [None]:
llm = ChatOpenAI(temperature=0.1, model_name="gpt-3.5-turbo")
eval_framework = RAGEvaluationFramework(enhanced_vector_store, llm)

### 建立評估資料集

In [None]:
dataset, dataset_records = eval_framework.create_evaluation_dataset()

### 執行評估

In [None]:
evaluation_results = eval_framework.run_comprehensive_evaluation(dataset)

### 產生報告

In [None]:
eval_framework.generate_evaluation_report(evaluation_results)

---

### 實作 A/B 測試

# 實作 A/B 測試框架
class RAGABTestFramework:
    """RAG 系統 A/B 測試框架"""
    
    def __init__(self, system_a, system_b, system_a_name="System A", system_b_name="System B"):
        self.system_a = system_a
        self.system_b = system_b
        self.system_a_name = system_a_name
        self.system_b_name = system_b_name
        
    def run_comparative_test(self, test_questions):
        """執行比較測試"""
        
        print(f"=== A/B 測試：{self.system_a_name} vs {self.system_b_name} ===")
        
        results = {
            "questions": [],
            "system_a_answers": [],
            "system_b_answers": [],
            "system_a_response_times": [],
            "system_b_response_times": [],
            "system_a_context_counts": [],
            "system_b_context_counts": []
        }
        
        import time
        
        for i, question in enumerate(test_questions, 1):
            print(f"\n測試 {i}/{len(test_questions)}: {question[:50]}...")
            
            results["questions"].append(question)
            
            # 測試系統 A
            start_time = time.time()
            result_a = self.system_a({"query": question})
            time_a = time.time() - start_time
            
            results["system_a_answers"].append(result_a["result"])
            results["system_a_response_times"].append(time_a)
            results["system_a_context_counts"].append(len(result_a["source_documents"]))
            
            # 測試系統 B  
            start_time = time.time()
            result_b = self.system_b({"query": question})
            time_b = time.time() - start_time
            
            results["system_b_answers"].append(result_b["result"])
            results["system_b_response_times"].append(time_b)
            results["system_b_context_counts"].append(len(result_b["source_documents"]))
            
            print(f"  {self.system_a_name} 回應時間: {time_a:.2f}s")
            print(f"  {self.system_b_name} 回應時間: {time_b:.2f}s")
        
        return results
    
    def analyze_ab_results(self, results):
        """分析 A/B 測試結果"""
        
        print(f"\n=== A/B 測試分析報告 ===")
        
        # 回應時間比較
        avg_time_a = sum(results["system_a_response_times"]) / len(results["system_a_response_times"])
        avg_time_b = sum(results["system_b_response_times"]) / len(results["system_b_response_times"])
        
        print(f"\n【性能比較】")
        print(f"{self.system_a_name} 平均回應時間: {avg_time_a:.2f}秒")
        print(f"{self.system_b_name} 平均回應時間: {avg_time_b:.2f}秒")
        
        if avg_time_a < avg_time_b:
            improvement = (avg_time_b - avg_time_a) / avg_time_b * 100
            print(f"✅ {self.system_a_name} 速度較快，提升 {improvement:.1f}%")
        else:
            improvement = (avg_time_a - avg_time_b) / avg_time_a * 100
            print(f"✅ {self.system_b_name} 速度較快，提升 {improvement:.1f}%")
        
        # 答案長度比較
        avg_length_a = sum(len(answer) for answer in results["system_a_answers"]) / len(results["system_a_answers"])
        avg_length_b = sum(len(answer) for answer in results["system_b_answers"]) / len(results["system_b_answers"])
        
        print(f"\n【答案品質指標】")
        print(f"{self.system_a_name} 平均答案長度: {avg_length_a:.0f} 字符")
        print(f"{self.system_b_name} 平均答案長度: {avg_length_b:.0f} 字符")
        
        # 上下文使用比較
        avg_context_a = sum(results["system_a_context_counts"]) / len(results["system_a_context_counts"])
        avg_context_b = sum(results["system_b_context_counts"]) / len(results["system_b_context_counts"])
        
        print(f"{self.system_a_name} 平均使用上下文: {avg_context_a:.1f} 個文件")
        print(f"{self.system_b_name} 平均使用上下文: {avg_context_b:.1f} 個文件")
        
        return {
            "performance_winner": self.system_a_name if avg_time_a < avg_time_b else self.system_b_name,
            "avg_response_times": {self.system_a_name: avg_time_a, self.system_b_name: avg_time_b},
            "avg_answer_lengths": {self.system_a_name: avg_length_a, self.system_b_name: avg_length_b},
            "avg_context_usage": {self.system_a_name: avg_context_a, self.system_b_name: avg_context_b}
        }





### 建立兩個不同配置的 RAG 系統進行比較

In [None]:
# 建立兩個不同配置的 RAG 系統進行比較
def create_comparison_systems():
    """創建用於比較的 RAG 系統"""
    
    llm = ChatOpenAI(temperature=0.1, model_name="gpt-3.5-turbo")
    
    # 系統 A：基礎配置
    retriever_a = enhanced_vector_store.as_retriever(search_kwargs={"k": 3})
    system_a = RetrievalQA.from_chain_type(
        llm=llm,
        chain_type="stuff",
        retriever=retriever_a,
        return_source_documents=True
    )
    
    # 系統 B：多查詢檢索配置
    multi_query_retriever = MultiQueryRetriever.from_llm(
        retriever=enhanced_vector_store.as_retriever(search_kwargs={"k": 2}),
        llm=llm
    )
    system_b = RetrievalQA.from_chain_type(
        llm=llm,
        chain_type="stuff",
        retriever=multi_query_retriever,
        return_source_documents=True
    )
    
    return system_a, system_b

### 執行 A/B 測試

In [None]:
system_a, system_b = create_comparison_systems()

ab_test = RAGABTestFramework(
    system_a, 
    system_b, 
    "基礎檢索", 
    "多查詢檢索"
)

test_questions = [
    "如何選擇合適的機器學習演算法？",
    "深度學習模型訓練中的常見問題有哪些？",
    "資料科學專案的評估指標包括什麼？"
]

ab_results = ab_test.run_comparative_test(test_questions)
analysis = ab_test.analyze_ab_results(ab_results)

---

## 部署架構設計考量

In [None]:
# 生產級 RAG 系統架構示例
class ProductionRAGSystem:
    """生產級 RAG 系統架構"""
    
    def __init__(self, config):
        self.config = config
        self.cache = {}  # 簡單的記憶體快取
        self.request_count = 0
        self.error_count = 0
        
    def setup_production_components(self):
        """設定生產環境組件"""
        
        print("=== 生產環境組件設定 ===")
        
        # 1. 連接池管理
        self.setup_connection_pools()
        
        # 2. 快取策略
        self.setup_caching_strategy()
        
        # 3. 監控系統
        self.setup_monitoring()
        
        # 4. 安全機制
        self.setup_security()
        
        print("生產環境組件設定完成")
    
    def setup_connection_pools(self):
        """設定連接池"""
        print("配置資料庫連接池...")
        print("- 向量資料庫連接池：最大連接數 20")
        print("- LLM API 連接池：並發限制 10")
        print("- Redis 快取連接池：最大連接數 50")
    
    def setup_caching_strategy(self):
        """設定快取策略"""
        print("配置快取策略...")
        print("- 查詢結果快取：TTL 1小時")
        print("- 向量檢索快取：TTL 24小時")
        print("- 模型回應快取：TTL 30分鐘")
    
    def setup_monitoring(self):
        """設定監控系統"""
        print("配置監控系統...")
        print("- 回應時間監控：P95 < 2秒")
        print("- 錯誤率監控：< 1%")
        print("- 資源使用監控：CPU < 80%, Memory < 85%")
        print("- 自訂業務指標：查詢成功率、用戶滿意度")
    
    def setup_security(self):
        """設定安全機制"""
        print("配置安全機制...")
        print("- API 金鑰管理與輪換")
        print("- 請求速率限制：每用戶每分鐘最多 60 次")
        print("- 輸入驗證與清理")
        print("- 敏感資訊過濾")
    
    def process_query_with_fallback(self, query, user_id=None):
        """帶有容錯機制的查詢處理"""
        
        self.request_count += 1
        
        try:
            # 1. 輸入驗證
            if not self.validate_input(query):
                return {"error": "Invalid input", "fallback": True}
            
            # 2. 速率限制檢查
            if not self.check_rate_limit(user_id):
                return {"error": "Rate limit exceeded", "fallback": True}
            
            # 3. 快取檢查
            cache_key = f"query:{hash(query)}"
            if cache_key in self.cache:
                print(f"快取命中：{query[:30]}...")
                return self.cache[cache_key]
            
            # 4. 執行主要邏輯
            result = self.execute_rag_pipeline(query)
            
            # 5. 快取結果
            self.cache[cache_key] = result
            
            # 6. 記錄成功指標
            self.log_success_metrics(query, result)
            
            return result
            
        except Exception as e:
            self.error_count += 1
            print(f"查詢處理錯誤: {str(e)}")
            
            # 容錯機制：提供基本回答
            fallback_response = {
                "answer": "抱歉，我現在無法處理您的查詢。請稍後再試。",
                "error": str(e),
                "fallback": True
            }
            
            self.log_error_metrics(query, e)
            return fallback_response
    
    def validate_input(self, query):
        """輸入驗證"""
        if not query or len(query.strip()) == 0:
            return False
        if len(query) > 1000:  # 限制查詢長度
            return False
        
        # 檢查惡意內容（簡化示例）
        malicious_patterns = ["<script>", "DROP TABLE", "DELETE FROM"]
        for pattern in malicious_patterns:
            if pattern.lower() in query.lower():
                return False
        
        return True
    
    def check_rate_limit(self, user_id):
        """速率限制檢查"""
        if user_id is None:
            return True  # 匿名用戶暫時不限制
        
        # 簡化的速率限制邏輯
        # 實際實作應該使用 Redis 或專門的速率限制服務
        return True
    
    def execute_rag_pipeline(self, query):
        """執行 RAG 流程"""
        # 這裡是簡化的實作，實際應該調用完整的 RAG 系統
        import time
        time.sleep(0.1)  # 模擬處理時間
        
        return {
            "answer": f"基於知識庫的回答：{query}",
            "confidence": 0.85,
            "sources": ["doc1.pdf", "doc2.md"],
            "processing_time": 0.1
        }
    
    def log_success_metrics(self, query, result):
        """記錄成功指標"""
        # 實際實作應該發送到監控系統
        print(f"成功處理查詢，處理時間: {result.get('processing_time', 'N/A')}秒")
    
    def log_error_metrics(self, query, error):
        """記錄錯誤指標"""
        # 實際實作應該發送到錯誤追蹤系統
        print(f"查詢處理失敗: {str(error)}")
    
    def get_system_health(self):
        """獲取系統健康狀態"""
        if self.request_count == 0:
            error_rate = 0
        else:
            error_rate = self.error_count / self.request_count
        
        health_status = {
            "status": "healthy" if error_rate < 0.01 else "degraded",
            "total_requests": self.request_count,
            "error_count": self.error_count,
            "error_rate": f"{error_rate:.2%}",
            "cache_size": len(self.cache)
        }
        
        return health_status



In [None]:
# 測試生產級系統
production_config = {
    "max_connections": 20,
    "cache_ttl": 3600,
    "rate_limit": 60
}

prod_system = ProductionRAGSystem(production_config)
prod_system.setup_production_components()

In [None]:
# 測試查詢處理
test_queries = [
    "什麼是機器學習？",
    "如何優化深度學習模型？",
    "",  # 無效輸入測試
    "正常的技術問題",
    "<script>alert('xss')</script>"  # 安全測試
]

print(f"\n=== 生產系統測試 ===")
for i, query in enumerate(test_queries, 1):
    print(f"\n測試 {i}: {query if query else '(空查詢)'}")
    result = prod_system.process_query_with_fallback(query, user_id="test_user")
    print(f"結果: {result.get('answer', result.get('error', 'Unknown'))[:50]}...")
    if result.get('fallback'):
        print("⚠️  使用了容錯機制")

In [None]:


# 檢查系統健康狀態
health = prod_system.get_system_health()
print(f"\n=== 系統健康狀態 ===")
for key, value in health.items():
    print(f"{key}: {value}")


### 性能優化策略

In [None]:
# 性能優化示例
class OptimizedRAGSystem:
    """性能優化的 RAG 系統"""
    
    def __init__(self):
        self.query_cache = {}
        self.embedding_cache = {}
        self.response_cache = {}
        
    def batch_embedding_optimization(self, texts):
        """批量嵌入優化"""
        print("=== 批量嵌入優化 ===")
        
        # 模擬批量處理比單個處理更高效
        import time
        
        # 單個處理時間
        start_time = time.time()
        individual_embeddings = []
        for text in texts:
            time.sleep(0.1)  # 模擬單個嵌入時間
            individual_embeddings.append(f"embedding_for_{text[:10]}")
        individual_time = time.time() - start_time
        
        # 批量處理時間
        start_time = time.time()
        time.sleep(0.3)  # 模擬批量嵌入時間
        batch_embeddings = [f"batch_embedding_for_{text[:10]}" for text in texts]
        batch_time = time.time() - start_time
        
        print(f"文本數量：{len(texts)}")
        print(f"單個處理時間：{individual_time:.2f}秒")
        print(f"批量處理時間：{batch_time:.2f}秒")
        print(f"性能提升：{(individual_time - batch_time) / individual_time:.1%}")
        
        return batch_embeddings
    
    def smart_caching_strategy(self, query):
        """智慧快取策略"""
        print(f"\n=== 智慧快取策略 ===")
        
        # 1. 查詢標準化
        normalized_query = self.normalize_query(query)
        print(f"原始查詢：{query}")
        print(f"標準化後：{normalized_query}")
        
        # 2. 語義相似查詢檢測
        similar_queries = self.find_similar_cached_queries(normalized_query)
        if similar_queries:
            print(f"找到相似的快取查詢：{similar_queries[0]}")
            return self.query_cache[similar_queries[0]]
        
        # 3. 執行新查詢並快取
        result = f"新查詢結果：{normalized_query}"
        self.query_cache[normalized_query] = result
        print(f"查詢結果已快取")
        
        return result
    
    def normalize_query(self, query):
        """查詢標準化"""
        # 簡化的標準化邏輯
        normalized = query.lower().strip()
        # 移除標點符號
        import string
        normalized = normalized.translate(str.maketrans('', '', string.punctuation))
        return normalized
    
    def find_similar_cached_queries(self, query):
        """找到相似的快取查詢"""
        # 簡化的相似度檢測
        similar_queries = []
        for cached_query in self.query_cache.keys():
            # 簡單的詞彙重疊檢測
            query_words = set(query.split())
            cached_words = set(cached_query.split())
            overlap = len(query_words & cached_words) / len(query_words | cached_words)
            if overlap > 0.7:  # 70% 相似度閾值
                similar_queries.append(cached_query)
        
        return similar_queries
    
    def demonstrate_optimization_impact(self):
        """展示優化效果"""
        print("=== 優化效果演示 ===")
        
        # 測試批量嵌入
        test_texts = [
            "機器學習基礎",
            "深度學習原理", 
            "資料科學方法",
            "AI 倫理問題",
            "自然語言處理"
        ]
        
        self.batch_embedding_optimization(test_texts)
        
        # 測試智慧快取
        test_queries = [
            "什麼是機器學習？",
            "機器學習是什麼？",  # 語義相似
            "什麼是深度學習？",
            "深度學習的定義",     # 語義相似
            "AI 倫理考量"
        ]
        
        for query in test_queries:
            result = self.smart_caching_strategy(query)
            print()


In [None]:
# 測試優化系統
optimized_system = OptimizedRAGSystem()
optimized_system.demonstrate_optimization_impact()