<hr/>
K-means做關鍵詞分群<br>
Grid search要分群的群組數(若未定可以用手肘法算)<br>
有用K-means++初始化要拿來分群的質心位置<br>
基於歐式距離和餘弦距離的都要算，最後用Silhouette評估優劣<br>
最後有用Similar_Words_Dictionary.txt這個同義詞辭典把同義的詞彙插入同一個群集內<br>
<hr/>

In [None]:
import os
import warnings
import numpy as np
from gensim.models import KeyedVectors
from sklearn.preprocessing import normalize
from sklearn.metrics import silhouette_score
from sklearn.metrics import pairwise_distances_argmin_min
from sklearn.cluster import KMeans
import pandas as pd

os.environ["OMP_NUM_THREADS"] = "2"

# 忽略特定警告消息
warnings.filterwarnings("ignore", message="KMeans is known to have a memory leak on Windows with MKL")

# 加载保存的英文单词向量文件
english_word_vectors = KeyedVectors.load_word2vec_format("english_word_vectors.txt", binary=False)

# 获取单词和对应的向量
words = english_word_vectors.index_to_key
vectors = np.array([english_word_vectors[word] for word in words])

# 标准化向量以使其适用于余弦距离
normalized_vectors = normalize(vectors)

def get_chinese_translation(word, translations):
    return translations.get(word, word)  # 返回原始的英文单词

# 加载英文单词和中文词的对应关系
def load_translations(file_path):
    translations = {}
    with open(file_path, "r", encoding="utf-8") as file:
        for line in file:
            if " | " in line:
                parts = line.strip().split(" | ")
                if len(parts) == 2:
                    chinese, english = parts
                    translations[english.strip().replace("_", " ")] = chinese.split('.')[1].strip()
                else:
                    print(f"Skipping line due to unexpected format: {line.strip()}")
            else:
                print(f"Skipping line due to missing separator: {line.strip()}")
    return translations

translations = load_translations("answer_all_TC_cleaned_split.txt")

# 解析 "Similar_Words_Dictionary.txt" 文件，建立相似词的字典
def load_similar_words(file_path):
    similar_words = {}
    with open(file_path, "r", encoding="utf-8") as file:
        for line in file:
            parts = line.strip().split("|")
            if len(parts) == 2:
                key_word = parts[0].strip()
                similar = [w.strip() for w in parts[1].split(",")]
                similar_words[key_word] = similar
            elif len(parts) == 1:
                key_word = parts[0].strip()
                similar_words[key_word] = []
    return similar_words

similar_words = load_similar_words("Similar_Words_Dictionary.txt")

# 使用 K-means++ 初始化质心
def kmeans_plus_plus_init(vectors, k):
    centroids = [vectors[np.random.choice(vectors.shape[0])]]
    for _ in range(1, k):
        dist_sq = np.min([np.sum((vectors - centroid)**2, axis=1) for centroid in centroids], axis=0)
        probabilities = dist_sq / dist_sq.sum()
        cumulative_probabilities = np.cumsum(probabilities)
        r = np.random.rand()
        i = np.searchsorted(cumulative_probabilities, r)
        centroids.append(vectors[i])
    return np.array(centroids)

# 定义计算基于余弦距离的K-means聚类和Silhouette分数的函数
def compute_kmeans_cosine_silhouette(cluster_num):
    # 使用 K-means++ 初始化质心
    initial_centroids = kmeans_plus_plus_init(normalized_vectors, cluster_num)
    
    for _ in range(10):  # 迭代以优化质心
        labels, _ = pairwise_distances_argmin_min(normalized_vectors, initial_centroids, metric='cosine')
        new_centroids = []
        for i in range(cluster_num):
            cluster_points = normalized_vectors[labels == i]
            if len(cluster_points) > 0:
                new_centroids.append(cluster_points.mean(axis=0))
            else:
                # 如果簇是空的，则保留原始质心
                new_centroids.append(initial_centroids[i])
        new_centroids = np.array(new_centroids)
        new_centroids = normalize(new_centroids)
        if np.all(initial_centroids == new_centroids):
            break
        initial_centroids = new_centroids

    labels, _ = pairwise_distances_argmin_min(normalized_vectors, initial_centroids, metric='cosine')
    silhouette_score_value = silhouette_score(normalized_vectors, labels, metric='cosine')

    clusters = {}
    for i, word in enumerate(words):
        word_clean = word.replace("_", " ")
        cluster_label = labels[i]
        if cluster_label not in clusters:
            clusters[cluster_label] = []
        clusters[cluster_label].append(word_clean)
    
    # 将相似词插入相应的群集中
    for key_word, similars in similar_words.items():
        # 查找 key_word 在哪个群集
        target_cluster_label = None
        for cluster_label, cluster_words in clusters.items():
            cluster_words_clean = [w.strip().lower() for w in cluster_words]
            if key_word in cluster_words_clean:
                target_cluster_label = cluster_label
                break

        # 如果找到了目标群集，将相似词插入相同群集
        if target_cluster_label is not None:
            for similar in similars:
                if similar == '':
                    break
                else:
                    if similar not in clusters[target_cluster_label]:
                        clusters[target_cluster_label].insert(0, similar)  # 插入到开头
                        
    return silhouette_score_value, clusters

