In [1]:
import os
import json
import time
from tqdm import tqdm
from typing import List
from langchain.docstore.document import Document
from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings
from tenacity import retry, stop_after_attempt, wait_random_exponential
import tiktoken
from pydantic import SecretStr
from chromadb.config import Settings

RateLimitError = Exception

# ====== CONFIG ======
SOURCE_JSON = "parsed_course_data.json"
CHROMA_DIR = "persist/chroma_data"
OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]

EMBEDDING_MODEL_NAME = "text-embedding-ada-002"
MAX_TOKENS_PER_BATCH = 250000
BATCH_DELAY_SECONDS = 1.5
# =====================

# ✅ 初始化
# 方法 1：使用 SecretStr 轉換
embedding_model = OpenAIEmbeddings(api_key=SecretStr(OPENAI_API_KEY))
# 或者方法 2：使用環境變數（無需額外設置）
# embedding_model = OpenAIEmbeddings()
tokenizer = tiktoken.encoding_for_model(EMBEDDING_MODEL_NAME)

# ✅ 載入課程資料
with open(SOURCE_JSON, "r", encoding="utf-8") as f:
    raw_courses = json.load(f)


# ✅ 時間欄位轉 metadata（含字串與布林標記）
def build_time_metadata(time_slots):
    if isinstance(time_slots, list):
        time_str = ",".join(time_slots)
        time_flags = {f"ts_{ts}": True for ts in time_slots}
    else:
        time_str = ""
        time_flags = {}
    return {"time_slots": time_str, **time_flags}


# ✅ 建立 Document 列表
documents = []
for c in raw_courses:
    content = f"課程名稱：{c['title']}\n課程介紹：{c['description']}\n授課老師：{c.get('instructor','')}\n課程網址：{c.get('course_url','')}"
    metadata = {"course_id": c["course_id"], "title": c["title"], "instructor": c.get("instructor", ""), "course_url": c.get("course_url", ""), **build_time_metadata(c.get("time_slots", []))}
    documents.append(Document(page_content=content, metadata=metadata))


# ✅ 計算 token 數
def count_tokens(text: str) -> int:
    return len(tokenizer.encode(text))


# ✅ 按 token 數分批
def batch_by_token_limit(docs: List[Document], max_tokens: int):
    batch, total = [], 0
    for doc in docs:
        tokens = count_tokens(doc.page_content)
        if total + tokens > max_tokens and batch:
            yield batch
            batch, total = [], 0
        batch.append(doc)
        total += tokens
    if batch:
        yield batch


# ✅ 安全封裝的 add_texts，內建 retry
@retry(wait=wait_random_exponential(min=2, max=10), stop=stop_after_attempt(5))
def safe_add_texts(vectordb, texts, metadatas):
    vectordb.add_texts(texts=texts, metadatas=metadatas)


# ✅ 初始化 Chroma 向量庫，設置 persist_directory
vectordb = Chroma(persist_directory=CHROMA_DIR, embedding_function=embedding_model, client_settings=Settings(is_persistent=True))

# ✅ 分批嵌入與寫入
batches = list(batch_by_token_limit(documents, MAX_TOKENS_PER_BATCH))
for i, batch in enumerate(tqdm(batches, desc="Embedding batches")):
    texts = [d.page_content for d in batch]
    metadatas = [d.metadata for d in batch]
    safe_add_texts(vectordb, texts, metadatas)
    time.sleep(BATCH_DELAY_SECONDS)

# 資料會自動儲存，不需要呼叫 persist()
print(f"✅ 完成：總共處理 {len(documents)} 筆課程，結果已儲存於 {CHROMA_DIR}")

Embedding batches: 100%|██████████| 8/8 [01:18<00:00,  9.85s/it]

✅ 完成：總共處理 1300 筆課程，結果已儲存於 persist/chroma_data





In [2]:
import zipfile
import os


def zip_chroma_data(folder_path, output_zip="chroma_data.zip"):
    with zipfile.ZipFile(output_zip, "w", zipfile.ZIP_DEFLATED) as zipf:
        for root, _, files in os.walk(folder_path):
            for file in files:
                filepath = os.path.join(root, file)
                arcname = os.path.relpath(filepath, start=folder_path)
                zipf.write(filepath, arcname)


