In [None]:
import os
import faiss
import torch
import logging
from typing import List
from typing import Any, Dict
from sentence_transformers import util
from transformers import AutoTokenizer, AutoModel
from sentence_transformers import SentenceTransformer
from RAGLibrary import Widgets, Define
from RAGLibrary import myRAG, checkConstruct, createSchema, faissConvert, embedding

In [None]:
widgets_list = Widgets.create_name_form()

In [None]:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
os.environ["HF_HUB_DISABLE_SYMLINKS_WARNING"] = "1"
force_download = True

In [None]:
config = Define.WidgetValues(widgets_list)

dcmt_path = config["dcmt_path"]
base_folder = config["base_folder"]
base_path = config["base_path"]
chunks_base = config["chunks_base"]
json_file_path = config["json_file_path"]
schema_ex_path = config["schema_ex_path"]
embedding_path = config["embedding_path"]
torch_path = config["torch_path"]
faiss_path = config["faiss_path"]
mapping_path = config["mapping_path"]
mapping_data = config["mapping_data"]

FILE_TYPE = config["FILE_TYPE"]
DATA_KEY = config["DATA_KEY"]
EMBE_KEY = config["EMBE_KEY"]
SWITCH = config["SWITCH"]
EMBEDD_MODEL = config["EMBEDD_MODEL"]
SEARCH_EGINE = config["SEARCH_EGINE"]
RERANK_MODEL = config["RERANK_MODEL"]
RESPON_MODEL = config["RESPON_MODEL"]
MERGE = config["MERGE"]
API_KEY = config["API_KEY"]

WORD_LIMIT = config["WORD_LIMIT"]
LEVEL_INPUT = config["LEVEL_INPUT"]
LEVEL_VALUES = config["LEVEL_VALUES"]

SEARCH_ENGINE = faiss.IndexFlatIP

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

if (SWITCH == "Auto Model"):
    try:
        tokenizer = AutoTokenizer.from_pretrained(EMBEDD_MODEL, force_download=force_download)
        model = AutoModel.from_pretrained(EMBEDD_MODEL, force_download=force_download)
        model = model.to(device)
        print("Model and tokenizer loaded successfully")
    except Exception as e:
        raise
elif (SWITCH == "Sentence Transformer"):
    try:
        # model = SentenceTransformer(EMBEDD_MODEL).to(device)
        model = SentenceTransformer("../../cached_model")
        print("SentenceTransformer loaded successfully")
    except Exception as e:
        raise

print(f"Using: {device}")

In [None]:
""" PREPROCESS TEXT """

def preprocess_text(text):
    import re
    if isinstance(text, list):
        return [preprocess_text(t) for t in text]
    if isinstance(text, str):
        text = text.strip()
        text = re.sub(r'[^\w\s\(\)\.\,\;\:\-–]', '', text)
        text = re.sub(r'[ ]{2,}', ' ', text)
        return text
    return text

In [None]:
""" CREATE EMBEDDING """

def create_embedding(texts, batch_size=32):
    try:
        embeddings = model.encode(texts, batch_size=batch_size, convert_to_tensor=True, device=device)
        return embeddings
    except RuntimeError as e:
        if "CUDA out of memory" in str(e):
            print("VRAM overflow. Switching to CPU.")
            model.to("cpu")
            return model.encode(texts, batch_size=batch_size, convert_to_tensor=True, device="cpu")
        raise e

