In [None]:
from sentence_transformers import SentenceTransformer
from underthesea import word_tokenize
import unicodedata
import numpy as np
import time
import csv
import os
import re

from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.service import Service as Selenium_Service
from selenium.webdriver.chrome.options import Options as Selenium_Options
from selenium.webdriver import Chrome as Selenium_Chrome
chrome_options = Selenium_Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
selenium_service = Selenium_Service(ChromeDriverManager().install())
selenium_driver = Selenium_Chrome(service=selenium_service, options=chrome_options) # Start WebDriver
# sele_driver.quit() # Close the browser

def normalize_text(text):
    text = text.replace('đ', 'd').replace('Đ', 'D')
    text = unicodedata.normalize('NFKD', text).encode('ascii', 'ignore').decode('utf-8')
    text = text.lower().strip()
    return text
def extract_keywords(text):
    def is_number(text):
        return bool(re.fullmatch(r'[\d,. ]+', text))
    specialchars = ['!', '"', '#', '$', '%', '&', "'", '(', ')', '*', '+', ',', '-', '.', '..', '...', '/', ':', ';', '<', '=', '>', '?', '@', '[', '\\', ']', '^', '_', '`', '{', '|', '}', '~']
    specialwords = ["giấy tờ", "thủ tục", "giấy", "gì", "cần", "nào", "sắp", "đang", "sẽ", "của", "bị", "hoặc", "với", "và", "thì", "muốn", "gì", "mình", "tôi", "phải", "làm sao", "để", "cho", "làm", "như", "đối với", "từ", "theo", "là", "được", "ở", "đã", "về", "có", "các", "tại", "đến", "vào", "do", "vì", "bởi vì", "thuộc"]
    words = word_tokenize(text)
    words = [w.lower().strip() for w in words]
    words = list(set(words))
    words = [w for w in words if not is_number(w)]
    words = [w for w in words if w not in specialchars]
    words = [w for w in words if w not in specialwords]
    words_normalized = [normalize_text(w) for w in words if " " in w]
    return list(set(words + words_normalized))

# Pre-process

In [None]:
# # ---------- [Optional] Re-raw2cache ----------
# def raw2cache(raw_path, cache_path):
#     # Step 1
#     with open(raw_path, 'r', encoding='utf-8') as f1:
#         lines = f1.readlines()
#         lines = [line.strip() for line in lines if line.strip()]
#         items = [lines[i:i+6] for i in range(0, len(lines), 6)]
#         headers = ["STT", "Mã chuẩn", "Tên thủ tục", "Lĩnh vực", "Cơ quan thực hiện", "Mức độ"]
#         with open(cache_path, 'w', newline='', encoding='utf-8') as f2:
#             writer = csv.writer(f2)
#             writer.writerow(headers)
#             writer.writerows(items)
#     # Step 2
#     with open(cache_path, mode='r', newline='', encoding='utf-8') as f:
#         thutucs = list(csv.DictReader(f))
#         thutucs = sorted(thutucs, key=lambda e: len(e["Tên thủ tục"]))
#         for e in thutucs:
#             e["thutuc_Link"] = "NOINFO"
#             e["thutuc_Trình tự thực hiện"] = "NOINFO"
#             e["thutuc_Cách thức thực hiện"] = "NOINFO"
#             e["thutuc_Thành phần hồ sơ"] = "NOINFO"
#             e["thutuc_Thời gian giải quyết"] = "NOINFO"
#             e["thutuc_Đối tượng thực hiện"] = "NOINFO"
#             e["thutuc_Cơ quan thực hiện"] = "NOINFO"
#             e["thutuc_Kết quả"] = "NOINFO"
#             e["thutuc_Phí, lệ phí"] = "NOINFO"
#             e["thutuc_Tên mẫu đơn, tờ khai"] = "NOINFO"
#             e["thutuc_Yêu cầu, điều kiện"] = "NOINFO"
#             e["thutuc_Căn cứ pháp lý"] = "NOINFO"
#     with open(cache_path, mode='w', newline='', encoding='utf-8') as f:
#         fieldnames = thutucs[0].keys()
#         writer = csv.DictWriter(f, fieldnames=fieldnames)
#         writer.writeheader()
#         writer.writerows(thutucs)
# raw2cache("url/raw", "url/cache")

In [None]:
# ---------- Load thutucs from cache ----------
with open('url/cache', mode='r', newline='', encoding='utf-8') as f:
    thutucs = list(csv.DictReader(f))
    for i in range(len(thutucs)):
        thutucs[i]["keywords"] = extract_keywords(thutucs[i]["Tên thủ tục"])
    tenthutucs = [e["Tên thủ tục"] for e in thutucs]

In [None]:
model_e5 = SentenceTransformer("onelevelstudio/M-E5-BASE")
model_mpnet = SentenceTransformer("onelevelstudio/M-MPNET-BASE")