# 定义计算基于欧式距离的K-means聚类和Silhouette分数的函数
def compute_kmeans_euclidean_silhouette(cluster_num):
    kmeans = KMeans(n_clusters=cluster_num, init='k-means++', random_state=42)
    labels = kmeans.fit_predict(vectors)
    silhouette_score_value = silhouette_score(vectors, labels, metric='euclidean')

    clusters = {}
    for i, word in enumerate(words):
        word_clean = word.replace("_", " ")
        cluster_label = labels[i]
        if cluster_label not in clusters:
            clusters[cluster_label] = []
        clusters[cluster_label].append(word_clean)

    # 将相似词插入相应的群集中
    for key_word, similars in similar_words.items():
        # 查找 key_word 在哪个群集
        target_cluster_label = None
        for cluster_label, cluster_words in clusters.items():
            cluster_words_clean = [w.strip().lower() for w in cluster_words]
            if key_word in cluster_words_clean:
                target_cluster_label = cluster_label
                break

        # 如果找到了目标群集，将相似词插入相同群集
        if target_cluster_label is not None:
            for similar in similars:
                if similar == '':
                    break
                else:
                    if similar not in clusters[target_cluster_label]:
                        clusters[target_cluster_label].insert(0, similar)  # 插入到开头

    return silhouette_score_value, clusters

# 循环计算不同 Cluster_Num 的结果
best_cosine_silhouette = -1
best_cosine_clusters = None
best_cosine_cluster_num = None
cosine_results = []

best_euclidean_silhouette = -1
best_euclidean_clusters = None
best_euclidean_cluster_num = None
euclidean_results = []

for cluster_num in range(300, 601, 30):
    # 计算余弦距离的结果
    cosine_silhouette, cosine_clusters = compute_kmeans_cosine_silhouette(cluster_num)
    print(f"Cosine - Cluster_Num: {cluster_num}, Separation Silhouette: {cosine_silhouette}")
    cosine_results.append((cluster_num, cosine_silhouette, cosine_clusters))
    if cosine_silhouette > best_cosine_silhouette:
        best_cosine_silhouette = cosine_silhouette
        best_cosine_clusters = cosine_clusters
        best_cosine_cluster_num = cluster_num

    # 计算欧式距离的结果
    euclidean_silhouette, euclidean_clusters = compute_kmeans_euclidean_silhouette(cluster_num)
    print(f"Euclidean - Cluster_Num: {cluster_num}, Separation Silhouette: {euclidean_silhouette}")
    euclidean_results.append((cluster_num, euclidean_silhouette, euclidean_clusters))
    if euclidean_silhouette > best_euclidean_silhouette:
        best_euclidean_silhouette = euclidean_silhouettex
        best_euclidean_clusters = euclidean_clusters
        best_euclidean_cluster_num = cluster_num

print(f"\nBest Cosine Cluster_Num: {best_cosine_cluster_num}, Best Separation Silhouette: {best_cosine_silhouette}")
print(f"Best Euclidean Cluster_Num: {best_euclidean_cluster_num}, Best Separation Silhouette: {best_euclidean_silhouette}")

# 存储结果到Excel文件
with pd.ExcelWriter("clustering_results.xlsx") as writer:
    for cluster_num, silhouette, clusters in cosine_results:
        cluster_df = pd.DataFrame()
        for cluster_label, cluster_words in clusters.items():
            if cluster_words:
                translated_words = [get_chinese_translation(word, translations) for word in cluster_words]
                cluster_df = pd.concat([cluster_df, pd.DataFrame({
                    "Cluster_Label": [cluster_label],
                    "Words": [', '.join(translated_words)]
                })], ignore_index=True)
        cluster_df.to_excel(writer, sheet_name=f"Cosine_Cluster_{cluster_num}_Sil_{silhouette:.4f}", index=False)

    for cluster_num, silhouette, clusters in euclidean_results:
        cluster_df = pd.DataFrame()
        for cluster_label, cluster_words in clusters.items():
            if cluster_words:
                translated_words = [get_chinese_translation(word, translations) for word in cluster_words]
                cluster_df = pd.concat([cluster_df, pd.DataFrame({
                    "Cluster_Label": [cluster_label],
                    "Words": [', '.join(translated_words)]
                })], ignore_index=True)
        cluster_df.to_excel(writer, sheet_name=f"Euclidean_Cluster_{cluster_num}_Sil_{silhouette:.4f}", index=False)

print("Clustering results have been saved to clustering_results.xlsx")


<hr/>
列出每群的數量<br>
<hr/>

In [None]:
import re

def contains_chinese(text):
    """Check if a string contains any Chinese character."""
    return any('\u4e00' <= char <= '\u9fff' for char in text)

def filter_out_chinese(cluster):
    """Remove items containing Chinese characters and count remaining items."""
    filtered_cluster = {}
    for key, items in cluster.items():
        filtered_items = [item for item in items if not contains_chinese(item)]
        filtered_cluster[key] = {
            "filtered_items": filtered_items,
            "count": len(filtered_items)
        }
    return filtered_cluster

filtered_cluster = filter_out_chinese(clusters)

for key, value in filtered_cluster.items():
    print(f"Cluster {key}: {value['count']} items")
