# Environment

## Installing package

In [3]:
!pip install pdfplumber
!pip install f
!pip install rank_bm25
!pip install -U ckip-transformers



## Import package

In [None]:
import os
import json
import re
from tqdm import tqdm
import jieba
import pdfplumber
import math
from collections import Counter

Mounted at /content/drive


## Route

In [None]:
stop_word_path = "./dataset/preliminary/stopwords.txt"
question_path = "./dataset/preliminary/questions_example.json" # 問題文件的路徑
dict_path = "./dataset/preliminary/dict.txt.big"
prompt_path = "./dataset/preliminary/prompt.txt"
source_path = "./reference" # 參考資料的路徑
output_path = "./dataset/preliminary/pred_retrieve.json" # 答案輸出的路徑
truth_sample_path = "./dataset/preliminary/ground_truths_example.json"
pred_sample_path = "./dataset/preliminary/pred_retrieve.json"

# Main functions

## load/read function

In [None]:
# 載入參考資料，返回一個字典，key為檔案名稱，value為PDF檔內容的文本
def load_data(source_path):
    masked_file_ls = os.listdir(source_path)  # 獲取資料夾中的檔案列表
    corpus_dict = {int(file.replace('.pdf', '')): read_pdf(os.path.join(source_path, file)) for file in tqdm(masked_file_ls)}  # 讀取每個PDF文件的文本，並以檔案名作為鍵，文本內容作為值存入字典
    return corpus_dict


# 讀取單個PDF文件並返回其文本內容
def read_pdf(pdf_loc, page_infos: list = None):
    pdf = pdfplumber.open(pdf_loc)  # 打開指定的PDF文件

    # TODO: 可自行用其他方法讀入資料，或是對pdf中多模態資料（表格,圖片等）進行處理

    # 如果指定了頁面範圍，則只提取該範圍的頁面，否則提取所有頁面
    pages = pdf.pages[page_infos[0]:page_infos[1]] if page_infos else pdf.pages
    pdf_text = ''
    for _, page in enumerate(pages):  # 迴圈遍歷每一頁
        text = page.extract_text()  # 提取頁面的文本內容
        if text:
            pdf_text += text
            pdf.close()  # 關閉PDF文件

    return pdf_text  # 返回萃取出的文本

## Cleaning text function

In [None]:
# Load stopwords
def load_stopwords(filepath):
    with open(filepath, 'r', encoding='utf-8') as f:
        stopwords = set(f.read().splitlines())
    return stopwords

stopwords = load_stopwords(stop_word_path)

jieba.set_dictionary(dict_path)

def diction():
    # 開啟並讀取檔案
    with open(prompt_path, 'r', encoding='utf-8') as file:
        for line in file:
            # 移除行末的換行符號及多餘的空白
            word = line.strip()
            # 確保詞彙非空
            if word:
                jieba.add_word(word)

diction()

def clean_text(text, comment):
    # Remove punctuation
    if (comment):
      punct_pattern = r'[\s+\.\!\/_,$%^*(+\"\'’“”‘’]+|[+——！，。？、~@#￥%……&*（）]+'
      text = "".join(c for c in text if c not in ('；','，','。','！','：','「','」','…','、','？','【','】','.',':','?',';','!','~','`','+','-','<','>','/','[',']','{','}',"'",'"'))
      text = re.sub(punct_pattern, '', text)

      # Normalize whitespace
      text = re.sub(r'\s+', ' ', text).strip()

      if not text:
        return []

      # Tokenization
      tokens = list(jieba.cut_for_search(text))

      # Remove stopwords
      tokens = [word for word in tokens if word not in stopwords]
    else:
      tokens = list(jieba.cut_for_search(text))
    return tokens