# # ---------- [Optional] Re-vectorize embeddings ----------
# embs_e5    = model_e5.encode(tenthutucs)
# np.save("url/embs_e5", embs_e5)
# os.rename("url/embs_e5.npy", "url/embs_e5")
# embs_mpnet = model_mpnet.encode(tenthutucs)
# np.save("url/embs_mpnet", embs_mpnet)
# os.rename("url/embs_mpnet.npy", "url/embs_mpnet")

# ---------- Load pre-vectorized embeddings ----------
embs_e5 = np.load("url/embs_e5")
embs_mpnet = np.load("url/embs_mpnet")

# Pre-Process 2

In [None]:
# ---------- [Optional] Re-scrape content ----------

patterns = {
    "Trình tự thực hiện": r""">Trình tự thực hiện</td>\s*<td[^>]*>(.*?)</td>""", 
    "Cách thức thực hiện": r""">Cách thức thực hiện</td>\s*<td[^>]*>(.*?)</td>""", 
    "Thành phần hồ sơ": r""">Thành phần hồ sơ</td>\s*<td[^>]*>(.*?)</td>""", 
    "Thời gian giải quyết": r""">Thời gian giải quyết</td>\s*<td[^>]*>(.*?)</td>""", 
    "Đối tượng thực hiện": r""">Đối tượng thực hiện</td>\s*<td[^>]*>(.*?)</td>""", 
    "Cơ quan thực hiện": r""">Cơ quan thực hiện</td>\s*<td[^>]*>(.*?)</td>""", 
    "Kết quả": r""">Kết quả</td>\s*<td[^>]*>(.*?)</td>""", 
    "Phí, lệ phí": r""">Phí, lệ phí</td>\s*<td[^>]*>(.*?)</td>""", 
    "Tên mẫu đơn, tờ khai": r""">Tên mẫu đơn, tờ khai</td>\s*<td[^>]*>(.*?)</td>""", 
    "Yêu cầu, điều kiện": r""">Yêu cầu, điều kiện</td>\s*<td[^>]*>(.*?)</td>""", 
    "Căn cứ pháp lý": r""">Căn cứ pháp lý</td>\s*<td[^>]*>(.*?)</td>""", 
}

def get_thutuc_content_from_machuan(machuan):

    final_res = {}

    def scrape_html_content(url, waiting_time=2):
        selenium_driver.get(url)
        time.sleep(waiting_time)
        html_content = selenium_driver.page_source
        return html_content

    try:
        url_search = "https://dichvucong.lamdong.gov.vn/vi/procedure/search?keyword=" + machuan
        search_html_content = scrape_html_content(url_search)
        match = re.search(r'href="(/vi/procedure/detail/[a-zA-Z0-9]+)"', search_html_content)
        if match:
            url_thutuc_content = "https://dichvucong.lamdong.gov.vn" + match.group(1)
            final_res["thutuc_Link"] = url_thutuc_content
            # ----------------------------------------------------------------------------------------------------
            thutuc_html_content = scrape_html_content(url_thutuc_content)
            for i in range(len(patterns)):
                pattern_name = list(patterns.keys())[i]
                pattern_regex = list(patterns.values())[i]
                match = re.search(pattern_regex, thutuc_html_content, re.DOTALL)
                if match:
                    extracted_text = match.group(1)
                    extracted_text = re.sub(r'</?[^>]+>', '\n', extracted_text) # Replace HTML tags with "\n"
                    extracted_text = re.sub(r'\n+', '\n', extracted_text)       # Merge multiple "\n"
                    extracted_text = extracted_text.replace("&nbsp;", " ")      # Replace &nbsp; with " "
                    extracted_text = extracted_text.strip()
                    extracted_text = extracted_text if extracted_text != "" else f"Không có thông tin {pattern_name.lower()}."
                    final_res[f"thutuc_{pattern_name}"] = f"{extracted_text}"
    except Exception as e:
        print(f"⚠️ > Error: {e}")
    return final_res

with open('url/cache', mode='r', newline='', encoding='utf-8') as f:
    thutucs_crawl = list(csv.DictReader(f))
with open('url_backup/cache', mode='r', newline='', encoding='utf-8') as f:
    thutucs_crawl_backup = list(csv.DictReader(f))

