<a href="https://colab.research.google.com/github/cundeyu154/PL-Repo/blob/main/HW_4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
pip install google-genai pandas numpy requests beautifulsoup4 scikit-learn jieba gradio gspread



In [5]:
import os
import json
import re
import requests
import numpy as np
import pandas as pd
from bs4 import BeautifulSoup
from google import genai
from google.genai import types
from sklearn.feature_extraction.text import TfidfVectorizer
import jieba
import gradio as gr
import gspread

# ==============================================================================
# 1. 配置與常量 (請替換為您的實際值！)
# ==============================================================================
GEMINI_API_KEY = "AIzaSyDN-LWFIop1hMP2J3lKLN8hU06ZOb3tkvM"

GS_CREDENTIALS_PATH = "service_account_key.json"

# Google Sheet 設置
SHEET_NAME = "文本分析自動化專案"
DATA_WORKSHEET_TITLE = "原始數據" # 爬蟲結果存放的工作表名稱
STATS_WORKSHEET_TITLE = "統計結果" # 關鍵詞統計結果存放的工作表名稱
TOP_N_KEYWORDS = 20 # 要提取的熱詞數量

# ==============================================================================
# 2. 輔助函數：Google Sheets 處理
# ==============================================================================

def init_gspread():
    """初始化 gspread 客戶端並授權."""
    try:
        # 使用服務帳戶金鑰檔案進行授權
        gc = gspread.service_account(filename=GS_CREDENTIALS_PATH)
        return gc
    except Exception as e:
        # 在 Gradio 介面中拋出錯誤，讓使用者看到
        raise gr.Error(f"Google Sheets 授權失敗。請檢查設定。錯誤: {e}")

def get_or_create_worksheet(spreadsheet, title, header):
    """取得或創建指定名稱的工作表。"""
    try:
        worksheet = spreadsheet.worksheet(title)
        return worksheet
    except gspread.WorksheetNotFound:
        # 如果工作表不存在，則創建它並寫入標題
        worksheet = spreadsheet.add_worksheet(title=title, rows="100", cols="20")
        worksheet.update('A1', [header])
        return worksheet
    except Exception as e:
        raise gr.Error(f"操作工作表 {title} 失敗: {e}")

# ==============================================================================
# 3. 數據收集：爬蟲功能 (已強化為實際提取)
# ==============================================================================

def scrape_data(target_url):
    """
    從單一 URL 爬取文章主體文本的強化版函數。
    它嘗試使用常見的 CSS 選擇器來定位主要文章內容。
    """
    url = target_url
    if not url.startswith(('http://', 'https://')):
        url = 'https://' + url

    try:
        headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
        response = requests.get(url, headers=headers, timeout=15)
        response.raise_for_status() # 檢查 HTTP 錯誤

        # 這裡設置編碼處理，避免中文亂碼
        response.encoding = response.apparent_encoding
        soup = BeautifulSoup(response.content, 'html.parser')

        # 強化：嘗試使用更精確的選擇器來定位文章主體
        content_selectors = [
            "article",
            ".article-content",
            ".post-body",
            ".entry-content",
            ".main-content",
            "#content-body", # 針對特定網站結構添加
        ]

        main_content_element = None
        for selector in content_selectors:
            # 嘗試使用 CSS 選擇器查找
            main_content_element = soup.select_one(selector)
            if main_content_element:
                break

        paragraphs = []
        if main_content_element:
            # 如果找到主體元素，只從該元素內提取所有段落
            paragraphs = [p.get_text().strip() for p in main_content_element.find_all('p')]
        else:
            # 如果沒有找到精確的主體元素，則退回到全頁面的 <p> 標籤
            paragraphs = [p.get_text().strip() for p in soup.find_all('p')]

        # 過濾空行並合併
        full_text = '\n'.join(filter(None, paragraphs))

        # 如果無法提取大量文本，可能是網站結構複雜，或頁面非主要文章
        if not full_text or len(full_text) < 100:
            # 嘗試抓取 body 裡的所有文本作為最終備選
            body_text = soup.body.get_text(separator='\n', strip=True) if soup.body else ""
            if len(body_text) > 100:
                 full_text = body_text
            else:
                return pd.DataFrame({"標題": ["無法提取主體文本"], "內容": ["請檢查 URL、網站結構或爬蟲邏輯。"]})

        # 模擬單筆數據的 DataFrame 結構
        data = pd.DataFrame({
            "標題": [soup.title.string.strip() if soup.title else "無標題"],
            "內容": [full_text]
        })
        return data

    except requests.exceptions.RequestException as e:
        error_msg = f"連線錯誤: {e.__class__.__name__} - {e}"
        return pd.DataFrame({"標題": [error_msg], "內容": [error_msg]})
    except Exception as e:
        error_msg = f"爬蟲解析錯誤: {e.__class__.__name__} - {e}"
        return pd.DataFrame({"標題": [error_msg], "內容": [error_msg]})

