In [1]:
import os
import sys
import re
import pandas as pd
from nltk.stem import PorterStemmer
import numpy as np
import string
from tqdm import trange

# Utility Function

In [2]:
def read_all_file(file_path: str) -> dict[str, list[str]]:
    # 檢查該路徑是否為目錄
    if not os.path.isdir(file_path):
        print("The path is not a directory.")
        sys.exit(1)
    # 取得所有目錄下檔案名稱
    file_names: list[str] = os.listdir(file_path)
    # 依照檔案名稱排序
    file_names.sort(key=lambda x: int(x.split(".")[0]))
    # 取得所有檔案內容
    all_content: dict[str, list[str]] = {"File Name": [], "Content": []}
    for file_name in file_names:
        fd = open(os.path.join(file_path, file_name), "r")
        content: list[str] = fd.readlines()
        fd.close()    
        all_content["File Name"].append(file_name)
        all_content["Content"].append(" ".join(content))

    return all_content

def read_file(file_path: str) -> list[str]:
    fd = open(file_path, "r")
    contents = fd.readlines()
    fd.close()
    
    return [content.replace("\n", "") for content in contents]

def write_file(file_path: str, data: list[float]):
    fd = open(file_path, "w")
    for d in data:
        fd.write(str(d)+"\n")
    fd.close()

def text_preprocessing(text: str, stopwords: list[str]) -> str:
    # 移除標點符號
    punctuations = string.punctuation + '-'
    text = "".join([char for char in text if char not in punctuations])
    # 去除數字
    text: str = re.sub(r"\d+", "", text)
    # 轉換為小寫
    text: str = text.lower()
    # 去除 stopwords
    text: list[str] = text.split()
    text: list[str] = [t for t in text if t not in stopwords]
    # stemming
    stemmer = PorterStemmer()
    text: list[str] = [stemmer.stem(t) for t in text]


    text: str = " ".join(text)
    return text    

def compute_cosine_similarity(doc_x: str, doc_y: str, dictionary: pd.DataFrame) -> float:
    doc_x_df = pd.read_csv("./output/"+doc_x, sep='\t')
    doc_y_df = pd.read_csv("./output/"+doc_y, sep='\t')

    # 去除 t_index 重複的列，以防合併後數量不一致
    doc_x_df.drop_duplicates(subset='t_index', inplace=True)
    doc_y_df.drop_duplicates(subset='t_index', inplace=True)
    # 將 dataframe 合併 
    merge_df = pd.merge(dictionary, doc_x_df, on='t_index', how='left')
    merge_df.fillna(0, inplace=True)
    doc_x_vector = merge_df['tf_idf'].values

    merge_df = pd.merge(dictionary, doc_y_df, on='t_index', how='left')
    merge_df.fillna(0, inplace=True)
    doc_y_vector = merge_df['tf_idf'].values

    return np.dot(doc_x_vector, doc_y_vector)

# Read File

In [3]:
input_file_path = "./data"
all_content: dict[str, list[str]] = read_all_file(input_file_path)
all_content_df: pd.DataFrame = pd.DataFrame(all_content)
stopwords = read_file("./stopwords.txt")

# Text Preprocessing

In [4]:
all_content_df["Content"] = all_content_df["Content"].apply(text_preprocessing, args=(stopwords,))

# Compute Document Frequency(DF)

In [5]:
dictionary = {}
for text in all_content_df["Content"].values:
    tokens = list(set(text.split(" ")))
    for term in tokens:
        if term in dictionary:
            dictionary[term] += 1
        else:
            dictionary[term] = 1

# 依照字典 key 排序
dictionary = dict(sorted(dictionary.items()))
# 轉換成 DataFrame
dictionary = pd.DataFrame.from_dict(dictionary, orient="index", columns=["df"])
dictionary.reset_index(inplace=True)
dictionary = dictionary.rename(columns={"index": "term"})
dictionary.reset_index(inplace=True)
dictionary = dictionary.rename(columns={"index": "t_index"})
# Output
dictionary.to_csv('./output/dictionary.txt', sep='\t', index=False)

# Inverse Document Frequency(IDF)

In [6]:
number_of_documents = len(all_content_df)
dictionary["idf"] = np.log10(number_of_documents / dictionary["df"].values)

# Compute TF-IDF

In [7]:
for index in trange(number_of_documents):
    tf_idf = {}
    row = all_content_df.iloc[index]
    tokens = row["Content"].split(" ")
    unique_tokens = set(tokens)
    for token in unique_tokens:
         # 計算 tf
        tf = tokens.count(token) / len(tokens)
        # 計算 tf-idf
        tf_idf[token] = tf * dictionary[dictionary["term"]==token]["idf"].values[0]
    
    # 根據算出來的 tf-idf 組成文件的向量
    doc_vec = {"t_index": [], "tf_idf": []}
    for token in tokens:
        doc_vec["t_index"].append(dictionary[dictionary["term"] == token]["t_index"].values[0])
        doc_vec["tf_idf"].append(tf_idf[token])
    doc_vec_df = pd.DataFrame(doc_vec)
    # 轉成 unit vector
    doc_vec_df['tf_idf'] = doc_vec_df['tf_idf'] / np.linalg.norm(doc_vec_df['tf_idf'].values)
    # Output
    doc_vec_df.to_csv(f'./output/{row["File Name"]}', sep='\t', index=False)
    


100%|██████████| 1095/1095 [08:07<00:00,  2.25it/s]


# Compute Cosine Similarity

In [8]:
doc_x = "1.txt"
doc_y = "2.txt"

cos_similarity = compute_cosine_similarity(doc_x, doc_y, dictionary)
print(f"Cosine Similarity: {cos_similarity}")

Cosine Similarity: 0.07202953986765596
