In [1]:
# import json
# with open("static/DATA.json", mode="r", encoding="utf-8") as f:
#     DATA = json.load(f)
# ldg_names = [e["name"] for e in DATA["DVC_TTHC_LamDong"]["data"]]

In [2]:
_TEST_PASSAGES = ["Thủ tục thành lập công ty tư nhân", "Thủ tục đăng ký kết hôn", "Thủ tục chuyển nhượng quyền sử dụng đất", "Thủ tục đấu thầu đất xây dựng", "Thủ tục cấp lại lý lịch tư pháp", "Thủ tục chuyển trường cho học sinh trung học phổ thông", "Thủ tục chuyển trường cho học sinh trung học cơ sở", "Thủ tục chuyển trường cho học sinh tiểu học", "Thủ tục đăng ký lại kết hôn", "Thủ tục đăng ký kết hôn có yếu tố nước ngoài", "Thủ tục làm giấy khai sinh", "Thủ tục thành lập công ty trách nhiệm hữu hạn 1 thành viên", "Thủ tục thành lập công ty trách nhiệm hữu hạn 2 thành viên trở lên", "Thủ tục tố cáo tại cấp xã", "Thủ tục tố cáo tại cấp tỉnh"]
_TEST_QUERIES = ["Cháu muốn chuyển trường cấp 3 thì cần phải làm gì?", "Tôi muốn mở công ty thì thủ tục gì?"]

-----

In [None]:
from huggingface_hub import hf_hub_download as HF_Download
from tokenizers import Tokenizer as STL_Tokenizer
from rank_bm25 import BM25Okapi as BM25_Retriever
import onnxruntime as ort
import numpy as np
import json
import os
os.makedirs("_hyse", exist_ok=True)

def dict2json(dict, jsonpath):
    try:
        with open(jsonpath, "w", encoding="utf-8") as f:
            json.dump(dict, f, ensure_ascii=False, indent=4)
    except Exception as er:
        print(f"⚠️ dict2json > Error: {er}")

def json2dict(jsonpath):
    dict = {}
    try:
        with open(jsonpath, "r", encoding="utf-8") as f:
            dict = json.load(f)
    except Exception as er:
        print(f"⚠️ json2dict > Error: {er}")
    return dict

class SentenceTransformerLite:
    # Init: model_path -> model + tokenizer
    def __init__(self, model_path="onelevelstudio/ML-E5-0.3B"):
        try:
            # Model (ONNX)
            try: HF_Download(repo_id=model_path, filename="onnx/model.onnx_data")
            except: pass
            STL_model = ort.InferenceSession(HF_Download(repo_id=model_path, filename="onnx/model.onnx"))
            # Tokenizer
            STL_tokenizer = STL_Tokenizer.from_pretrained(model_path)
            STL_tokenizer.enable_padding(pad_id=1, pad_token="<pad>")
            STL_tokenizer.enable_truncation(max_length=512)
        except Exception as er:
            raise ValueError(f"⚠️ > SentenceTransformerLite > init > Error: {er}")
        # Return
        self.STL_model = STL_model
        self.STL_tokenizer = STL_tokenizer
    # Encode: Text(s) -> Embedding(s)
    def encode(self, inputtexts):
        # Ensure inputtexts is a list of strings
        if isinstance(inputtexts, list) and all(isinstance(e, str) for e in inputtexts):
            if len(inputtexts) == 0:
                raise ValueError(f"⚠️ > SentenceTransformerLite > encode > inputtexts = empty list []")
        elif isinstance(inputtexts, str):
            inputtexts = [inputtexts]
        else:
            raise ValueError(f"⚠️ > SentenceTransformerLite > encode > inputtexts != string or list of strings")
        # Tokenize
        inputs = self.STL_tokenizer.encode_batch(inputtexts, is_pretokenized=False)
        inputs_ids = np.array([e.ids for e in inputs], dtype=np.int64)
        inputs_msk = np.array([e.attention_mask for e in inputs], dtype=np.int64)
        # Encoding
        embeddings = self.STL_model.run(None, {"input_ids": inputs_ids, "attention_mask": inputs_msk})[0]                                             # Encode
        embeddings = np.sum(embeddings * np.expand_dims(inputs_msk, axis=-1), axis=1) / np.maximum(np.sum(inputs_msk, axis=1, keepdims=True), 1e-9)   # Pooling
        embeddings = embeddings / np.maximum(np.linalg.norm(embeddings, axis=1, keepdims=True), 1e-9)                                                 # Normalize
        # Return
        return embeddings

In [5]:
class EngineSemantic:
    # ----- Example -----
    # engine_semantic = EngineSemantic()
    # engine_semantic.update(_TEST_PASSAGES)
    # engine_semantic.search(_TEST_QUERIES)
    # -------------------
    def __init__(self, name="hyse1_sem1", modelpath="onelevelstudio/ML-E5-0.3B"):
        self.name = name
        self.modelpath = modelpath
        self.savepath_docs = f"_hyse/{name}.json"
        self.savepath_embs = f"_hyse/{name}.npy"
        self.model = SentenceTransformerLite(modelpath)
        # ----------
        self.docs = []
        self.embs = []
        if os.path.exists(self.savepath_docs) and os.path.exists(self.savepath_embs):
            self.docs = json2dict(self.savepath_docs)["docs"]  # 📤 Read file as docs
            self.embs = np.load(self.savepath_embs)            # 📤 Read file as embs
    def update(self, new_docs):
        if self.docs == new_docs:
            pass
        else:
            self.docs = new_docs
            self.embs = self.model.encode(self.docs)
            dict2json({"docs": self.docs}, self.savepath_docs) # 📥 Save docs as file
            np.save(self.savepath_embs, self.embs)             # 📥 Save embs as file
    def search(self, new_queries, top=5):
        embs_queries = self.model.encode(new_queries)
        # -----
        similarities = embs_queries @ self.embs.T
        best_matching_idxs = [[idx for idx, _ in sorted(enumerate(sim), key=lambda x: x[1], reverse=True)][:min(top, len(self.docs))] for sim in similarities]
        best_matching_docs = [[self.docs[idx] for idx in e] for e in best_matching_idxs]
        best_matching_similarities = [[similarities[i][idx] for idx in idxs] for i, idxs in enumerate(best_matching_idxs)]
        # -----
        return [[{"index": ee[0], "doc": ee[1], "score": round(float(ee[2]), 3)} for ee in zip(e[0], e[1], e[2])] for e in zip(best_matching_idxs, best_matching_docs, best_matching_similarities)]