<a href="https://colab.research.google.com/github/41371116h/PL-Repo./blob/main/HW4%E5%90%ABAI%E6%91%98%E8%A6%81_(PTT_China_Drama).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#HW4含AI摘要 (PTT China-Drama)（作業四）
- 目標：把PTT China-Drama的文章用爬蟲抓下來，包含標題以及內文且可以自行設定想要抓取的頁數，並顯示熱詞分析表格，最後加入Gemini建議及摘要，並顯示在Gradio以及google sheet上
- AI 點子（可選）：熱詞分析以及AI摘要建議
- Sheet 欄位：post_id, title,url, date,author, nrec, created_at, fetched_at, content
- GoogleSheet: https://docs.google.com/spreadsheets/d/1GScHTHISiioV89XO5twgGl-IpaYG-0nMwhLRTizSpNI/edit?pli=1&gid=0#gid=0




In [2]:
# ============================================
# 🕸️ PTT China-Drama 爬蟲 + 任務整合 + 內文抓取 + 文字分析（含寫回 Google Sheet & AI 摘要）
# 🚨 Gradio Dataframe height 參數錯誤已修復 🚨
# ============================================
# 安裝（Colab 執行一次）
!pip -q install gspread gspread_dataframe google-auth google-auth-oauthlib google-auth-httplib2 \
               gradio pandas beautifulsoup4 python-dateutil jieba google-generativeai

# -------------------------
import os, time, uuid, re, datetime
import requests, pandas as pd
from bs4 import BeautifulSoup
from urllib.parse import urljoin
from collections import Counter, defaultdict
from gspread_dataframe import set_with_dataframe, get_as_dataframe
import gspread
from google.colab import auth, userdata
from google.auth import default
import gradio as gr
import google.generativeai as genai
import json
from io import StringIO

# =========================
# 時間工具
# =========================
def tznow():
    return datetime.datetime.now().astimezone()

# =========================
# 常數 / 表頭
# =========================
CLIPS_HEADER = ["clip_id","url","selector","text","href","created_at","added_to_task"]
TASKS_HEADER = ["id","task","status","priority","est_min","start_time","end_time",
                "actual_min","pomodoros","due_date","labels","notes",
                "created_at","updated_at","completed_at","planned_for"]
PTT_HEADER = ["post_id","title","url","date","author","nrec","created_at","fetched_at","content"]

# =========================
# Google Sheets 初始化（Colab 認證）
# =========================
try:
    auth.authenticate_user()
    creds, _ = default()
    gc = gspread.authorize(creds)

    # 把你的試算表 URL 放這裡
    SHEET_URL = "https://docs.google.com/spreadsheets/d/1GScHTHISiioV89XO5twgGl-IpaYG-0nMwhLRTizSpNI/edit#gid=938437500"
    sh = gc.open_by_url(SHEET_URL)

except Exception as e:
    print(f"❌ Google Sheets 認證或連線失敗: {e}")
    # 創建虛擬物件防止崩潰
    class MockWorksheet:
        def get_all_records(self): return []
        def get_all_values(self): return []
        def update(self, *args, **kwargs): pass
        def clear(self): pass
    ws_clips = MockWorksheet()
    ws_analysis = MockWorksheet()


# 確保工作表存在（"爬蟲"）
def ensure_ws(name, headers):
    try:
        ws = sh.worksheet(name)
    except gspread.WorksheetNotFound:
        ws = sh.add_worksheet(title=name, rows="2000", cols=str(len(headers)+10))
        ws.update([headers])
    # 如果沒有表頭就補上
    vals = ws.get_all_values()
    if not vals or (vals and vals[0] != headers):
        ws.clear()
        ws.update([headers])
    return ws

ws_clips = ensure_ws("爬蟲", PTT_HEADER)            # 儲存抓到的文章（含 content）
ws_analysis = ensure_ws("爬蟲_analysis", ["analysis_time","n_docs","total_words","avg_words","top_words","ai_summary","ai_conclusion"])

# =========================
# Gemini API 設定 (完全依照用戶指定的番茄鐘寫法)
# =========================
model = None # 將全域變數名稱設定為 model
try:
    # !!! 請替換為您真實的 API Key !!!
    # 建議使用 Colab Secret 或環境變數儲存
    GEMINI_API_KEY = "AIzaSyAWyvSMGkAgiTMSdE8TEId8IFDw0OD46io"
    genai.configure(api_key=GEMINI_API_KEY)
    # 使用 Flash 模型，它速度快、成本低，適合規劃任務
    model = genai.GenerativeModel("gemini-2.5-flash")
    print("✅ Gemini API 配置成功。")
