In [1]:
from typing import List, Dict
import numpy as np
from abc import ABC, abstractmethod
from nltk.probability import FreqDist
import math

In [2]:
import json

# Get data from json file
with open('/kaggle/input/alqac2024/law.json', 'r', encoding='utf-8') as f:
    law_data = json.load(f)

with open('/kaggle/input/alqac2024/train.json', 'r', encoding='utf-8') as f:
    train_data = json.load(f)

with open('/kaggle/input/alqac2024/public_test.json', 'r', encoding='utf-8') as f:
    test_data = json.load(f)


In [3]:
class LexicalRetrieval(ABC):
    def __init__(self, data: List[Dict[str, str]]):
        # Initialize the class with a list of dictionaries containing laws and their articles.
        self.data = data

    def data2documents(self):
        # Convert the list of laws and articles into a flat list of document texts.
        documents = []
        for law in self.data:
            for article in law['articles']:
                documents.append(article['text'])
        return documents

    def info_search(self, document: str) -> Dict[str, str]:
        # Search for a specific document and return its associated law ID and article ID.
        res = {}
        for law in self.data:
            for article in law['articles']:
                if article['text'] == document:
                    res['law_id'] = law['id']
                    res['article_id'] = article['id']
                    res["text"] = article["text"]
        return res

    @abstractmethod
    def retrieve(self, query: str, top_k: int = 10) -> List[Dict[str, str]]:
        # Abstract method to retrieve the top-k documents most relevant to the query.
        pass

    @abstractmethod
    def get_scores(self, query_tokens: List[str]) -> np.ndarray:
        # Abstract method to compute similarity scores for the query tokens against all documents.
        pass

    def score(self, query: str, document: str) -> float:
        # Compute a normalized score for the similarity between the query and a specific document.
        assert document in self.documents, "Document not in corpus"

        query_tokens = query.split()
        scores = self.get_scores(query_tokens)
        scores = (scores - np.min(scores)) / (np.max(scores) - np.min(scores))

        for i, doc in enumerate(self.documents):
            if doc == document:
                return scores[i]
        return 0.0

In [4]:
class QLDRetrieval(LexicalRetrieval):
    def __init__(self, data: List[Dict[str, str]]):
        # Initialize the class by calling the parent constructor and setting additional attributes.
        super().__init__(data)
        self.documents = self.data2documents()
        self.n = 100  # A parameter for QLD scoring
        self.alpha_d = 0.1  # Smoothing parameter for QLD scoring
        self.epsilon = 0.00001  # Small constant to avoid division by zero
        self.tokenized_docs, self.fdist_docs = self.preprocess_documents()  # Tokenize and compute frequency distributions for documents
        self.collection_fdist = self.compute_collection_frequencies()  # Compute the frequency distribution for the entire collection

    def preprocess_documents(self) -> List[List[str]]:
        # Tokenize the documents and compute their frequency distributions.
        tokenized_docs = [doc.split() for doc in self.documents]
        fdist_docs = [FreqDist(doc) for doc in tokenized_docs]
        return tokenized_docs, fdist_docs

    def compute_collection_frequencies(self) -> FreqDist:
        # Compute the frequency distribution of all tokens in the collection.
        all_tokens = [token for doc in self.tokenized_docs for token in doc]
        return FreqDist(all_tokens)

    def compute_term_scores(self, query_tf: FreqDist, doc_tf: FreqDist) -> List[float]:
        # Compute the term scores for a given query term frequency and document term frequency.
        term_scores = []
        for term in query_tf:
            if term in doc_tf:
                p_qi_d = (doc_tf[term] / len(doc_tf)) / self.alpha_d
                p_qi_c = self.collection_fdist[term] / len(self.collection_fdist)
                term_scores.append(math.log(p_qi_d / p_qi_c + self.epsilon))
        return term_scores

    def compute_qld_scores(self, query_tf: FreqDist, doc_tfs: List[FreqDist]) -> List[float]:
        # Compute the QLD scores for a query against all documents.
        scores = []
        for doc_tf in doc_tfs:
            term_scores = self.compute_term_scores(query_tf, doc_tf)
            doc_score = np.sum(term_scores)
            doc_score += self.n * math.log(self.alpha_d)
            doc_score += np.sum([math.log(self.collection_fdist[term] / len(self.collection_fdist) + self.epsilon) for term in query_tf])
            scores.append(doc_score)
        return scores

    def get_scores(self, query_tokens: List[str]) -> np.ndarray:
        # Compute the similarity scores for the query tokens against all documents.
        query_tf = FreqDist(query_tokens)
        doc_tfs = [FreqDist(doc) for doc in self.tokenized_docs]
        scores = self.compute_qld_scores(query_tf, doc_tfs)
        return scores

    def retrieve(self, query: str, top_k: int = 10) -> List[Dict[str, str]]:
        # Retrieve the top-k documents most relevant to the query based on QLD scores.
        scores = self.get_scores(query.split())
        sorted_scores = np.argsort(scores)[::-1][:top_k]
        scores = (scores - np.min(scores)) / (np.max(scores) - np.min(scores))
        res = []
        for i in sorted_scores:
            tmp = self.info_search(self.documents[i])
            tmp["qld_score"] = scores[i]
            res.append(tmp)
        return res

    def retrieve_all(self, query: str) -> List[Dict[str, str]]:
        # Retrieve all documents sorted by relevance to the query based on QLD scores.
        scores = self.get_scores(query.split())
        sorted_scores = np.argsort(scores)[::-1]
        scores = (scores - np.min(scores)) / (np.max(scores) - np.min(scores))
        res = []
        for i in sorted_scores:
            tmp = self.info_search(self.documents[i])
            tmp["qld_score"] = scores[i]
            res.append(tmp)
        return res

