# 使用 LangChain、OpenAI、Qdrant 實作 RAG 系統

## 🔧 架構總覽

```
使用者提問
     ↓
LangChain 啟動流程控制
     ↓
Qdrant 負責向量搜尋
     ↓
OpenAI 負責生成回應
```

* LangChain：協調各元件並組合 Chain
* Qdrant：向量資料庫，快速檢索語意相近段落
* OpenAI：用 GPT 模型進行文字生成

---

## 📁 文件來源：k8s.io

* 來源：Kubernetes 官方網站 [https://k8s.io](https://k8s.io)
* 類型：Markdown 文件、YAML 範例
* 處理流程：

  * ~網頁爬蟲抓取內容~ 直接使用 https://github.com/kubernetes/website 原始碼內容
  * 清洗與分段切片（Chunking），
  * 建立 embedding 向量

---

## 🔍 向量化流程：使用 OpenAI Embedding API

* **模型選擇**：`text-embedding-3-large`
* **切片方式**：

  * 每段文件控制在一定 token 長度
  * 保留原始網址 / metadata
* **語意向量後格式**：

  * 向量
  * 原文片段
  * 來源與分類資訊

---

## 🧠 向量儲存：Qdrant

* 開源向量資料庫，支援：

  * 相似度搜尋（Cosine、Dot、Euclidean）
  * 高速查詢（HNSW）
  * Metadata filter
* 專案設定：

  * 每筆文件片段存為一筆向量點
  * 使用 metadata 儲存原始來源與語言等資訊

---

## 🧪 使用情境展示

**範例問題**：「如何設定 Kubernetes 的 Ingress？」

RAG 回應流程：

1. 使用者提問
2. 檢索 k8s.io 中與 Ingress 相關段落
3. 將片段與問題輸入 LLM
4. 回傳包含原文與解釋的答案

---

## 📈 實作效益

* **精準性提升**：限定知識來源減少幻覺
* **延展性強**：可替換為內部文件或多語內容
* **彈性調整**：可自訂檢索策略與 prompt 範本

---

## 🧩 延伸應用場景

* 企業內部知識庫搜尋系統
* 文件問答機器人（DevOps 文件、自助客服）
* API 文件智慧查詢


In [1]:
!pip install langchain_qdrant qdrant_client langchain-openai langchain_openai langchain_community unstructured markdown



In [2]:
# Test that your OpenAI API key is correctly set as an environment variable
# Note. if you run this notebook locally, you will need to reload your terminal and the notebook for the env variables to be live.
import os

# Note. alternatively you can set a temporary env variable like this:
#os.environ["OPENAI_API_KEY"] = ""

os.environ["AZURE_OPENAI_API_KEY"]=""
os.environ["AZURE_OPENAI_ENDPOINT"]=""
os.environ["OPENAI_API_VERSION"]="2024-12-01-preview"
os.environ["OPENAI_MODEL"]="gpt-4.1-mini"
os.environ["AZURE_OPENAI_DEPLOYMENT_NAME"]="gpt-4.1-mini"
#os.environ["OPENAI_MODEL"]="text-embedding-3-small"

if os.getenv("AZURE_OPENAI_API_KEY") is not None:
    print("AZURE_OPENAI_API_KEY is ready")
else:
    print("AZURE_OPENAI_API_KEY environment variable not found")

AZURE_OPENAI_API_KEY is ready


### 整合 Azure OpenAI 與 Qdrant，建立 Retrieval-Augmented Generation (RAG) 系統的基礎元件。

1. **語言模型初始化**：使用 `AzureChatOpenAI` 建立對話型語言模型，透過環境變數設定 Azure OpenAI 的 endpoint、deployment 名稱與 API 版本。

2. **Embedding 模型設定**：使用 `AzureOpenAIEmbeddings`，選擇 `text-embedding-3-large` 模型，負責將文字轉換成向量，供後續語意檢索使用。

3. **Qdrant 向量資料庫初始化**：

   * 建立 `QdrantClient`，連接本地或遠端 Qdrant。
   * 檢查是否已有名為 `k8s-official-doc` 的 collection，若無則以 3072 維向量與 cosine 距離建立。
   * 使用 `QdrantVectorStore` 將 Qdrant 包裝成可供 LangChain 使用的向量儲存與檢索介面。

整體流程為將文字資料轉成向量儲存在 Qdrant 中，並可搭配語言模型進行語意查詢與問答生成。


In [3]:
from langchain_openai import AzureOpenAIEmbeddings
from langchain_openai import AzureChatOpenAI

llm = AzureChatOpenAI(
    temperature="0.0",
    azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
    azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT_NAME"],
    openai_api_version=os.environ["OPENAI_API_VERSION"],
    #rate_limiter=InMemoryRateLimiter(
    #    requests_per_second=60,
    #    check_every_n_seconds=1,
    #)
)

openai_embeddings = AzureOpenAIEmbeddings(
    model='text-embedding-3-large',
    azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
    api_key=os.environ["AZURE_OPENAI_API_KEY"],
    openai_api_version=os.environ["OPENAI_API_VERSION"],
)

from langchain_qdrant import QdrantVectorStore
from qdrant_client import QdrantClient
from qdrant_client.models import Distance
from qdrant_client.models import VectorParams

# initialize the qdrant client
client = QdrantClient(
    host="qdrant",
    prefer_grpc=True,
)

collection_name = "k8s-official-doc"

if not client.collection_exists(collection_name):
    client.create_collection(
       collection_name=collection_name,
       vectors_config=VectorParams(
           size=3072,
           distance=Distance.COSINE
        ),
    )
# create the vector store
vectordb = QdrantVectorStore(
    client=client,
    collection_name=collection_name,
    embedding=openai_embeddings,
)

## 建立一個具「對話上下文記憶能力」的問答系統，使用 LangChain 結合向量檢索與 LLM 回答邏輯。

1. **Condense Prompt**：定義一段 system prompt，負責根據歷史對話與新問題，將可能依賴上下文的問題轉換為獨立可理解的查詢。

2. **QA Prompt**：建立一段 system prompt，規範模型回答格式（Question / Context / Answer），且明確限制僅根據 context 回答，使用繁體中文。

3. **向量檢索設定**：

   * 使用 `vectordb` 向量資料庫建立 retriever，採用 **MMR (Max Marginal Relevance)** 檢索策略，提升多樣性與相關性。
   * 檢索數量為 4 筆。

4. **組合為對話式 QA Chain**：

   * `create_history_aware_retriever`：結合 LLM 與 retriever，將對話歷史轉換為具檢索意義的查詢。
   * `create_stuff_documents_chain`：將檢索到的文件與問題輸入 LLM，生成回答。
   * `create_retrieval_chain`：整合檢索與回答兩段流程為一個可執行 Chain。

這使系統能根據對話上下文準確查找資訊，產出格式化的回答。


In [4]:
from langchain.chains import create_history_aware_retriever
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.prompts import ChatPromptTemplate

chat_history = []

condense_question_system_template = """
    給定一段對話歷史和最新的用戶問題，用戶問題可能參考對話歷史中的內容，
    重新構造一個獨立的問題，該問題可以在沒有對話歷史的情況下理解。
    不要直接回答問題，只有在需要時重新構造問題，否則原樣返回。
"""

condense_question_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", condense_question_system_template),
        ("placeholder", "{chat_history}"),
        ("human", "{input}"),
    ]
)