# ==============================================================================
# 4. 文本分析：TF-IDF 與關鍵詞提取
# ==============================================================================

def chinese_tokenizer(text):
    """使用 jieba 進行中文分詞。"""
    # 擴充更完整的中文停用詞列表
    # 這裡只使用一個非常簡化的停用詞列表，實際應用中請擴充
    simple_stopwords = {'的', '了', '是', '我', '你', '他', '她', '它', '我們', '你們', '他們', '和', '但', '也', '而', '這', '那', '一個', '一種', '可以', '能夠', '因為', '所以', '這樣', '那樣', '在', '上', '下', '中', '之', '所', '則', '等', '與', '或'}

    # 使用 jieba.cut 進行精確分詞
    words = jieba.cut(text, cut_all=False)

    # 過濾停用詞、單字詞（通常為助詞/標點）、數字和空白
    filtered_words = [
        word.lower()
        for word in words
        if word.strip() and
           len(word.strip()) > 1 and
           word.lower() not in simple_stopwords and
           not word.strip().isdigit()
    ]

    # 必須返回空格分隔的字符串，這是 TfidfVectorizer 期望的格式
    return " ".join(filtered_words)

def dummy_tokenizer(text):
    """TfidfVectorizer 需要一個接受文本並返回分詞後的空格分隔字串的 callable。"""
    return chinese_tokenizer(text).split(' ')

def perform_analysis(df):
    """執行 TF-IDF 計算，提取熱詞。"""
    if df.empty or '內容' not in df.columns:
        return None, "數據框無內容或格式錯誤。"

    corpus = df['內容'].astype(str).tolist()

    # 使用 TfidfVectorizer 進行 TF-IDF 計算
    # 我們將 preprocessor 和 tokenizer 設為 None，讓 token_pattern 處理，
    # 但為了中文精確分詞，我們必須用自己的分詞函數
    vectorizer = TfidfVectorizer(
        tokenizer=dummy_tokenizer, # 使用 dummy_tokenizer 將 jieba 處理後的列表傳給 TfidfVectorizer
        preprocessor=None,
        token_pattern=None, # 禁用內建的 token_pattern
        use_idf=True
    )

    try:
        tfidf_matrix = vectorizer.fit_transform(corpus)
    except Exception as e:
        return None, f"TF-IDF 計算失敗，可能是文本量過少或分詞錯誤: {e}"

    feature_names = vectorizer.get_feature_names_out()

    # 計算所有文檔的平均 TF-IDF 分數 (這裡的軸向必須是 0)
    avg_tfidf_scores = tfidf_matrix.mean(axis=0).tolist()[0]

    # 組合詞彙和分數
    df_scores = pd.DataFrame({'詞彙': feature_names, '平均TFIDF得分': avg_tfidf_scores})

    # 排序並選取前 N 個熱詞
    top_keywords = df_scores.sort_values(by='平均TFIDF得分', ascending=False).head(TOP_N_KEYWORDS)

    # 格式化分數
    top_keywords['平均TFIDF得分'] = top_keywords['平均TFIDF得分'].apply(lambda x: f"{x:.5f}")

    return top_keywords, None

