<a href="https://colab.research.google.com/github/alayuala/114-1PL.repo/blob/main/WEEK7_%E6%96%87%E5%AD%97%E8%B3%87%E6%96%99%E5%B0%8F%E5%88%86%E6%9E%90.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **文字資料小分析（報告草稿/問卷開放題）（作業四）**  
* 目標：從 Sheet 讀開放式回答 → 做詞數與關鍵字計數 → 輸出前 N 熱詞 → 回寫統計表。  
* AI 點子：請模型產出 5 句洞察摘要 + 一段 120 字結論。  


In [131]:
# 安裝必要的 Python 套件
# 這邊列出了我們這個專案會用到的所有套件
# gspread: 用來操作 Google Sheets
# gspread_dataframe: 方便 Pandas DataFrame 與 Google Sheets 之間的轉換
# google-auth, google-auth-oauthlib, google-auth-httplib2: Google 身份驗證相關
# gradio: 建立一個互動式的網頁介面
# pandas: 強大的資料處理和分析工具
# beautifulsoup4: 用於解析 HTML 網頁內容，方便抓取資料
# google-generativeai: 呼叫 Google 的生成式 AI 模型 (Gemini)
# python-dateutil: 處理日期和時間，特別是時區問題
!pip -q install gspread gspread_dataframe google-auth google-auth-oauthlib google-auth-httplib2 \
               gradio pandas beautifulsoup4 google-generativeai python-dateutil

請保留下面這個儲存格 (設定您的 Google Sheet 連結、工作表名稱和時區)。

In [132]:
# 設定您的 Google Sheet 連結、工作表名稱和時區
# 請將 SHEET_URL 替換成您自己的 Google Sheet 連結
SHEET_URL = "https://docs.google.com/spreadsheets/d/163AOmd5HnH9sxzYJo4BwZeE87UFr3k5XxuggcRNsZCs/edit?usp=sharing" # <-- 請確認這是您的 Google Sheet 網址
INPUT_WORKSHEET_NAME = "工作表1" # <-- 開放式回答資料所在的工作表名稱
OUTPUT_WORKSHEET_NAME = "分析結果" # <-- 文本分析結果將寫入的工作表名稱 (我會自動建立)
TIMEZONE = "Asia/Taipei" # 設定時區

In [133]:
# 確保指定的工作表 (worksheet) 在試算表中存在，如果不存在則創建一個新的
def ensure_worksheet(sh, title, header):
    print(f"嘗試確保工作表 '{title}' 存在...")
    try:
        ws = sh.worksheet(title) # 嘗試開啟現有的工作表
        print(f"工作表 '{title}' 已存在。")
    except gspread.WorksheetNotFound:
        print(f"工作表 '{title}' 不存在，嘗試創建...")
        try:
            # 如果找不到，則新增一個工作表，並設定初始的行列數
            ws = sh.add_worksheet(title=title, rows="1000", cols=str(len(header)+5))
            print(f"工作表 '{title}' 創建成功。")
            ws.update([header]) # 新增時寫入表頭
            print(f"已為工作表 '{title}' 寫入表頭。")
        except Exception as e:
            print(f"⚠️ 創建工作表 '{title}' 失敗: {e}")
            raise # 創建失敗時重新拋出異常
    # 再次檢查：若沒有表頭（工作表是空的或第一行不是預期的表頭），則清空並寫上表頭
    try:
        data = ws.get_all_values()
        if not data or (data and data[0] != header):
            print(f"工作表 '{title}' 沒有表頭或表頭不符，清空並重新寫入表頭。")
            ws.clear() # 清空工作表內容
            ws.update([header]) # 重新寫入表頭
            print(f"已為工作表 '{title}' 重新寫入表頭。")
    except Exception as e:
        print(f"⚠️ 檢查或更新工作表 '{title}' 表頭失敗: {e}")
        # 這裡選擇不中斷程式，但記錄錯誤

    print(f"確保工作表 '{title}' 過程結束。")
    return ws

# 讀取 DataFrame 的輔助函式
def read_df(ws, header):
    """從 Google Sheet 工作表讀取資料到 DataFrame，並確保欄位和型態正確"""
    df = get_as_dataframe(ws, evaluate_formulas=True, header=0)
    if df is None or df.empty:
        sheet_header = ws.get_all_values('A1:1')
        cols = sheet_header[0] if sheet_header and sheet_header[0] else header
        return pd.DataFrame(columns=cols)
    df = df.fillna("")
    for c in header:
        if c not in df.columns:
            df[c] = ""
    return df

# 將 DataFrame 寫入 Google Sheet 的輔助函式
def write_df(ws, df, header):
    """將 DataFrame 內容寫入 Google Sheet 工作表 (全量覆蓋)"""
    if df.empty:
        ws.clear()
        ws.update([header])
        return
    df_out = df.copy()
    for c in df_out.columns:
        df_out[c] = df_out[c].astype(str)
    ws.clear()
    ws.update([header] + df_out[header].values.tolist())

# 需要 tznow 函式來處理時間戳記
def tznow():
    from datetime import datetime as dt
    from dateutil.tz import gettz
    # 確保 TIMEZONE 變數已定義
    try:
        TIMEZONE
    except NameError:
        # 如果 TIMEZONE 未定義，給出警告並使用預設值
        print("⚠️ 時區變數 TIMEZONE 未定義，請執行設定儲存格。這裡將使用預設值 Asia/Taipei。")
        TIMEZONE_DEFAULT = "Asia/Taipei"
        return dt.now(gettz(TIMEZONE_DEFAULT))
    # 如果 TIMEZONE 已定義，則使用它
    return dt.now(gettz(TIMEZONE))

# 確保 PTT 文章資料、輸入資料和分析結果的工作表存在並有正確的表頭
# 使用前面定義的 ensure_worksheet 函式來處理
print("確保 PTT 文章工作表...")
ws_ptt_posts = ensure_worksheet(sh, "ptt_movie_posts", PTT_HEADER) # PTT 文章工作表
print("確保 輸入資料工作表...")
ws_input = ensure_worksheet(sh, INPUT_WORKSHEET_NAME, INPUT_HEADER) # 輸入資料工作表
print("確保 分析結果工作表...")
ws_analysis = ensure_worksheet(sh, OUTPUT_WORKSHEET_NAME, ANALYSIS_HEADER) # 分析結果工作表

