In [None]:
## vincent naive bayes

import os
import re
import string
import nltk
from nltk.corpus import stopwords
from sklearn.feature_extraction.text import TfidfVectorizer
from scipy.cluster.hierarchy import linkage, dendrogram, fcluster
import matplotlib.pyplot as plt
import csv
import math

nltk.download('punkt')
nltk.download('stopwords')

def preprocess_text(text):
    # 移除第三行
    lines = text.split('\n')
    if len(lines) > 2:
        text = '\n'.join(lines[:2] + lines[3:])
    # 轉換為小寫
    text = text.lower()
    # 移除標點符號
    text = text.translate(str.maketrans('', '', string.punctuation))
    # 移除指定單詞和停用詞
    stop_words = set(stopwords.words('english'))
    tokens = nltk.word_tokenize(text)
    tokens = [word for word in tokens if word not in stop_words and word != 'embed']
    return ' '.join(tokens)

# 讀取文檔資料夾中的文檔
def read_documents_from_folder(folder_path):
    documents = {}
    for filename in os.listdir(folder_path):
        if filename.endswith(".txt"):
            doc_id = int(filename.split('.')[0])
            with open(os.path.join(folder_path, filename), "r", encoding="utf-8", errors = "replace") as file:
                documents[doc_id] = file.read()
    return documents

# 讀取訓練檔案
def read_training_file(training_file_path):
    class_docs = {}
    with open(training_file_path, "r") as file:
        for line in file:
            parts = line.strip().split()
            class_id = int(parts[0])
            doc_ids = list(map(int, parts[1:]))
            if class_id not in class_docs:
                class_docs[class_id] = []
            class_docs[class_id].extend(doc_ids)
    return class_docs

# 增加字典字數
def safe_increment(dictionary, key):
    if key not in dictionary:
        dictionary[key] = 0
    dictionary[key] += 1

# 計算詞頻
def calculate_term_frequencies(documents, class_docs):
    class_term_freq = {}
    overall_term_freq = {}

    for class_id, doc_ids in class_docs.items():
        if class_id not in class_term_freq:
            class_term_freq[class_id] = {}

        for doc_id in doc_ids:
            if doc_id in documents:
                tokens = preprocess_text(documents[doc_id])
                for token in tokens:
                    safe_increment(class_term_freq[class_id], token)
                    safe_increment(overall_term_freq, token)

    return class_term_freq, overall_term_freq

# 卡方檢定計算特徵選擇
def chi2_feature_selection(class_term_freq, overall_term_freq, class_docs, total_classes, top_k=500):
    chi2_scores = {}
    total_docs = sum(len(docs) for docs in class_docs.values())

    for term, total_count in overall_term_freq.items():
        for class_id in range(1, total_classes + 1):
            A = class_term_freq.get(class_id, {}).get(term, 0)
            B = sum(class_term_freq.get(c, {}).get(term, 0) for c in range(1, total_classes + 1)) - A
            C = len(class_docs.get(class_id, [])) - A
            D = total_docs - (A + B + C)

            numerator = total_docs * (A * D - B * C) ** 2
            denominator = (A + C) * (B + D) * (A + B) * (C + D)
            if denominator > 0:
                if term not in chi2_scores:
                    chi2_scores[term] = 0
                chi2_scores[term] += numerator / denominator

    sorted_terms = sorted(chi2_scores.items(), key=lambda x: x[1], reverse=True)
    selected_terms = {term: idx for idx, (term, _) in enumerate(sorted_terms[:top_k])}
    return selected_terms

# 計算機率
def calculate_class_probabilities(documents, class_docs, vocabulary):
    class_word_counts = {}
    class_doc_counts = {}
    total_docs = sum(len(docs) for docs in class_docs.values())

    for class_id, doc_ids in class_docs.items():
        if class_id not in class_word_counts:
            class_word_counts[class_id] = {}
        class_doc_counts[class_id] = len(doc_ids)

        for doc_id in doc_ids:
            if doc_id in documents:
                tokens = preprocess_text(documents[doc_id])
                for token in tokens:
                    if token in vocabulary:
                        safe_increment(class_word_counts[class_id], token)

    class_priors = {class_id: count / total_docs for class_id, count in class_doc_counts.items()}
    class_conditional_probs = {}

    for class_id, word_counts in class_word_counts.items():
        total_words = sum(word_counts.values()) + len(vocabulary)
        class_conditional_probs[class_id] = {
            term: (word_counts.get(term, 0) + 1) / total_words for term in vocabulary
        }
    return class_priors, class_conditional_probs

# 分類測試文檔
def classify_documents(test_documents, vocabulary, class_priors, class_conditional_probs):
    results = {}
    for doc_id, doc_content in test_documents.items():
        tokens = preprocess_text(doc_content)
        tokens = [token for token in tokens if token in vocabulary]
        class_scores = {}
        for class_id, prior in class_priors.items():
            log_prob = math.log(prior)
            for token in tokens:
                log_prob += math.log(class_conditional_probs[class_id].get(token, 1e-6))
            class_scores[class_id] = log_prob
        predicted_class = max(class_scores, key=class_scores.get)
        results[doc_id] = predicted_class
    return results

# 主程式
def main():
    folder_path = '/Users/sophiehuang/Documents/113-1/113-1-IRTM/all_lyrics'
    training_file_path = '/Users/sophiehuang/Documents/113-1/113-1-IRTM/113-1-IRTM-final-project/folds/fold3.txt'
    output_file = './predictions3.csv'

    # 讀取文檔和訓練資料
    documents = read_documents_from_folder(folder_path)
    class_docs = read_training_file(training_file_path)

    # 測試文檔是未在訓練集中出現的文檔
    training_ids = set(doc_id for doc_ids in class_docs.values() for doc_id in doc_ids)
    test_documents = {doc_id: doc for doc_id, doc in documents.items() if doc_id not in training_ids}

    # 計算詞頻與選擇特徵
    class_term_freq, overall_term_freq = calculate_term_frequencies(documents, class_docs)
    vocabulary = chi2_feature_selection(class_term_freq, overall_term_freq, class_docs, len(class_docs), top_k=500)

    # 計算先驗機率與條件機率
    class_priors, class_conditional_probs = calculate_class_probabilities(documents, class_docs, vocabulary)

    # 分類測試文檔
    predictions = classify_documents(test_documents, vocabulary, class_priors, class_conditional_probs)

    # 保存結果
    with open(output_file, mode='w', newline='', encoding='utf-8') as file:
        writer = csv.writer(file)
        writer.writerow(["Id", "Value"])
        for doc_id, predicted_class in sorted(predictions.items()):
            writer.writerow([doc_id, predicted_class])

    print(f"預測結果已保存到 {output_file}")