In [None]:
def LoadEmbedding(embedding_path: str, device, DATA_KEY: str = "content", EMBE_KEY: str = "data_embeddings", field_keys: List[str] = ["Câu hỏi", "Câu trả lời", "Câu hỏi Embedding"]) -> Dict[str, Any]:
    result = {}
    print(f"\nĐang tải embedding từ {embedding_path}\n")
    try:
        data = torch.load(embedding_path, map_location="cpu", weights_only=False)
        
        print(f"Các key có sẵn: {list(data.keys())}")

        content = []
        if isinstance(data, dict) and DATA_KEY in data:
            content = data[DATA_KEY]
            print(f"Số mục trong '{DATA_KEY}': {len(content)}")
        else:
            print(f"Lỗi: File .pt không có key '{DATA_KEY}' hoặc không đúng định dạng.")
        
        if not content:
            print("Lỗi: File trống.")
        else:
            for key in field_keys:
                data_list = [item[key] for item in content if key in item]
                if data_list:
                    if key.lower().find("embedding") != -1 and isinstance(data_list[0], (list, torch.Tensor)):
                        result[key] = torch.tensor(data_list, dtype=torch.float32).to(device)
                        print(f"Đã tải '{key}' với kích thước: {result[key].shape}")
                    else:
                        result[key] = data_list
                        print(f"Đã tải '{key}' với số mục: {len(data_list)}")
                else:
                    print(f"Cảnh báo: Không tìm thấy '{key}' trong '{DATA_KEY}'.")
                
                if key not in result and key.lower().find("embedding") != -1 and EMBE_KEY in data:
                    embed_data = data[EMBE_KEY]
                    if isinstance(embed_data, (list, torch.Tensor)) and len(embed_data) > 0:
                        result[key] = torch.tensor(embed_data, dtype=torch.float32).to(device)
                        print(f"Đã tải '{key}' từ '{EMBE_KEY}' với kích thước: {result[key].shape}")
        
        for key in field_keys:
            if key in result:
                if isinstance(result[key], torch.Tensor):
                    print(f"Số '{key}': {result[key].shape[0]}")
                else:
                    print(f"Số '{key}': {len(result[key])}")
            else:
                print(f"Lỗi: Không tải được '{key}'.")
        
    except (KeyError, ValueError, RuntimeError) as e:
        print(f"Lỗi khi tải embedding: {e}")
    
    return result

In [None]:
qa_questions = []
qa_answers = []
qa_question_embeddings = None
required_fields = ["Câu hỏi", "Câu trả lời", "Câu hỏi Embedding"]

data = {}
if os.path.exists(embedding_path):
    data = LoadEmbedding(
        embedding_path=embedding_path,
        device=device,
        DATA_KEY=DATA_KEY,
        EMBE_KEY=EMBE_KEY,
        field_keys=required_fields
    )

if data:
    print("\nDữ liệu trả về:")
    for key in required_fields:
        if key in data:
            if isinstance(data[key], torch.Tensor):
                print(f"{key}: Tensor với kích thước {data[key].shape}")
            else:
                print(f"{key}: {len(data[key])} mục")
        else:
            print(f"Lỗi: Không tìm thấy '{key}' trong dữ liệu trả về.")

    qa_questions = data.get("Câu hỏi", [])
    qa_answers = data.get("Câu trả lời", [])
    qa_question_embeddings = data.get("Câu hỏi Embedding")
   

In [None]:
question_cache = {}
def find_best_answer(user_question):
    user_question = preprocess_text(user_question)
    if user_question in question_cache:
        user_embedding = question_cache[user_question]
    else:
        user_embedding = create_embedding([user_question])[0].to(device)
        question_cache[user_question] = user_embedding

    similarities = util.pytorch_cos_sim(user_embedding, qa_question_embeddings)[0]
    torch.cuda.empty_cache()
    threshold = max(0.7, similarities.max().item() * 0.9)
    matched_indices = torch.where(similarities >= threshold)[0]
    
    if len(matched_indices) > 0:
        responses = sorted(
            [(qa_answers[idx.item()], similarities[idx].item()) for idx in matched_indices],
            key=lambda x: x[1], reverse=True
        )[:5]
        return responses
    return []

In [None]:
print("<< Enter 'exit', 'quit', 'escape', 'bye' or Press ESC to exit >>")
print("Chatbot: Hello there! I'm here to help you =))")
while True:
    try:
        user_input = input("You: ")
        if user_input.strip().lower() in ["exit", "quit", "escape", "bye", ""]:
            print("Chatbot: Goodbye!")
            break

        responses = find_best_answer(user_input)

        print(f"You: {user_input.strip()}")
        if responses:
            print("Chatbot:")
            for i, (response, score) in enumerate(responses, 1):
                print(f"{i}. [{score:.4f}] {response}")
        else:
            print("Chatbot: Sorry I don't know the answer to that question =))")

    except KeyboardInterrupt:
        print("\nChatbot: Goodbye!")
        break