print("所有必要工作表確保完成。")

確保 PTT 文章工作表...
嘗試確保工作表 'ptt_movie_posts' 存在...
工作表 'ptt_movie_posts' 不存在，嘗試創建...
工作表 'ptt_movie_posts' 創建成功。
已為工作表 'ptt_movie_posts' 寫入表頭。
確保工作表 'ptt_movie_posts' 過程結束。
確保 輸入資料工作表...
嘗試確保工作表 '工作表1' 存在...
工作表 '工作表1' 已存在。
工作表 '工作表1' 沒有表頭或表頭不符，清空並重新寫入表頭。
已為工作表 '工作表1' 重新寫入表頭。
確保工作表 '工作表1' 過程結束。
確保 分析結果工作表...
嘗試確保工作表 '分析結果' 存在...
工作表 '分析結果' 不存在，嘗試創建...
工作表 '分析結果' 創建成功。
已為工作表 '分析結果' 寫入表頭。
確保工作表 '分析結果' 過程結束。
所有必要工作表確保完成。


請保留下面這個儲存格 (定義資料表的欄位名稱和初始化 DataFrame)。

In [134]:
# 定義不同資料表的欄位名稱 (Header)
# PTT 文章資料的欄位
PTT_HEADER = [
    "post_id", "title", "url", "date", "author", "nrec", "created_at",
    "fetched_at", "content"
]
# 輸入資料的欄位 (您需要根據您的 Sheet 調整)
INPUT_HEADER = ["Timestamp", "開放式回答"] # <-- 請根據您的 Sheet 輸入工作表標頭調整這裡的欄位名稱
# 文本分析結果的欄位
ANALYSIS_HEADER = ["term", "freq", "df_count", "tfidf_mean", "examples"]

# 初始化空的 DataFrame，後續由 Gradio 讀取或爬取後填充
ptt_posts_df = pd.DataFrame(columns=PTT_HEADER)
input_responses_df = pd.DataFrame(columns=INPUT_HEADER)
terms_df = pd.DataFrame(columns=ANALYSIS_HEADER)

請保留下面這個儲存格 (進行 Google 身份驗證)。

In [135]:
# 進行 Google 身份驗證
# 這會在 Colab 環境中跳出一個視窗，引導您完成授權流程
# 授權後，程式碼才能存取您的 Google Drive 和 Google Sheets
from google.colab import auth
auth.authenticate_user()

# 使用您的 Google 憑證來授權 gspread
import gspread
from google.auth import default
creds, _ = default()

# 初始化 gspread 客戶端，之後就可以用 gc 來操作您的 Google Sheets
gc = gspread.authorize(creds)

請保留下面這個儲存格 (確保 Google Sheet 試算表存在)。

In [136]:
# 確保指定的 Google Sheet 存在，如果不存在則創建一個新的
def ensure_spreadsheet(url):
    try:
        # Try to open the spreadsheet using the URL
        sh = gc.open_by_url(url)
        print(f"✅ Successfully opened spreadsheet from URL: {url}")
    except gspread.SpreadsheetNotFound:
        print(f"⚠️ Spreadsheet not found at URL: {url}. Attempting to create a new one (this might not be the intended behavior if you expected to use an existing sheet).")
        # If not found by URL, try creating one. Note: creating by URL is not standard gspread.
        # A more typical approach is to create by name or rely on an existing sheet.
        # For this scenario, let's assume the user wants to use the sheet at the URL.
        # If it's not found, there might be an issue with the URL or permissions.
        # We'll re-raise the error or return None to indicate failure.
        raise # Re-raise the error to inform the user the sheet wasn't found

    except Exception as e:
        print(f"⚠️ Failed to open spreadsheet from URL {url}: {e}")
        raise # Re-raise any other exceptions

    return sh

# Use the provided SHEET_URL to ensure the spreadsheet exists and is opened
try:
    sh = ensure_spreadsheet(SHEET_URL)
    print(f"✅ Google Sheet '{SHEET_URL}' 確保完成，準備檢查工作表。")
except Exception as e:
    print(f"❌ 無法開啟或建立 Google Sheet: {e}")
    # Depending on the severity, you might want to exit or disable parts of the app
    sh = None # Set sh to None to prevent further errors if spreadsheet opening failed

✅ Successfully opened spreadsheet from URL: https://docs.google.com/spreadsheets/d/163AOmd5HnH9sxzYJo4BwZeE87UFr3k5XxuggcRNsZCs/edit?usp=sharing
✅ Google Sheet 'https://docs.google.com/spreadsheets/d/163AOmd5HnH9sxzYJo4BwZeE87UFr3k5XxuggcRNsZCs/edit?usp=sharing' 確保完成，準備檢查工作表。


請保留下面這個儲存格 (確保 Google Sheet 工作表存在並定義輔助函式 read_df, write_df, tznow)。

In [137]:
# 確保指定的工作表 (worksheet) 在試算表中存在，並檢查/補上表頭
def ensure_worksheet(sh, title, header):
    if sh is None:
        print(f"⚠️ 試算表物件為 None，無法確保工作表 '{title}'。請檢查試算表開啟步驟。")
        return None

    print(f"嘗試確保工作表 '{title}' 存在於試算表 '{sh.title}'...")
    try:
        ws = sh.worksheet(title) # 嘗試開啟現有的工作表
        print(f"工作表 '{title}' 已存在。")
    except gspread.WorksheetNotFound:
        print(f"工作表 '{title}' 不存在於試算表 '{sh.title}'，嘗試創建...")
        try:
            # 如果找不到，則新增一個工作表，並設定初始的行列數
            ws = sh.add_worksheet(title=title, rows="1000", cols=str(len(header)+5))
            print(f"工作表 '{title}' 創建成功。")
            # ws.update([header]) # 新增時寫入表頭 - Moved below to handle existing sheets without headers too
        except Exception as e:
            print(f"⚠️ 創建工作表 '{title}' 失敗: {e}")
            # Depending on the severity, you might want to exit or disable parts of the app
            return None # Return None if worksheet creation fails

    # 再次檢查：若沒有表頭（工作表是空的或第一行不是預期的表頭），則清空並寫上表頭
    try:
        data = ws.get_all_values('A1:1') # Only read the first row
        if not data or data[0] != header:
            print(f"工作表 '{title}' 沒有表頭或表頭不符，清空並重新寫入表頭。")
            ws.clear() # 清空工作表內容
            ws.update([header]) # 重新寫入表頭
            print(f"已為工作表 '{title}' 重新寫入表頭。")
        else:
            print(f"工作表 '{title}' 表頭檢查正常。")
    except Exception as e:
        print(f"⚠️ 檢查或更新工作表 '{title}' 表頭失敗: {e}")
        # 這裡選擇不中斷程式，但記錄錯誤

    print(f"確保工作表 '{title}' 過程結束。")
    return ws

