【版本】

v15 系列 - Colab 穩定版啟動器

【重要提示】

1.  **首次執行準備**：
    *   請務必先在 Colab 的「程式碼片段」中搜尋 `drive`，插入「掛接 Google 雲端硬碟」的程式碼儲存格，並執行一次以完成授權。這是讀取雲端硬碟中設定檔和 ZIP 檔案的必要步驟。
    *   **建議之後將此儲存格移至筆記本頂端並保留**，方便每次重新連線 Colab 時快速重新掛載雲端硬碟。
2.  **設定檔路徑**：
    *   系統預期 `project_config.json` 檔案位於您的 Google 雲端硬碟根目錄下的 `Config/DataPipeline` 資料夾中。請確保檔案存在於正確路徑： `My Drive/Config/DataPipeline/project_config.json`。
3.  **ZIP 檔案位置**：
    *   根據 `project_config.json` 中的設定，ZIP 檔案預期位於 Google 雲端硬碟的特定路徑（通常是 `My Drive/DataPipeline_Project_File/Input`）。請確保您的 ZIP 檔案已上傳至對應位置。
4.  **輸出與日誌**：
    *   執行完成後，所有輸出（資料庫檔案、日誌、未處理成功的檔案等）將儲存在 `project_config.json` 中定義的 `project_folder_name` 所對應的雲端硬碟路徑下（例如 `My Drive/DataPipeline_Project_File/Workspace`）。
5.  **Colab 環境限制**：
    *   Colab 提供的是臨時環境，檔案系統在執行階段結束後可能會被清除（雲端硬碟掛載的檔案除外）。長時間執行的任務請注意 Colab 的閒置斷線機制。
    *   如果遇到「無法定位套件」或 `ModuleNotFoundError`，請取消註解並執行第一個程式碼儲存格中的套件安裝指令。
6.  **手動調整參數**：
    *   如果不想使用 `project_config.json`，或者需要臨時調整參數，可以直接修改第二個程式碼儲存格中 `PipelineOrchestrator` 的初始化參數。

In [None]:
# @title 安裝必要的套件 (如果尚未安裝)
# !pip install duckdb pandas pyarrow psutil

In [None]:
# @title 主要執行區塊：載入設定、初始化並執行管線
import json
import os
from google.colab import drive

# --- 掛載 Google Drive ---
try:
    drive.mount('/content/drive')
    print("Google Drive 已成功掛載。")
    google_drive_mount_point = "/content/drive/My Drive"
except Exception as e:
    print(f"Google Drive 掛載失敗: {e}")
    google_drive_mount_point = None # 若掛載失敗，則設定為 None

# --- 設定檔路徑 ---
# 預設 Google Drive 中的設定檔路徑
CONFIG_FILE_PATH_DRIVE = os.path.join(google_drive_mount_point, "Config/DataPipeline/project_config.json") if google_drive_mount_point else None

# --- Colab 本地備用設定檔路徑 (如果 Drive 中找不到) ---
# 為了方便在沒有 Drive 掛載或 Drive 中沒有設定檔時也能基本運行，可以提供一個 Colab 本地的預設路徑
# 例如，使用者可以手動上傳 project_config.json 到 Colab 的 /content/ 目錄下
CONFIG_FILE_PATH_LOCAL_FALLBACK = "/content/project_config.json"

# --- 決定最終使用的設定檔路徑 ---
final_config_path = None
if CONFIG_FILE_PATH_DRIVE and os.path.exists(CONFIG_FILE_PATH_DRIVE):
    final_config_path = CONFIG_FILE_PATH_DRIVE
    print(f"將使用 Google Drive 中的設定檔: {final_config_path}")
elif os.path.exists(CONFIG_FILE_PATH_LOCAL_FALLBACK):
    final_config_path = CONFIG_FILE_PATH_LOCAL_FALLBACK
    print(f"將使用 Colab 本地備用設定檔: {final_config_path}")
else:
    print(f"錯誤：在 Google Drive ({CONFIG_FILE_PATH_DRIVE}) 或本地備用路徑 ({CONFIG_FILE_PATH_LOCAL_FALLBACK}) 中均未找到 project_config.json。")
    print("請檢查檔案路徑，或在下方直接填寫參數。")

# --- 讀取設定檔 --- (如果找到設定檔)
config = {}
if final_config_path:
    try:
        with open(final_config_path, 'r', encoding='utf-8') as f:
            config = json.load(f)
        print(f"成功從 {final_config_path} 載入設定。")
    except Exception as e:
        print(f"從 {final_config_path} 讀取設定檔失敗: {e}")
        config = {} # 載入失敗則使用空設定