# ==============================================================================
# 5. Gemini API 串接：洞察生成
# ==============================================================================

def generate_insights(df_keywords, all_text):
    """串接 Gemini API 生成洞察摘要與結論。"""

    if not GEMINI_API_KEY or GEMINI_API_KEY == "YOUR_GEMINI_API_KEY_HERE":
        raise gr.Error("❌ 錯誤：請在程式碼中填入有效的 `GEMINI_API_KEY`。")

    client = genai.Client(api_key=GEMINI_API_KEY)

    # 準備 Prompt
    keyword_list = df_keywords.to_dict('records')
    keyword_text = json.dumps(keyword_list, ensure_ascii=False, indent=2)

    # 將所有文本合併成一個大字符串（限制長度以避免超出上下文窗口）
    # 這裡只取前 5000 字作為上下文參考
    context_text = all_text[:5000]

    user_prompt = f"""
    你的任務是根據以下提供的原始文本內容和關鍵詞統計結果，生成一份專業分析。

    ---
    **統計關鍵詞列表 (TF-IDF Top {TOP_N_KEYWORDS}):**
    {keyword_text}

    **原始文本摘要 (作為上下文參考):**
    ---
    {context_text}
    ---

    請嚴格遵守以下輸出格式要求：
    1.  **洞察摘要 (Insights):** 生成 5 句獨立的中文句子，每句提出一個有價值、基於數據的洞察或發現。請確保這 5 句清晰地列出。
    2.  **結論 (Conclusion):** 生成一段完整的中文結論，字數必須控制在 **110 到 130 字** 之間，總結分析的結果並提出行動建議。
    """

    # 系統指令：這次改為獨立的變數
    system_instruction_text = "你是一位精通中文分析的專家。你必須根據輸入數據，輸出一份包含「5 句洞察摘要」和「一段 110-130 字結論」的分析報告，不得包含其他冗餘的標題或解釋。"

    # 僅包含 user 內容 (這是正確的)
    user_contents = [
        {
            "role": "user",
            "parts": [{"text": user_prompt}]
        }
    ]


    # 執行 API 呼叫
    try:
        response = client.models.generate_content(
            model='gemini-2.5-flash',
            contents=user_contents, # 僅 user 內容
            config=types.GenerateContentConfig(
                temperature=0.3,
                # 💡 關鍵修正：將系統指令移入 config 內，解決 'unexpected keyword argument' 錯誤
                system_instruction=system_instruction_text
            )
            # 舊版本 SDK 不支援 system_instruction 作為頂層參數，故移除:
            # system_instruction=system_instruction_text
        )

        # 簡易的格式化輸出 (解析模型的回覆)
        insight_conclusion_text = response.text

        # 嘗試解析出洞察和結論部分
        parts = insight_conclusion_text.split('\n')
        insights = []
        conclusion_lines = []

        for line in parts:
            if line.strip():
                # 假設洞察是獨立的句子，或以列表符號開始
                if re.match(r'^\d+[\.、]\s*|^\*\s*|^\-\s*', line.strip()):
                    insights.append(line.strip())
                elif len(line.strip()) > 30: # 假設超過30字的連續文本是結論的一部分
                    conclusion_lines.append(line.strip())

        conclusion = " ".join(conclusion_lines)

        # 如果解析失敗，或者洞察不足 1 句，直接使用原始文本
        if len(insights) < 1 or len(conclusion) < 50:
             final_output = f"**[AI 生成結果] (原始輸出)**\n```\n{insight_conclusion_text}\n```"
        else:
            final_output = "### 💡 數據洞察摘要 (5 句)\n"
            # 清理洞察句前的數字或符號
            cleaned_insights = []
            for i in insights:
                 cleaned_insights.append(re.sub(r'^\d+[\.、]\s*|^\*\s*|^\-\s*', '', i).strip())

            final_output += "\n".join([f"- {i}" for i in cleaned_insights[:5]]) # 只取前 5 句
            final_output += "\n\n### 📝 總結與建議 (約 120 字)\n"
            final_output += conclusion

        return final_output, "✅ 成功"

    except Exception as e:
        # 顯示更友好的 API 錯誤訊息
        raise gr.Error(f"❌ Gemini API 呼叫失敗，請檢查 API Key 是否正確且服務是否已啟用。錯誤: {e}")