# 執行主程式
main()

[nltk_data] Downloading package punkt to
[nltk_data]     /Users/sophiehuang/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/sophiehuang/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


UnicodeDecodeError: 'utf-8' codec can't decode byte 0xff in position 0: invalid start byte

In [73]:
## calculate scores

import os
import pandas as pd
from sklearn.metrics import precision_score, recall_score, f1_score

# 定義正確答案檔案和預測結果資料夾
# 定義正確答案檔案和預測結果資料夾
ground_truth_path = '/Users/sophiehuang/Documents/113-1/113-1-IRTM/113-1-IRTM-final-project/output_modified.csv'
predictions_folder = '/Users/sophiehuang/Documents/113-1/113-1-IRTM/113-1-IRTM-final-project/prediction_sophie'

# 讀取正確答案
ground_truth = pd.read_csv(ground_truth_path)

# 結果存放列表
results = []

# 遍歷預測檔案
for prediction_file in os.listdir(predictions_folder):
    if prediction_file.endswith('.csv'):  # 僅處理 CSV 檔案
        # 讀取預測檔案
        predictions_path = os.path.join(predictions_folder, prediction_file)
        predictions = pd.read_csv(predictions_path)

        # 合併正確答案和預測檔案，僅保留相同的 Id
        merged = pd.merge(ground_truth, predictions, on='Id', how='inner')

        if merged.empty:  # 如果沒有共同的 Id，跳過該檔案
            print(f"Skipping file {prediction_file}: No matching Ids.")
            continue

        # 提取正確和預測的值
        true_labels = merged['Value_x']  # 正確答案中的 'Value'
        predicted_labels = merged['Value_y']  # 預測檔案中的 'Value'

        # 計算 Precision, Recall, 和 F1-score
        precision = precision_score(true_labels, predicted_labels, average='weighted', zero_division=0)
        recall = recall_score(true_labels, predicted_labels, average='weighted', zero_division=0)
        f1 = f1_score(true_labels, predicted_labels, average='weighted', zero_division=0)

        # 將結果加入列表
        results.append({
            'file': prediction_file,
            'precision': precision,
            'recall': recall,
            'f1_score': f1
        })

# 將結果轉為 DataFrame
results_df = pd.DataFrame(results)

# 顯示結果並保存到檔案
print(results_df)
results_df.to_csv('evaluation_results.csv', index=False)


                      file  precision    recall  f1_score
0   output_svm_param_3.csv   0.247520  0.274454  0.243258
1   output_svm_param_2.csv   0.255037  0.273712  0.241058
2   output_svm_param_1.csv   0.248988  0.280773  0.253086
3   output_svm_param_5.csv   0.256993  0.276730  0.241925
4   output_svm_param_4.csv   0.262353  0.276816  0.252420
5   output_svm_param_6.csv   0.245009  0.274933  0.243543
6   output_svm_param_7.csv   0.236972  0.276687  0.243191
7  output_svm_param_10.csv   0.250025  0.282447  0.253279
8   output_svm_param_9.csv   0.263737  0.275665  0.247597
9   output_svm_param_8.csv   0.251372  0.273212  0.245497


In [59]:
## change output
import pandas as pd

# 讀取 CSV 檔案
data = pd.read_csv("output.csv")

# 將 cluster=16 替換為 11
data.loc[data['Value'] == 11, 'Value'] = 1



# 儲存結果到新的 CSV 檔案
data.to_csv("output_modified.csv", index=False)

print("處理完成，結果已儲存到 'output.csv'")


處理完成，結果已儲存到 'output.csv'


In [46]:
## Sophie Naive Bayes

import os
import math
from collections import defaultdict, Counter
from sklearn.feature_extraction.text import TfidfVectorizer  # type: ignore
from nltk.corpus import stopwords  # type: ignore
import nltk  # type: ignore
import numpy as np  # type: ignore
import chardet

# 確保下載 NLTK 停用詞
nltk.download('stopwords')
stop_words = set(stopwords.words('english'))


# 資料預處理
def preprocess_text(text):
    # 移除第三行
    lines = text.split('\n')
    if len(lines) > 2:
        text = '\n'.join(lines[:2] + lines[3:])
    # 轉換為小寫
    text = text.lower()
    # 移除標點符號
    text = text.translate(str.maketrans('', '', string.punctuation))
    # 移除指定單詞和停用詞
    stop_words = set(stopwords.words('english'))
    tokens = nltk.word_tokenize(text)
    tokens = [word for word in tokens if word not in stop_words and word != 'embed']
    return ' '.join(tokens)


# 載入文件
def load_documents(folder_path):
    """從資料夾載入文件，並自動檢測編碼。"""
    docs = {}
    for filename in os.listdir(folder_path):
        if filename.endswith(".txt"):  # 確保只處理文字檔
            file_path = os.path.join(folder_path, filename)
            
            # 自動檢測文件編碼
            with open(file_path, 'rb') as file:  # 以二進制方式打開
                raw_data = file.read()
                detected_encoding = chardet.detect(raw_data)['encoding']
            
            # 以檢測到的編碼打開文件
            try:
                with open(file_path, 'r', encoding=detected_encoding) as file:
                    doc_id = int(filename.split('.')[0])  # 假設檔名為數字
                    docs[doc_id] = preprocess(file.read())
            except UnicodeDecodeError:
                print(f"Failed to decode file {filename} with encoding {detected_encoding}. Skipping.")
    return docs


# 計算 TF-IDF 並減少字彙數量
def calculate_tfidf(docs, max_features=1000):
    """計算 TF-IDF 並選取前 max_features 個特徵。"""
    vectorizer = TfidfVectorizer(max_features=max_features)
    tfidf_matrix = vectorizer.fit_transform([docs[doc_id] for doc_id in sorted(docs.keys())])
    vocab = vectorizer.get_feature_names_out()
    return vocab


