In [1]:
from simcse import SimCSE
from entity_retrieval import surface_index_memory
# from your_module import convert_normed_to_s_expression  # Đổi 'your_module' thành tên file của bạn
from simcse import SimCSE
from itertools import product
from SPARQLWrapper import SPARQLWrapper, JSON
from tqdm import tqdm
import ujson
import json
import time

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
model = SimCSE("princeton-nlp/unsup-simcse-roberta-large")

surface_index = surface_index_memory.EntitySurfaceIndexMemory(
    "vi_entity_list_file_wikidata_complete_all_mention", "vi_surface_map_file_wikidata_all_mention",
    "vi_surface_map_file_wikidata_complete_all_mention")

02/23/2025 19:12:00 - INFO - simcse.tool -   Use `cls_before_pooler` for unsupervised models. If you want to use other pooling policy, specify `pooler` argument.
02/23/2025 19:12:00 - INFO - entity_retrieval.surface_index_memory -   Building entity mid vocabulary.


FileNotFoundError: [Errno 2] No such file or directory: 'vi_entity_list_file_wikidata_complete_all_mention'

In [3]:
def is_valid_expression(expr):
    """Kiểm tra tính hợp lệ của biểu thức bằng cách đếm số ngoặc mở và đóng."""
    count = 0
    for char in expr:
        if char == '(':
            count += 1
        elif char == ')':
            count -= 1
        if count < 0:
            return False  # Gặp ngoặc đóng trước ngoặc mở
    return count == 0

def fix_unbalanced_parentheses(expr):
    """Loại bỏ ngoặc đóng dư nếu có."""
    while not is_valid_expression(expr) and expr.endswith(')'):
        expr = expr[:-1]
    return expr

In [4]:

def find_entity_or_relation(label, label_map, simcse_model, facc1_index, top_k=50, similarity_threshold=0.2):
    """
    Tìm mã tương ứng cho thực thể hoặc quan hệ từ gold maps, SimCSE hoặc FACC1.
    Nếu label_map rỗng, sẽ bỏ qua bước similarity và chuyển thẳng sang tra cứu FACC1.
    """
    search = True
    label_lower = label.lower()
    # Nếu label_map không rỗng, thử tra cứu qua label_map và similarity
    if label_map:
        if label_lower in label_map:
            return label_map[label_lower]
        similarities = simcse_model.similarity([label_lower], list(label_map.keys()))
        if list(label_map.keys()):
            merged_list = list(zip(label_map.keys(), similarities[0]))
            sorted_list = sorted(merged_list, key=lambda x: x[1], reverse=True)
            if sorted_list and sorted_list[0][1] > similarity_threshold:
                return label_map[sorted_list[0][0]]
    
    # Nếu không có dữ liệu từ label_map, chuyển thẳng sang tra cứu FACC1
    try:
        print(label)
        facc1_candidates = facc1_index.get_indexrange_entity_el_pro_one_mention(label, top_k=top_k)
    except Exception as e:
        # Nếu có lỗi xảy ra trong quá trình tra cứu FACC1, log lỗi và trả về label gốc
        print(f"Lỗi khi truy xuất FACC1 cho label '{label}': {e}")
        return label

    if facc1_candidates:
        keys = list(facc1_candidates.keys())
        if not keys:
            return label
        try:
            # Lấy candidate đầu tiên và các candidate có điểm >= 0.001
            temp = [key for key in keys[1:] if facc1_candidates[key] >= 0.001]
            return [keys[0]] + temp if temp else keys[0]
        except Exception as e:
            print(f"Lỗi khi xử lý kết quả FACC1 cho label '{label}': {e}")
            return label

    return label

# def invert_map(original_map):
#     """Đảo ngược key-value trong map để tìm mã từ tên."""
#     return {v.lower(): k for k, v in original_map.items()}