# 讀取 DataFrame 的輔助函式
def read_df(ws, header):
    """從 Google Sheet 工作表讀取資料到 DataFrame，並確保欄位和型態正確"""
    if ws is None:
        print("⚠️ 工作表物件為 None，無法讀取 DataFrame。")
        cols = header # Use provided header for empty DataFrame
        return pd.DataFrame(columns=cols)

    try:
        df = get_as_dataframe(ws, evaluate_formulas=True, header=0)
        if df is None or df.empty:
            print(f"ℹ️ 工作表 '{ws.title}' 為空或無法讀取 DataFrame。")
            sheet_header = ws.get_all_values('A1:1')
            cols = sheet_header[0] if sheet_header and sheet_header[0] else header
            return pd.DataFrame(columns=cols)

        df = df.fillna("")
        # Ensure all header columns exist, add if missing
        for c in header:
            if c not in df.columns:
                print(f"⚠️ 工作表 '{ws.title}' 缺少欄位 '{c}'，已新增空欄位。")
                df[c] = ""
        # Optional: Reorder columns to match header
        df = df[header]

        print(f"✅ 從工作表 '{ws.title}' 讀取 {len(df)} 筆資料。")
        return df

    except Exception as e:
        print(f"⚠️ 從工作表 '{ws.title}' 讀取 DataFrame 失敗: {e}")
        cols = ws.get_all_values('A1:1')[0] if ws.get_all_values('A1:1') else header
        return pd.DataFrame(columns=cols)


# 將 DataFrame 寫入 Google Sheet 的輔助函式
def write_df(ws, df, header):
    """將 DataFrame 內容寫入 Google Sheet 工作表 (全量覆蓋)"""
    if ws is None:
        print("⚠️ 工作表物件為 None，無法寫入 DataFrame。")
        return

    print(f"嘗試將 {len(df)} 筆資料寫入工作表 '{ws.title}'...")
    try:
        if df.empty:
            ws.clear()
            ws.update([header])
            print(f"✅ 工作表 '{ws.title}' 已清空並寫入表頭。")
            return

        df_out = df.copy()
        # Ensure only header columns are written and in the correct order
        df_out = df_out.reindex(columns=header, fill_value="")
        for c in df_out.columns:
            df_out[c] = df_out[c].astype(str)

        ws.clear()
        ws.update([header] + df_out[header].values.tolist())
        print(f"✅ 已將 {len(df)} 筆資料寫入工作表 '{ws.title}'。")

    except Exception as e:
        print(f"⚠️ 將 DataFrame 寫入工作表 '{ws.title}' 失敗: {e}")


# 需要 tznow 函式來處理時間戳記
def tznow():
    from datetime import datetime as dt
    from dateutil.tz import gettz
    # 確保 TIMEZONE 變數已定義
    try:
        TIMEZONE
    except NameError:
        # 如果 TIMEZONE 未定義，給出警告並使用預設值
        print("⚠️ 時區變數 TIMEZONE 未定義，請執行設定儲存格。這裡將使用預設值 Asia/Taipei。")
        TIMEZONE_DEFAULT = "Asia/Taipei"
        return dt.now(gettz(TIMEZONE_DEFAULT))
    # 如果 TIMEZONE 已定義，則使用它
    return dt.now(gettz(TIMEZONE))

# 確保 PTT 文章資料、輸入資料和分析結果的工作表存在並有正確的表頭
# 使用前面定義的 ensure_worksheet 函式來處理
# Ensure the spreadsheet object 'sh' is available before calling ensure_worksheet
if 'sh' in globals() and sh is not None:
    print("確保 PTT 文章工作表...")
    ws_ptt_posts = ensure_worksheet(sh, "ptt_movie_posts", PTT_HEADER) # PTT 文章工作表
    print("確保 輸入資料工作表...")
    ws_input = ensure_worksheet(sh, INPUT_WORKSHEET_NAME, INPUT_HEADER) # 輸入資料工作表
    print("確保 分析結果工作表...")
    ws_analysis = ensure_worksheet(sh, OUTPUT_WORKSHEET_NAME, ANALYSIS_HEADER) # 分析結果工作表

    print("所有必要工作表確保完成。")
else:
    print("⚠️ 試算表物件 'sh' 未成功初始化，跳過工作表確保步驟。")

確保 PTT 文章工作表...
嘗試確保工作表 'ptt_movie_posts' 存在於試算表 'hw4'...
工作表 'ptt_movie_posts' 已存在。
工作表 'ptt_movie_posts' 表頭檢查正常。
確保工作表 'ptt_movie_posts' 過程結束。
確保 輸入資料工作表...
嘗試確保工作表 '工作表1' 存在於試算表 'hw4'...
工作表 '工作表1' 已存在。
工作表 '工作表1' 表頭檢查正常。
確保工作表 '工作表1' 過程結束。
確保 分析結果工作表...
嘗試確保工作表 '分析結果' 存在於試算表 'hw4'...
工作表 '分析結果' 已存在。
工作表 '分析結果' 表頭檢查正常。
確保工作表 '分析結果' 過程結束。
所有必要工作表確保完成。


請保留下面這個儲存格 (安裝必要的 Python 函式庫)。