except Exception as e:
    print(f"❌ Gemini API 配置失敗: {e}")

# =========================
# PTT 爬蟲函式
# =========================
PTT_INDEX = "https://www.ptt.cc/bbs/China-Drama/index.html"
PTT_COOKIES = {"over18": "1"}

def _get_soup(url):
    r = requests.get(url, timeout=15, headers={"User-Agent":"Mozilla/5.0"}, cookies=PTT_COOKIES)
    r.raise_for_status()
    return BeautifulSoup(r.text, "html.parser")

def _get_prev_index_url(soup):
    for a in soup.select("div.btn-group-paging a.btn.wide"):
        if "上頁" in a.get_text(strip=True):
            href = a.get("href")
            if href:
                return "https://www.ptt.cc" + href
    return None

def _parse_nrec(span):
    if not span:
        return 0
    txt = span.get_text(strip=True)
    if txt == "爆":
        return 100
    if txt.startswith("X"):
        try: return -int(txt[1:])
        except: return -10
    try: return int(txt)
    except: return 0

def extract_post_list_from_index(url):
    soup = _get_soup(url)
    posts = []
    for r in soup.select("div.r-ent"):
        a = r.select_one("div.title a")
        if not a:
            continue
        title = a.get_text(strip=True)
        url_post = "https://www.ptt.cc" + a.get("href")
        author = r.select_one("div.author").get_text(strip=True)
        date = r.select_one("div.date").get_text(strip=True)
        nrec = _parse_nrec(r.select_one("div.nrec span"))
        posts.append({"title": title, "url": url_post, "author": author, "date": date, "nrec": nrec})
    prev = _get_prev_index_url(soup)
    return posts, prev

# 抓 index_pages 頁的文章列表（採去重）
def crawl_index_pages(index_pages=3, min_push=0, keyword=""):
    url = PTT_INDEX
    all_rows = []
    try:
        seen = set([r['url'] for r in ws_clips.get_all_records()])
    except Exception:
        seen = set()

    for _ in range(int(index_pages)):
        try:
            posts, prev = extract_post_list_from_index(url)
        except Exception as e:
            return f"⚠️ 取得 index 失敗：{e}"
        for p in posts:
            if p["nrec"] < int(min_push):
                continue
            if keyword and keyword not in p["title"]:
                continue
            if p["url"] in seen:
                continue
            all_rows.append({
                "post_id": str(uuid.uuid4())[:8],
                "title": p["title"][:200],
                "url": p["url"],
                "date": p["date"],
                "author": p["author"],
                "nrec": str(p["nrec"]),
                "created_at": tznow().isoformat(),
                "fetched_at": tznow().isoformat(),
                "content": ""
            })
            seen.add(p["url"])
        if not prev:
            break
        url = prev
    if all_rows:
        df_new = pd.DataFrame(all_rows, columns=PTT_HEADER)
        # append to sheet
        existing = get_as_dataframe(ws_clips, evaluate_formulas=False)
        existing = existing.dropna(how="all")
        if existing is None or existing.empty:
            combined = df_new
        else:
            combined = pd.concat([existing, df_new], ignore_index=True)
        set_with_dataframe(ws_clips, combined)
        return f"✅ 取得 {len(all_rows)} 篇文章（寫入 '爬蟲'）"
    return "ℹ️ 無新文章"

def _clean_ptt_content(soup):
    # 移除推文區與 meta
    for p in soup.select("div.push"):
        p.decompose()
    main = soup.select_one("#main-content")
    if not main:
        return ""
    for m in main.select("div.article-metaline, div.article-metaline-right"):
        m.decompose()
    text = main.get_text("\n", strip=True)
    if "--" in text:
        text = text.split("--")[0].strip()
    return text

def fetch_and_write_contents(limit_per_run=50):
    # 從 sheet 讀出尚未有 content 的 url（或 content 空白）
    df = get_as_dataframe(ws_clips, evaluate_formulas=False).fillna("")
    if df.empty:
        return "⚠️ '爬蟲' 工作表沒有資料"
    to_fetch = df[df["content"].apply(lambda x: not bool(str(x).strip()))]
    if to_fetch.empty:
        return "ℹ️ 沒有待抓取內文"
    updated = 0
    for idx, row in to_fetch.head(limit_per_run).iterrows():
        url = row["url"]
        try:
            soup = _get_soup(url)
            content = _clean_ptt_content(soup)
        except Exception as e:
            content = f"FETCH_ERROR: {e}" # 記錄錯誤
        df.loc[idx, "content"] = content
        df.loc[idx, "fetched_at"] = tznow().isoformat()
        updated += 1
        time.sleep(0.5)
    set_with_dataframe(ws_clips, df)
    return f"✅ 已更新 {updated} 篇文章的內文到 '爬蟲' 工作表"