Building prefix dict from /content/drive/MyDrive/Colab Notebooks/AICUP/dataset/preliminary/dict.txt.big ...
DEBUG:jieba:Building prefix dict from /content/drive/MyDrive/Colab Notebooks/AICUP/dataset/preliminary/dict.txt.big ...
Loading model from cache /tmp/jieba.u610a394bc606bc52a272eabd0137bb7f.cache
DEBUG:jieba:Loading model from cache /tmp/jieba.u610a394bc606bc52a272eabd0137bb7f.cache
Loading model cost 1.569 seconds.
DEBUG:jieba:Loading model cost 1.569 seconds.
Prefix dict has been built successfully.
DEBUG:jieba:Prefix dict has been built successfully.


## Retrieve algorithm

In [None]:
# 使用基於 Dirichlet 平滑的語言模型進行檢索
def LMIR_retrieve(qs, source, corpus_dict):
    # 根據來源取得過濾後的語料庫
    filtered_corpus = [corpus_dict[int(file)] for file in source]
    doc_ids = [int(file) for file in source]

    # 對文件和查詢進行分詞
    tokenized_corpus = [clean_text(doc,True) for doc in filtered_corpus]
    tokenized_query = clean_text(qs,True)

    # 計算語料庫中的詞頻
    corpus_term_freq = Counter()
    for doc_tokens in tokenized_corpus:
        corpus_term_freq.update(doc_tokens)
    corpus_length = sum(corpus_term_freq.values())

    # 預先計算每個文件的長度和詞頻
    doc_term_freqs = []
    doc_lengths = []
    for doc_tokens in tokenized_corpus:
        term_freq = Counter(doc_tokens)
        doc_term_freqs.append(term_freq)
        doc_lengths.append(len(doc_tokens))

    # 設定 Dirichlet 平滑參數 mu
    mu = 3000

    # 計算每個文件的查詢似然
    scores = []
    for i in range(len(filtered_corpus)):
        score = 0.0
        doc_length = doc_lengths[i]
        doc_term_freq = doc_term_freqs[i]
        for term in tokenized_query:
            cf = corpus_term_freq[term]
            p_corpus = cf / corpus_length if corpus_length > 0 else 0
            tf = doc_term_freq[term]
            # Dirichlet 平滑計算
            p = (tf + mu * p_corpus) / (doc_length + mu)
            if p > 0:
                score += math.log(p)
        scores.append(score)

    # 取得得分最高的文件索引
    most_similar_doc_index = scores.index(max(scores))
    retrieved_doc_id = doc_ids[most_similar_doc_index]
    return retrieved_doc_id # 傳回文檔名稱（整數）

## Loading dataset

In [None]:
answer_dict = {"answers": []}  # 初始化字典

source_path_insurance = os.path.join(source_path, 'insurance')  # 設定參考資料路徑
corpus_dict_insurance = load_data(source_path_insurance)

source_path_finance = os.path.join(source_path, 'finance')  # 設定參考資料路徑
corpus_dict_finance = load_data(source_path_finance)

with open(os.path.join(source_path, 'faq/pid_map_content.json'), 'rb') as f_s:
    key_to_source_dict = json.load(f_s)  # 讀取參考資料文件
    key_to_source_dict = {int(key): value for key, value in key_to_source_dict.items()}

100%|██████████| 643/643 [03:49<00:00,  2.80it/s]
100%|██████████| 1035/1035 [23:55<00:00,  1.39s/it]


## Run

In [309]:
answer_dict = {"answers": []}  # 初始化字典

with open(question_path, 'rb') as f:
    qs_ref = json.load(f)  # 讀取問題檔案