In [138]:
# 安裝必要的 Python 套件
# 這邊列出了我們這個專案會用到的所有套件
# gspread: 用來操作 Google Sheets
# gspread_dataframe: 方便 Pandas DataFrame 與 Google Sheets 之間的轉換
# google-auth, google-auth-oauthlib, google-auth-httplib2: Google 身份驗證相關
# gradio: 建立一個互動式的網頁介面
# pandas: 強大的資料處理和分析工具
# beautifulsoup4: 用於解析 HTML 網頁內容，方便抓取資料
# google-generativeai: 呼叫 Google 的生成式 AI 模型 (Gemini)
# python-dateutil: 處理日期和時間，特別是時區問題
!pip -q install gspread gspread_dataframe google-auth google-auth-oauthlib google-auth-httplib2 \
               gradio pandas beautifulsoup4 google-generativeai python-dateutil

請保留下面這個儲存格 (匯入專案所需的函式庫)。

In [139]:
# 匯入專案所需的函式庫
# 包含了處理時間、資料科學工具、網頁操作、文本分析以及 Google 服務的函式庫

import os, time, uuid, re, json, datetime
from datetime import datetime as dt, timedelta
from dateutil.tz import gettz # 處理時區相關功能，確保時間正確

import pandas as pd # Pandas DataFrame 是資料處理的核心
import gradio as gr # Gradio 讓我們快速建立一個分享的介面
import requests # 用於發送 HTTP 請求，例如取得網頁原始碼
from bs4 import BeautifulSoup # BeautifulSoup 協助解析 HTML 結構

# 文本分析常用的工具
from collections import Counter, defaultdict # 計算詞頻、處理字典
import numpy as np # 數值計算的好幫手
from scipy.sparse import csr_matrix # 如果使用到 TF-IDF 或其他向量化方法時可能會需要

# 呼叫 Google 的生成式 AI 模型，例如 Gemini
import google.generativeai as genai

# 與 Google 服務 (特別是 Google Sheets) 互動的函式庫
from google.colab import auth # 在 Colab 環境中進行 Google 身份驗證
import gspread # 讀取和寫入 Google Sheets
from gspread_dataframe import set_with_dataframe, get_as_dataframe # 更方便地處理 DataFrame 與 Sheet 之間的資料傳輸
from google.auth.transport.requests import Request # 處理驗證過程中的 HTTP 請求
from google.oauth2 import service_account # 服務帳戶驗證 (如果您使用服務帳戶金鑰的話)
from google.auth import default # 使用預設的身份驗證憑證

請保留下面這個儲存格 (設定 Gemini API 金鑰並初始化模型)。

In [140]:
from google.colab import userdata

# 從 Colab Secrets 中獲取 API 金鑰
# 請將您的 Gemini API 金鑰儲存在 Colab 的 Secrets 中，名稱設定為 'GOOGLE_API_KEY'
# 如何設定 Secrets: 在 Colab 左側面板點擊「🔑」，新增一個 Secret，名稱為 GOOGLE_API_KEY，值貼上您的 API 金鑰
api_key = userdata.get('GOOGLE_API_KEY')

# 使用獲取的金鑰配置 genai
genai.configure(api_key=api_key)

# 初始化 Gemini 模型，這裡使用 'gemini-2.5-pro'
# 您可以根據需求更換模型名稱
model = genai.GenerativeModel('gemini-2.5-pro')

請保留下面這個儲存格 (定義核心功能函式：PTT 爬蟲、Sheet 讀取、文本分析、AI 生成)。

In [None]:
# ==============
# PTT 電影版爬蟲函式
# ==============

# (保留原有的 PTT 爬蟲輔助函式 _get_soup, _get_prev_index_url, _parse_nrec, _extract_post_list, _clean_ptt_content)
PTT_MOVIE_INDEX = "https://www.ptt.cc/bbs/movie/index.html" # PTT 電影版首頁 URL
PTT_COOKIES = {"over18": "1"} # 用於繞過 PTT 的滿 18 歲驗證 (會話狀態維持)

def _get_soup(url):
    headers = {"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"}
    r = requests.get(url, timeout=15, headers=headers, cookies=PTT_COOKIES)
    r.raise_for_status()
    return BeautifulSoup(r.text, "html.parser")

def _get_prev_index_url(soup):
    btns = soup.select("div.btn-group-paging a.btn.wide")
    for a in btns:
        if "上頁" in a.get_text(strip=True):
            href = a.get("href")
            if href:
                from urllib.parse import urljoin
                return urljoin(PTT_MOVIE_INDEX, href)
    return None

def _parse_nrec(nrec_span):
    if not nrec_span: return 0
    txt = nrec_span.get_text(strip=True)
    if txt == "爆": return 100
    if txt.startswith("X"):
        try: return -int(txt[1:])
        except: return -10
    try: return int(txt)
    except: return 0

def _extract_post_list(soup):
    posts = []
    for r in soup.select("div.r-ent"):
        a = r.select_one("div.title a")
        if not a: continue
        title = a.get_text(strip=True)
        url = "https://www.ptt.cc" + a.get("href")
        author = r.select_one("div.author").get_text(strip=True)
        date = r.select_one("div.date").get_text(strip=True)
        nrec = _parse_nrec(r.select_one("div.nrec span"))
        posts.append({"title": title, "url": url, "author": author, "date": date, "nrec": nrec})
    return posts

def _clean_ptt_content(soup):
    for p in soup.select("div.push"): p.decompose()
    main = soup.select_one("#main-content")
    if not main: return "", ""
    metas = main.select("div.article-metaline, div.article-metaline-right")
    for m in metas: m.decompose()
    text = main.get_text("\n", strip=True)
    if "--" in text: text = text.split("--")[0].strip()
    title_tag = soup.select_one("span.article-meta-value")
    meta_title = title_tag.get_text(strip=True) if title_tag else ""
    return text, meta_title