zip_chroma_data("persist/chroma_data")  # ⬅ 這裡改成你的 Chroma 資料夾路徑
print("✅ 已壓縮為 chroma_data.zip")

✅ 已壓縮為 chroma_data.zip


In [8]:
from azure.storage.blob import BlobServiceClient
import os
from dotenv import load_dotenv

load_dotenv()
account_name = os.getenv("AZURE_STORAGE_ACCOUNT_NAME")
account_key = os.getenv("AZURE_STORAGE_ACCOUNT_KEY")
container_name = os.getenv("AZURE_BLOB_CONTAINER")

connection_str = f"DefaultEndpointsProtocol=https;AccountName={account_name};AccountKey={account_key};EndpointSuffix=core.windows.net"
blob_service_client = BlobServiceClient.from_connection_string(connection_str)

try:
    container_client = blob_service_client.get_container_client(container_name)
    print("✅ 成功連線，列出 container 中的 blob:")
    for blob in container_client.list_blobs():
        print(" -", blob.name)
except Exception as e:
    print("❌ 連線失敗，請檢查 account name/key/container name")
    print("錯誤訊息：", e)

✅ 成功連線，列出 container 中的 blob:


In [9]:
from azure.storage.blob import BlobClient
from azure.storage.blob import ContentSettings

test_blob_name = "test-upload.txt"
test_blob_client = blob_service_client.get_blob_client(container=container_name, blob=test_blob_name)

with open("test-upload.txt", "w", encoding="utf-8") as f:
    f.write("這是一個測試檔案")

with open("test-upload.txt", "rb") as data:
    test_blob_client.upload_blob(data, overwrite=True, content_settings=ContentSettings(content_type="text/plain"))
    print("✅ 測試檔案已上傳")

✅ 測試檔案已上傳


In [4]:
import os
from dotenv import load_dotenv
from azure.storage.blob import BlobClient, ContentSettings
from tqdm import tqdm

# --------------- 🔧 設定常數 ----------------
LOCAL_FILE = "chroma_data.zip"
BLOB_NAME = "course_vector.zip"
CONTENT_TYPE = "application/zip"
MAX_SINGLE_PUT_SIZE = 16 * 1024 * 1024  # 16MB
MAX_BLOCK_SIZE = 4 * 1024 * 1024  # 4MB
TIMEOUT = 600
MAX_CONCURRENCY = 4

# --------------- 🔐 載入憑證 ----------------
load_dotenv()
ACCOUNT_NAME = os.getenv("AZURE_STORAGE_ACCOUNT_NAME")
ACCOUNT_KEY = os.getenv("AZURE_STORAGE_ACCOUNT_KEY")
CONTAINER_NAME = os.getenv("AZURE_BLOB_CONTAINER")

if not all([ACCOUNT_NAME, ACCOUNT_KEY, CONTAINER_NAME]):
    raise ValueError("❌ 請確認 .env 中帳號資訊是否齊全")

# --------------- 🔗 建立連線字串 ----------------
AZURE_CONN_STR = f"DefaultEndpointsProtocol=https;AccountName={ACCOUNT_NAME};AccountKey={ACCOUNT_KEY};EndpointSuffix=core.windows.net"


# --------------- 📦 準備上傳檔案 ----------------
class TqdmUploadWrapper:
    def __init__(self, file, total):
        self.file = file
        self.progress_bar = tqdm(total=total, unit="B", unit_scale=True, desc="📤 Uploading")

    def read(self, size):
        data = self.file.read(size)
        self.progress_bar.update(len(data))
        return data

    def __iter__(self):
        while True:
            chunk = self.read(1024 * 1024)  # Read in 1MB chunks
            if not chunk:
                break
            yield chunk

    def __getattr__(self, attr):
        return getattr(self.file, attr)


# --------------- ☁️ 建立 BlobClient 並上傳 ----------------
file_size = os.path.getsize(LOCAL_FILE)

blob_client = BlobClient.from_connection_string(conn_str=AZURE_CONN_STR, container_name=CONTAINER_NAME, blob_name=BLOB_NAME, max_single_put_size=MAX_SINGLE_PUT_SIZE, max_block_size=MAX_BLOCK_SIZE)