# 計算似然比特徵選取
def calculate_likelihood_ratio(docs, labels, vocab):
    """計算每個詞語和類別的似然比。"""
    term_class_counts = Counter()
    term_counts = Counter()
    class_counts = Counter(labels)
    total_docs = len(labels)

    for i, doc in enumerate(docs):
        unique_terms = set(doc.split())  # 文件中的唯一詞彙
        for term in unique_terms:
            if term in vocab:
                term_class_counts[(term, labels[i])] += 1
                term_counts[term] += 1

    likelihood_scores = defaultdict(float)
    for term in vocab:
        for cls in class_counts.keys():
            observed = term_class_counts.get((term, cls), 0)
            expected = (term_counts[term] * class_counts[cls]) / total_docs
            if expected > 0:  # 避免除以零
                likelihood_scores[term] += ((observed - expected) ** 2) / expected

    return sorted(likelihood_scores, key=likelihood_scores.get, reverse=True)


# 訓練 Multinomial Naive Bayes
def train_naive_bayes(class_docs, docs, selected_vocab):
    """使用加一平滑訓練 Multinomial Naive Bayes。"""
    class_counts = Counter()
    term_counts = {cls: Counter() for cls in class_docs.keys()}
    vocab_size = len(selected_vocab)

    for cls, doc_ids in class_docs.items():
        class_counts[cls] += len(doc_ids)
        for doc_id in doc_ids:
            for term in docs[doc_id].split():
                if term in selected_vocab:
                    term_counts[cls][term] += 1

    class_probs = {cls: math.log(class_counts[cls] / sum(class_counts.values())) for cls in class_counts}
    term_probs = {cls: defaultdict(float) for cls in class_counts}

    for cls in class_counts:
        total_terms = sum(term_counts[cls].values()) + vocab_size
        for term in selected_vocab:
            term_probs[cls][term] = math.log((term_counts[cls][term] + 1) / total_terms)

    return class_probs, term_probs


# 文件分類
def classify_document(doc, selected_vocab, class_probs, term_probs):
    """分類單一文件。"""
    scores = {cls: class_probs[cls] for cls in class_probs}
    for cls in class_probs:
        for term in doc.split():
            if term in selected_vocab:
                scores[cls] += term_probs[cls][term]
    return max(scores, key=scores.get)


# 分類測試文件
def classify_and_output_with_tfidf_likelihood(training_data, docs, output_path, tfidf_features=1000, lr_features=500):
    """使用 TF-IDF 和似然比進行文件分類並輸出。"""
    class_docs = defaultdict(list)
    labels = []
    train_doc_ids = []
    for line in open(training_data, 'r', encoding='utf-8').readlines():
        parts = line.strip().split()
        cls = int(parts[0])
        doc_ids = list(map(int, parts[1:]))
        class_docs[cls].extend(doc_ids)
        labels.extend([cls] * len(doc_ids))
        train_doc_ids.extend(doc_ids)

    train_docs = [docs[doc_id] for doc_id in train_doc_ids]
    all_doc_ids = set(docs.keys())
    test_doc_ids = sorted(all_doc_ids - set(train_doc_ids))
    test_docs = {doc_id: docs[doc_id] for doc_id in test_doc_ids}

    tfidf_vocab = calculate_tfidf({doc_id: docs[doc_id] for doc_id in train_doc_ids}, max_features=tfidf_features)
    lr_vocab = calculate_likelihood_ratio(train_docs, labels, tfidf_vocab)[:lr_features]

    class_probs, term_probs = train_naive_bayes(class_docs, docs, lr_vocab)

    test_predictions = {}
    for doc_id, doc in test_docs.items():
        predicted_class = classify_document(doc, lr_vocab, class_probs, term_probs)
        test_predictions[doc_id] = predicted_class

    with open(output_path, 'w', encoding='utf-8') as file:
        file.write("Id,Value\n")
        for doc_id in sorted(test_predictions.keys()):
            file.write(f"{doc_id},{test_predictions[doc_id]}\n")

    print(f"Results saved to {output_path}")


if __name__ == "__main__":
    training_data = "/Users/sophiehuang/Documents/113-1/113-1-IRTM/113-1-IRTM-final-project/folds/fold1.txt"
    folder_path = "/Users/sophiehuang/Documents/113-1/113-1-IRTM/all_lyrics"
    output_path = "output_NB_1.csv"

    docs = load_documents(folder_path)
    classify_and_output_with_tfidf_likelihood(
        training_data,
        docs,
        output_path,
        tfidf_features=1000,
        lr_features=500
    )