else:
    print("未找到設定檔，將使用下方直接定義的預設參數或手動填寫的參數。")

# --- 從 src 目錄導入 PipelineOrchestrator ---
try:
    # 假設此 notebook 位於 data_pipeline_v15/run_stable.ipynb
    # 則 src 目錄相對於此 notebook 的路徑是 ../src/
    # 為了讓 Python 直譯器能找到 src 下的模組，需要將 src 的父目錄 (即 data_pipeline_v15) 加入 sys.path
    import sys
    # current_notebook_dir = os.getcwd() # Colab 中 .ipynb 檔案的當前目錄通常是 /content
    # project_root = os.path.dirname(current_notebook_dir) # 這會是 / (根目錄)，不正確
    # 正確的做法是，如果我們知道 .ipynb 在 data_pipeline_v15 下，而套件在 data_pipeline_v15/src 下
    # 我們需要 data_pipeline_v15/src 這個路徑或者其父目錄 data_pipeline_v15 在 sys.path 中

    # 方案 A: 如果 Colab 工作目錄是 /content，且使用者已將整個 data_pipeline_v15 專案複製到 /content/data_pipeline_v15
    # 或者透過 Drive 掛載，專案位於 /content/drive/My Drive/path_to_project/data_pipeline_v15
    # 我們需要確定 src 相對於 notebook 的位置。此 notebook 設計為放在 data_pipeline_v15 目錄下。
    # 因此，src 目錄與此 notebook 在同一層級下的 src/

    # 首先嘗試添加包含 src 的目錄 (即 notebook 所在的 data_pipeline_v15 目錄)
    # Colab 中，os.getcwd() 通常是 /content。如果 .ipynb 檔案在 /content/data_pipeline_v15/run_stable.ipynb,
    # 則需要添加 /content/data_pipeline_v15 到 sys.path，以便導入 src.data_pipeline_v15
    # 或者更簡單，如果結構是 data_pipeline_v15/src/data_pipeline_v15 和 data_pipeline_v15/run_stable.ipynb
    # 且 Colab 的 CWD 是 data_pipeline_v15，則可以直接 from src.data_pipeline_v15 import ...

    # 假設使用者將 `data_pipeline_v15` 專案的根目錄 (包含 `src` 和 `run_stable.ipynb` 的目錄)
    # 同步到了 Google Drive 的 `Colab Notebooks/data_pipeline_v15`
    # 或者直接上傳/複製到了 Colab 的 `/content/data_pipeline_v15`
    # 我們需要將這個 `data_pipeline_v15` 目錄加入到 Python 的搜尋路徑中

    notebook_dir_name = "data_pipeline_v15" # 專案根目錄的名稱
    potential_project_root_drive = os.path.join(google_drive_mount_point, "Colab Notebooks", notebook_dir_name) if google_drive_mount_point else None
    potential_project_root_local = os.path.join("/content", notebook_dir_name)

    project_root_to_add = None
    if potential_project_root_drive and os.path.isdir(os.path.join(potential_project_root_drive, "src")):
        project_root_to_add = potential_project_root_drive
        print(f"找到專案 src 目錄於 Google Drive: {os.path.join(project_root_to_add, 'src')}")
    elif os.path.isdir(os.path.join(potential_project_root_local, "src")):
        project_root_to_add = potential_project_root_local
        print(f"找到專案 src 目錄於 Colab 本地: {os.path.join(project_root_to_add, 'src')}")
    else:
        # 如果專案根目錄不是 notebook_dir_name，例如使用者直接在 /content 或 Drive 根目錄下執行
        # 並且 src 也在那裡，可以嘗試將當前目錄 (如果 .ipynb 在專案根目錄下運行)
        # 或者 Drive 根目錄 (如果 src 在 Drive 根目錄的 src 資料夾)
        # 但最可靠的是要求使用者將整個專案資料夾 (如 data_pipeline_v15) 放到一個已知位置
        # 作為最後的嘗試，檢查當前 notebook 執行目錄是否直接包含 src
        current_notebook_parent_dir = os.getcwd() # 通常是 /content
        if os.path.isdir(os.path.join(current_notebook_parent_dir, "src")):
             project_root_to_add = current_notebook_parent_dir
             print(f"在 Notebook 當前目錄找到 src: {os.path.join(project_root_to_add, 'src')}")
        elif google_drive_mount_point and os.path.isdir(os.path.join(google_drive_mount_point, "src")):
             project_root_to_add = google_drive_mount_point # 例如 My Drive/src
             print(f"在 Google Drive 根目錄找到 src: {os.path.join(project_root_to_add, 'src')}")
        else:
            print("警告：無法自動定位 'src' 目錄。請確保 'src' 目錄與此筆記本位於正確的相對路徑下，")
            print(f"  或位於 {potential_project_root_drive} 或 {potential_project_root_local}。")
            print("  如果導入失敗，請手動調整 sys.path 或確保專案結構正確。")

    if project_root_to_add and project_root_to_add not in sys.path:
        sys.path.insert(0, project_root_to_add)
        print(f"已將 '{project_root_to_add}' 加入到 sys.path。")

    # 現在嘗試導入
    from src.data_pipeline_v15.pipeline_orchestrator import PipelineOrchestrator
    print("PipelineOrchestrator 導入成功。")

