# 連接資料庫並取得評論資料

In [None]:
import os
import pandas as pd
from sqlalchemy import create_engine, text, bindparam

In [None]:
db_user = 'root'
db_password = '123456'
db_host = '100.77.42.49'
db_port = '3306'
db_name = 'lda_temp'

# 建立連線
connection_string = f"mysql+pymysql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}?charset=utf8mb4"
engine = create_engine(connection_string)

try:
    with engine.connect() as connection:
        result = connection.execute(text("SELECT 1"))
        print("連線成功！")
except Exception as e:
    print(f"連線失敗：{e}")

In [None]:
sql_file_path = 'sql_query/5b.sql'
sql_file_name = os.path.splitext(os.path.basename(sql_file_path))[0]

In [None]:
try:
    # 讀取 SQL 檔案
    if not os.path.exists(sql_file_path):
        raise FileNotFoundError(f"找不到檔案：{sql_file_path}")

    with open(sql_file_path, 'r', encoding='utf-8-sig') as file:
        sql_query = file.read()

    # 執行查詢並放入 dataFrame
    with engine.connect() as conn:
        df = pd.read_sql(text(sql_query), conn)

    print(f"讀取成功！共取得 {len(df)} 筆資料")
    # print(df.head())

except FileNotFoundError as e:
    print(e)
except Exception as e:
    print(f"發生錯誤: {e}")

# 分詞

In [None]:
import pandas as pd
from gensim import corpora, models
from gensim.models.coherencemodel import CoherenceModel
import pyLDAvis
import pyLDAvis.gensim_models
import re
import os
import torch
import csv
from ckip_transformers.nlp import CkipWordSegmenter, CkipPosTagger, CkipNerChunker
import tqdm as notebook_tqdm
from tqdm.auto import tqdm

In [None]:
OUTPUT_PATH = f"result_{sql_file_name}"
STOP_WORDS_PATH = r"stop_dic/stopwords.txt"
CUSTOM_DICT_PATH = r'stop_dic/dict.txt'
POS_MAPPING_CSV_PATH = r"stop_dic/CKIP_Tag_Mapping_Table.csv"
DATA_PATH = r'data'
DEVICE = 0 if torch.cuda.is_available() else -1
print(f"{torch.cuda.is_available()}")