# ==============================================================================
# 6. 主流程控制函數 (Gradio 執行)
# ==============================================================================

def automated_analysis_flow(target_url, sheet_url_or_id):
    """
    整個自動化流程的主控制函數。
    """

    # 輸出變數初始化
    status_log = ""
    top_keywords_df = pd.DataFrame({'詞彙': ['N/A'], '平均TFIDF得分': ['N/A']})
    gemini_output = "等待執行結果..."

    if not target_url or not sheet_url_or_id:
        raise gr.Error("請輸入目標 URL 和 Google Sheet ID/URL。")

    try:
        # --- 步驟 1: 爬蟲與寫入 Google Sheets ---
        status_log += f"1. 開始爬取 URL: {target_url}\n"
        raw_data_df = scrape_data(target_url)

        if raw_data_df.empty or raw_data_df['內容'].iloc[0].startswith(("連線錯誤", "爬蟲解析錯誤", "無法提取")):
            error_message = raw_data_df['內容'].iloc[0]
            status_log += f"❌ 爬蟲或數據提取失敗: {error_message}\n"
            raise gr.Error(f"爬蟲失敗: {error_message}")

        status_log += f"   ✅ 爬取成功，獲得 {len(raw_data_df)} 筆數據。\n"

        # 初始化 GS
        gc = init_gspread()
        status_log += f"   ✅ Google Sheets 服務帳戶授權成功。\n"

        # 開啟試算表
        spreadsheet = gc.open_by_url(sheet_url_or_id) if sheet_url_or_id.startswith('http') else gc.open_by_key(sheet_url_or_id)
        status_log += f"   ✅ 成功開啟試算表: {spreadsheet.title}\n"

        # 寫入原始數據工作表
        data_ws = get_or_create_worksheet(spreadsheet, DATA_WORKSHEET_TITLE, ["標題", "內容"])
        data_ws.clear()
        data_ws.update([raw_data_df.columns.values.tolist()] + raw_data_df.values.tolist())
        status_log += f"   ✅ 原始數據已寫入到 [{DATA_WORKSHEET_TITLE}]。\n"

        # --- 步驟 2: TF-IDF 詞頻與關鍵字統計 ---
        status_log += "2. 開始執行 TF-IDF 文本分析...\n"
        top_keywords_df, error = perform_analysis(raw_data_df)

        if error:
            status_log += f"❌ TF-IDF 分析失敗: {error}\n"
            raise gr.Error(f"TF-IDF 分析失敗: {error}")

        status_log += f"   ✅ TF-IDF 分析完成，提取前 {TOP_N_KEYWORDS} 熱詞。\n"

        # 將統計結果回寫到 Google Sheets
        stats_ws = get_or_create_worksheet(spreadsheet, STATS_WORKSHEET_TITLE, ["詞彙", "平均TFIDF得分"])
        stats_ws.clear()
        stats_ws.update([top_keywords_df.columns.values.tolist()] + top_keywords_df.values.tolist())
        status_log += f"   ✅ 統計結果已回寫到 [{STATS_WORKSHEET_TITLE}]。\n"

        # --- 步驟 3: 串接 Gemini API 生成洞察摘要與結論 ---
        status_log += "3. 串接 Gemini API，生成洞察與結論...\n"
        all_text_content = "\n".join(raw_data_df['內容'].astype(str).tolist())

        gemini_output, gemini_status = generate_insights(top_keywords_df, all_text_content)

        status_log += f"   {gemini_status} Gemini 洞察生成完成。\n"
        status_log += "--- 執行流程結束 ---"

        return status_log, top_keywords_df, gemini_output

    except gr.Error as ge:
        # Gradio.Error 會被自動顯示，這裡只更新 Log
        status_log += str(ge).replace("gr.Error: ", "")
        return status_log, top_keywords_df, "執行中斷，請查看 Log 資訊。"
    except Exception as e:
        # 其他未預期的致命錯誤
        error_msg = f"❌ 致命錯誤發生: {e}"
        status_log += error_msg
        raise gr.Error(f"致命錯誤: {e}")