def parse_nsexpr(expr):
    """
    Chuyển chuỗi biểu thức thành cây cấu trúc dạng nested list.
    Hàm này dùng duyệt ký tự, khi gặp '(' sẽ tìm phần con cho đến khi khớp với ')',
    và giữ nguyên nội dung trong ngoặc vuông.
    """
    tokens = []
    i = 0
    while i < len(expr):
        if expr[i].isspace():
            i += 1
        elif expr[i] == '(':
            # Tìm phần con của biểu thức trong ngoặc đơn
            count = 1
            j = i + 1
            while j < len(expr) and count > 0:
                if expr[j] == '(':
                    count += 1
                elif expr[j] == ')':
                    count -= 1
                j += 1
            # Đệ quy phân tích phần con (loại bỏ ngoặc bao ngoài)
            subtree = parse_nsexpr(expr[i+1:j-1])
            tokens.append(subtree)
            i = j
        elif expr[i] == '[':
            # Giữ nguyên nội dung trong ngoặc vuông
            j = expr.find(']', i)
            if j == -1:
                return ""
                raise ValueError("Không tìm thấy dấu ']' kết thúc.")
                
            token = expr[i:j+1].strip()
            tokens.append(token)
            i = j + 1
        else:
            # Đọc một token cho đến khi gặp khoảng trắng hoặc ngoặc
            j = i
            while j < len(expr) and (not expr[j].isspace()) and expr[j] not in ['(', ')']:
                j += 1
            tokens.append(expr[i:j])
            i = j
    return tokens

def collect_labels(tree):
    """
    Duyệt cây cấu trúc (nested list) để thu thập các nhãn của quan hệ và thực thể.
    Giả sử:
      - Biểu thức JOIN có dạng: ["JOIN", relation_part, entity_part]
      - Phần relation_part: nếu là list và bắt đầu bằng "R", thì phần thứ hai chứa nhãn quan hệ (dạng "[ label ]"). 
        Nếu là chuỗi dạng "[ label ]" thì đó cũng là nhãn quan hệ.
      - Phần entity_part: nếu là chuỗi dạng "[ label ]" thì đó là nhãn thực thể, nếu là list thì xử lý đệ quy.
      - Biểu thức AND sẽ có nhiều biểu thức con.
    """
    relations = []
    entities = []
    
    if isinstance(tree, list) and tree:
        # Nếu token đầu tiên là JOIN hoặc AND
        op = tree[0]
        if isinstance(op, str):
            op_upper = op.upper()
        else:
            op_upper = ""
        
        if op_upper == "JOIN":
            # Xử lý phần quan hệ
            if len(tree) >= 2:
                rel_part = tree[1]
                # Nếu là list dạng [ "R", "[ label ]" ]
                if isinstance(rel_part, list) and len(rel_part) >= 2 and isinstance(rel_part[0], str) and rel_part[0].upper() == "R":
                    token = rel_part[1]
                    if isinstance(token, str) and token.startswith('[') and token.endswith(']'):
                        rel_label = token[1:-1].strip()
                        relations.append(rel_label)
                # Nếu là chuỗi dạng "[ label ]"
                elif isinstance(rel_part, str) and rel_part.startswith('[') and rel_part.endswith(']'):
                    rel_label = rel_part[1:-1].strip()
                    relations.append(rel_label)
                else:
                    # Nếu không đúng định dạng, duyệt đệ quy
                    sub_rel, sub_ent = collect_labels(rel_part)
                    relations.extend(sub_rel)
                    entities.extend(sub_ent)
            # Xử lý phần thực thể
            if len(tree) >= 3:
                ent_part = tree[2]
                if isinstance(ent_part, list):
                    sub_rel, sub_ent = collect_labels(ent_part)
                    relations.extend(sub_rel)
                    entities.extend(sub_ent)
                elif isinstance(ent_part, str) and ent_part.startswith('[') and ent_part.endswith(']'):
                    ent_label = ent_part[1:-1].strip()
                    entities.append(ent_label)
        elif op_upper == "AND":
            # Với AND, duyệt tất cả các phần con
            for sub in tree[1:]:
                sub_rel, sub_ent = collect_labels(sub)
                relations.extend(sub_rel)
                entities.extend(sub_ent)
        else:
            # Nếu không phải JOIN hay AND, duyệt tất cả các phần tử nếu chúng là list
            for elem in tree:
                if isinstance(elem, list):
                    sub_rel, sub_ent = collect_labels(elem)
                    relations.extend(sub_rel)
                    entities.extend(sub_ent)
    return relations, entities