def load_sheet_preview(n=100):
    df = get_as_dataframe(ws_clips, evaluate_formulas=False).fillna("")
    if df is None:
        return pd.DataFrame()
    return df.head(n)

# =========================
# 文字分析 (分離出純分析和 AI 摘要邏輯)
# =========================
import jieba
from sklearn.feature_extraction.text import TfidfVectorizer

def simple_tokenize_cn(text):
    # 只保留中文與英數，然後用 jieba 切詞
    text = re.sub(r"[^\u4e00-\u9fffA-Za-z0-9]+", " ", str(text))
    toks = [w for w in jieba.lcut(text) if len(w.strip())>0]
    return toks

def get_analysis_data(df, topn, min_df, use_tfidf):
    """提取分析所需的數據，不執行寫入或 AI 呼叫"""
    texts = df["content"].astype(str).tolist()
    docs = [t for t in texts if t.strip() and not t.startswith("FETCH_ERROR")]

    if not docs:
        return None, None, None, "⚠️ 沒有可分析的文字（'爬蟲' 工作表的 content 欄）"

    # 純文字分析部分 (詞頻/字數)
    counts = [len(re.findall(r"[\u4e00-\u9fffA-Za-z0-9]", d)) for d in docs]
    total_words = sum(counts)
    avg_words = round(total_words / len(counts), 2)
    token_docs = [simple_tokenize_cn(d) for d in docs]
    freq = Counter()
    for toks in token_docs:
        freq.update([t for t in toks if re.search(r'[\u4e00-\u9fff]', t)])

    topn = int(topn)
    common = freq.most_common(topn)
    top_words_str = "; ".join([f"{w}:{c}" for w,c in common])

    # TF-IDF 可選
    if use_tfidf:
        try:
            vec = TfidfVectorizer(tokenizer=simple_tokenize_cn, lowercase=False, min_df=min_df)
            vec.fit_transform(docs)
        except Exception:
            pass

    analysis_row = {
        "analysis_time": tznow().isoformat(),
        "n_docs": len(docs),
        "total_words": total_words,
        "avg_words": avg_words,
        "top_words": top_words_str,
        "ai_summary": "",
        "ai_conclusion": ""
    }

    top_df = pd.DataFrame(common, columns=["word","count"])

    return analysis_row, top_df, docs, f"✅ 分析完成：{len(docs)} 篇；總字數 {total_words}；平均 {avg_words}"

def run_pure_analysis(topn, use_tfidf):
    """執行純文字分析並寫入 Google Sheet"""
    df = get_as_dataframe(ws_clips, evaluate_formulas=False).fillna("")
    analysis_row, top_df, docs, message = get_analysis_data(df, topn, 1, use_tfidf)

    if analysis_row is None:
        return message, pd.DataFrame(), "(AI 摘要未執行)"

    # 寫入 ws_analysis (純文字分析結果)
    analysis_header = ["analysis_time","n_docs","total_words","avg_words","top_words","ai_summary","ai_conclusion"]
    df_an = get_as_dataframe(ws_analysis, evaluate_formulas=False).fillna("")

    # 檢查並清理舊紀錄中的 AI 欄位，以確保不影響後續 AI 摘要的覆蓋
    analysis_row["ai_summary"] = "（待 AI 摘要）"
    analysis_row["ai_conclusion"] = ""

    if df_an.empty or list(df_an.columns) != analysis_header:
        ws_analysis.clear()
        ws_analysis.update([analysis_header])
        df_an_new = pd.DataFrame([analysis_row], columns=analysis_header)
    else:
        df_an_new = pd.concat([df_an, pd.DataFrame([analysis_row], columns=analysis_header)], ignore_index=True)
    set_with_dataframe(ws_analysis, df_an_new)

    # 寫入 top_words 到專門的 Sheet
    top_ws = ensure_ws("爬蟲_top_words", ["word","count"])
    set_with_dataframe(top_ws, top_df)

    return message, top_df, "(AI 摘要待執行)"