# 主要爬蟲函式：從 PTT 電影版抓取文章
# 抓取結果會寫入 ws_ptt_posts 工作表
def crawl_ptt_movie(index_pages=3, min_push=0, keyword=""):
    """從最新 index.html 往前翻 index_pages 頁，抓滿足條件的文章並寫入 Sheet"""
    global ptt_posts_df, ws_ptt_posts # 宣告使用全域變數
    url = PTT_MOVIE_INDEX
    all_rows = []
    # 先從 Sheet 讀取現有的 PTT 文章，用於去重
    ptt_posts_df = read_df(ws_ptt_posts, PTT_HEADER) # 從 PTT 工作表讀取
    seen_urls = set(ptt_posts_df["url"].tolist()) if not ptt_posts_df.empty else set()

    for _ in range(int(index_pages)):
        try:
            soup = _get_soup(url)
        except Exception as e:
            print(f"⚠️ 抓取頁面失敗 {url}: {e}")
            break
        posts = _extract_post_list(soup)
        for p in posts:
            if p["nrec"] < int(min_push): continue
            if keyword and (keyword not in p["title"]): continue
            if p["url"] in seen_urls: continue

            try:
                art_soup = _get_soup(p["url"])
                content, meta_title = _clean_ptt_content(art_soup)
            except Exception as e:
                print(f"⚠️ 抓取文章內文失敗 {p['url']}: {e}")
                content, meta_title = "", ""

            final_title = p["title"] if p["title"] else (meta_title or "（無標題）")

            all_rows.append({
                "post_id": str(uuid.uuid4())[:8],
                "title": final_title[:200],
                "url": p["url"],
                "date": p["date"],
                "author": p["author"],
                "nrec": str(p["nrec"]),
                "created_at": tznow().isoformat(), # 需要 tznow 函式
                "fetched_at": tznow().isoformat(), # 需要 tznow 函式
                "content": content
            })
            seen_urls.add(p["url"])

        prev = _get_prev_index_url(soup)
        if not prev: break
        url = prev
        time.sleep(0.5)

    if all_rows:
        new_df = pd.DataFrame(all_rows, columns=PTT_HEADER)
        # 合併新抓取的資料到現有 DataFrame
        ptt_posts_df = pd.concat([ptt_posts_df, new_df], ignore_index=True)
        # 將更新後的 DataFrame 寫回 PTT 工作表
        write_df(ws_ptt_posts, ptt_posts_df, PTT_HEADER)
        return f"✅ 取得 {len(all_rows)} 篇文章（已寫入 Sheet 的 ptt_movie_posts 工作表）", ptt_posts_df
    else:
        return "ℹ️ 沒有新文章符合條件（或內容已在 Sheet 的 ptt_movie_posts 工作表）", ptt_posts_df


# ==============
# 文本分析（jieba + TF/IDF + bigram）函式
# ==============
import re
try:
    import jieba
except ImportError:
    jieba = None
    print("⚠️ 未安裝 jieba，中文斷詞將使用較簡單的空白字元分割。請考慮安裝 jieba 以獲得更好的斷詞效果。")

def _tokenize_zh(text):
    text = re.sub(r"[^\u4e00-\u9fffA-Za-z0-9,.!?:;\'\"()]+", " ", text)
    if not jieba:
        return [t.strip() for t in text.split() if len(t.strip()) > 1]
    tokens = [w.strip() for w in jieba.lcut(text) if len(w.strip()) > 1]
    return tokens

# 從 Google Sheet 讀取開放式回答資料的函式 (保留)
def read_open_ended_responses(worksheet_name=INPUT_WORKSHEET_NAME, column_name="開放式回答"):
    """從指定的 Google Sheet 工作表和欄位讀取文字資料"""
    global sh, ws_input # 使用前面已經授權並開啟的試算表物件和輸入工作表物件
    try:
        # 嘗試開啟指定的工作表，如果指定的是 INPUT_WORKSHEET_NAME，則直接使用 ws_input
        ws = sh.worksheet(worksheet_name) if worksheet_name != INPUT_WORKSHEET_NAME else ws_input
        df = get_as_dataframe(ws, evaluate_formulas=True, header=0)
        if df is None or df.empty:
            return pd.DataFrame(columns=[column_name]), f"ℹ️ 工作表 '{worksheet_name}' 沒有資料。"

        if column_name not in df.columns:
             return pd.DataFrame(columns=[column_name]), f"⚠️ 工作表 '{worksheet_name}' 中找不到 '{column_name}' 欄位。"
        df = df.fillna("")
        return df[[column_name]], f"✅ 從工作表 '{worksheet_name}' 讀取 {len(df)} 筆資料。"

    except gspread.WorksheetNotFound:
        return pd.DataFrame(columns=[column_name]), f"⚠️ 找不到工作表 '{worksheet_name}'。"
    except Exception as e:
        return pd.DataFrame(columns=[column_name]), f"⚠️ 讀取工作表失敗: {e}"