In [5]:
def find_law(law_id, article_id, data):
    for law in data:
        if law['id'] == law_id:
            for article in law['articles']:
                if article['id'] == article_id:
                    return article['text']
    return None

In [6]:
def find_query(question_id, data):
    for q in data:
        if q['question_id'] == question_id:
            return q['text']
    return None

In [7]:
def find_score(data, law_id, article_id):
    for entry in data:
        if entry['law_id'] == law_id and entry['article_id'] == article_id:
            return entry['qld_score']
    return None

In [8]:
qld_retrieval = QLDRetrieval(law_data)

In [9]:
def get_qld(data):
    # Compute qld_score for each row and add as a new column
    qld_scores = []               
    scores = []
    query = ""
    for index, row in data.iterrows():
        if query != row['query']:
            query = row['query']
            scores = qld_retrieval.retrieve_all(query)
            
        law_id = str(row['law_id'])
        article_id = str(row['article_id'])

        qld_score = find_score(scores, law_id, article_id)
        if qld_score is None:
            content = find_law(law_id, article_id, law_data)
            qld_score = qld_retrieval.score(query, content)
#         print(law_id, article_id, qld_score)
        qld_scores.append(qld_score)

    data['qld_score'] = qld_scores
    return data

In [10]:
# import pandas as pd
# path = '/kaggle/input/all-private-test-related-data-alqac-2024/bm25_bert_len_output_data_private_24_vimonot5.csv'

# data = pd.read_csv(path)

# data = get_qld(data)

# data.to_csv('bm25_bert_len_output_data_private_24_vimonot5.csv', index=False,  encoding='utf-8')


In [12]:
##Add qld score to each query in csv file
# import os
# import pandas as pd

# # Define the path to the directory
# directory_path = '/kaggle/input/output-concat-bm25/with label'

# # List all files in the directory
# all_files = os.listdir(directory_path)

# # Filter out and process only the CSV files
# csv_files = [file for file in all_files if file.endswith('.csv')]

# # Iterate through each CSV file and read its content
# for csv_file in csv_files:
#     file_path = os.path.join(directory_path, csv_file)
#     data = pd.read_csv(file_path)
    
#     data = get_qld(data)

#     # Save the modified DataFrame back to CSV
#     output_file_path = csv_file
#     data.to_csv(output_file_path, index=False,  encoding='utf-8')
    
#     print(f"Processed and saved {csv_file}")

In [13]:
# train_scores = []
# # Get scores in train
# for item in train_data:
#     question_text = item["text"]
#     question_id = item['question_id']
#     # Get top 100
#     scores = qld_retrieval.retrieve(question_text, 100)
#     break
#     scores_list = []
#     for score in scores:
#         scores_list.append({
#             "qld_score": score["qld_score"],
#             "law_id": score["law_id"],
#             "article_id": score["article_id"]
#         })
    
#     train_scores.append({
#         "question_id": question_id,
#         "scores": scores_list
#     })

# print('Train')
# # print(train_scores[0])
    


In [14]:
# test_scores = []
# # Get scores in test
# for item in test_data:
#     question_text = item["text"]
#     question_id = item['question_id']
#     # Get top 100
#     scores =  qld_retrieval.retrieve(question_text, 100)
#     scores_list = []
#     for score in scores:
#         scores_list.append({
#             "qld_score": score["qld_score"],
#             "law_id": score["law_id"],
#             "article_id": score["article_id"]
#         })
    
#     test_scores.append({
#         "question_id": question_id,
#         "scores": scores_list
#     })
    

# print('Test')

Save output

In [15]:
# path1= "train_scores.json"
# with open(path1, "w", encoding="utf-8") as jsonfile:
#     json.dump(train_scores, jsonfile, ensure_ascii=False, indent=4)
    
# path2= "test_scores.json"
# with open(path2, "w", encoding="utf-8") as jsonfile:
#     json.dump(test_scores, jsonfile, ensure_ascii=False, indent=4)
    