[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/sophiehuang/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


Results saved to output_NB_1.csv


In [50]:
import os
from collections import defaultdict
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score
import nltk
import numpy as np
import chardet

# 確保下載 NLTK 停用詞
nltk.download('stopwords')
stop_words = set(nltk.corpus.stopwords.words('english'))

# 資料預處理
def preprocess(text):
    # 移除第三行
    lines = text.split('\n')
    if len(lines) > 2:
        text = '\n'.join(lines[:2] + lines[3:])
    # 轉換為小寫
    text = text.lower()
    # 移除標點符號
    text = text.translate(str.maketrans('', '', string.punctuation))
    # 移除指定單詞和停用詞
    stop_words = set(stopwords.words('english'))
    tokens = nltk.word_tokenize(text)
    tokens = [word for word in tokens if word not in stop_words and word != 'embed']
    return ' '.join(tokens)

# 載入文件
def load_documents(folder_path):
    """從資料夾載入文件，並自動檢測編碼。"""
    docs = {}
    for filename in os.listdir(folder_path):
        if filename.endswith(".txt"):  # 確保只處理文字檔
            file_path = os.path.join(folder_path, filename)
            
            # 自動檢測文件編碼
            with open(file_path, 'rb') as file:  # 以二進制方式打開
                raw_data = file.read()
                detected_encoding = chardet.detect(raw_data)['encoding']
            
            # 以檢測到的編碼打開文件
            try:
                with open(file_path, 'r', encoding=detected_encoding) as file:
                    doc_id = int(filename.split('.')[0])  # 假設檔名為數字
                    docs[doc_id] = preprocess(file.read())
            except UnicodeDecodeError:
                print(f"Failed to decode file {filename} with encoding {detected_encoding}. Skipping.")
    return docs

# 訓練和測試 kNN
def classify_with_knn(training_data, docs, output_path, tfidf_features=1000, k=5):
    """使用 kNN 進行文件分類並輸出結果。"""
    class_docs = defaultdict(list)
    labels = []
    train_doc_ids = []
    
    # 讀取訓練資料
    for line in open(training_data, 'r', encoding='utf-8').readlines():
        parts = line.strip().split()
        cls = int(parts[0])
        doc_ids = list(map(int, parts[1:]))
        class_docs[cls].extend(doc_ids)
        labels.extend([cls] * len(doc_ids))
        train_doc_ids.extend(doc_ids)
    
    # 將訓練和測試分開
    all_doc_ids = set(docs.keys())
    test_doc_ids = sorted(all_doc_ids - set(train_doc_ids))
    test_docs = {doc_id: docs[doc_id] for doc_id in test_doc_ids}
    
    # 計算 TF-IDF
    vectorizer = TfidfVectorizer(max_features=tfidf_features)
    X_train = vectorizer.fit_transform([docs[doc_id] for doc_id in train_doc_ids])
    X_test = vectorizer.transform([test_docs[doc_id] for doc_id in test_doc_ids])
    y_train = np.array(labels)

    # 建立 kNN 模型
    knn = KNeighborsClassifier(n_neighbors=k)
    knn.fit(X_train, y_train)
    
    # 預測測試資料
    y_pred = knn.predict(X_test)
    
    # 儲存結果
    with open(output_path, 'w', encoding='utf-8') as file:
        file.write("Id,Value\n")
        for doc_id, pred in zip(test_doc_ids, y_pred):
            file.write(f"{doc_id},{pred}\n")
    
    print(f"Results saved to {output_path}")

# 主程式
if __name__ == "__main__":
    training_data = "/Users/sophiehuang/Documents/113-1/113-1-IRTM/113-1-IRTM-final-project/folds/fold1.txt"
    folder_path = "/Users/sophiehuang/Documents/113-1/113-1-IRTM/all_lyrics"
    output_path = "output_knn1.csv"

    docs = load_documents(folder_path)
    classify_with_knn(
        training_data,
        docs,
        output_path,
        tfidf_features=1000,
        k=5
    )


[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/sophiehuang/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


Results saved to output_knn1.csv


In [51]:
import os
from collections import defaultdict
from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np
import chardet
import nltk

# 確保下載 NLTK 停用詞
nltk.download('stopwords')
stop_words = set(nltk.corpus.stopwords.words('english'))

# 資料預處理
def preprocess(text):
    # 移除第三行
    lines = text.split('\n')
    if len(lines) > 2:
        text = '\n'.join(lines[:2] + lines[3:])
    # 轉換為小寫
    text = text.lower()
    # 移除標點符號
    text = text.translate(str.maketrans('', '', string.punctuation))
    # 移除指定單詞和停用詞
    stop_words = set(stopwords.words('english'))
    tokens = nltk.word_tokenize(text)
    tokens = [word for word in tokens if word not in stop_words and word != 'embed']
    return ' '.join(tokens)

# 載入文件
def load_documents(folder_path):
    """從資料夾載入文件，並自動檢測編碼。"""
    docs = {}
    for filename in os.listdir(folder_path):
        if filename.endswith(".txt"):  # 確保只處理文字檔
            file_path = os.path.join(folder_path, filename)
            
            # 自動檢測文件編碼
            with open(file_path, 'rb') as file:  # 以二進制方式打開
                raw_data = file.read()
                detected_encoding = chardet.detect(raw_data)['encoding']
            
            # 以檢測到的編碼打開文件
            try:
                with open(file_path, 'r', encoding=detected_encoding) as file:
                    doc_id = int(filename.split('.')[0])  # 假設檔名為數字
                    docs[doc_id] = preprocess(file.read())
            except UnicodeDecodeError:
                print(f"Failed to decode file {filename} with encoding {detected_encoding}. Skipping.")
    return docs

# 訓練 Rocchio 分類器
def train_rocchio(training_data, docs, tfidf_features=1000):
    """使用 Rocchio 方法計算每個類別的質心。"""
    class_docs = defaultdict(list)
    labels = []
    train_doc_ids = []
    
    # 讀取訓練資料
    for line in open(training_data, 'r', encoding='utf-8').readlines():
        parts = line.strip().split()
        cls = int(parts[0])
        doc_ids = list(map(int, parts[1:]))
        class_docs[cls].extend(doc_ids)
        labels.extend([cls] * len(doc_ids))
        train_doc_ids.extend(doc_ids)
    
    # 計算 TF-IDF
    vectorizer = TfidfVectorizer(max_features=tfidf_features)
    X_train = vectorizer.fit_transform([docs[doc_id] for doc_id in train_doc_ids])
    y_train = np.array(labels)
    
    # 計算每個類別的質心
    centroids = {}
    for cls in class_docs:
        cls_indices = [i for i, label in enumerate(y_train) if label == cls]
        cls_vectors = X_train[cls_indices]
        centroids[cls] = cls_vectors.mean(axis=0)  # 計算質心
    
    return centroids, vectorizer

# 文件分類
def classify_with_rocchio(test_docs, centroids, vectorizer):
    """使用 Rocchio 分類器對測試文件進行分類。"""
    X_test = vectorizer.transform([test_docs[doc_id] for doc_id in sorted(test_docs.keys())])
    test_predictions = {}

    for idx, doc_id in enumerate(sorted(test_docs.keys())):
        distances = {}
        for cls, centroid in centroids.items():
            # 計算與每個類別質心的距離（歐幾里得距離）
            distances[cls] = np.linalg.norm(X_test[idx] - centroid)
        predicted_class = min(distances, key=distances.get)  # 選擇距離最近的類別
        test_predictions[doc_id] = predicted_class
    
    return test_predictions

# 主程式
if __name__ == "__main__":
    training_data = "/Users/sophiehuang/Documents/113-1/113-1-IRTM/113-1-IRTM-final-project/folds/fold1.txt"
    folder_path = "/Users/sophiehuang/Documents/113-1/113-1-IRTM/all_lyrics"
    output_path = "output_rocchio1.csv"

    # 載入資料
    docs = load_documents(folder_path)

    # 訓練 Rocchio 分類器
    centroids, vectorizer = train_rocchio(
        training_data,
        docs,
        tfidf_features=1000
    )

    # 獲取測試文件
    all_doc_ids = set(docs.keys())
    train_doc_ids = []
    for line in open(training_data, 'r', encoding='utf-8').readlines():
        parts = line.strip().split()
        train_doc_ids.extend(list(map(int, parts[1:])))
    test_doc_ids = sorted(all_doc_ids - set(train_doc_ids))
    test_docs = {doc_id: docs[doc_id] for doc_id in test_doc_ids}

    # 使用 Rocchio 分類測試文件
    test_predictions = classify_with_rocchio(test_docs, centroids, vectorizer)

    # 儲存結果
    with open(output_path, 'w', encoding='utf-8') as file:
        file.write("Id,Value\n")
        for doc_id in sorted(test_predictions.keys()):
            file.write(f"{doc_id},{test_predictions[doc_id]}\n")

    print(f"Results saved to {output_path}")


[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/sophiehuang/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


Results saved to output_rocchio1.csv


In [None]:
import os
from collections import defaultdict
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score
import numpy as np
import chardet
import nltk

# 確保下載 NLTK 停用詞
nltk.download('stopwords')
stop_words = set(nltk.corpus.stopwords.words('english'))

# 資料預處理
def preprocess(text):
    # 移除第三行
    lines = text.split('\n')
    if len(lines) > 2:
        text = '\n'.join(lines[:2] + lines[3:])
    # 轉換為小寫
    text = text.lower()
    # 移除標點符號
    text = text.translate(str.maketrans('', '', string.punctuation))
    # 移除指定單詞和停用詞
    stop_words = set(stopwords.words('english'))
    tokens = nltk.word_tokenize(text)
    tokens = [word for word in tokens if word not in stop_words and word != 'embed']
    return ' '.join(tokens)

# 載入文件
def load_documents(folder_path):
    """從資料夾載入文件，並自動檢測編碼。"""
    docs = {}
    for filename in os.listdir(folder_path):
        if filename.endswith(".txt"):  # 確保只處理文字檔
            file_path = os.path.join(folder_path, filename)
            
            # 自動檢測文件編碼
            with open(file_path, 'rb') as file:  # 以二進制方式打開
                raw_data = file.read()
                detected_encoding = chardet.detect(raw_data)['encoding']
            
            # 以檢測到的編碼打開文件
            try:
                with open(file_path, 'r', encoding=detected_encoding) as file:
                    doc_id = int(filename.split('.')[0])  # 假設檔名為數字
                    docs[doc_id] = preprocess(file.read())
            except UnicodeDecodeError:
                print(f"Failed to decode file {filename} with encoding {detected_encoding}. Skipping.")
    return docs

# 訓練 SVM 分類器
def train_svm(training_data, docs, tfidf_features=1000, kernel='linear'):
    """使用支持向量機訓練模型。"""
    class_docs = defaultdict(list)
    labels = []
    train_doc_ids = []
    
    # 讀取訓練資料
    for line in open(training_data, 'r', encoding='utf-8').readlines():
        parts = line.strip().split()
        cls = int(parts[0])
        doc_ids = list(map(int, parts[1:]))
        class_docs[cls].extend(doc_ids)
        labels.extend([cls] * len(doc_ids))
        train_doc_ids.extend(doc_ids)
    
    # 計算 TF-IDF
    vectorizer = TfidfVectorizer(max_features=tfidf_features)
    X_train = vectorizer.fit_transform([docs[doc_id] for doc_id in train_doc_ids])
    y_train = np.array(labels)
    
    # 訓練 SVM 模型
    svm_model = SVC(kernel=kernel, probability=True, random_state=42)
    svm_model.fit(X_train, y_train)
    
    return svm_model, vectorizer

# 文件分類
def classify_with_svm(test_docs, svm_model, vectorizer):
    """使用 SVM 對測試文件進行分類。"""
    X_test = vectorizer.transform([test_docs[doc_id] for doc_id in sorted(test_docs.keys())])
    test_predictions = {}

    for idx, doc_id in enumerate(sorted(test_docs.keys())):
        predicted_class = svm_model.predict(X_test[idx])
        test_predictions[doc_id] = predicted_class[0]  # SVM 的預測值是一個陣列
    
    return test_predictions

# 主程式
if __name__ == "__main__":
    training_data = "/Users/sophiehuang/Documents/113-1/113-1-IRTM/113-1-IRTM-final-project/folds/fold2.txt"
    folder_path = "/Users/sophiehuang/Documents/113-1/113-1-IRTM/all_lyrics"
    output_path = "output_svm1.csv"

    # 載入資料
    docs = load_documents(folder_path)

    # 訓練 SVM 分類器
    svm_model, vectorizer = train_svm(
        training_data,
        docs,
        tfidf_features=1000,
        kernel='linear'
    )

    # 獲取測試文件
    all_doc_ids = set(docs.keys())
    train_doc_ids = []
    for line in open(training_data, 'r', encoding='utf-8').readlines():
        parts = line.strip().split()
        train_doc_ids.extend(list(map(int, parts[1:])))
    test_doc_ids = sorted(all_doc_ids - set(train_doc_ids))
    test_docs = {doc_id: docs[doc_id] for doc_id in test_doc_ids}

    # 使用 SVM 分類測試文件
    test_predictions = classify_with_svm(test_docs, svm_model, vectorizer)

    # 儲存結果
    with open(output_path, 'w', encoding='utf-8') as file:
        file.write("Id,Value\n")
        for doc_id in sorted(test_predictions.keys()):
            file.write(f"{doc_id},{test_predictions[doc_id]}\n")

    print(f"Results saved to {output_path}")


[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/sophiehuang/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


Results saved to output_svm1.csv


In [74]:
import os
from collections import defaultdict
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.svm import SVC
import numpy as np
import chardet
import nltk

# 確保下載 NLTK 停用詞
nltk.download('stopwords')
stop_words = set(nltk.corpus.stopwords.words('english'))

# 資料預處理
def preprocess(text):
    # 移除第三行
    lines = text.split('\n')
    if len(lines) > 2:
        text = '\n'.join(lines[:2] + lines[3:])
    # 轉換為小寫
    text = text.lower()
    # 移除標點符號
    text = text.translate(str.maketrans('', '', string.punctuation))
    # 移除指定單詞和停用詞
    stop_words = set(stopwords.words('english'))
    tokens = nltk.word_tokenize(text)
    additional_stop_words = ['embed', 'oh', 'yeah', 'baby', 'uh', 'la', 'na', 'ah', 'whoa', 'gonna', 'wanna', 'hey', 'ho', 'ha', 'll']
    tokens = [word for word in tokens if word not in stop_words and word not in additional_stop_words]
    return ' '.join(tokens)

# 載入文件
def load_documents(folder_path):
    """從資料夾載入文件，並自動檢測編碼。"""
    docs = {}
    for filename in os.listdir(folder_path):
        if filename.endswith(".txt"):  # 確保只處理文字檔
            file_path = os.path.join(folder_path, filename)
            
            # 自動檢測文件編碼
            with open(file_path, 'rb') as file:  # 以二進制方式打開
                raw_data = file.read()
                detected_encoding = chardet.detect(raw_data)['encoding']
            
            # 以檢測到的編碼打開文件
            try:
                with open(file_path, 'r', encoding=detected_encoding) as file:
                    doc_id = int(filename.split('.')[0])  # 假設檔名為數字
                    docs[doc_id] = preprocess(file.read())
            except UnicodeDecodeError:
                print(f"Failed to decode file {filename} with encoding {detected_encoding}. Skipping.")
    return docs

# 訓練 SVM 分類器
def train_svm(training_data, docs, tfidf_features=1000, kernel='linear', C=1.0, gamma='scale', degree=3):
    """使用支持向量機訓練模型，並支持參數設置。"""
    class_docs = defaultdict(list)
    labels = []
    train_doc_ids = []
    
    # 讀取訓練資料
    for line in open(training_data, 'r', encoding='utf-8').readlines():
        parts = line.strip().split()
        cls = int(parts[0])
        doc_ids = list(map(int, parts[1:]))
        class_docs[cls].extend(doc_ids)
        labels.extend([cls] * len(doc_ids))
        train_doc_ids.extend(doc_ids)
    
    # 計算 TF-IDF
    vectorizer = TfidfVectorizer(max_features=tfidf_features)
    X_train = vectorizer.fit_transform([docs[doc_id] for doc_id in train_doc_ids])
    y_train = np.array(labels)
    
    # 訓練 SVM 模型
    svm_model = SVC(kernel=kernel, C=C, gamma=gamma, degree=degree, probability=True, random_state=42)
    svm_model.fit(X_train, y_train)
    
    return svm_model, vectorizer

# 文件分類
def classify_with_svm(test_docs, svm_model, vectorizer):
    """使用 SVM 對測試文件進行分類。"""
    X_test = vectorizer.transform([test_docs[doc_id] for doc_id in sorted(test_docs.keys())])
    test_predictions = {}

    for idx, doc_id in enumerate(sorted(test_docs.keys())):
        predicted_class = svm_model.predict(X_test[idx])
        test_predictions[doc_id] = predicted_class[0]  # SVM 的預測值是一個陣列
    
    return test_predictions

# 主程式
if __name__ == "__main__":
    training_data = "/Users/sophiehuang/Documents/113-1/113-1-IRTM/113-1-IRTM-final-project/folds/fold1.txt"
    folder_path = "/Users/sophiehuang/Documents/113-1/113-1-IRTM/all_lyrics"
    output_path = "output_svm_param_stop_1.csv"

    # 載入資料
    docs = load_documents(folder_path)

    # 訓練 SVM 分類器
    svm_model, vectorizer = train_svm(
        training_data,
        docs,
        tfidf_features=1000,
        kernel='rbf',  # 核函數選擇：'linear', 'rbf', 'poly', 'sigmoid'
        C=1.0,  # 正則化參數
        gamma='scale',  # 控制非線性核的範圍 ('scale' 自適應選擇)
        degree=3  # 多項式核的次數
    )

    # 獲取測試文件
    all_doc_ids = set(docs.keys())
    train_doc_ids = []
    for line in open(training_data, 'r', encoding='utf-8').readlines():
        parts = line.strip().split()
        train_doc_ids.extend(list(map(int, parts[1:])))
    test_doc_ids = sorted(all_doc_ids - set(train_doc_ids))
    test_docs = {doc_id: docs[doc_id] for doc_id in test_doc_ids}

    # 使用 SVM 分類測試文件
    test_predictions = classify_with_svm(test_docs, svm_model, vectorizer)

    # 儲存結果
    with open(output_path, 'w', encoding='utf-8') as file:
        file.write("Id,Value\n")
        for doc_id in sorted(test_predictions.keys()):
            file.write(f"{doc_id},{test_predictions[doc_id]}\n")

    print(f"Results saved to {output_path}")


SyntaxError: invalid syntax (2187819926.py, line 26)

In [58]:
import os
import torch
import numpy as np
from torch.utils.data import DataLoader, Dataset
from transformers import BertTokenizer, BertForSequenceClassification, AdamW
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

# 確保 GPU 可用
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 自定義數據集
class LyricsDataset(Dataset):
    def __init__(self, lyrics, labels, tokenizer, max_length):
        self.lyrics = lyrics
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.lyrics)

    def __getitem__(self, index):
        lyric = self.lyrics[index]
        label = self.labels[index]

        encoding = self.tokenizer.encode_plus(
            lyric,
            add_special_tokens=True,
            max_length=self.max_length,
            return_token_type_ids=False,
            padding="max_length",
            truncation=True,
            return_attention_mask=True,
            return_tensors="pt",
        )

        return {
            "input_ids": encoding["input_ids"].flatten(),
            "attention_mask": encoding["attention_mask"].flatten(),
            "labels": torch.tensor(label, dtype=torch.long),
        }

# 加載數據
def load_data(folder_path, training_data):
    # 讀取歌詞
    lyrics = []
    labels = []

    for line in open(training_data, "r", encoding="utf-8").readlines():
        parts = line.strip().split()
        cls = int(parts[0])
        doc_ids = list(map(int, parts[1:]))
        labels.extend([cls] * len(doc_ids))

        for doc_id in doc_ids:
            file_path = os.path.join(folder_path, f"{doc_id}.txt")
            with open(file_path, "r", encoding="utf-8") as file:
                lyrics.append(file.read().strip())

    return lyrics, labels

# 模型訓練
def train_model(train_loader, val_loader, model, optimizer, epochs=3):
    loss_fn = torch.nn.CrossEntropyLoss()
    model = model.to(device)

    for epoch in range(epochs):
        print(f"Epoch {epoch + 1}/{epochs}")
        model.train()
        train_loss = 0

        for batch in train_loader:
            optimizer.zero_grad()
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["labels"].to(device)

            outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss
            train_loss += loss.item()
            loss.backward()
            optimizer.step()

        print(f"Training Loss: {train_loss / len(train_loader):.4f}")

        # 評估模型
        model.eval()
        val_loss = 0
        val_preds = []
        val_labels = []

        with torch.no_grad():
            for batch in val_loader:
                input_ids = batch["input_ids"].to(device)
                attention_mask = batch["attention_mask"].to(device)
                labels = batch["labels"].to(device)

                outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
                loss = outputs.loss
                val_loss += loss.item()

                preds = torch.argmax(outputs.logits, dim=1)
                val_preds.extend(preds.cpu().numpy())
                val_labels.extend(labels.cpu().numpy())

        print(f"Validation Loss: {val_loss / len(val_loader):.4f}")
        print(classification_report(val_labels, val_preds))

    return model

# 主程式
if __name__ == "__main__":
    folder_path = "/Users/sophiehuang/Documents/113-1/113-1-IRTM/all_lyrics"  # 替換為歌詞資料夾的路徑
    training_data = "/Users/sophiehuang/Documents/113-1/113-1-IRTM/113-1-IRTM-final-project/folds/fold1.txt"  # 替換為訓練數據文件的路徑
    max_length = 128  # BERT 最大文本長度
    batch_size = 16
    learning_rate = 2e-5
    epochs = 3
    num_labels = 6  # 替換為情緒類別數量

    # 加載數據
    lyrics, labels = load_data(folder_path, training_data)

    # 分割數據集
    train_lyrics, val_lyrics, train_labels, val_labels = train_test_split(
        lyrics, labels, test_size=0.2, random_state=42
    )

    # 初始化 Tokenizer 和模型
    tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
    model = BertForSequenceClassification.from_pretrained(
        "bert-base-uncased", num_labels=num_labels
    )

    # 構建 DataLoader
    train_dataset = LyricsDataset(train_lyrics, train_labels, tokenizer, max_length)
    val_dataset = LyricsDataset(val_lyrics, val_labels, tokenizer, max_length)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size)

    # 優化器
    optimizer = AdamW(model.parameters(), lr=learning_rate)

    # 訓練模型
    model = train_model(train_loader, val_loader, model, optimizer, epochs)

    # 儲存模型
    model.save_pretrained("./bert_emotion_model")
    tokenizer.save_pretrained("./bert_emotion_model")
    print("Model saved.")


Downloading:   0%|          | 0.00/440M [00:00<?, ?B/s]

Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertForSequenceClassification: ['cls.predictions.transform.dense.bias', 'cls.predictions.decoder.weight', 'cls.predictions.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.seq_relationship.weight', 'cls.predictions.transform.dense.weight', 'cls.seq_relationship.bias']
- This IS expected if you are initializing BertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertForSequenceClassification were not initialized from the model checkpoint at

Epoch 1/3


IndexError: Target 6 is out of bounds.

In [None]:
import csv

# 讀取 txt 檔案
with open(".txt", "r", encoding="utf-8") as file:
    lines = file.readlines()

# 初始化資料列表
rows = []

# 解析每一行的內容
for line in lines:
    line = line.strip()  # 移除行首和行尾空白字元
    if ":" in line:  # 確保該行包含 ':'
        try:
            # 分割出 cluster ID 和 doc_ids
            cluster, doc_ids = line.split(":")
            cluster_id = cluster.strip().split(" ")[1]  # 取出 Cluster 編號
            doc_ids = doc_ids.split(",")  # 分割出 doc_id 列表

            # 將 cluster ID 和每個 doc_id 儲存為一行資料
            for doc_id in doc_ids:
                if doc_id.strip():  # 確保 doc_id 不為空
                    rows.append({"doc_id": doc_id.strip(), "cluster": cluster_id})
        except Exception as e:
            print(f"解析行時出現問題：{line}，錯誤訊息：{e}")

# 確保資料非空
if rows:
    # 寫入 CSV 檔案
    with open("output.csv", "w", newline="", encoding="utf-8") as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=["doc_id", "cluster"])
        writer.writeheader()  # 寫入欄位名稱
        writer.writerows(rows)  # 寫入資料

    print("CSV 檔案已成功建立：output.csv")
else:
    print("未找到有效資料，請檢查 .txt 檔案的內容格式！")


In [44]:
import csv

# 讀取 txt 檔案
with open("filtered_clusters.txt", "r", encoding="utf-8") as file:
    lines = file.readlines()

# 初始化資料列表
rows = []

# 解析每一行的內容
for line in lines:
    line = line.strip()  # 移除行首和行尾空白字元
    if ":" in line:  # 確保該行包含 ':'
        try:
            # 分割出 cluster ID 和 doc_ids
            cluster, doc_ids = line.split(":")
            cluster_id = cluster.strip().split(" ")[1]  # 取出 Cluster 編號
            doc_ids = doc_ids.split(",")  # 分割出 doc_id 列表

            # 將 cluster ID 和每個 doc_id 儲存為一行資料
            for doc_id in doc_ids:
                if doc_id.strip():  # 確保 doc_id 不為空
                    rows.append({"doc_id": int(doc_id.strip()), "cluster": cluster_id})  # 將 doc_id 轉為數字排序
        except Exception as e:
            print(f"解析行時出現問題：{line}，錯誤訊息：{e}")

# 對資料根據 doc_id 進行排序
rows = sorted(rows, key=lambda x: x["doc_id"])

# 確保資料非空
if rows:
    # 寫入 CSV 檔案
    with open("output.csv", "w", newline="", encoding="utf-8") as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=["doc_id", "cluster"])
        writer.writeheader()  # 寫入欄位名稱
        writer.writerows(rows)  # 寫入資料

    print("CSV 檔案已成功建立且已根據 doc_id 排序：output.csv")