except ModuleNotFoundError as e:
    print(f"導入 PipelineOrchestrator 失敗: {e}")
    print("請檢查：")
    print("1. 'src' 資料夾是否與此 Colab Notebook 在同一專案目錄下 (例如 'data_pipeline_v15/src')。")
    print("2. 如果您將專案放在 Google Drive 中，是否已正確掛載 Drive 且路徑設定無誤。")
    print("3. 'src' 資料夾內是否包含 'data_pipeline_v15' 子資料夾，且其中有 'pipeline_orchestrator.py'。")
    print("4. 如果 Colab 環境中缺少必要的 sys.path 設定，請嘗試手動添加專案根目錄到 sys.path。例如：")
    print("   # import sys")
    print("   # sys.path.append('/content/drive/My Drive/path_to_your_project_root') # 替換成您的實際路徑")
    PipelineOrchestrator = None # 設為 None 以避免後續 NameError
except ImportError as e:
    print(f"導入 PipelineOrchestrator 時發生 ImportError: {e}")
    PipelineOrchestrator = None
except Exception as e:
    print(f"導入時發生未預期錯誤: {e}")
    PipelineOrchestrator = None


# --- 初始化 PipelineOrchestrator ---
if PipelineOrchestrator:
    try:
        # 參數優先級：直接在下方修改 > project_config.json > 預設值
        # 您可以在此處直接覆寫來自 config 檔案的參數，或提供缺失的參數
        # Colab 環境中，project_folder_name 通常會對應到 Google Drive 上的路徑
        # 例如: os.path.join(google_drive_mount_point, "DataPipeline_Project_File/Workspace")

        # 處理 project_folder_name 和 zip_files 的路徑，確保它們是 Drive 上的絕對路徑
        # config 中的路徑通常是相對於 Drive 根目錄的，例如 "DataPipeline_Project_File/Workspace"
        project_folder_name_drive = None
        if google_drive_mount_point and config.get("project_folder_name"):
            project_folder_name_drive = os.path.join(google_drive_mount_point, config.get("project_folder_name"))
            print(f"偵測到 project_folder_name，將轉換為 Drive 絕對路徑: {project_folder_name_drive}")
        else:
            # 如果 Drive 未掛載或 config 中沒有，則提供一個 Colab 本地的預設值
            project_folder_name_drive = "/content/data_pipeline_output" # Colab 本地臨時工作區
            print(f"警告: 未能從設定檔或 Drive 設定 project_folder_name，將使用 Colab 本地預設路徑: {project_folder_name_drive}")
            print("     執行結束後，此路徑下的檔案可能會被清除。建議設定 Google Drive 路徑。")

        # 處理 zip_files, 假設它是相對於 Drive 下 Input 資料夾的檔案名列表 (逗號分隔)
        # 或者是一個相對於 Drive 的完整路徑下的檔案 (如果包含斜線)
        # 這裡的 zip_files 參數預期是檔案名稱列表（無路徑），orchestrator 會在 input_dir 中尋找它們
        # input_dir 是 project_folder_name 下的 Input/
        # 所以這裡的 zip_files 參數可以直接從 config 讀取，不需要轉換成絕對路徑
        zip_files_list_str = config.get("zip_files_list_str", "") # 例如 "file1.zip,file2.zip"

        # --- 在此處手動調整參數 (如果需要) ---
        # 這些值將覆寫從 project_config.json 載入的值
        manual_params = {
            "project_folder_name": project_folder_name_drive, # 必要，工作區的根目錄 (雲端硬碟或本地)
            "database_name": config.get("database_name", "mydatabase.duckdb"),
            "log_name": config.get("log_name", "pipeline_run.log"),
            "zip_files": zip_files_list_str, # 要處理的 ZIP 檔案名稱，逗號分隔，留空表示處理 Input/zip/ 下所有 ZIP
            "run_mode": config.get("run_mode", "NORMAL"), # NORMAL 或 BACKFILL
            "conflict_strategy": config.get("conflict_strategy", "REPLACE"), # REPLACE 或 SKIP
            "schemas_config_path": config.get("schemas_config_path", "config/schemas.json"), # 相對於套件根目錄(src)
            "duckdb_memory_limit_gb": config.get("duckdb_memory_limit_gb", 4),
            "duckdb_threads": config.get("duckdb_threads", -1), # -1 自動偵測 CPU 核心數
            "micro_batch_size": config.get("micro_batch_size", 10),
            "hardware_monitor_interval": config.get("hardware_monitor_interval", 2),
            "recreate_workspace_on_run": config.get("recreate_workspace_on_run", True),
            "cleanup_workspace_on_finish": config.get("cleanup_workspace_on_finish", False)
        }
        # --- 手動調整結束 ---

        print("\n--- PipelineOrchestrator 初始化參數 ---")
        for key, value in manual_params.items():
            print(f"  {key}: {value}")
        print("---------------------------------------\n")

        # 確保 schemas_config_path 是相對於專案根目錄 (包含 src 的目錄)
        # PipelineOrchestrator 內部會將其視為相對於套件根目錄的路徑
        # 但如果 project_config.json 是從 Drive 載入的，它可能包含的是相對於 Drive 的路徑
        # PipelineOrchestrator 的 __init__ 中 _load_schemas 會處理：
        # package_root = Path(__file__).parent.parent.parent
        # abs_schemas_path = package_root / self.schemas_config_path
        # 這意味著 schemas_config_path 應該是相對於 data_pipeline_v15/src 的路徑
        # 例如，如果 schemas.json 在 data_pipeline_v15/config/schemas.json
        # 則 schemas_config_path 應該是 "../config/schemas.json" (相對於 src/data_pipeline_v15)
        # 或者，如果 orchestrator 的 _load_schemas 是基於 project_root (包含 src 的目錄)
        # 則 schemas_config_path 可以是 "config/schemas.json"
        # 目前的 Orchestrator 設計 (v15) 是相對於套件的父目錄 (即 src/data_pipeline_v15 的父目錄 data_pipeline_v15)
        # 所以 "config/schemas.json" 是正確的，如果 config 資料夾在 data_pipeline_v15/config

        # 檢查 schemas_config_path 是否需要基於 project_root_to_add 調整
        # Orchestrator 的 _load_schemas 使用 Path(__file__).parent.parent.parent / self.schemas_config_path
        # __file__ 是 pipeline_orchestrator.py 的路徑
        # parent.parent.parent 會是 project_root_to_add (如果 sys.path 設定正確且導入來源正確)
        # 因此，schemas_config_path "config/schemas.json" 應該能被正確解析為
        # project_root_to_add / "config/schemas.json"
        schemas_path_to_check = os.path.join(project_root_to_add if project_root_to_add else ".", manual_params["schemas_config_path"])
        if not os.path.exists(schemas_path_to_check):
            print(f"警告: schemas_config_path 指向的檔案 '{schemas_path_to_check}' 可能不存在。")
            print(f"       (使用的 project_root_to_add: {project_root_to_add})")
            print(f"       (使用的 schemas_config_path: {manual_params['schemas_config_path']})")
            print("       請確保 schemas.json 檔案位於相對於專案根目錄的正確位置 (例如: data_pipeline_v15/config/schemas.json)。")
        else:
            print(f"確認 schemas.json 預期路徑: {schemas_path_to_check}")

        orchestrator = PipelineOrchestrator(**manual_params)
        print("PipelineOrchestrator 初始化成功。")

        # --- 執行管線 ---
        print("\n--- 開始執行管線 ---")
        orchestrator.run()
        print("--- 管線執行完畢 ---")

    except TypeError as te:
        print(f"初始化 PipelineOrchestrator 失敗，可能是參數不符: {te}")
        print("請檢查 manual_params 中的參數是否與 PipelineOrchestrator 的 __init__ 方法匹配。")
    except FileNotFoundError as fnfe:
        print(f"執行管線時發生 FileNotFoundError: {fnfe}")
        print("請檢查設定檔中所有的路徑是否正確，特別是 project_folder_name 以及 schemas_config_path。")
    except Exception as e:
        import traceback
        print(f"執行管線時發生未預期錯誤: {e}")
        print("詳細錯誤追蹤：")
        traceback.print_exc()
else:
    print("PipelineOrchestrator 未能成功導入，無法執行管線。請檢查導入錯誤訊息。")

# --- 清理 (可選) ---
# if google_drive_mount_point:
# try:
# drive.flush_and_unmount()
# print("Google Drive 已卸載。")
# except Exception as e:
# print(f"卸載 Google Drive 時發生錯誤: {e}")
print("\n筆記本執行完畢。請檢查輸出日誌和檔案。")