# 主要文本分析函式 - 修改為接受 DataFrame 作為輸入
def analyze_texts(df, text_column_name, topk=50, min_df=2):
    """從 DataFrame 中的指定文字欄位進行文本分析，結果寫入分析工作表"""
    global ws_analysis # 使用前面確保存在的分析結果工作表
    if df is None or df.empty or text_column_name not in df.columns:
        return "📭 沒有有效的文字資料進行分析。", pd.DataFrame(columns=ANALYSIS_HEADER), ""

    docs = df[text_column_name].tolist()
    docs = [d for d in docs if d and isinstance(d, str)]

    if not docs:
         return "📭 沒有有效的文字資料進行分析。", pd.DataFrame(columns=ANALYSIS_HEADER), ""

    # (保留原有的詞頻、文件頻率、TF-IDF、Bigram 計算邏輯)
    freq = Counter()
    df_cnt = defaultdict(int)
    token_docs = []
    for doc in docs:
        toks = _tokenize_zh(doc)
        token_docs.append(toks)
        freq.update(toks)
        for t in set(toks):
            df_cnt[t] += 1

    filtered_terms = [t for t in freq.keys() if df_cnt[t] >= int(min_df)]
    freq = Counter({t: freq[t] for t in filtered_terms})
    df_cnt = {t: df_cnt[t] for t in filtered_terms}

    tfidf_map = {}
    try:
        from sklearn.feature_extraction.text import TfidfVectorizer
        vec = TfidfVectorizer(tokenizer=_tokenize_zh, lowercase=False)
        X = vec.fit_transform(docs)
        terms = vec.get_feature_names_out()
        term_indices = [i for i, t in enumerate(terms) if t in filtered_terms]
        X = X[:, term_indices]
        terms = [terms[i] for i in term_indices]
        tfidf_mean = X.mean(axis=0).A1 if X.shape[0] > 0 and X.shape[1] > 0 else np.array([])
        tfidf_map = dict(zip(terms, tfidf_mean))
    except ImportError:
         print("⚠️ 未安裝 scikit-learn，無法計算 TF-IDF。")
    except Exception as e:
        print(f"⚠️ TF-IDF 計算失敗: {e}")

    from itertools import tee
    def pairwise(iterable):
        "s -> (s0,s1), (s1,s2), (s2, s3), ..."
        a, b = tee(iterable)
        next(b, None)
        return zip(a, b)
    bigram_freq = Counter()
    for toks in token_docs:
        bigram_freq.update([" ".join(bg) for bg in pairwise(toks)])

    candidates = list(freq.keys())
    candidates.sort(key=lambda t: (round(tfidf_map.get(t,0.0) if tfidf_map else 0.0, 6), freq.get(t,0)), reverse=True)
    top_terms = candidates[:int(topk)]

    examples = {}
    for term in top_terms:
        ex = ""
        for doc in docs:
            if term in doc:
                i = doc.find(term)
                s = max(0, i-15)
                e = min(len(doc), i+len(term)+15)
                ex = doc[s:e].replace("\n"," ")
                break
        examples[term] = ex

    rows = []
    for t in top_terms:
        rows.append({
            "term": t,
            "freq": str(freq.get(t,0)),
            "df_count": str(df_cnt.get(t,0)),
            "tfidf_mean": f"{tfidf_map.get(t,0.0) if tfidf_map else 0.0:.6f}",
            "examples": examples.get(t, "")
        })
    terms_df = pd.DataFrame(rows, columns=ANALYSIS_HEADER)

    # 將結果寫回 Google Sheet 的分析結果工作表
    write_df(ws_analysis, terms_df, ANALYSIS_HEADER)

    # 產生 Markdown 格式的摘要報告
    md_lines = []
    md_lines.append(f"### 關鍵詞 Top {len(top_terms)}（依 TF-IDF 平均值優先，次序再以詞頻）")
    for i, t in enumerate(top_terms, 1):
        tfidf_val = float(tfidf_map.get(t,0.0) if tfidf_map else 0.0)
        md_lines.append(f"{i}. **{t}** — tfidf≈{tfidf_val:.4f}；freq={freq.get(t,0)}；df={df_cnt.get(t,0)}")
    md_lines.append("\n### 常見雙詞搭配（前 20）")
    for i, (bg, c) in enumerate(bigram_freq.most_common(20), 1):
        md_lines.append(f"{i}. {bg} — {c}")

    return f"✅ 已完成文本分析，共處理 {len(docs)} 篇文檔；分析結果已寫入 Sheet 的 '{OUTPUT_WORKSHEET_NAME}' 工作表。", terms_df, "\n".join(md_lines)

# 讀取 DataFrame 的輔助函式 (從之前的 PTT 爬蟲程式碼搬移過來並修改)
def read_df(ws, header):
    """從 Google Sheet 工作表讀取資料到 DataFrame，並確保欄位和型態正確"""
    df = get_as_dataframe(ws, evaluate_formulas=True, header=0)
    if df is None or df.empty:
        sheet_header = ws.get_all_values('A1:1')
        cols = sheet_header[0] if sheet_header and sheet_header[0] else header
        return pd.DataFrame(columns=cols)
    df = df.fillna("")
    for c in header:
        if c not in df.columns:
            df[c] = ""
    return df

# 將 DataFrame 寫入 Google Sheet 的輔助函式 (從之前的 PTT 爬蟲程式碼搬移過來)
def write_df(ws, df, header):
    """將 DataFrame 內容寫入 Google Sheet 工作表 (全量覆蓋)"""
    if df.empty:
        ws.clear()
        ws.update([header])
        return
    df_out = df.copy()
    for c in df_out.columns:
        df_out[c] = df_out[c].astype(str)
    ws.clear()
    ws.update([header] + df_out[header].values.tolist())

# AI 生成洞察和結論的函式 (保留)
def generate_ai_output(terms_df):
    """使用 Gemini 模型生成洞察摘要和結論"""
    # global model # 模型已在前面儲存格初始化
    if terms_df is None or terms_df.empty:
        return "📭 沒有分析結果，無法生成 AI 摘要和結論。", "", ""

    input_text = "請根據以下文本分析結果，生成 5 句洞察摘要和一段約 120 字的結論：\n\n"
    input_text += "關鍵詞列表 (依重要性排序)：\n"
    for _, r in terms_df.head(20).iterrows():
        input_text += f"- 詞彙: {r['term']}, 頻率: {r['freq']}, 文件數: {r['df_count']}, TF-IDF: {float(r['tfidf_mean']):.4f}, 範例: {r['examples']}\n"

    input_text += "\n請注意：\n1. 洞察摘要請使用條列式。\n2. 結論請連貫成一段文字，約 120 字。\n3. 請用繁體中文。\n\n範例輸出格式：\n### 洞察摘要\n- 洞察 1\n- 洞察 2\n...\n### 結論\n這裡是一段約 120 字的結論。"

    try:
        # model = genai.GenerativeModel('gemini-2.5-pro') # 模型已在前面儲存格初始化
        resp = model.generate_content(input_text)
        ai_output = resp.text

        insights = []
        conclusion = ""
        if "### 洞察摘要" in ai_output and "### 結論" in ai_output:
            parts = ai_output.split("### 洞察摘要")
            if len(parts) > 1:
                insights_part_conc_part = parts[1].split("### 結論")
                if len(insights_part_conc_part) > 1:
                    insights_text = insights_part_conc_part[0].strip()
                    insights = [line.strip("- ").strip() for line in insights_text.split("\n") if line.strip().startswith("-")]
                    conclusion = insights_part_conc_part[1].strip()
                else:
                    insights_text = insights_part_conc_part[0].strip()
                    insights = [line.strip("- ").strip() for line in insights_text.split("\n") if line.strip().startswith("-")]
            elif "### 結論" in ai_output:
                 parts = ai_output.split("### 結論")
                 if len(parts) > 1:
                     conclusion = parts[1].strip()
        else:
             insights = [ai_output]
             conclusion = "（無法自動解析結論）"

        insights_md = "### 洞察摘要\n" + ("\n".join([f"- {i}" for i in insights]) if insights else "（無）")
        conclusion_md = "### 結論\n" + (conclusion if conclusion else "（無）")

        return "✅ 已成功生成 AI 摘要和結論。", insights_md, conclusion_md

    except Exception as e:
        return f"⚠️ 生成 AI 摘要和結論失敗: {e}", "### 洞察摘要\n（生成失敗）", "### 結論\n（生成失敗）"