In [None]:
class CKIPTokenizer:
    """
    CKIPTokenizer 類別用於整合中研院 CKIP Transformers 斷詞工具
    """
    def __init__(self, stopwords_path, pos_mapping_csv_path, custom_dict_path=None, device=0):
        """
        初始化斷詞器與 CKIP 模型。
        :param stopwords_path: 停用詞檔案路徑 (.txt)
        :param pos_mapping_csv_path: 詞性對照表路徑 (.csv)，用於決定保留哪些詞性
        :param custom_dict_path: 自定義詞典路徑 (.txt)
        :param device: 執行設備 (0 為 GPU, -1 為 CPU)
        """
        self.stopwords = self.load_stopwords(stopwords_path)
        self.custom_dict = self.load_custom_dict(custom_dict_path) if custom_dict_path else {}
        self.pos_tags_to_keep = self.load_pos_tags_to_keep(pos_mapping_csv_path)
        
        # 初始化 CKIP 三大核心驅動器
        self.ws_driver  = CkipWordSegmenter (model="albert-base", device=device) # 斷詞
        self.pos_driver = CkipPosTagger     (model="albert-base", device=device) # 詞性標記
        self.ner_driver = CkipNerChunker    (model="albert-base", device=device) # 實體辨識
        
    def load_stopwords(self, filepath):
        """載入停用詞檔案，回傳一個 Set 以提升搜尋效率"""
        with open(filepath, encoding='utf-8-sig') as f:
            return set(line.strip() for line in f)

    def load_custom_dict(self, filepath):
        """載入自定義詞典，格式預期為：詞彙 [權重]"""
        custom_terms = {}
        with open(filepath, encoding='utf-8-sig') as f:
            for line in f:
                parts = line.strip().split()
                if len(parts) >= 1:
                    word = parts[0]
                    # 若無指定權重，預設為 1.0
                    weight = float(parts[1]) if len(parts) > 1 else 1.0
                    custom_terms[word] = weight
        return custom_terms

    def load_pos_tags_to_keep(self, csv_path):
        """從 CSV 載入需要保留的詞性標記"""
        pos_tags = []
        with open(csv_path, newline='', encoding='utf-8-sig') as csvfile:
            reader = csv.DictReader(csvfile)
            for row in reader:
                # 檢查 CSV 中 ' need(0/1) ' 欄位是否為 '1'
                if row['need(0/1)'].strip() == '1':
                    tags = row['簡化標記']
                    pos_tags.append(tags)
        return pos_tags

    def tokenize(self, text, return_pos=False):
        """
        對單一字串進行斷詞
        :param text: 輸入純文字
        :param return_pos: 是否回傳 (詞, 詞性) 元組清單
        """
        if not isinstance(text, str):
            return ""

        # 清洗文字：僅保留中文 (Regex: \u4e00-\u9fff)
        text = re.sub(r'[^\u4e00-\u9fff]', '', text)
        if not text:
            return ""

        try:
            # 執行斷詞與詞性標記
            word_sentence_list = self.ws_driver([text], show_progress=False)
            pos_sentence_list = self.pos_driver(word_sentence_list, show_progress=False)

            words = word_sentence_list[0]
            pos_tags = pos_sentence_list[0]

            filtered_words = []
            filtered_word_pos = []
            
            # 過濾邏輯：1.不在停用詞內 2.字數>=2 3.屬於保留詞性
            for word, pos in zip(words, pos_tags):
                if (word not in self.stopwords) and (len(word) >= 2) and (pos in self.pos_tags_to_keep):
                    filtered_words.append(f"{word}")
                    filtered_word_pos.append((word, pos))

            if return_pos:
                return filtered_word_pos
            else:
                return " ".join(filtered_words) # 回傳以空格分隔的字串
        except Exception as e:
            print(f"CKIP processing error: {e}")
            return "" if not return_pos else []
        
    def batch_tokenize(self, text_list, batch_size=64):
        """
        批次處理多筆文字
        :param text_list: 字串列表
        :param batch_size: 批次處理大小
        """
        clean_texts = []
        valid_indices = [] # 用於記錄非空值的位置，以便最後還原清單長度
        
        print("正在清理文字...")
        for idx, text in enumerate(text_list):
            if isinstance(text, str):
                # 這裡保留了中英數字 ( \w )，與單筆 tokenize 的邏輯略有不同
                text = re.sub(r'[^\u4e00-\u9fff\w]', '', text) 
                if text.strip():
                    clean_texts.append(text)
                    valid_indices.append(idx)
        
        if not clean_texts:
            return [[] for _ in range(len(text_list))]

        # 批次執行模型運算
        print(f"執行 CKIP 模型 (Batch Size: {batch_size})...")
        ws_list = self.ws_driver(clean_texts, batch_size=batch_size, show_progress=True)
        pos_list = self.pos_driver(ws_list, batch_size=batch_size, show_progress=True)

        print("過濾停用詞與篩選詞性...")
        final_tokens_list = []
        
        for words, pos_tags in zip(ws_list, pos_list):
            filtered_words = []
            for word, pos in zip(words, pos_tags):
                # 條件篩選：非停用詞、長度 > 1、指定詞性
                if (word not in self.stopwords) and (len(word) > 1) and (pos in self.pos_tags_to_keep):
                    filtered_words.append(word)
            
            final_tokens_list.append(filtered_words)

        # 依照原始 list 的索引將結果放回對應位置，其餘填空 list
        result = [[] for _ in range(len(text_list))]
        for i, tokens in zip(valid_indices, final_tokens_list):
            result[i] = tokens
            
        return result

In [None]:
tokenizer = CKIPTokenizer(
    stopwords_path=STOP_WORDS_PATH, 
    pos_mapping_csv_path=POS_MAPPING_CSV_PATH, 
    custom_dict_path=CUSTOM_DICT_PATH, 
    device=DEVICE
)

In [None]:
print("處理 DataFrame...")
df['tokens'] = tokenizer.batch_tokenize(df['評論'].tolist(), batch_size=256)
print(df[['評論', 'tokens']].head())

In [None]:
file_name = f"result_tokens.csv"
save_path = os.path.join(OUTPUT_PATH, file_name)

print(f"\n準備儲存檔案至: {save_path}")

try:
    # 檢查目錄是否存在，不存在就建立
    if not os.path.exists(OUTPUT_PATH):
        os.makedirs(OUTPUT_PATH)
        print(f"已建立資料夾: {OUTPUT_PATH}")

    df[['評論', 'tokens']].to_csv(
        save_path, 
        index=False, 
        encoding='utf-8-sig'
    )
    
    print(f"成功保存，檔案大小約: {os.path.getsize(save_path) / 1024:.2f} KB")

except Exception as e:
    print(f"保存失敗: {e}")

# LDA

## 建立詞頻矩陣

In [None]:
import pandas as pd
import ast
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.decomposition import LatentDirichletAllocation

In [None]:
corpus = df['tokens'].apply(lambda x: ' '.join(x))
print(corpus.head(3))

In [None]:
n_features = 1000  # 只取前 1000 個最重要的詞