def extract_entities_and_relations(normed_expr):

    if not normed_expr or len(normed_expr) == 0:  # Kiểm tra nếu normed_expr rỗng
        return [], []
    
    if normed_expr[0] != "(":
        return [], []
    
    tree = parse_nsexpr(normed_expr)
    if tree is None:
        return [], []  # Trả về danh sách rỗng nếu parse thất bại
    
    return collect_labels(tree)



In [5]:
def convert_normed_to_s_expression(normed_expr, gold_relation_map, gold_entity_map, simcse_model, facc1_index):
    """
    Chuyển đổi từ normed_sexpression sang s_expression.
    Sau khi trích xuất các nhãn quan hệ và thực thể từ normed_expr,
    ta lấy danh sách các mã ứng viên cho mỗi nhãn và tạo hoán vị giữa các cặp ứng viên đó.
    Kết quả trả về là một danh sách các s_expression khả dĩ.
    """
    # Đảo ngược map để lấy mã từ tên
    # inverted_relation_map = invert_map(gold_relation_map)
    # inverted_entity_map = invert_map(gold_entity_map)
    normed_expr = fix_unbalanced_parentheses(normed_expr)
    # Trích xuất các nhãn quan hệ và thực thể từ biểu thức
    relations, entities = extract_entities_and_relations(normed_expr)
    
    # Tạo mapping từ token xuất hiện trong biểu thức sang danh sách các ứng viên mã.
    # Ví dụ: token_str = "[ author ]"
    candidate_map = {}
    
    for rel in relations:
        token = f'[ {rel} ]'
        candidate = find_entity_or_relation(rel, gold_relation_map, simcse_model, facc1_index)
        # Nếu candidate không phải danh sách, chuyển nó thành danh sách để tạo hoán vị
        if not isinstance(candidate, list):
            candidate = [candidate]
        candidate_map[token] = candidate
        
    for ent in entities:
        token = f'[ {ent} ]'
        candidate = find_entity_or_relation(ent, gold_entity_map, simcse_model, facc1_index)
        if not isinstance(candidate, list):
            candidate = [candidate]
        candidate_map[token] = candidate
    
    # Nếu không có token nào cần thay thế, trả về biểu thức gốc
    if not candidate_map:
        return [normed_expr]
    
    # Lấy danh sách các token và danh sách các danh sách ứng viên tương ứng
    tokens = list(candidate_map.keys())
    candidate_lists = [candidate_map[token] for token in tokens]
    
    # Tạo tất cả các hoán vị ứng viên (Cartesian product)
    all_combinations = list(product(*candidate_lists))
    
    s_expressions = []
    for comb in all_combinations:
        temp_expr = normed_expr
        # Với mỗi token, thay thế bằng ứng viên tương ứng theo hoán vị
        for token, replacement in zip(tokens, comb):
            temp_expr = temp_expr.replace(token, replacement)
        s_expressions.append(temp_expr)
    
    return s_expressions