# 確保 TIMEZONE 變數已定義，如果沒有則使用預設值 Asia/Taipei
try:
    TIMEZONE
except NameError:
    print("⚠️ 時區變數 TIMEZONE 未定義，這裡將使用預設值 Asia/Taipei。")
    TIMEZONE = "Asia/Taipei"

# 需要 tznow 函式才能讓 crawl_ptt_movie 和其他需要時間戳的函式運作
def tznow():
    from datetime import datetime as dt
    from dateutil.tz import gettz
    # 確保 TIMEZONE 變數已定義 (在此儲存格開頭已處理)
    return dt.now(gettz(TIMEZONE))

# 初始化空的 DataFrame，後續由 Gradio 讀取或爬取後填充
ptt_posts_df = pd.DataFrame(columns=PTT_HEADER)
input_responses_df = pd.DataFrame(columns=INPUT_HEADER)
terms_df = pd.DataFrame(columns=ANALYSIS_HEADER)

請保留下面這個儲存格 (定義 Gradio 介面需要呼叫的輔助函式)。

In [141]:
# ==============
# 輔助函式：資料處理與分析流程
# 將部分需要在 Gradio 介面中呼叫的函式移到 Gradio 設定之前定義
# ==============

# 輔助函式：重新整理 PTT 文章資料 (從 Sheet 讀取)
def refresh_ptt_posts():
    global ptt_posts_df, ws_ptt_posts, PTT_HEADER # 確保使用全域變數和工作表物件
    try:
        # 確保 ws_ptt_posts 已被初始化 (如果前面的儲存格執行了)
        if 'ws_ptt_posts' not in globals():
             return pd.DataFrame(columns=PTT_HEADER), "⚠️ PTT 文章工作表物件未初始化，請執行前面相關儲存格。"

        ptt_posts_df = read_df(ws_ptt_posts, PTT_HEADER).copy()
        return ptt_posts_df, f"✅ 已從 Sheet 載入 {len(ptt_posts_df)} 篇 PTT 文章。"
    except NameError:
         return pd.DataFrame(columns=PTT_HEADER), "⚠️ PTT_HEADER 或相關物件未定義，請執行前面相關儲存格。"


# 輔助函式：重新整理開放式回答資料 (從 Sheet 讀取)
def refresh_input_responses(worksheet_name, column_name):
    global input_responses_df, sh, ws_input, INPUT_HEADER # 確保使用全域變數和工作表物件
    try:
        # 確保 sh 和 ws_input 已被初始化 (如果前面的儲存格執行了)
        if 'sh' not in globals() or 'ws_input' not in globals():
            return pd.DataFrame(columns=INPUT_HEADER), "⚠️ Google Sheet 物件或輸入資料工作表物件未初始化，請執行前面相關儲存格。"

        # 呼叫 oxbeeai_ni9s 中定義的 read_open_ended_responses 函式
        input_responses_df, msg = read_open_ended_responses(worksheet_name, column_name)
        return input_responses_df, msg
    except NameError:
        return pd.DataFrame(columns=INPUT_HEADER), "⚠️ read_open_ended_responses 或相關物件未定義，請執行前面相關儲存格。"


# 輔助函式：執行 PTT 爬蟲並更新 DataFrame
def run_ptt_crawl(index_pages, min_push, keyword):
    global ptt_posts_df, ws_ptt_posts # 確保使用全域變數和工作表物件
    try:
        # 確保 ws_ptt_posts 已被初始化
        if 'ws_ptt_posts' not in globals():
            return pd.DataFrame(columns=PTT_HEADER), "⚠️ PTT 文章工作表物件未初始化，請執行前面相關儲存格。"

        # 呼叫 oxbeeai_ni9s 中定義的 crawl_ptt_movie 函式
        msg, ptt_posts_df = crawl_ptt_movie(index_pages, min_push, keyword)
        # crawl_ptt_movie 內部已經將資料寫回 Sheet 了
        return msg, ptt_posts_df
    except NameError:
        return pd.DataFrame(columns=PTT_HEADER), "⚠️ crawl_ptt_movie 或相關物件未定義，請執行前面相關儲存格。"


# 輔助函式：執行文本分析和 AI 生成
def perform_analysis_and_ai(data_source, input_col, topk, min_df):
    global ptt_posts_df, input_responses_df, terms_df # 確保使用全域變數
    df_to_analyze = pd.DataFrame()
    text_column = ""

    if data_source == "PTT 文章 (已爬取)":
        df_to_analyze = ptt_posts_df
        # 分析 PTT 文章時，合併 title 和 content 作為分析文本
        df_to_analyze["combined_text"] = df_to_analyze["title"].fillna("") + "\n" + df_to_analyze["content"].fillna("")
        text_column = "combined_text"
    elif data_source == "開放式回答 (從 Sheet 讀取)":
        df_to_analyze = input_responses_df
        text_column = input_col # 使用使用者指定的欄位名稱
        if text_column not in df_to_analyze.columns:
             return "⚠️ 選擇的資料來源或欄位名稱有誤。", pd.DataFrame(columns=ANALYSIS_HEADER), "", "", "" # 額外返回空字串給 AI 輸出區域

    if df_to_analyze.empty:
        return "📭 沒有資料可以分析。", pd.DataFrame(columns=ANALYSIS_HEADER), "", "", ""

    # 呼叫 oxbeeai_ni9s 中定義的 analyze_texts 函式進行分析
    msg, terms_df, md_report = analyze_texts(df_to_analyze, text_column, topk, min_df)

    # 呼叫 oxbeeai_ni9s 中定義的 generate_ai_output 函式生成 AI 洞察和結論
    ai_msg, insights_md, conclusion_md = generate_ai_output(terms_df)

    # 返回分析結果訊息、分析結果 DataFrame、Markdown 報告、AI 洞察和結論
    return msg, terms_df, md_report, insights_md, conclusion_md