# 建立 Vectorizer
tf_vectorizer = CountVectorizer(
    max_features=n_features,
    max_df=0.95,             # 如果一個詞在 95% 的文章都出現，就刪掉 (太通用)
    min_df=5,                # 至少要出現 5 次才算數 (過濾錯字或極罕見詞)
    token_pattern=r'(?u)\b\w+\b' # 確保匹配中文
)

# 轉換為矩陣 (詞袋（Bag-of-Words）)
tf = tf_vectorizer.fit_transform(corpus)

print(f"矩陣形狀: {tf.shape} (評論數, 特徵詞數)")
feature_names = tf_vectorizer.get_feature_names_out() # 詞彙表
print(f"前 50 個特徵詞: {feature_names[:50]}")

In [None]:
# 算出每個詞在所有文章中的總次數
# tf 是一個稀疏矩陣 (sparse matrix)，sum() 出來會是一個矩陣物件
# .A1 是 numpy 的屬性，可以快速將矩陣攤平變成一維陣列 (array)
total_counts = tf.sum(axis=0).A1

# 建立 DataFrame 對照表 (詞, 次數)
word_freq_df = pd.DataFrame({
    'Term': feature_names,
    'Frequency': total_counts
})

# 依照次數由大到小排序
word_freq_df = word_freq_df.sort_values(by='Frequency', ascending=False)
print("\n=== 前 20 個高頻詞 ===")
print(word_freq_df.head(20))

## 訓練 LDA 模型

In [None]:
# 設定主題數量
n_topics = 10

print(f"開始訓練 LDA 模型 (主題數: {n_topics})...")

lda = LatentDirichletAllocation(n_components=n_topics,                  # 主題數量
                                max_iter=100,                           # 疊代次數
                                learning_method='online',
                                learning_offset=50,
                                doc_topic_prior= (1 / n_topics),        # 預設為 1 / n_topics，主題分布的稀疏程度（alpha）
                                topic_word_prior= (1 / n_features),     # 預設為 1 / n_features，詞分布的稀疏程度（Beta）
                                random_state=0)

lda.fit(tf)
print("模型訓練完成")

## 查看主題關鍵字

In [None]:
def print_top_words(model, feature_names, n_top_words):
    print("\n=== 各主題關鍵字列表 ===")
    for topic_idx, topic in enumerate(model.components_):
        message = f"主題 #{topic_idx + 1}: "
        message += " ".join([feature_names[i]
                             for i in topic.argsort()[:-n_top_words - 1:-1]])
        print(message)
    print()

# 顯示每個主題前 15 個關鍵字
print_top_words(lda, feature_names, n_top_words=15)

## 輸出每個文章對應的主題

In [None]:
import numpy as np
topics=lda.transform(tf)
topic = []
for t in topics:
    topic.append(list(t).index(np.max(t)))
df['topic'] = topic
df.to_csv(f"{OUTPUT_PATH}/data_topic.csv",index = False, encoding='utf-8-sig')
# topics[0] #0 1 2 

## web可視化

In [None]:
import pyLDAvis
print(pyLDAvis.__version__)
import pyLDAvis.lda_model

In [None]:
# 在 Jupyter Notebook 中，直接顯示
pyLDAvis.enable_notebook()
vis = pyLDAvis.lda_model.prepare(lda, tf, tf_vectorizer)
pyLDAvis.display(vis)

In [None]:
# 保存成 HTML 檔案
html_path = os.path.join(OUTPUT_PATH, 'result_lda.html')
pyLDAvis.save_html(vis, html_path)

# 文本相似度

In [None]:
import re
import jieba
import numpy as np
import tqdm as notebook_tqdm
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from sentence_transformers import SentenceTransformer

In [None]:
STOP_WORDS_PATH = r"stop_dic/stopwords.txt"

In [None]:
# 專利摘要文本1，申請號：CN202480029985.2
patent1 = "本公開涉及通信方法、通信設備、通信系統及存儲介質。通信方法包括：AF向第一網元發送第一資訊，所述第一資訊用於指示取消A‑IoT設備的週期性操作。本公開通過AF向第一網元發送第一資訊，以指示取消A‑IoT設備的週期性操作，從而有效減少A‑IoT設備的能量消耗。"

# 專利摘要文本2，申請號：CN202480000774.6
patent2 = "本公開涉及通信方法、通信設備、通信系統及存儲介質。該方法包括：第一實體接收網路功能服務消費方發送的第一請求，所述第一請求包括網路功能服務提供方相關的第一資訊，所述第一資訊是基於所述網路功能服務提供方的第二資訊進行處理得到的資訊，所述網路功能服務消費方位於第一網路，所述網路功能服務提供方位於第二網路；所述第一實體發送第二請求。通過本公開實施例，可以提高通信安全性。"