else:
    print("未找到有效資料，請檢查 .txt 檔案的內容格式！")


CSV 檔案已成功建立且已根據 doc_id 排序：output.csv


In [45]:
import pandas as pd
import os

# 讀取 CSV 檔案
data = pd.read_csv("output.csv")

# 檢查輸入資料
print("資料預覽：")
print(data.head())

# 創建存放分割資料的資料夾
output_folder = "label_chunks"
os.makedirs(output_folder, exist_ok=True)

# 儲存分割結果
chunk_files = {f"chunk_{i+1}.csv": [] for i in range(10)}

# 對每個 cluster 進行分割
for label, group in data.groupby("cluster"):
    # 將該 label 的資料均分為 10 等份
    chunk_size = len(group) // 10
    remainder = len(group) % 10  # 用於處理無法整除的部分
    start_idx = 0
    
    for i in range(10):
        if i < remainder:  # 前 remainder 等份多分配一筆資料
            end_idx = start_idx + chunk_size + 1
        else:
            end_idx = start_idx + chunk_size
        
        chunk = group.iloc[start_idx:end_idx]
        chunk_files[f"chunk_{i+1}.csv"].append(chunk)
        start_idx = end_idx

# 合併並寫入每一個 chunk 到對應的檔案
for chunk_name, chunk_data in chunk_files.items():
    combined_chunk = pd.concat(chunk_data)
    combined_chunk.to_csv(os.path.join(output_folder, chunk_name), index=False)
    print(f"{chunk_name} 已儲存至 {output_folder}")