請保留下面這個儲存格 (設定並啟動 Gradio 互動式介面)。

In [None]:
# ==============
# Gradio 介面
# 這是整個應用程式的網頁介面部分，使用 Gradio 函式庫建立。
# 它包含了不同的分頁，用於執行爬蟲、讀取 Sheet 資料、文本分析和 AI 生成摘要。
# ==============

# 使用 gr.Blocks 建立介面
with gr.Blocks(title="文本分析與 AI 摘要工具") as demo:
    gr.Markdown("# 📝 文字資料分析與 AI 摘要工具") # 主標題

    # --- 資料獲取分頁 ---
    with gr.Tab("資料獲取 (PTT & Sheet)"):
        gr.Markdown("### 從 PTT 電影版爬取文章")
        with gr.Row():
            ptt_pages = gr.Number(value=3, label="往前爬取頁數", precision=0)
            ptt_min_push = gr.Number(value=0, label="最低推文數", precision=0)
            ptt_keyword = gr.Textbox(label="標題關鍵字（可空白）")
        btn_crawl_ptt = gr.Button("🕷️ 開始爬取 PTT")
        msg_crawl_ptt = gr.Markdown() # 顯示爬蟲結果訊息
        btn_refresh_ptt = gr.Button("🔄 從 Sheet 載入已爬取文章")
        msg_refresh_ptt = gr.Markdown()
        grid_ptt_posts = gr.Dataframe(value=ptt_posts_df, label="已爬取的 PTT 文章 (來自 Sheet)", interactive=False)

        gr.Markdown("### 從 Google Sheet 讀取開放式回答")
        # 這裡使用 Textbox 讓使用者輸入工作表名稱和欄位名稱，而不是寫死變數
        input_sheet_name = gr.Textbox(label="輸入資料所在工作表名稱", value=INPUT_WORKSHEET_NAME) # 使用預設值
        input_col_name = gr.Textbox(label="開放式回答欄位名稱", value=INPUT_HEADER[1]) # 使用預設值
        btn_read_sheet = gr.Button("📥 從 Sheet 讀取開放式回答")
        msg_read_sheet = gr.Markdown() # 顯示讀取結果訊息
        grid_input_responses = gr.Dataframe(value=input_responses_df, label="從 Sheet 讀取的開放式回答", interactive=False)


    # --- 文本分析與 AI 分頁 ---
    with gr.Tab("文本分析與 AI 摘要"):
        gr.Markdown("### 文本分析設定")
        data_source_radio = gr.Radio(["PTT 文章 (已爬取)", "開放式回答 (從 Sheet 讀取)"], label="選擇分析資料來源", value="開放式回答 (從 Sheet 讀取)")
        analysis_topk = gr.Number(value=50, label="輸出 Top K 關鍵詞", precision=0)
        analysis_min_df = gr.Number(value=2, label="最低文件頻率 (Min DF)", precision=0)
        btn_analyze = gr.Button("🔬 執行文本分析與 AI 生成") # 將分析和 AI 生成合併到一個按鈕
        msg_analyze = gr.Markdown() # 顯示分析結果訊息

        gr.Markdown("### 文本分析報告（關鍵詞與雙詞搭配）")
        out_analysis_report = gr.Markdown() # 顯示 Markdown 格式的分析報告
        grid_terms = gr.Dataframe(value=terms_df, label="關鍵詞分析結果 (已寫入 Sheet)", interactive=False)

        gr.Markdown("### AI 生成洞察與結論")
        out_ai_insights = gr.Markdown("### 洞察摘要\n（待生成）") # 顯示 AI 洞察
        out_ai_conclusion = gr.Markdown("### 結論\n（待生成）") # 顯示 AI 結論


    # === 綁定介面元件與後端函式 ===

    # 資料獲取分頁的按鈕綁定
    btn_crawl_ptt.click(run_ptt_crawl, inputs=[ptt_pages, ptt_min_push, ptt_keyword], outputs=[msg_crawl_ptt, grid_ptt_posts])
    btn_refresh_ptt.click(refresh_ptt_posts, outputs=[grid_ptt_posts, msg_refresh_ptt])
    # 呼叫 refresh_input_responses，傳入使用者在介面輸入的工作表名稱和欄位名稱
    btn_read_sheet.click(refresh_input_responses, inputs=[input_sheet_name, input_col_name], outputs=[grid_input_responses, msg_read_sheet])

    # 文本分析與 AI 分頁的按鈕綁定
    # 點擊分析按鈕時，呼叫 perform_analysis_and_ai，將結果更新到對應的介面區域
    btn_analyze.click(
        perform_analysis_and_ai,
        inputs=[data_source_radio, input_col_name, analysis_topk, analysis_min_df], # 注意這裡 input_col_name 是從介面讀取的值
        outputs=[msg_analyze, grid_terms, out_analysis_report, out_ai_insights, out_ai_conclusion] # 將所有輸出綁定到介面元件
    )


# 啟動 Gradio 介面
demo.launch(debug=True) # debug=True 可以看到更多執行細節

It looks like you are running Gradio on a hosted Jupyter notebook, which requires `share=True`. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://eb6cafdeda94d36da5.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


ℹ️ 工作表 'ptt_movie_posts' 為空或無法讀取 DataFrame。
嘗試將 47 筆資料寫入工作表 'ptt_movie_posts'...
✅ 已將 47 筆資料寫入工作表 'ptt_movie_posts'。
✅ 從工作表 'ptt_movie_posts' 讀取 47 筆資料。




嘗試將 50 筆資料寫入工作表 '分析結果'...
✅ 已將 50 筆資料寫入工作表 '分析結果'。