system_prompt = """
    你是一個問答任務的助手。根據 Context 的資料內容回答問題。
    不要回答 Context 以外的資訊
    如果 Context 的資料沒有相關內容，就基於 Context 的內容做解釋。
    使用繁體中文回答。
    回答的格式為
        Question：問題
        Context: Context
        Answer: 答案

    Context:
    {context}
"""

qa_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system_prompt),
        ("placeholder", "{chat_history}"),
        ("human", "{input}"),
    ]
)

vb_retriever = vectordb.as_retriever(
    #search_type="similarity",
    search_type="mmr",
    search_kwargs={
        "k": 4,
        #"score_threshold": 0.5,
    }
)

# Create a history-aware retriever from the OpenAI model and the retriever
convo_qa_chain = create_retrieval_chain(
    create_history_aware_retriever(
        llm,
        vb_retriever,
        condense_question_prompt
    ),
    create_stuff_documents_chain(
        llm,
        qa_prompt
    )
)

## 建立一個具備對話記憶功能的問答流程，用於向 LangChain QA Chain 發送查詢並取得回答。

1. **`generate_prompt(question)`**：依據是否有過往對話，產生不同的提示：

   * 若無歷史紀錄，僅回傳單一問題。
   * 若有歷史紀錄，取最近三輪 QA，整理為 context，並加入新問題，強化語境理解。