In [6]:
class SExpressionParser:
    def __init__(self):
        self.var_counter = 1  # Đếm số biến trung gian (?X1, ?X2, ...)

    def get_new_var(self):
        """Tạo biến trung gian mới."""
        var_name = f"?X{self.var_counter}"
        self.var_counter += 1
        return var_name

    def parse_s_expr(self, s_expr):
        """Chuyển đổi S-Expression thành danh sách lồng nhau."""
        s_expr = re.sub(r'\(', ' ( ', s_expr)
        s_expr = re.sub(r'\)', ' ) ', s_expr)
        tokens = s_expr.split()
        return self.build_tree(tokens)

    def build_tree(self, tokens):
        """Chuyển đổi danh sách token thành cây lồng nhau."""
        if not tokens:
            return None
        token = tokens.pop(0)
        if token == "(":
            sub_expr = []
            while tokens[0] != ")":
                sub_expr.append(self.build_tree(tokens))
            tokens.pop(0)  # Bỏ dấu ")"
            return sub_expr
        elif token == ")":
            raise ValueError("Unexpected ')'")
        else:
            return token

    def process_join(self, expr, target_var):
        """
        Xử lý JOIN, tạo triple SPARQL.
        """
        triples = []
        if not isinstance(expr, list):
            return expr, triples

        if expr[0] == "AND":
            # Xử lý từng JOIN trong AND riêng lẻ
            for sub_expr in expr[1:]:
                _, sub_triples = self.process_join(sub_expr, target_var)
                triples.extend(sub_triples)
            return target_var, triples

        if expr[0] == "JOIN":
            right_expr = expr[2]
            right_triples = []
            if isinstance(right_expr, list) and right_expr[0] == "JOIN":
                right_var, right_triples = self.process_join(right_expr, self.get_new_var())
            else: 
                right_var = right_expr
            
            # Xử lý nhánh trái
            left_expr = expr[1]
            if isinstance(left_expr, list) and left_expr[0] == "R":
                rel = left_expr[1]
                right = right_var
                left = target_var
                if right[0] != '?':
                    right = "wd:" + right
                if left[0] !='?':
                    left = "wd:" + left   
                triples.append([right, f"wdt:{rel}", left])
            else:
                right = right_var
                left = target_var
                if right[0] != '?':
                    right = "wd:" + right
                if left[0] !='?':
                    left = "wd:" + left   
                triples.append([left, f"wdt:{left_expr}", right])

            # Thêm các triples từ nhánh phải trước khi thêm triple chính
            triples = right_triples + triples
            return target_var, triples

        return expr, triples

    def s_expr_to_sparql(self, s_expr):
        """Chuyển đổi từ S-Expression sang SPARQL."""
        parsed_expr = self.parse_s_expr(s_expr)
        target_var = "?answer"
        final_var, triples = self.process_join(parsed_expr, target_var)

        sparql_body = "\n  ".join([" ".join(t) + " ." for t in triples])
        sparql_query = f"""PREFIX wd: <http://www.wikidata.org/entity/> 
PREFIX wdt: <http://www.wikidata.org/prop/direct/> 
SELECT DISTINCT {target_var} WHERE {{ 
  {sparql_body}
}}"""
        return sparql_query


def execute_query_with_odbc(sparql_query):
    """
    Thực thi truy vấn SPARQL trên Wikidata endpoint và trả về kết quả.
    """
    ENDPOINT_URL = "https://query.wikidata.org/sparql"
    if sparql_query == None or sparql_query == []:
        return []
    sparql = SPARQLWrapper(ENDPOINT_URL)
    sparql.setQuery(str(sparql_query))  # Ép kiểu về str cho chắc chắn
    sparql.setReturnFormat(JSON)
    
    try:
        response = sparql.query().convert()
        answers = [item["answer"]["value"] for item in response["results"]["bindings"] if "answer" in item]
        return answers
    except Exception as e:
        return []



In [7]:

def load_jsonl(file_path):
    """Đọc file JSONL và trả về danh sách các object"""


    with open(file_path, 'r', encoding='utf-8') as f:
        content = f.read().replace('(EXPECTED RESULT)', 'null').replace('(QUESTION)', 'null')

    try:
        data = ujson.loads(content)
        print("JSON loaded successfully!")
    except Exception as e:
        print(f"Error: {e}")
    return data


In [8]:
predictions = load_jsonl("LLMs/beam_prediction/generated_predictions_beam.json")
gold_data = load_jsonl("Data/LC-QuAD2.0/label_map/LC-QuAD2.0_test.json")


JSON loaded successfully!
JSON loaded successfully!


In [9]:
simcse_model = model  # Mô hình SimCSE của bạn
facc1_index = surface_index

In [21]:
gold_entity_map = {}
gold_relation_map = {}

In [22]:
query = "( AND ( JOIN [ occupation ] [ singer ] ) ( JOIN ( R [ voice actor ] ) [ South Park ] ) )"

In [23]:
set_query = convert_normed_to_s_expression(query, gold_relation_map, gold_entity_map, simcse_model, facc1_index)
set_query

occupation
voice actor
singer
South Park
Lỗi khi truy xuất FACC1 cho label 'South Park': list index out of range


['( AND ( JOIN occupation singer ) ( JOIN ( R voice actor ) South Park ) )']