def run_ai_summary():
    """執行 AI 摘要，並更新 Google Sheet"""

    global model
    if model is None:
        return "❌ Gemini API 未配置，無法執行 AI 摘要。", "(AI 摘要未執行)"

    df = get_as_dataframe(ws_clips, evaluate_formulas=False).fillna("")

    # 使用預設參數取得內容和基本分析數據
    analysis_row, top_df, docs, base_message = get_analysis_data(df, 20, 1, False)

    if docs is None:
         return base_message, "(AI 摘要未執行)"

    ai_summary = "⚠️ AI 摘要未執行"
    ai_conclusion = ""

    try:
        all_text_preview = "\n---\n".join(docs)[:10000]  # 擷取前 10000 字給模型

        prompt = f"""請用中文，根據以下受訪者 / 文章內容，輸出：
1) 五句簡短洞察（每句一行，最多 30字/句）
2) 一段約 120 字的結論（總結情緒與主要趨勢）。
內容：
{all_text_preview}
"""
        # 呼叫 generate_content
        resp = model.generate_content(prompt, request_options={"timeout": 60})
        out = resp.text.strip()

        parts = out.split('\n\n', 1)

        if len(parts) == 2:
            ai_summary = parts[0].strip()
            ai_conclusion = parts[1].strip()
        else:
            ai_summary = out.strip()
            ai_conclusion = "（無法從 AI 輸出中切分出明確的結論段落）"

    except Exception as e:
        ai_summary = f"⚠️ Gemini 呼叫失敗：{e}"
        ai_conclusion = ""

    # 將 AI 結果寫入 ws_analysis (更新最近一筆紀錄)
    df_an = get_as_dataframe(ws_analysis, evaluate_formulas=False).fillna("")
    analysis_header = ["analysis_time","n_docs","total_words","avg_words","top_words","ai_summary","ai_conclusion"]

    if not df_an.empty:
        # 覆蓋最新一筆紀錄的 AI 欄位
        df_an.loc[df_an.index[-1], "ai_summary"] = ai_summary
        df_an.loc[df_an.index[-1], "ai_conclusion"] = ai_conclusion
        set_with_dataframe(ws_analysis, df_an)

        full_ai_output = f"【洞察】\n{ai_summary}\n\n【結論】\n{ai_conclusion}"
        return f"✅ AI 摘要已生成並更新 Google Sheet。", full_ai_output
    else:
        # 如果沒有純分析紀錄，則創建新的一筆 (補上基礎數據)
        if analysis_row is None:
            analysis_row = {k: "N/A" for k in analysis_header}
            analysis_row["analysis_time"] = tznow().isoformat()

        analysis_row["ai_summary"] = ai_summary
        analysis_row["ai_conclusion"] = ai_conclusion

        df_an_new = pd.DataFrame([analysis_row], columns=analysis_header)
        set_with_dataframe(ws_analysis, df_an_new)

        full_ai_output = f"【洞察】\n{ai_summary}\n\n【結論】\n{ai_conclusion}"
        return f"✅ AI 摘要已生成並寫入新紀錄。", full_ai_output


# =========================
# Gradio UI
# =========================
# Keep in-memory clip/task tables for Gradio display (your original)
clips_df_local = pd.DataFrame(columns=CLIPS_HEADER)
tasks_df_local = pd.DataFrame(columns=TASKS_HEADER)