2. **`ask_question(question)`**：

   * 呼叫 `generate_prompt` 產生 prompt。
   * 使用 `convo_qa_chain.invoke()` 執行整合後的 QA Chain，傳入使用者輸入與對話歷史。
   * 回傳模型產出的回答內容（`result["answer"]`）。

強化 RAG 系統對上下文的理解與回應品質，並維持對話連貫性。


In [5]:
def generate_prompt(question):
    if not chat_history:
        prompt = f"""
        問題：{question}
        """
    else:
        print("Chat with previous history")
        context_entries = [f"Question: {q}\nAnswer: {a}" for q, a in chat_history[-3:]]
        context = "\n\n".join(context_entries)
        prompt = f"""
            使用最近對話提供的內容，以簡潔且具信息性的方式回答新問題。
            最近對話的內容：{context}
            新問題：{question}
            答案：
        """

    return prompt

def ask_question(question):
    prompt = generate_prompt(question)
    # Invoke the chain with the question and the chat history
    result = convo_qa_chain.invoke(
        {
            "input": prompt,
            "chat_history": chat_history
        }
    )

    # Return the bot's answer
    return result["answer"]

### 目前 RAG Qdrant 還沒有資料，送出測試問題做對照組

In [6]:
print(ask_question("如何擴展 Service IP 範圍?"))

Question：如何擴展 Service IP 範圍?  
Context: （無相關資料）  
Answer: 由於 Context 中沒有提供關於如何擴展 Service IP 範圍的相關資訊，無法直接回答此問題。建議參考相關系統或平台的官方文件，了解如何調整或擴展 Service IP 範圍的設定。


### 使用 LangChain 的 `DirectoryLoader` 從 Kubernetes 官方網站的本地 Markdown 文件載入資料，目的是建立向量化文件集作為 RAG 系統知識來源。

1. **文件來源**：目錄路徑為 `website/content/en/docs/tasks/network/`，對應 Kubernetes 文件中關於 Network Tasks 的部分。

2. **載入器設定**：

   * 使用 `DirectoryLoader` 搭配 `TextLoader`，讀取所有 `.md` 檔案。
   * 啟用 `recursive=True`，可遞迴載入子目錄內文件。
   * `glob="./**/*.md"` 過濾所有 Markdown 檔。

3. **文件儲存**：

   * 載入後的所有文件被存入 `documents` 清單中。
   * `documents[0]` 表示載入的第一筆文件，內容為一個 LangChain 的 `Document` 物件，內含原始文字與 metadata。

後續向量化與檢索系統建構的第一階段：**文件載入與準備**。


In [7]:
documents = []

from langchain_community.document_loaders import DirectoryLoader
from langchain_community.document_loaders import TextLoader
#from langchain_community.document_loaders import UnstructuredMarkdownLoader

# https://kubernetes.io/docs/tasks/extend-kubectl/kubectl-plugins/

loader = DirectoryLoader(
    "website/content/en/docs/tasks/network/",
    #"website/content/zh-cn/docs/tasks/network/", # 如果可以選，要用英文，還是簡體中文呢？
    glob="./**/*.md",
    show_progress=True,
    recursive=True,
    loader_cls=TextLoader,
    #loader_cls=UnstructuredMarkdownLoader,
    #loader_kwargs={"mode":"single"},
    #loader_kwargs={"mode":"elements"},
)

documents = loader.load()
documents[0]

100%|██████████| 5/5 [00:00<00:00, 1611.21it/s]