for i in range(len(thutucs_crawl)):
    machuan = thutucs_crawl[i]["Mã chuẩn"]
    if thutucs_crawl[i]["thutuc_Link"] == "NOINFO":
        
        # ----------
        thutuc_content = {}
        for ee in thutucs_crawl_backup:
            if machuan == ee["Mã chuẩn"]:
                thutuc_content[f"thutuc_Link"] = ee[f"thutuc_Link"]
                for pattern_name in list(patterns.keys()):
                    thutuc_content[f"thutuc_{pattern_name}"] = ee[f"thutuc_{pattern_name}"]
                break
        if len(thutuc_content) != len(patterns)+1:
            thutuc_content = get_thutuc_content_from_machuan(machuan)
            time.sleep(2)
        # ----------

        if len(thutuc_content) == len(patterns)+1:
            print("✔️", end="")
            # ----------
            thutucs_crawl[i][f"thutuc_Link"] = thutuc_content[f"thutuc_Link"]
            for pattern_name in list(patterns.keys()):
                thutucs_crawl[i][f"thutuc_{pattern_name}"] = thutuc_content[f"thutuc_{pattern_name}"]
            # ----------
            with open("url/cache", mode='w', newline='', encoding='utf-8') as f:
                fieldnames = thutucs_crawl[0].keys()
                writer = csv.DictWriter(f, fieldnames=fieldnames)
                writer.writeheader()
                writer.writerows(thutucs_crawl)
        else:
            print("❌", end="")
    else:
        print("⏩", end="")
    if i % 50 == 49:
        print()

# Main Process

In [None]:
test_questions = [
"mình sắp khởi nghiệp cần giấy tờ gì?",
"tôi muốn mua đất thì cần làm gì?",
"tôi sắp cưới vợ thì phải làm như nào?",
"tôi sắp lập gia đình thì cần làm gì?",
"vợ tôi sắp sinh con thủ tục nào?",
"thủ tục xây nhà cấp 3 cấp 4?",
"làm sao để phúc khảo bài thi thpt?",
"làm sao để tố cáo hành vi vi phạm pháp luật?",
"tôi muốn ly dị chồng, tôi phải làm gì?",
"tôi muốn ly hôn chồng, tôi phải làm gì?",
"tôi muốn chuyển hộ khẩu cho con của tôi",
"làm sao để đăng ký nhập học cho con của tôi?",
"dang ky ket hon",
"đăng ký kết hôn",
"đăng ký",
"tố cáo xã",
]

In [None]:
def retrieve_idx_exactmatch(text, thutucs, top=5):
    thutuc_scores = []
    for e in thutucs:
        score = 0
        # ----------
        if normalize_text(text) in normalize_text(e["Tên thủ tục"]):
            score += 1
            if text.lower() in e["Tên thủ tục"].lower():
                score += 1
        # ----------
        thutuc_scores.append(score)
    thutuc_scores_idx = sorted(range(len(thutuc_scores)), key=lambda i: thutuc_scores[i], reverse=True)
    scores = [(idx, thutuc_scores[idx]) for idx in thutuc_scores_idx]
    scores = [e for e in scores if e[1] != 0]
    return [e[0] for e in scores][:min(len(scores), top)]

def retrieve_idx_keywordmatch(text, thutucs, top=5):
    q_keywords = extract_keywords(text)
    # print(q_keywords)
    thutuc_scores = []
    for e in thutucs:
        score = 0
        # ----------
        for k in q_keywords:
            if k in e["keywords"]:
                score += 1
        # ----------
        thutuc_scores.append(score)
    thutuc_scores_idx = sorted(range(len(thutuc_scores)), key=lambda i: thutuc_scores[i], reverse=True)
    scores = [(idx, thutuc_scores[idx]) for idx in thutuc_scores_idx]
    scores = [e for e in scores if e[1] != 0]
    return [e[0] for e in scores][:min(len(scores), top)]

def retrieve_idx_semantic(text, pre_embs, emb_model, top=5):
    q_emb = emb_model.encode(text)
    similarities = emb_model.similarity(q_emb, pre_embs)[0]
    top_5_idx = sorted(range(len(similarities)), key=lambda i: similarities[i], reverse=True)[:top]
    return top_5_idx

def rank_thutucsidx(thutucs_idx):
    count_dict = {}
    for idx in thutucs_idx:
        count_dict[idx] = count_dict.get(idx, 0) + 1
    sorted_counts = sorted(count_dict.items(), key=lambda x: x[1], reverse=True)
    return [{"thutuc_idx": e[0], "count": e[1]} for e in sorted_counts]

for text in test_questions:
    print("-"*100)
    print(f"> {text}")
    res_exactmatch_idx = retrieve_idx_exactmatch(text=text, thutucs=thutucs, top=3)
    res_keywordmatch_idx = retrieve_idx_keywordmatch(text=text, thutucs=thutucs, top=3)
    res_semantic_idx_1 = retrieve_idx_semantic(text=text, pre_embs=embs_e5, emb_model=model_e5, top=3)
    res_semantic_idx_2 = retrieve_idx_semantic(text=text, pre_embs=embs_mpnet, emb_model=model_mpnet, top=3)
    
    thutucs_idx = res_exactmatch_idx + res_keywordmatch_idx + res_semantic_idx_1 + res_semantic_idx_2

    rank_thutucs_idx = rank_thutucsidx(thutucs_idx)

    for e in rank_thutucs_idx:
        print(f"{thutucs[e['thutuc_idx']]['Tên thủ tục']} (score={e['count']})")