for q_dict in tqdm(qs_ref['questions'], desc="Processing questions"):
  if q_dict['category'] == 'finance':
      # 進行檢索
      retrieved = LMIR_retrieve(q_dict['query'], q_dict['source'], corpus_dict_finance)
      # 將結果加入字典
      answer_dict['answers'].append({"qid": q_dict['qid'], "retrieve": retrieved})

  elif q_dict['category'] == 'insurance':
      retrieved = LMIR_retrieve(q_dict['query'], q_dict['source'], corpus_dict_insurance)
      answer_dict['answers'].append({"qid": q_dict['qid'], "retrieve": retrieved})

  elif q_dict['category'] == 'faq':
      corpus_dict_faq = {key: str(value) for key, value in key_to_source_dict.items() if key in q_dict['source']}
      retrieved = LMIR_retrieve(q_dict['query'], q_dict['source'], corpus_dict_faq)
      answer_dict['answers'].append({"qid": q_dict['qid'], "retrieve": retrieved})

  else:
      raise ValueError("Something went wrong")  # 如果過程有問題，拋出錯誤

# 將答案字典保存為json文件
with open(output_path, 'w', encoding='utf8') as f:
  json.dump(answer_dict, f, ensure_ascii=False, indent=4)  # 儲存檔案，確保格式和非ASCII字符

Processing questions: 100%|██████████| 150/150 [00:07<00:00, 18.89it/s]


## Evaluate accuracy

In [310]:
import json

# Load the ground truth JSON file
with open(truth_sample_path, 'r', encoding='utf-8') as f:
    ground_truth_data = json.load(f)

# Load your result JSON file
with open(pred_sample_path, 'r', encoding='utf-8') as f:
    your_result_data = json.load(f)


# Ground truth mapping
ground_truths = {}
for item in ground_truth_data['ground_truths']:
    qid = item['qid']
    retrieve = item['retrieve']
    category = item.get('category', 'unknown')  # Get category if needed
    ground_truths[qid] = {
        'retrieve': retrieve,
        'category': category
    }

# Your result mapping
your_results = {}
for item in your_result_data['answers']:
    qid = item['qid']
    retrieve = item['retrieve']
    your_results[qid] = retrieve

from collections import defaultdict

category_correct = defaultdict(int)
category_total = defaultdict(int)

for qid, gt in ground_truths.items():
    gt_retrieve = gt['retrieve']
    category = gt['category']
    your_retrieve = your_results.get(qid)

    if your_retrieve is not None:
        category_total[category] += 1
        if your_retrieve == gt_retrieve:
            category_correct[category] += 1

correct = 0
total = 0
incorrect_qids = []

for qid, gt in ground_truths.items():
    gt_retrieve = gt['retrieve']
    your_retrieve = your_results.get(qid)

    if your_retrieve is not None:
        total += 1
        if your_retrieve == gt_retrieve:
            correct += 1
        else:
            incorrect_qids.append({
                'qid': qid,
                'ground_truth': gt_retrieve,
                'your_retrieve': your_retrieve,
                'category': gt['category']
            })
    else:
        print(f"Warning: QID {qid} not found in your results.")

# Handle any qids in your results that are not in ground truth
extra_qids = set(your_results.keys()) - set(ground_truths.keys())
if extra_qids:
    print(f"These QIDs are in your results but not in ground truth: {extra_qids}")

## Print accuracy

In [311]:
print("Category-wise Accuracy:")
for category in category_total:
    cat_accuracy = category_correct[category] / category_total[category]
    print(f"- {category}: {cat_accuracy * 100:.2f}%")

if total > 0:
    accuracy = correct / total
    print(f"Total Questions Evaluated: {total}")
    print(f"Correctly Retrieved: {correct}")
    print(f"Accuracy: {accuracy * 100:.2f}%")
else:
    print("No matching QIDs found between your results and ground truth.")

Category-wise Accuracy:
- insurance: 90.00%
- finance: 80.00%
- faq: 94.00%
Total Questions Evaluated: 150
Correctly Retrieved: 132
Accuracy: 88.00%


FT
Category-wise Accuracy:
- insurance: 82.00%
- finance: 82.00%
- faq: 94.00%
Total Questions Evaluated: 150
Correctly Retrieved: 129
Accuracy: 86.00%