print("資料已按 label 均分為 10 等份，結果存放於資料夾 'label_chunks'")


資料預覽：
   doc_id  cluster
0       1        4
1       2        5
2       3        1
3       4        1
4       5        4
chunk_1.csv 已儲存至 label_chunks
chunk_2.csv 已儲存至 label_chunks
chunk_3.csv 已儲存至 label_chunks
chunk_4.csv 已儲存至 label_chunks
chunk_5.csv 已儲存至 label_chunks
chunk_6.csv 已儲存至 label_chunks
chunk_7.csv 已儲存至 label_chunks
chunk_8.csv 已儲存至 label_chunks
chunk_9.csv 已儲存至 label_chunks
chunk_10.csv 已儲存至 label_chunks
資料已按 label 均分為 10 等份，結果存放於資料夾 'label_chunks'


In [43]:
def load_file(filepath):
    """從文件中載入數據"""
    with open(filepath, 'r', encoding='utf-8') as file:
        return file.readlines()

def save_file(filepath, data):
    """將數據保存到文件"""
    with open(filepath, 'w', encoding='utf-8') as file:
        file.writelines(data)

def filter_ids(original_file, ids_to_remove_file, output_file):
    """過濾掉指定的 ID"""
    # 讀取原始數據
    original_data = load_file(original_file)

    # 讀取要刪除的 ID
    with open(ids_to_remove_file, 'r', encoding='utf-8') as file:
        ids_to_remove = {line.strip().split('.')[0] for line in file.readlines()}

    # 過濾數據
    filtered_data = []
    for line in original_data:
        # 確保行中包含冒號
        if ':' not in line:
            print(f"Skipping line (no colon found): {line.strip()}")
            continue

        cluster_id, ids = line.split(':', 1)  # 僅分割第一個冒號
        ids_list = ids.split(',')
        # 過濾掉需要刪除的 ID
        filtered_ids = [id.strip() for id in ids_list if id.strip() not in ids_to_remove]

        # 如果該 cluster 沒有剩餘 ID，可選擇是否跳過
        if not filtered_ids:
            print(f"Skipping cluster {cluster_id} (no IDs left after filtering).")
            continue

        # 構建新的行
        filtered_data.append(f"{cluster_id}: {', '.join(filtered_ids)}\n")

    # 保存過濾後的數據
    save_file(output_file, filtered_data)
    print(f"已成功將結果保存到 {output_file}")


