# Install Packages

In [1]:
!pip install -q transformers datasets accelerate peft bitsandbytes
!pip install -q torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
!pip install sentence-transformers faiss-cpu
!pip install python-docx PyMuPDF!
!pip install PyPDF2
!pip install fastapi uvicorn nest-asyncio pyngrok
!pip install sqlalchemy pyodbc

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.0/67.0 MB[0m [31m21.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m363.4/363.4 MB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.8/13.8 MB[0m [31m95.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.6/24.6 MB[0m [31m89.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m883.7/883.7 kB[0m [31m46.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m664.8/664.8 MB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m211.5/211.5 MB[0m [31m11.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.3/56.3 MB[0m [31m42.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

# Connect Drive

In [2]:
import os
import zipfile
from google.colab import drive

# Bước 1: Mount Google Drive
drive.mount('/content/drive')

# #Load RAG Model
# zip_path = '/content/drive/My Drive/iso9001-rag-model.zip'
# extract_dir = '/content/iso9001-rag-model'

# if not os.path.exists(extract_dir):
#     print(f"Extracting zip file to: {extract_dir}")
#     with zipfile.ZipFile(zip_path, 'r') as zip_ref:
#         zip_ref.extractall(extract_dir)
#     print("Extraction completed.")
# else:
#     print(f"Directory already exists: {extract_dir}. Skipping extraction.")

Mounted at /content/drive


In [3]:
#Load Finetuned Model
zip_path = '/content/drive/My Drive/iso9001-fine-tuned-model.zip'
extract_dir = '/content/iso9001-fine-tuned-model'

if not os.path.exists(extract_dir):
    print(f"Extracting zip file to: {extract_dir}")
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_dir)
    print("Extraction completed.")
else:
    print(f"Directory already exists: {extract_dir}. Skipping extraction.")


Extracting zip file to: /content/iso9001-fine-tuned-model
Extraction completed.


In [4]:
# !unzip /content/iso9001-finetuned-model.zip -d /content/iso9001-finetuned-model

# Import Libraries

In [5]:
import os
import json
import torch
from transformers import (
    AutoTokenizer,
    AutoModelForCausalLM,
    TrainingArguments,
    Trainer,
    DataCollatorForLanguageModeling,
    BitsAndBytesConfig
)
from peft import LoraConfig, get_peft_model, TaskType, prepare_model_for_kbit_training
import logging
from typing import List, Dict, Optional, Tuple, IO
from fastapi import FastAPI, File, UploadFile
import numpy as np
import gc
import io
import PyPDF2
# import fitz
# import docx
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
# from iso_chatbot import ISO9001ChatBot
from fastapi.middleware.cors import CORSMiddleware
import os
import sys
import asyncio

In [6]:
from sentence_transformers import SentenceTransformer, util

In [7]:
import re

In [8]:
import random

In [9]:
from google.colab import userdata
import os

os.environ["NGROK_AUTH_TOKEN"] = userdata.get("NGROK_AUTH_TOKEN")

# Class RAG

In [10]:
from dataclasses import dataclass, field
from typing import List

@dataclass
class ISO9001Requirement:
    id: str
    clause: str
    title: str
    criterion: str
    example: str
    keyword: List[str] = field(default_factory=list)

In [11]:
from typing import List, Dict, Optional
import logging

logger = logging.getLogger(__name__)

class ISO9001KnowledgeBase:
    """ISO 9001:2015 knowledge base with key requirements loaded from JSON"""

    def __init__(self, json_path: str = None):
        if json_path and os.path.exists("/content/drive/My Drive/rag_data.json"):
            self.requirements = self._load_iso_requirements("/content/drive/My Drive/rag_data.json")
            print(f"✅ Loaded {len(self.requirements)} requirements from \"/content/drive/My Drive/rag_data.json\"")
        else:
            logger.warning("JSON file not found, using sample requirements")

    def _load_iso_requirements(self, path: str) -> List[ISO9001Requirement]:
        """Load ISO 9001:2015 requirements from a JSON file"""
        try:
            with open(path, 'r', encoding='utf-8') as file:
                data = json.load(file)

                # Handle both list and single object formats
                if isinstance(data, dict):
                    # If it's a single object, wrap it in a list
                    data = [data]
                elif not isinstance(data, list):
                    raise ValueError("JSON data must be a list or a single object")

                requirements = []
                for item in data:
                    if not isinstance(item, dict):
                        logger.warning(f"Skipping invalid item: {item}")
                        continue

                    # Validate required fields
                    required_fields = ['id', 'clause', 'title', 'criterion', 'example', 'keyword']
                    if not all(field in item for field in required_fields):
                        logger.warning(f"Skipping item missing required fields: {item}")
                        continue

                    # Ensure keyword' are lists
                    item['keyword'] = item.get('keyword', [])
                    if not isinstance(item['keyword'], list):
                        item['keyword'] = [item['keyword']]

                    # Create requirement object
                    req = ISO9001Requirement(
                        id=item['id'],
                        clause=item['clause'],
                        title=item['title'],
                        criterion=item['criterion'],
                        example=item['example'],
                        keyword=item['keyword']
                    )
                    requirements.append(req)

                return requirements

        except Exception as e:
            logger.error(f"Error loading requirements from {path}: {e}")
            return self._generate_sample_criteria_data(10)

    @staticmethod
    def _generate_sample_criteria_data(num_samples: int = 10) -> list:
        """Tạo danh sách tiêu chí mẫu theo định dạng ISO 9001:2015"""
        sample_titles = [
            "Phê duyệt tài liệu", "Kiểm soát thay đổi", "Xác định rủi ro", "Đào tạo và năng lực",
            "Đánh giá nội bộ", "Hành động khắc phục", "Xem xét của lãnh đạo", "Kiểm soát thông tin dạng văn bản",
            "Thỏa mãn yêu cầu khách hàng", "Lập kế hoạch chất lượng"
        ]

        sample_criteria = [
            "Tài liệu phải được phê duyệt về tính phù hợp trước khi ban hành.",
            "Mọi thay đổi phải được ghi lại và phê duyệt trước khi thực hiện.",
            "Rủi ro và cơ hội phải được xác định và giải quyết.",
            "Nhân sự phải được đào tạo và chứng minh năng lực.",
            "Phải thực hiện đánh giá nội bộ định kỳ.",
            "Cần có quy trình xử lý và ghi nhận hành động khắc phục.",
            "Lãnh đạo phải định kỳ xem xét hệ thống quản lý chất lượng.",
            "Thông tin dạng văn bản phải được kiểm soát chặt chẽ.",
            "Tổ chức phải đảm bảo yêu cầu khách hàng được hiểu rõ và đáp ứng.",
            "Phải thiết lập kế hoạch chất lượng cho các quá trình."
        ]

        sample_examples = [
            "Ví dụ: SOP có chữ ký xác nhận, ngày phê duyệt.",
            "Ví dụ: Biên bản cập nhật thay đổi, có xác nhận.",
            "Ví dụ: Danh sách rủi ro kèm hành động phòng ngừa.",
            "Ví dụ: Hồ sơ đào tạo, chứng chỉ ISO.",
            "Ví dụ: Báo cáo đánh giá nội bộ định kỳ.",
            "Ví dụ: Biên bản hành động khắc phục sự cố.",
            "Ví dụ: Biên bản họp lãnh đạo định kỳ.",
            "Ví dụ: Kiểm soát phiên bản tài liệu.",
            "Ví dụ: Hợp đồng và tài liệu yêu cầu khách hàng.",
            "Ví dụ: Kế hoạch kiểm tra chất lượng sản phẩm."
        ]

        sample_keywords = [
            ["approval", "release", "sop", "phê duyệt", "document control"],
            ["change", "revision", "update", "approval"],
            ["risk", "opportunity", "risk register"],
            ["training", "competence", "năng lực"],
            ["internal audit", "evaluation", "đánh giá"],
            ["correction", "action", "root cause", "khắc phục"],
            ["review", "leadership", "họp lãnh đạo"],
            ["version", "control", "document", "tài liệu"],
            ["requirement", "customer", "đáp ứng"],
            ["quality plan", "kế hoạch", "kiểm soát"]
        ]

        sample_data = []
        for i in range(num_samples):
            sample_data.append({
                "id": f"CRIT-{i+1:03d}",
                "clause": f"{random.randint(4, 10)}.{random.randint(1, 5)}",
                "title": sample_titles[i % len(sample_titles)],
                "criterion": sample_criteria[i % len(sample_criteria)],
                "example": sample_examples[i % len(sample_examples)],
                "keyword": sample_keywords[i % len(sample_keywords)]
            })

        return sample_data

    def get_clause_info(self, clause: str) -> Optional[ISO9001Requirement]:
        """Get information about a specific clause"""
        for req in self.requirements:
            if req.clause == clause:
                return req
        return None

In [12]:
class DocumentProcessor:
    """Process and extract text from various document formats"""

    @staticmethod
    def extract_text_from_pdf(file: IO) -> str:
        """Extract text from PDF file-like object"""
        try:
            pdf_reader = PyPDF2.PdfReader(file)
            text = ""
            for page in pdf_reader.pages:
                page_text = page.extract_text()
                if page_text:
                    text += page_text + "\n"
            return text
        except Exception as e:
            logger.error(f"Error extracting text from PDF: {e}")
            return ""

    @staticmethod
    def extract_text_from_docx(file_path: str) -> str:
        """Extract text from DOCX file"""
        try:
            doc = docx.Document(file_path)
            text = ""
            for paragraph in doc.paragraphs:
                text += paragraph.text + "\n"
            return text
        except Exception as e:
            logger.error(f"Error extracting text from DOCX: {e}")
            return ""

    @staticmethod
    def extract_text_from_txt(file_path: str) -> str:
        """Extract text from TXT file"""
        try:
            with open(file_path, 'r', encoding='utf-8') as file:
                return file.read()
        except Exception as e:
            logger.error(f"Error extracting text from TXT: {e}")
            return ""

In [13]:
from typing import List, Dict, Optional, Tuple
import logging
import faiss
import torch
import numpy as np
from sentence_transformers import SentenceTransformer

logger = logging.getLogger(__name__)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class VectorStore:
    """FAISS-based vector store for RAG with GPU optimization"""

    def __init__(self, embedding_model: str = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"):
        logger.info(f"Initializing embedding model: {embedding_model}")
        self.embedding_model = SentenceTransformer(embedding_model)

        # Move to GPU if available
        if torch.cuda.is_available():
            self.embedding_model = self.embedding_model.to(device)
            logger.info("Embedding model moved to GPU")

        self.index = None
        self.documents = []
        self.embeddings = []

    def add_documents(self, documents: List[str], batch_size: int = 32):
        """Add documents to the vector store with batching for GPU efficiency"""
        logger.info(f"Adding {len(documents)} documents to vector store")

        # Process in batches for memory efficiency
        all_embeddings = []
        for i in range(0, len(documents), batch_size):
            batch = documents[i:i+batch_size]
            batch_embeddings = self.embedding_model.encode(
                batch,
                convert_to_tensor=True,
                show_progress_bar=True
            )
            all_embeddings.append(batch_embeddings.cpu().numpy())

            # Clear GPU memory
            if torch.cuda.is_available():
                torch.cuda.empty_cache()

        # Concatenate all embeddings
        embeddings = np.vstack(all_embeddings)

        if self.index is None:
            # Initialize FAISS index
            dimension = embeddings.shape[1]
            self.index = faiss.IndexFlatIP(dimension)  # Inner product for cosine similarity
            logger.info(f"Initialized FAISS index with dimension {dimension}")

        # Normalize embeddings for cosine similarity
        faiss.normalize_L2(embeddings)

        # Add to index
        self.index.add(embeddings.astype('float32'))

        # Store documents and embeddings
        self.documents.extend(documents)
        self.embeddings.extend(embeddings)

    def search(self, query: str, k: int = 5) -> List[Tuple[str, float]]:
        """Search for similar documents"""
        if self.index is None:
            return []

        # Generate query embedding
        query_embedding = self.embedding_model.encode(
            [query],
            convert_to_tensor=True
        ).cpu().numpy()
        faiss.normalize_L2(query_embedding)

        # Search
        scores, indices = self.index.search(query_embedding.astype('float32'), k)

        # Return results
        results = []
        for score, idx in zip(scores[0], indices[0]):
            if idx < len(self.documents):
                results.append((self.documents[idx], float(score)))

        return results

In [14]:
class ISO9001ComplianceChecker:
    """Check document compliance against ISO 9001:2015 requirements"""

    def __init__(self, knowledge_base: ISO9001KnowledgeBase):
        self.kb = knowledge_base
        self.embedding_model = SentenceTransformer("sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2")

        # Move to GPU if available
        if torch.cuda.is_available():
            self.embedding_model = self.embedding_model.to(device)

    def check_compliance(self, document_text: str) -> dict:
        results = {
            "overall_score": 0.0,
            "clause_compliance": {},
            "missing_requirements": [],
            "recommendations": [],
            "found_evidence": {}
        }

        total_score = 0
        total_requirements = len(self.kb.requirements)

        for req in self.kb.requirements:
            clause_result = self._check_clause_compliance(document_text, req)
            results["clause_compliance"][req.clause] = clause_result
            total_score += clause_result["score"]

            if clause_result["score"] < 0.5:
                results["missing_requirements"].append({
                    "clause": req.clause,
                    "title": req.title,
                    "criterion": req.criterion
                })

            if clause_result["evidence"]:
                results["found_evidence"][req.clause] = clause_result["evidence"]

        results["overall_score"] = total_score / total_requirements if total_requirements > 0 else 0
        results["recommendations"] = self._generate_recommendations(results)

        return results

    def _check_clause_compliance(self, document_text: str, requirement: dict) -> dict:
        # 1. Tách văn bản thành câu
        sentences = re.split(r'(?<=[.!?])\s+', document_text.strip())

        # 2. Mã hóa requirement tiêu chí
        requirement_texts = [requirement.criterion]
        if requirement.example:
            requirement_texts.append(requirement.example)

        req_embeddings = self.embedding_model.encode(requirement_texts, convert_to_tensor=True)

        # 3. Mã hóa từng câu văn bản
        sentence_embeddings = self.embedding_model.encode(sentences, convert_to_tensor=True)

        matched_sentences = []
        max_sim_score = 0.0

        # 4. Tính similarity câu - từng yêu cầu, lấy câu có similarity > ngưỡng 0.6
        for i, sent_emb in enumerate(sentence_embeddings):
            for req_emb in req_embeddings:
                sim_score = util.cos_sim(sent_emb, req_emb).item()
                if sim_score > 0.6:
                    matched_sentences.append(sentences[i].strip())
                    if sim_score > max_sim_score:
                        max_sim_score = sim_score

        # 5. Tính điểm compliance dựa trên max similarity (giả sử max_sim_score phản ánh mức độ phù hợp)
        final_score = round(max_sim_score, 4)

        # 6. Trả về kết quả
        return {
            "score": final_score,
            "evidence": matched_sentences
        }
    # def check_compliance(self, document_text: str) -> Dict:
    #     """Check document compliance against ISO 9001:2015"""
    #     results = {
    #         "overall_score": 0.0,
    #         "clause_compliance": {},
    #         "missing_requirements": [],
    #         "recommendations": [],
    #         "found_evidence": {}
    #     }

    #     total_score = 0
    #     total_requirements = len(self.kb.requirements)

    #     for req in self.kb.requirements:
    #         clause_result = self._check_clause_compliance(document_text, req)
    #         results["clause_compliance"][req.clause] = clause_result
    #         total_score += clause_result["score"]

    #         if clause_result["score"] < 0.5:
    #             results["missing_requirements"].append({
    #                 "clause": req.clause,
    #                 "title": req.title,
    #                 "criterion": req.criterion
    #             })

    #         if clause_result["evidence"]:
    #             results["found_evidence"][req.clause] = clause_result["evidence"]

    #     results["overall_score"] = total_score / total_requirements if total_requirements > 0 else 0
    #     results["recommendations"] = self._generate_recommendations(results)

    #     return results

    # def _check_clause_compliance(self, document_text: str, requirement: Dict) -> Dict:
    #     doc_lower = document_text.lower()

    #     # 1. Match keyword
    #     keywords = requirement.keyword
    #     keyword_matches = sum(1 for kw in keywords if kw.lower() in doc_lower)
    #     keyword_score = keyword_matches / len(keywords) if keywords else 0

    #     # 2. Match requirement & example
    #     requirement_texts = [requirement.criterion]
    #     if requirement.example:
    #         requirement_texts.append(requirement.example)

    #     evidence = []
    #     requirement_matches = 0

    #     for req in requirement_texts:
    #         req_tokens = req.lower().split()
    #         match_count = sum(1 for token in req_tokens if token in doc_lower)
    #         if match_count >= len(req_tokens) * 0.4:
    #             requirement_matches += 1
    #             evidence.append(req)

    #     requirement_score = requirement_matches / len(requirement_texts) if requirement_texts else 0

    #     final_score = round(keyword_score * 0.4 + requirement_score * 0.6, 4)

    #     return {
    #         "score": final_score,
    #         "keyword_matches": keyword_matches,
    #         "requirement_matches": requirement_matches,
    #         "evidence": evidence
    #     }

    def _generate_recommendations(self, results: Dict) -> List[str]:
        recs = []

        if results["overall_score"] < 0.3:
            recs.append("Mức độ tuân thủ tổng thể thấp. Cần rà soát toàn bộ tài liệu theo các điều khoản ISO 9001:2015.")

        if results["missing_requirements"]:
            recs.append(f"Tài liệu thiếu nội dung ở {len(results['missing_requirements'])} điều khoản. Cần bổ sung.")

        for clause, result in results["clause_compliance"].items():
            if result["score"] < 0.3:
                matched = next((c for c in self.kb.requirements if c.clause == clause), None)
                if matched:
                    recs.append(
                        f"Điều khoản {clause} ({matched.title}): Thiếu minh chứng cho yêu cầu: {matched.criterion}."
                    )
        return recs

# RAG Chatbot

In [15]:
class ISO9001RAGChatbot:
    """Main chatbot class combining RAG with DeepSeek model"""

    def __init__(self, model_name: str = "vinai/PhoGPT-4B-Chat"):
        self.model_name = model_name
        self.tokenizer = None
        self.model = None
        self.vector_store = VectorStore()
        self.knowledge_base = ISO9001KnowledgeBase(json_path="/content/drive/My Drive/rag_data.json")
        self.compliance_checker = ISO9001ComplianceChecker(self.knowledge_base)
        self.doc_processor = DocumentProcessor()

        self._load_model()
        self._setup_knowledge_base()

    def _load_model(self):
        """Load the DeepSeek model and tokenizer"""
        logger.info(f"Loading model: {self.model_name}")
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
        self.model = AutoModelForCausalLM.from_pretrained(
            self.model_name,
            torch_dtype=torch.float16,
            device_map="auto"
        ).to(device)

        if self.tokenizer.pad_token is None:
            self.tokenizer.pad_token = self.tokenizer.eos_token

    def _setup_knowledge_base(self):
        """Setup the RAG knowledge base with ISO 9001:2015 content"""
        logger.info("Setting up ISO 9001:2015 knowledge base")
        documents = []
        for req in self.knowledge_base.requirements:
            doc_text = f"Clause {req.clause}: {req.title}\n"
            doc_text += "Requirements:\n" + "\n".join(f"- {r}" for r in req.criterion)
            documents.append(doc_text)
            # for requirement in req.requirements:
            #     req_doc = f"Clause {req.clause} requirement: {requirement}"
            #     documents.append(req_doc)
        self.vector_store.add_documents(documents)

    def process_document(self, file: IO) -> Dict:
        """Process uploaded document (file-like object) and check compliance"""
        logger.info(f"Processing uploaded file: {file.name}")

        # Lấy đuôi file để xác định định dạng
        ext = os.path.splitext(file.name)[1].lower()

        if ext == '.pdf':
            text = self.doc_processor.extract_text_from_pdf(file)
        elif ext == '.docx':
            text = self.doc_processor.extract_text_from_docx(file)
        elif ext == '.txt':
            text = self.doc_processor.extract_text_from_txt(file)
        else:
            raise ValueError("Unsupported file format")

        if not text.strip():
            raise ValueError("Could not extract text from document")

        compliance_results = self.compliance_checker.check_compliance(text)
        return {
            "document_text": text,
            "compliance_results": compliance_results
        }

    def generate_compliance_summary(self, compliance_results: Dict) -> Tuple[float, List[Tuple[str, Dict]]]:
        """Generate structured compliance summary with scores and clause details"""
        summary = []

        for clause, result in compliance_results["clause_compliance"].items():
            req = next(r for r in self.knowledge_base.requirements if r.clause == clause)
            status = "Compliant" if result["score"] >= 0.5 else "Non-compliant"
            summary.append((
                clause,
                {
                    "title": req.title,
                    "score": result["score"],
                    "status": status,
                    "evidence": result.get("evidence", [])[:2] or ["Không tìm thấy đoạn thông tin phù hợp theo yêu cầu."]
                }
            ))

        return compliance_results["overall_score"], summary

# Finetuned Chatbot

In [102]:
class ISO9001ChatBot:
    def __init__(self, model_path: str):
        self.base_model = "Viet-Mistral/Vistral-7B-Chat"
        self.tokenizer = AutoTokenizer.from_pretrained(self.base_model, trust_remote_code=True)
        if self.tokenizer.pad_token is None:
            self.tokenizer.pad_token = self.tokenizer.eos_token

        base = AutoModelForCausalLM.from_pretrained(
            self.base_model,
            torch_dtype=torch.bfloat16,
            device_map="auto",
            trust_remote_code=True
        )

        from peft import PeftModel
        self.model = PeftModel.from_pretrained(base, model_path)
        self.model.eval()

    def _classify_question(self, question: str) -> str:
        # Định nghĩa nội dung trao đổi theo format chat
        messages = [
            {
                "role": "system",
                "content": "Bạn là một trợ lý chuyên đánh giá câu hỏi người dùng về quản lý chất lượng (QMS)."
            },
            {
                "role": "user",
                "content": f"""Phân loại câu hỏi sau thành một trong hai loại:

    1. document_search — nếu người dùng muốn tìm, xem hoặc yêu cầu một tài liệu cụ thể (như biểu mẫu, file, SOP, quy trình...).
    2. normal_question — nếu người dùng đang hỏi về kiến thức, khái niệm, yêu cầu của ISO 9001:2015.

    Chỉ trả lời duy nhất một từ: document_search hoặc normal_question.

    Câu hỏi: "{question}"
    Loại:"""
            }
        ]

        # Áp dụng chat template của model
        prompt = self.tokenizer.apply_chat_template(
            messages,
            tokenize=False,
            add_generation_prompt=True
        )

        # Tokenize prompt theo chuẩn của Vistral
        inputs = self.tokenizer(prompt, return_tensors="pt").to(self.model.device)

        with torch.no_grad():
            output = self.model.generate(
                **inputs,
                max_new_tokens=10,
                do_sample=False,  # sinh chắc chắn
                pad_token_id=self.tokenizer.eos_token_id
            )

        result = self.tokenizer.decode(
            output[0][inputs["input_ids"].shape[1]:],
            skip_special_tokens=True
        ).strip().lower()

        # Phân loại kết quả
        if "document_search" in result:
            return "document_search"
        elif "normal_question" in result:
            return "normal_question"
        else:
            return "unknown"

    def _extract_keywords(self, question: str) -> List[str]:
        messages = [
            {
                "role": "system",
                "content": "Bạn là trợ lý QMS. Nhiệm vụ của bạn là trích xuất từ khóa tìm kiếm từ câu hỏi tiếng Việt."
            },
            {
                "role": "user",
                "content": f"""Câu hỏi: "{question}"

        Hãy trích xuất các từ khóa chính (dưới dạng tiếng Việt), phân cách bằng dấu phẩy.
        Không dịch sang tiếng Anh. Chỉ liệt kê từ khóa, không giải thích.
        """
            }
        ]

        prompt = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        print(prompt)
        inputs = self.tokenizer(prompt, return_tensors="pt").to(self.model.device)

        with torch.no_grad():
            output = self.model.generate(
                **inputs,
                max_new_tokens=30,
                do_sample=False,
                pad_token_id=self.tokenizer.eos_token_id
            )

        result = self.tokenizer.decode(output[0][inputs["input_ids"].shape[1]:], skip_special_tokens=True)
        return [kw.strip() for kw in result.split(",") if kw.strip()]

    def _chat_sync(self, question: str) -> str:
        messages = [
            {
                "role": "system",
                "content": "Bạn là trợ lý ISO 9001:2015. Dựa trên ngữ cảnh cuộc trò chuyện, hãy trả lời câu hỏi mới nhất một cách chi tiết, chính xác và thực tế."
            }
        ] + [
            {
                "role": "user",
                "content": f"""{question}

    Trả lời: """
            }
        ]

        # Tạo prompt chat đúng định dạng
        prompt = self.tokenizer.apply_chat_template(
            messages,
            tokenize=False,
            add_generation_prompt=True
        )
        print(prompt)
        inputs = self.tokenizer(prompt, return_tensors="pt", truncation=True, max_length=2048, padding=True)
        inputs = {k: v.to(self.model.device) for k, v in inputs.items()}

        with torch.no_grad():
            output = self.model.generate(
                **inputs,
                max_new_tokens=300,
                temperature=0.7,
                do_sample=True,
                pad_token_id=self.tokenizer.eos_token_id
            )

        return self.tokenizer.decode(output[0][inputs["input_ids"].shape[1]:], skip_special_tokens=True)

    async def chat(self, question: str) -> str:
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(None, self._chat_sync, question)


ERROR:asyncio:Task exception was never retrieved
future: <Task finished name='Task-284' coro=<Server.serve() done, defined at /usr/local/lib/python3.11/dist-packages/uvicorn/server.py:68> exception=KeyboardInterrupt()>
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/uvicorn/main.py", line 580, in run
    server.run()
  File "/usr/local/lib/python3.11/dist-packages/uvicorn/server.py", line 66, in run
    return asyncio.run(self.serve(sockets=sockets))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/nest_asyncio.py", line 30, in run
    return loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/nest_asyncio.py", line 92, in run_until_complete
    self._run_once()
  File "/usr/local/lib/python3.11/dist-packages/nest_asyncio.py", line 133, in _run_once
    handle._run()
  File "/usr/lib/python3.11/asyncio/events.py", line 84, in _run
    

# API

In [92]:
# Danh sách từ khóa QMS
qms_keywords = [
    "iso 9001", "hệ thống quản lý chất lượng", "qms", "tiêu chuẩn iso", "iso",
    "đánh giá nội bộ", "đánh giá chất lượng", "kiểm tra chất lượng", "cải tiến liên tục",
    "kiểm soát chất lượng", "đào tạo chất lượng", "rà soát quản lý", "phân tích nguyên nhân",
    "hành động khắc phục", "hành động phòng ngừa", "báo cáo không phù hợp",
    "quản lý rủi ro", "giám sát và đo lường", "Điều khoản", "Sổ tay hướng dẫn",
    "quản lý tài liệu", "kiểm soát tài liệu", "lưu trữ tài liệu", "phiên bản tài liệu",
    "bản phát hành", "phê duyệt tài liệu", "thay đổi tài liệu", "phân phối tài liệu",
    "thu hồi tài liệu", "tài liệu hướng dẫn", "biểu mẫu", "quy trình", "quy định",
    "hồ sơ chất lượng", "tài liệu viết tay", "tài liệu điện tử", "hướng dẫn công việc",
    "kiểm soát thay đổi", "ghi nhận tài liệu", "truy cập tài liệu", "hướng dẫn",
    "trách nhiệm", "quyền hạn", "người phụ trách", "bộ phận quản lý chất lượng",
    "phân công nhiệm vụ", "người soạn thảo", "người phê duyệt", "người sử dụng tài liệu",
    "đào tạo nhân viên", "giám sát", "điều khoản", "tài liệu", "hướng dẫn",
    "đánh giá tài liệu", "kiểm tra hiệu quả", "đo lường hiệu quả", "phản hồi",
    "cải tiến tài liệu", "kiểm toán nội bộ", "báo cáo kiểm toán", "khắc phục sai sót",
    "rủi ro", "quản lý rủi ro", "biện pháp phòng ngừa", "hành động khắc phục",
    "điều tra nguyên nhân", "xử lý sự cố", "ncr", "hướng dẫn", "viết", "phiên bản", "loại",
    "hệ thống iso", "quy trình làm việc", "tiêu chí chất lượng", "bảng kiểm tra",
    "hồ sơ kiểm tra", "biên bản họp", "chứng nhận iso", "mẫu biểu", "biểu mẫu", "quy trình",
    "quy định", "sổ tay chất lượng", "sổ tay hướng dẫn", "mục", "nội dung",
    "hồ sơ chất lượng", "tài liệu viết tay", "tài liệu điện tử", "hướng dẫn công việc",
    "kiểm soát thay đổi", "ghi nhận tài liệu", "truy cập tài liệu"
]

def preprocess_text(text):
    # Chuyển về chữ thường
    text = text.lower()
    # Loại bỏ dấu câu
    text = re.sub(r'[^\w\s]', ' ', text)
    # Loại bỏ khoảng trắng thừa
    text = re.sub(r'\s+', ' ', text).strip()
    return text

def classify_question_keyword_based(question):
    question_processed = preprocess_text(question)
    for keyword in qms_keywords:
        if keyword in question_processed:
            return "related"
    return "unrelated"

# Ví dụ test
questions = [
    "Tổ chức có nên lưu trữ tài liệu giấy không?",
    "AI sẽ thay thế con người trong tương lai?",
    "Quy trình phê duyệt tài liệu diễn ra như thế nào?",
    "Hôm nay thời tiết thế nào?",
    "Làm sao để kiểm soát thay đổi tài liệu hiệu quả?",
    "giải thích về NCR"
]

for q in questions:
    result = classify_question_keyword_based(q)
    print(f"Câu hỏi: {q}\nPhân loại: {result}\n")

Câu hỏi: Tổ chức có nên lưu trữ tài liệu giấy không?
Phân loại: related

Câu hỏi: AI sẽ thay thế con người trong tương lai?
Phân loại: unrelated

Câu hỏi: Quy trình phê duyệt tài liệu diễn ra như thế nào?
Phân loại: related

Câu hỏi: Hôm nay thời tiết thế nào?
Phân loại: unrelated

Câu hỏi: Làm sao để kiểm soát thay đổi tài liệu hiệu quả?
Phân loại: related

Câu hỏi: giải thích về NCR
Phân loại: related



In [103]:

chatbot = ISO9001ChatBot("./iso9001-fine-tuned-model/content/iso9001-fine-tuned-model")
# rag_chatbot = ISO9001RAGChatbot("./iso9001-rag-model/content/iso9001-rag-model")
# rag_chatbot = ISO9001RAGChatbot()

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

In [89]:
for q in [
    "có tài liệu y tế ở đây không",
    "trình bày yêu cầu của mục 7.5 trong ISO 9001",
    "cho tôi xem biểu mẫu đánh giá nội bộ",
    "Hướng dẫn viết",
    "gồm những phiên bản nào",
    "Yêu cầu thế nào"
]:
    label = chatbot._classify_question(q)
    print(f"[TEST] {q} --> {label}")

[TEST] có tài liệu y tế ở đây không --> document_search
[TEST] trình bày yêu cầu của mục 7.5 trong ISO 9001 --> normal_question
[TEST] cho tôi xem biểu mẫu đánh giá nội bộ --> document_search
[TEST] Hướng dẫn viết --> normal_question
[TEST] gồm những phiên bản nào --> document_search
[TEST] Yêu cầu thế nào --> normal_question


In [106]:
# Cho phép gọi từ frontend khác domain nếu cần
app = FastAPI(title="ISO 9001 Chatbot API")
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Hoặc liệt kê cụ thể: ["http://localhost:3000"]
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

class ClauseResult(BaseModel):
    clause: str
    title: str
    status: str
    score: float
    evidence: List[str] = []

class AnalyzeResponse(BaseModel):
    score: float
    clause_results: List[ClauseResult]

# class AnalyzeResponse(BaseModel):
#     criterion: str
#     status: str
#     score: float
#     explanation: str

class KeywordResponse(BaseModel):
    keywords: List[str]

class ChatRequest(BaseModel):
    question: str

class ChatResponse(BaseModel):
    response: str

class ChatMessage(BaseModel):
    role: str
    content: str

class ChatFullRequest(BaseModel):
    question: str
    history_messages: List[ChatMessage]

@app.get("/")
async def root():
    return {"message": "ISO 9001 Chatbot API is running"}

@app.post("/get-label", response_model=ChatResponse)
async def chat(req: ChatRequest):
    if not req.question.strip():
        raise HTTPException(status_code=400, detail="Question is required.")

    try:
        print("Calling chatbot...")
        if(classify_question_keyword_based(req.question) == "unrelated"):
            response = "unknown"
        else:
            response = chatbot._classify_question(req.question)

        return {"response": response}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/extract-keywords", response_model=KeywordResponse)
async def extract_keywords(req: ChatRequest):
    if not req.question.strip():
        raise HTTPException(status_code=400, detail="Question is required.")

    try:
        keywords = chatbot._extract_keywords(req.question)
        return {"keywords": keywords}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/chat", response_model=ChatResponse)
async def chat(req: ChatRequest):
    if not req.question.strip():
        raise HTTPException(status_code=400, detail="Question is required.")

    try:
        response = chatbot._chat_sync(req.question)
        return {"response": response}

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

# @app.post("/analyze", response_model=AnalyzeResponse)
# async def analyze(document: UploadFile = File(...)):
#     try:
#         # Đọc nội dung file
#         content = await document.read()
#         file_stream = io.BytesIO(content)
#         file_stream.name = document.filename  # Đặt tên để xác định định dạng

#         # Gọi hàm xử lý tài liệu
#         doc_results = rag_chatbot.extract_text_from_pdf(file_stream)

#         # Tạo bản báo cáo (tuỳ theo bạn muốn hiển thị gì)
#         score, clause_results_raw = rag_chatbot.generate_compliance_summary(doc_results["compliance_results"])

#         # Chuyển clause_results_raw sang dạng List[ClauseResult]
#         clause_results = [
#             ClauseResult(
#                 clause=clause,
#                 title=result["title"],
#                 status=result["status"],
#                 score=result["score"],
#                 evidence=result.get("evidence", [])
#             )
#             for clause, result in clause_results_raw
#         ]

#         return {
#             "score": score,
#             "clause_results": clause_results
#         }
#     except Exception as e:
#         raise HTTPException(status_code=500, detail=str(e))

@app.post("/analyze", response_model=AnalyzeResponse)
async def analyze(document: UploadFile = File(...)):
    try:
        # Đọc nội dung file
        content = await document.read()
        file_stream = io.BytesIO(content)
        file_stream.name = document.filename  # Đặt tên để xác định định dạng

        # Gọi hàm xử lý tài liệu
        doc_results = rag_chatbot.process_document(file_stream)

        # Tạo bản báo cáo (tuỳ theo bạn muốn hiển thị gì)
        score, clause_results_raw = rag_chatbot.generate_compliance_summary(doc_results["compliance_results"])

        # Chuyển clause_results_raw sang dạng List[ClauseResult]
        clause_results = [
            ClauseResult(
                clause=clause,
                title=result["title"],
                status=result["status"],
                score=result["score"],
                evidence=result.get("evidence", [])
            )
            for clause, result in clause_results_raw
        ]

        return {
            "score": score,
            "clause_results": clause_results
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

# Run

In [107]:
import nest_asyncio
from pyngrok import ngrok
import uvicorn
from google.colab import userdata

# Áp dụng patch cho asyncio để chạy trong notebook
nest_asyncio.apply()

NGROK_AUTH_TOKEN = userdata.get('NGROK_AUTH_TOKEN') # Get from Colab secrets if stored there
ngrok.set_auth_token(NGROK_AUTH_TOKEN) # Uncomment and use this line if needed

# Mở cổng 8000 ra internet
public_url = ngrok.connect(addr=8000, domain="ample-wildly-parrot.ngrok-free.app")
print("Public URL:", public_url)

# Chạy server
uvicorn.run(app, host="0.0.0.0", port=8000)

Public URL: NgrokTunnel: "https://ample-wildly-parrot.ngrok-free.app" -> "http://localhost:8000"


INFO:     Started server process [313]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)


Calling chatbot...
INFO:     2001:ee0:4f0e:f780:cc26:5f94:bcac:af00:0 - "POST /get-label HTTP/1.1" 200 OK
<s>[INST] <<SYS>>
Bạn là trợ lý ISO 9001:2015. Dựa trên ngữ cảnh cuộc trò chuyện, hãy trả lời câu hỏi mới nhất một cách chi tiết, chính xác và thực tế.
<</SYS>>

Người dùng: Hướng dẫn viết
Bot:  Đánh giá hiệu suất hoạt động của phòng ban dựa trên các chỉ số KPIs đã định, xác định điểm mạnh và yếu, đề xuất hành động cải tiến phù hợp. 
Người dùng: Biểu mẫu là gì
Bot:  Biểu mẫu là tài liệu chuẩn hóa được sử dụng để thu thập dữ liệu, ghi nhận thông tin theo định dạng chuẩn, giúp tăng tính nhất quán, giảm sai sót và dễ dàng kiểm tra, đánh giá. Biểu mẫu có thể bao gồm các bảng, checklist, phiếu kiểm tra, hoặc hồ sơ ghi nhận kết quả, có mã số, phiên bản và người sử dụng rõ ràng. Biểu mẫu được kiểm soát trong hệ thống tài liệu và có thể sử dụng lặp lại hoặc tùy chỉnh phù hợp với từng lần sử dụng. 
Người dùng: hướng dẫn viết
Bot: document_search
Người dùng: hướng dẫn viết
Bot: 

    Trả lời

INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [313]