Document(metadata={'source': 'website/content/en/docs/tasks/network/extend-service-ip-ranges.md'}, page_content='---\nreviewers:\n- thockin\n- dwinship\nmin-kubernetes-server-version: v1.29\ntitle: Extend Service IP Ranges\ncontent_type: task\n---\n\n<!-- overview -->\n{{< feature-state feature_gate_name="MultiCIDRServiceAllocator" >}}\n\nThis document shares how to extend the existing Service IP range assigned to a cluster.\n\n\n## {{% heading "prerequisites" %}}\n\n{{< include "task-tutorial-prereqs.md" >}}\n\n{{< version-check >}}\n\n{{< note >}}\nWhile you can use this feature with an earlier version, the feature is only GA and officially supported since v1.33.\n{{< /note >}}\n\n<!-- steps -->\n\n## Extend Service IP Ranges\n\nKubernetes clusters with kube-apiservers that have enabled the `MultiCIDRServiceAllocator`\n[feature gate](/docs/reference/command-line-tools-reference/feature-gates/) and have the\n`networking.k8s.io/v1beta1` API group active, will create a ServiceCIDR objec

### 使用 `MarkdownHeaderTextSplitter` 將載入的 Markdown 文件依標題層級切分成多個語意段落，方便後續向量化與語意檢索。

1. **切分依據**：

   * 僅使用 `#`（H1）與 `##`（H2）當作分段依據。
   * 每個分段會包含該標題與其對應內容。

2. **切分流程**：

   * 對每個 `Document` 執行 `split_text`，回傳多個段落（section）。
   * 每個段落會保留原始文件的 `metadata`，並加入：

     * Header metadata（如 H1/H2 內容）
     * 分段編號（例如 `"split": "2/5"`）

3. **彙總結果**：

   * 所有切分後段落收集到 `all_sections` 清單中。
   * 印出總共切分了多少段落，便於檢查與後續處理。

這步驟能提升 RAG 系統的檢索粒度與回答精準度，避免將過多無關段落送入 LLM。


In [8]:
from langchain.text_splitter import MarkdownHeaderTextSplitter

headers_to_split_on = [
    ("#", "Header 1"), 
    ("##", "Header 2"), 
    #("###", "Header 3"), 
    #("####", "Header 4")
]

text_splitter = MarkdownHeaderTextSplitter(
    headers_to_split_on=headers_to_split_on,
    #strip_headers=False # keep headers in the text
)

all_sections = []

for doc in documents:
    sections = text_splitter.split_text(doc.page_content)

    for i in range(len(sections)):
        # keep metadata from the original document
        metadata = dict(doc.metadata)
        metadata.update(sections[i].metadata)
        metadata.update({"split": f"{i+1}/{len(sections)}"})
        sections[i].metadata = metadata
    #for section in sections:
    #    # keep metadata from the original document
    #    metadata = dict(doc.metadata)
    #    metadata.update(section.metadata)
    #    section.metadata = metadata

    all_sections.extend(sections)

print(f"=====Split {len(all_sections)} sections")
#print(f"=====First section: {all_sections[0]}")
#print(f"=====Last section: {all_sections[-1]}")

=====Split 18 sections


### 將先前切分好的 Markdown 文件段落上傳至 Qdrant 向量資料庫，並以穩定的唯一 ID 管理每筆向量資料。

1. **`add_document(document)` 函式**：

   * 根據每段的來源路徑 (`source`) 與分段編號 (`split`) 建立 SHA-256 雜湊，作為向量的唯一 ID。
   * 將該段內容透過 `vectordb.add_documents()` 新增至向量資料庫，避免重複寫入。

2. **逐段上傳**：

   * 遍歷 `all_sections` 中的每段文件，呼叫 `add_document()` 執行語意向量與儲存。
   * 每次上傳會印出進度資訊：包含來源檔名與分段編號。

3. **資料驗證**：

   * 最後使用 `client.get_collection()` 查詢向量資料庫的 collection 狀態，顯示目前向量數量（`points_count`）。

這步驟實現了文件段落的語意向量與唯一性管理，是構建穩定、可擴展的向量知識庫的關鍵。


In [9]:
import hashlib

def add_document(document):
    # hash the source (url) to get a unique id
    print('Embedding progress:' + document.metadata['source'] + document.metadata['split'])
    hash = hashlib.sha256(
        str(document.metadata['source'] + document.metadata['split']).encode('utf-8')
    ).hexdigest()[::2]
    
    vectordb.add_documents(
        documents=[document],
        #ids=[str(uuid4())]
        ids=[hash]
    )

for section in all_sections:
    add_document(section)

collection_info = client.get_collection(collection_name=collection_name)
collection_info.points_count

Embedding progress:website/content/en/docs/tasks/network/extend-service-ip-ranges.md1/5
Embedding progress:website/content/en/docs/tasks/network/extend-service-ip-ranges.md2/5
Embedding progress:website/content/en/docs/tasks/network/extend-service-ip-ranges.md3/5
Embedding progress:website/content/en/docs/tasks/network/extend-service-ip-ranges.md4/5
Embedding progress:website/content/en/docs/tasks/network/extend-service-ip-ranges.md5/5
Embedding progress:website/content/en/docs/tasks/network/validate-dual-stack.md1/4
Embedding progress:website/content/en/docs/tasks/network/validate-dual-stack.md2/4
Embedding progress:website/content/en/docs/tasks/network/validate-dual-stack.md3/4
Embedding progress:website/content/en/docs/tasks/network/validate-dual-stack.md4/4
Embedding progress:website/content/en/docs/tasks/network/customize-hosts-file-for-pods.md1/4
Embedding progress:website/content/en/docs/tasks/network/customize-hosts-file-for-pods.md2/4
Embedding progress:website/content/en/docs

18

### 使用 RAG Qdrant 送出測試問題

In [10]:
print(ask_question("如何擴展 Service IP 範圍?"))

Question：如何擴展 Service IP 範圍?  
Context:  
本文件分享如何擴展分配給 Kubernetes 叢集的 Service IP 範圍。  
透過設定 kube-controller-manager 的 `--service-cluster-ip-range` 參數，可以配置多個 IP 範圍，並利用 MultiCIDRServiceAllocator 功能擴展 Service IP 範圍。  
此外，建立 Service 時可透過 `.spec.ipFamilyPolicy` 和 `.spec.ipFamilies` 來指定使用的 IP 家族（IPv4、IPv6 或雙棧），以便從不同的 IP 範圍中分配 Cluster IP。  

Answer:  
要擴展 Service IP 範圍，可以在 kube-controller-manager 中設定多個 `--service-cluster-ip-range`，啟用 MultiCIDRServiceAllocator 功能，讓 Kubernetes 支援多個 IP 範圍。建立 Service 時，透過設定 `.spec.ipFamilyPolicy`（如 SingleStack、PreferDualStack）和 `.spec.ipFamilies`（指定 IPv4 或 IPv6）來從不同的 IP 範圍分配 Cluster IP，達到擴展 Service IP 範圍的目的。


In [11]:
print(ask_question("如何使用 kubectl 擴展 Service IP?"))

Question：如何使用 kubectl 擴展 Service IP?  
Context: 根據文件內容，擴展 Service IP 範圍涉及設定多個 `service-cluster-ip-range`，並透過設定 Service 的 `.spec.ipFamilyPolicy` 和 `.spec.ipFamilies` 來指定使用 IPv4、IPv6 或雙棧 IP。使用 kubectl 則是用來建立和查看 Service 的設定。  
Answer:  
使用 kubectl 擴展 Service IP 的步驟如下：

1. 在 kube-controller-manager 啟動時，透過 `--service-cluster-ip-range` 參數設定多個 IP 範圍（例如同時設定 IPv4 和 IPv6 範圍），以啟用多個 Service IP 範圍。

2. 使用 kubectl 建立 Service 時，可以在 Service 的 YAML 中設定 `.spec.ipFamilyPolicy` 和 `.spec.ipFamilies`，例如：

   - 若不設定 `.spec.ipFamilyPolicy`，預設會使用第一個設定的 IP 範圍並設定為 `SingleStack`。

   - 若設定 `.spec.ipFamilies` 為 `[IPv6]`，Service 會從 IPv6 範圍分配 IP。

   - 若設定 `.spec.ipFamilyPolicy` 為 `PreferDualStack`，Service 會同時分配 IPv4 和 IPv6 的 IP。

3. 使用 `kubectl apply -f <service.yaml>` 建立或更新 Service。

4. 使用 `kubectl get svc <service-name> -o yaml` 或 `kubectl describe svc <service-name>` 查看 Service 的 IP 分配情況。

總結來說，kubectl 本身是用來建立和管理 Service 的工具，擴展 Service IP 範圍主要是透過 kube-controller-manager 的設定和 Service YAML 中的 IP 家族設定來達成，kubectl 則用

In [12]:
print(ask_question("如何使用 kubectl 擴展 Service IP? 只要回答 kubectl 的完整指令就好"))

Question：如何使用 kubectl 擴展 Service IP? 只要回答 kubectl 的完整指令就好  
Context: Context  
Answer:  
```shell
kubectl run nginx --image nginx
kubectl get pods --output=wide
kubectl exec nginx -- cat /etc/hosts
kubectl get svc my-service -o yaml
kubectl describe svc -l app.kubernetes.io/name=MyApp
kubectl get svc -l app.kubernetes.io/name=MyApp
```