with gr.Blocks(title="PTT Crawler & Task Integrator (extended)") as demo:
    gr.Markdown("## 🕸️ PTT China-Drama 爬蟲 + 內文抓取 + 文字分析（含 Gemini AI 摘要）")

    with gr.Tab("Crawler (index)"):
        url = gr.Textbox(label="起始 Index URL", value=PTT_INDEX)
        pages = gr.Number(label="向上抓幾頁 (index pages)", value=3, precision=0)
        min_push = gr.Number(label="最少推文數 (min_push)", value=0, precision=0)
        keyword = gr.Textbox(label="標題關鍵字過濾 (選填)", value="")
        btn_index = gr.Button("📄 抓 Index（多頁）")
        out_index = gr.Markdown()
        btn_index.click(fn=crawl_index_pages, inputs=[pages, min_push, keyword], outputs=[out_index])

    with gr.Tab("抓內文（寫回 Sheet）"):
        btn_fetch = gr.Button("📥 抓取尚未抓內文的文章（寫回 '爬蟲'）")
        out_fetch = gr.Markdown()
        btn_fetch.click(fn=fetch_and_write_contents, inputs=[gr.Number(value=50, visible=False)], outputs=[out_fetch])

        gr.Markdown("---")
        btn_show_sheet = gr.Button("📂 從 '爬蟲' 工作表讀出並顯示 (show)")
        sheet_table = gr.Dataframe(value=pd.DataFrame(), interactive=True)

        btn_show_sheet.click(fn=load_sheet_preview, outputs=[sheet_table])

    with gr.Tab("Add to Tasks"):
        clip_ids = gr.Textbox(label="要加入任務的 post_id（多個以逗號分隔）")
        default_priority = gr.Radio(["H", "M", "L"], value="M", label="預設優先度")
        est_min = gr.Number(value=25, precision=0, label="預估時間（分鐘）")
        btn_add = gr.Button("➕ 加入任務")
        msg_add = gr.Markdown()
        grid_tasks = gr.Dataframe(value=tasks_df_local, label="任務清單", interactive=True)

        # local add function
        def add_clips_local(clip_ids, default_priority, est_min):
            global tasks_df_local
            if not clip_ids:
                return "⚠️ 請輸入 post_id", load_sheet_preview(), tasks_df_local
            ids = [c.strip() for c in clip_ids.split(",") if c.strip()]

            df_sheet = get_as_dataframe(ws_clips, evaluate_formulas=False).fillna("")
            sel = df_sheet[df_sheet["post_id"].isin(ids)] if not df_sheet.empty else pd.DataFrame()

            if sel.empty:
                return "⚠️ 沒有匹配到任何 post_id", load_sheet_preview(), tasks_df_local

            _now = tznow().isoformat()
            new_tasks = []
            for _, r in sel.iterrows():
                title = r.get("title", "") or "（未命名）"
                note = f"PTT 連結：{r.get('url','')}\n作者：{r.get('author','')}\n原始 ID：{r.get('post_id','')}"
                new_tasks.append({
                    "id": str(uuid.uuid4())[:8],
                    "task": title[:120],
                    "status": "todo",
                    "priority": default_priority or "M",
                    "est_min": int(est_min) if est_min else 25,
                    "start_time": "",
                    "end_time": "",
                    "actual_min": 0,
                    "pomodoros": 0,
                    "due_date": "",
                    "labels": "from:ptt",
                    "notes": note,
                    "created_at": _now,
                    "updated_at": _now,
                    "completed_at": "",
                    "planned_for": ""
                })
            if new_tasks:
                tasks_df_local = pd.concat([tasks_df_local, pd.DataFrame(new_tasks, columns=TASKS_HEADER)], ignore_index=True)
                return f"✅ 已加入 {len(new_tasks)} 筆任務 (local)", load_sheet_preview(), tasks_df_local
            return "⚠️ 沒有可加入的項目", load_sheet_preview(), tasks_df_local


        btn_add.click(fn=add_clips_local, inputs=[clip_ids, default_priority, est_min], outputs=[msg_add, sheet_table, grid_tasks])

    with gr.Tab("🧠 Text Analysis (Gemini)"):
        with gr.Row():
            topn = gr.Number(label="Top N 熱詞", value=20, precision=0, scale=0)
            use_tfidf = gr.Checkbox(label="使用 TF-IDF (補充排序)", value=False, scale=1)

        # 分離按鈕 1: 純文字分析
        btn_analyze = gr.Button("🔍 1. 執行純文字分析 (詞頻/字數)")
        result_msg = gr.Markdown(value="---")

        # 移除 height 參數
        top_table = gr.Dataframe(label="熱門詞彙 Top N")

        gr.Markdown("---")

        # 分離按鈕 2: AI 摘要
        btn_ai_summary = gr.Button("🤖 2. 執行 AI 摘要 (呼叫 Gemini)")
        ai_msg = gr.Markdown(value="---")

        # AI 輸出欄位
        ai_out = gr.Textbox(label="Gemini AI 摘要 (洞察 + 結論)", lines=10, interactive=False)

        # 事件綁定 1: 純文字分析
        # 輸出：純分析結果訊息, 熱詞表格, AI 摘要佔位符
        btn_analyze.click(fn=run_pure_analysis,
                          inputs=[topn, use_tfidf],
                          outputs=[result_msg, top_table, ai_out])

        # 事件綁定 2: AI 摘要
        # 輸出：AI 摘要訊息, AI 輸出文字
        btn_ai_summary.click(fn=run_ai_summary,
                             inputs=None,
                             outputs=[ai_msg, ai_out])

# ====== 啟動 App ======
if __name__ == "__main__":
    # 完全使用您指定的參數
    demo.launch(debug=True, share=True)

✅ Gemini API 配置成功。
Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://4031eca500f5c719f3.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


Building prefix dict from the default dictionary ...
DEBUG:jieba:Building prefix dict from the default dictionary ...
Dumping model to file cache /tmp/jieba.cache
DEBUG:jieba:Dumping model to file cache /tmp/jieba.cache
Loading model cost 0.813 seconds.
DEBUG:jieba:Loading model cost 0.813 seconds.
Prefix dict has been built successfully.
DEBUG:jieba:Prefix dict has been built successfully.


Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7860 <> https://4031eca500f5c719f3.gradio.live