with open(LOCAL_FILE, "rb") as f:
    wrapped = TqdmUploadWrapper(f, total=file_size)
    blob_client.upload_blob(data=wrapped, blob_type="BlockBlob", overwrite=True, content_settings=ContentSettings(content_type=CONTENT_TYPE), max_concurrency=MAX_CONCURRENCY, timeout=TIMEOUT)
    wrapped.progress_bar.close()

print(f"\n✅ 上傳完成：{BLOB_NAME} 至 container {CONTAINER_NAME}")

📤 Uploading: 100%|██████████| 23.8M/23.8M [01:31<00:00, 261kB/s]


✅ 上傳完成：course_vector.zip 至 container course-data





In [6]:
import os
from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings
from pydantic import SecretStr

# 1. 設定 Chroma 資料夾路徑
CHROMA_LOCAL_DIR = os.path.join("persist", "chroma_data")

# 2. 檢查目錄結構
print("🔍 CHROMA_LOCAL_DIR 內容：")
for root, dirs, files in os.walk(CHROMA_LOCAL_DIR):
    level = root.replace(CHROMA_LOCAL_DIR, "").count(os.sep)
    indent = "  " * level
    print(f"{indent}{os.path.basename(root)}/")
    for f in files:
        print(f"{indent}  └─ {f}")

# 3. 初始化 embedding 函式（用同一套機制即可）
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "")
if not OPENAI_API_KEY:
    raise ValueError("請先設定環境變數 OPENAI_API_KEY")

embedding = OpenAIEmbeddings(api_key=SecretStr(OPENAI_API_KEY))

# 4. 嘗試讀取 Chroma 資料庫
try:
    vectordb = Chroma(persist_directory=CHROMA_LOCAL_DIR, embedding_function=embedding)
    # 讀取所有文件列表（metadata + 內容），用來檢查是否載入成功
    all_data = vectordb.get()
    documents = all_data.get("documents", [])
    metadatas = all_data.get("metadatas", [])

    print(f"\n✅ 總共在 Chroma 找到 {len(documents)} 筆文件")
    # 印出前 5 筆 metadata 來檢查
    for idx, meta in enumerate(metadatas[:5], start=1):
        print(f"  {idx}. metadata: {meta}")

    # 5. 若要進一步測試相似度檢索，可以呼叫 similarity_search：
    sample_query = "想選修跟 Python 資料分析有關的課程"
    hits = vectordb.similarity_search(sample_query, k=3)
    print(f"\n🔍 對 query '{sample_query}' 做相似度檢索，前 3 筆結果：")
    for i, hit in enumerate(hits, start=1):
        print(f"  [{i}] 課程名稱：{hit.metadata.get('title')}, 時段：{hit.metadata.get('time_slots')}")

except Exception as e:
    print("❌ 讀取 Chroma 資料庫失敗：", e)

🔍 CHROMA_LOCAL_DIR 內容：
chroma_data/
  └─ chroma.sqlite3
  8f32e79c-8252-4231-8783-bf4c51b313b8/
    └─ data_level0.bin
    └─ header.bin
    └─ index_metadata.pickle
    └─ length.bin
    └─ link_lists.bin

✅ 總共在 Chroma 找到 1300 筆文件
  1. metadata: {'time_slots': '1_3,1_4', 'title': '(大)法官如何思考？司法行為與司法政治上', 'ts_1_3': True, 'course_url': 'https://nol.ntu.edu.tw/nol/coursesearch/print_table.php?course_id=341%20U9340&class=&dpt_code=3410&ser_no=43630&semester=113-1&lang=CH', 'ts_1_4': True, 'course_id': 43630, 'instructor': '林建志'}
  2. metadata: {'ts_1_8': True, 'time_slots': '1_8,1_9,5_1702', 'course_id': 62222, 'instructor': '黃銘傑', 'ts_5_1702': True, 'course_url': 'https://nol.ntu.edu.tw/nol/coursesearch/print_table.php?course_id=A21%20M2490&class=&dpt_code=A410&ser_no=62222&semester=113-1&lang=CH', 'ts_1_9': True, 'title': '人工智慧、大數據與競爭法一'}
  3. metadata: {'title': '人口統計學', 'course_id': 71504, 'instructor': '蘇士詠', 'time_slots': '3_6,3_7,3_8', 'course_url': 'https://nol.ntu.edu.tw/nol/cours