# 專利摘要文本3，申請號：CN202410796819.7
patent3 = "本公開涉及一種螢幕的顯示方法、裝置、設備、存儲介質和程式產品，涉及螢幕顯示技術領域，方法包括：獲取終端生成的待顯示資料包，待顯示資料包包括目標幀對應的幀同步信號、目標幀中每個圖元行對應的行同步信號以及目標顯示資料；確定終端的螢幕對應的面板時序；根據面板時序，對行同步信號進行補償，得到目標行同步信號；根據幀同步信號、目標行同步信號和目標顯示資料，生成目標顯示信號，並在終端的螢幕上進行顯示。這樣，根據終端螢幕對應的面板時序對待顯示資料包中的行同步信號進行補償，從而避免了螢幕出現花屏、橫屏、亮帶等等異常情況，保證了螢幕的顯示效果和穩定性，提升了用戶體驗。"

## 1. 傳統方法_TF-IDF
Ref: https://doi.org/10.1111/jofi.12885

In [None]:
# 1. 加載停用詞
def load_stopwords(filepath):
    with open(filepath, encoding='utf-8-sig') as f:
        return set(line.strip() for line in f)

STOPWORDS = load_stopwords(STOP_WORDS_PATH)

In [None]:
# 2. jieba 分詞
def segment_sentence(sentence: str) -> list:
    words = jieba.cut(sentence, cut_all=False)
    return [w.strip() for w in words if w.strip()]

In [None]:
# 3. 去除停用詞
def remove_stopwords(words: list) -> list:
    return [w for w in words if w not in STOPWORDS]

In [None]:
# 4. 去除標點符號
def remove_punctuation(words: list) -> str:
    joined = " ".join(words)
    cleaned = re.sub(r"[^0-9A-Za-z\u4e00-\u9fa5\s]", " ", joined)
    cleaned = re.sub(r"\s+", " ", cleaned).strip()
    return cleaned

In [None]:
# 5. 總流程
def preprocess_document(text: str) -> list:
    # Step1 分詞
    words = segment_sentence(text)

    # Step2 去停用詞
    words = remove_stopwords(words)

    # Step3 去標點符號
    cleaned_list = remove_punctuation(words)

    return cleaned_list

In [None]:
text1 = preprocess_document(patent1)
text2 = preprocess_document(patent2)
text3 = preprocess_document(patent3)

In [None]:
text1

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

def calculate_tfidf_cosine_similarity(text1_words, text2_words):
    """
    使用 TF-IDF 方法計算兩個已完成分詞與去停用詞文本的餘弦相似度 (Cosine Similarity)。
    
    參數:
        text1_words: 第一個文本的分詞結果（字串形式，詞之間以空格分隔；或詞彙列表）
        text2_words: 第二個文本的分詞結果（字串形式，詞之間以空格分隔；或詞彙列表）
    
    回傳:
        float: 餘弦相似度值（範圍：0-1，1 表示方向完全相同/高度相似）
    """
    
    # 確保輸入為字串格式（若輸入為列表 list，則將其合併為以空格分隔的字串）
    if isinstance(text1_words, list):
        text1_words = ' '.join(text1_words)
    if isinstance(text2_words, list):
        text2_words = ' '.join(text2_words)
    
    # 初始化 TF-IDF 向量化工具
    # 這會將文本轉換為詞頻-逆文件頻率矩陣
    vectorizer = TfidfVectorizer()
    
    try:
        # 擬合數據並轉換文本為 TF-IDF 矩陣
        # 矩陣包含兩列向量，分別代表 text1 與 text2
        tfidf_matrix = vectorizer.fit_transform([text1_words, text2_words])
        
        # 計算兩個向量之間的餘弦相似度
        # cosine_similarity 預期輸入為矩陣，故使用切片 [0:1] 與 [1:2]
        similarity = cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:2])[0][0]
        
        return similarity
    except ValueError:
        # 若文本為空或不包含任何可辨識的詞彙，TfidfVectorizer 可能會拋出錯誤
        return 0.0

In [None]:
similarity12 = calculate_tfidf_cosine_similarity(text1, text2)
similarity13 = calculate_tfidf_cosine_similarity(text1, text3)

In [None]:
print(f"專利1與專利2的相似度: {similarity12:.4f}\n專利1與專利3的相似度: {similarity13:.4f}")

## 2. 現代方法_大語言模型

In [None]:
embedding_model = SentenceTransformer("./bge-base-zh-v1.5")

In [None]:
embedding = embedding_model.encode([patent1, patent2, patent3])

In [None]:
cos12 = embedding_model.similarity(embedding[0], embedding[1])
cos13 = embedding_model.similarity(embedding[0], embedding[2])

In [None]:
print(f"專利1與專利2的相似度: {cos12.item():.4f}\n專利1與專利3的相似度: {cos13.item():.4f}")