# ==============================================================================
# 7. Gradio 介面設計
# ==============================================================================

# 配置 Gradio 介面
with gr.Blocks(title="全自動化文本洞察生成器") as demo:
    gr.Markdown("# 🤖 全自動文本洞察生成器 (Gradio + Gemini + Google Sheets)")
    gr.Markdown(
        "此應用程式自動執行 **爬蟲 → 寫入 Sheets → 詞頻分析 → 回寫 Sheets → Gemini 洞察生成** 的完整流程。\n"
        "**🚨 重要：請務必將 JSON 憑證檔案命名為 `service_account_key.json` 並上傳！**"
    )

    with gr.Row():
        # 輸入區
        with gr.Column(scale=1):
            target_url_input = gr.Textbox(
                label="Step 1: 爬蟲目標 URL (請輸入一個單頁文章連結)",
                value="https://www.ettoday.net/news/20241026/2847285.htm",
                placeholder="例如: https://example.com/news/1"
            )
            sheet_id_input = gr.Textbox(
                label="Step 2: Google Sheet URL 或 ID (請務必填寫)",
                value="https://docs.google.com/spreadsheets/d/1cFttfMP93Bdzhtcqid06xTCqqZ0h5lrFODHXk8Se5R0/edit?gid=1806628729#gid=1806628729",
                placeholder="https://docs.google.com/spreadsheets/d/..."
            )
            run_button = gr.Button("🚀 一鍵執行分析流程", variant="primary")

        # 狀態輸出區
        with gr.Column(scale=1):
            status_output = gr.Textbox(
                label="流程執行狀態紀錄",
                lines=10,
                interactive=False,
                placeholder="等待執行..."
            )

    # 結果輸出區
    with gr.Tabs():
        with gr.TabItem("📊 關鍵詞統計結果 (回寫到 Google Sheet)"):
            keywords_output = gr.Dataframe(
                label=f"前 {TOP_N_KEYWORDS} 個熱詞及其 TF-IDF 平均得分 (已回寫到 Sheets)",
                headers=["詞彙", "平均TFIDF得分"],
                datatype=["str", "str"],
                wrap=True
            )
        with gr.TabItem("🤖 Gemini 洞察摘要與結論"):
            gemini_insight_output = gr.Markdown(
                label="AI 生成的洞察摘要與 120 字結論",
                value="Gemini API 輸出結果將顯示在這裡..."
            )

    # 綁定事件
    run_button.click(
        fn=automated_analysis_flow,
        inputs=[target_url_input, sheet_id_input],
        outputs=[status_output, keywords_output, gemini_insight_output]
    )

# 啟動 Gradio 服務
if __name__ == "__main__":
    # 載入 jieba 詞典，準備分詞
    jieba.initialize()
    print("Jieba 分詞庫已初始化。")

    demo.launch()

Jieba 分詞庫已初始化。
It looks like you are running Gradio on a hosted Jupyter notebook, which requires `share=True`. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://a930accd6b6b8cd751.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)