# 設定檔案路徑
original_file = 'cluster.txt'       # 原始數據檔案
ids_to_remove_file = 'remove_ids.txt'  # 包含需要刪除 ID 的檔案
output_file = 'filtered_clusters.txt'  # 過濾後的數據檔案

# 執行過濾
filter_ids(original_file, ids_to_remove_file, output_file)


Skipping line (no colon found): 
Skipping line (no colon found): 
Skipping line (no colon found): 
Skipping line (no colon found): 
Skipping line (no colon found): 
Skipping line (no colon found): 
Skipping line (no colon found): 
Skipping line (no colon found): 
已成功將結果保存到 filtered_clusters.txt


In [None]:
def filter_ids(original_file, ids_to_remove_file, output_file):
    """過濾掉指定的 ID"""
    # 讀取原始數據
    original_data = load_file(original_file)

    # 讀取要刪除的 ID
    with open(ids_to_remove_file, 'r', encoding='utf-8') as file:
        ids_to_remove = {line.strip().split('.')[0] for line in file.readlines()}

    # 過濾數據
    filtered_data = []
    for line in original_data:
        # 確保行中包含冒號
        if ':' not in line:
            print(f"Skipping line (no colon found): {line.strip()}")
            continue

        cluster_id, ids = line.split(':', 1)  # 僅分割第一個冒號
        ids_list = ids.split(',')
        # 過濾掉需要刪除的 ID
        filtered_ids = [id.strip() for id in ids_list if id.strip() not in ids_to_remove]

        # 如果該 cluster 沒有剩餘 ID，可選擇是否跳過
        if not filtered_ids:
            print(f"Skipping cluster {cluster_id} (no IDs left after filtering).")
            continue

        # 構建新的行
        filtered_data.append(f"{cluster_id}: {', '.join(filtered_ids)}\n")

    # 保存過濾後的數據
    save_file(output_file, filtered_data)
    print(f"已成功將結果保存到 {output_file}")
