In [None]:
#install dependencies
!pip install torch transformers sentence-transformers faiss-cpu accelerate pdfplumber

In [None]:
#GPU
!pip install faiss-gpu

# 1. DATA PREPROCESSING + CLAUSE-AWARE CHUNKING

In [None]:
import re
import pdfplumber
from typing import List, Dict


# ==============================
# 1. PDF EXTRACTION + HEADER REMOVAL
# ==============================

def extract_pdf_text(pdf_path: str) -> str:
    """
    Extract text from PDF while removing repeated headers and page numbers.
    """

    full_text = ""

    with pdfplumber.open(pdf_path) as pdf:
        for page in pdf.pages:
            text = page.extract_text()

            if not text:
                continue

            lines = text.split("\n")
            cleaned_lines = []

            for line in lines:
                line = line.strip()

                # ---- REMOVE REPEATED HEADER (CUSTOMISE IF NEEDED) ----
                if "Regulations on the Use of the UTM Resource Centre" in line:
                    continue

                if "July 2011" in line or "V 1.3" in line:
                    continue

                # ---- REMOVE STANDALONE PAGE NUMBERS ----
                if re.fullmatch(r"\d+", line):
                    continue

                cleaned_lines.append(line)

            page_text = "\n".join(cleaned_lines)
            full_text += page_text + "\n"

    return full_text


# ==============================
# 2. TEXT CLEANING (YOUR ORIGINAL FUNCTION - IMPROVED)
# ==============================

def clean_text(text: str) -> str:
    """
    Remove formatting artefacts but preserve clause numbering.
    """

    # Remove excessive blank lines
    text = re.sub(r'\n+', '\n', text)

    # Remove excessive spaces (but keep single spaces)
    text = re.sub(r'[ \t]+', ' ', text)

    # Ensure regulation numbers stay on new line (e.g., "1.", "2.")
    text = re.sub(r'\s*(\d+\.)\s*', r'\n\1 ', text)

    return text.strip()


# ==============================
# 3. CLAUSE-AWARE CHUNKING (UNCHANGED LOGIC)
# ==============================

def clause_aware_chunking(text: str) -> List[Dict]:
    """
    Chunk by Regulation and Clause level only.
    Sub-clauses (a), (b), (c) remain inside their parent clause.
    """

    chunks = []

    # --- Split by Regulation ---
    regulation_pattern = r'(?=\n\s*\d+\.\s)'
    regulations = re.split(regulation_pattern, text)

    for reg in regulations:
        reg = reg.strip()
        if not reg:
            continue

        reg_match = re.match(r'(\d+)\.', reg)
        if not reg_match:
            continue

        reg_number = reg_match.group(1)

        # Remove regulation number from text body
        reg_body = re.sub(r'^\d+\.\s*', '', reg).strip()

        # --- Split by Clause Level (i), (ii), (iii) ---
        clause_pattern = r'(?=\(\s*[ivx]+\s*\))'
        clauses = re.split(clause_pattern, reg_body, flags=re.IGNORECASE)

        for clause in clauses:
            clause = clause.strip()
            if not clause:
                continue

            clause_match = re.match(r'\(\s*([ivx]+)\s*\)', clause, flags=re.IGNORECASE)
            clause_id = clause_match.group(1) if clause_match else "main"

            # Keep full clause including sub-clauses
            clause_text = clause.strip()

            if len(clause_text) > 50:
                chunks.append({
                    "regulation": reg_number,
                    "clause": clause_id,
                    "text": clause_text
                })

    return chunks



# ==============================
# 4. COMPLETE PIPELINE
# ==============================

if __name__ == "__main__":

    pdf_path = "/content/drive/MyDrive/RAG-dataset/RCentre.pdf"  # Your PDF file

    raw_text = extract_pdf_text(pdf_path)
    cleaned_text = clean_text(raw_text)

    chunks = clause_aware_chunking(cleaned_text)

    print(f"Total chunks created: {len(chunks)}")
    print(chunks)

    # Optional: Save cleaned text
    with open("RCentre_cleaned.txt", "w", encoding="utf-8") as f:
        f.write(cleaned_text)

    # Optional: Save chunks
    import json
    with open("RCentre_chunks.json", "w", encoding="utf-8") as f:
        json.dump(chunks, f, indent=2, ensure_ascii=False)


# 2. EMBEDDING + VECTOR DATABASE (FAISS)

In [None]:
import faiss
import numpy as np
from sentence_transformers import SentenceTransformer

# Load embedding model
embedding_model = SentenceTransformer("all-MiniLM-L6-v2")

def create_vector_index(chunks):
    texts = [chunk["text"] for chunk in chunks]

    embeddings = embedding_model.encode(texts, convert_to_numpy=True)

    dimension = embeddings.shape[1]
    index = faiss.IndexFlatL2(dimension)
    index.add(embeddings)

    return index, embeddings


# 3.RETRIEVAL FUNCTION

In [None]:
def retrieve(query, index, chunks, top_k=3):
    query_embedding = embedding_model.encode([query], convert_to_numpy=True)
    distances, indices = index.search(query_embedding, top_k)

    retrieved_chunks = []
    for idx in indices[0]:
        retrieved_chunks.append(chunks[idx])

    return retrieved_chunks


# 4.GENERATIVE MODEL (RAG GENERATION)

In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM

#Load model function
def load_model(model_name):
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        torch_dtype=torch.float16,
        #load_in_4bit=True,
        device_map="auto"
    )
    return tokenizer, model


In [None]:
def generate_answer(tokenizer, model, query, retrieved_chunks):
    context = "\n\n".join([
        f"Regulation {c['regulation']}({c['clause']}): {c['text']}"
        for c in retrieved_chunks
    ])

    prompt = f"""
You are a regulatory assistant for the UTM Resource Centre.
Answer strictly using the provided regulations.
Cite the regulation number.

Regulations:
{context}

Question:
{query}

Answer:
"""

    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)

    outputs = model.generate(
        **inputs,
        max_new_tokens=100,
        do_sample=False,
        repetition_penalty=1.1,
        pad_token_id=tokenizer.eos_token_id
    )

    return tokenizer.decode(outputs[0], skip_special_tokens=True)


# 5. Evaluation part

Definition of the golden dataset

In [None]:
#Definition of the golden dataset
golden_dataset = [
     {
        "id": "Q1",
        "question": "What happens if a user fails to return materials by the due date?",
        "expected_regulation": "7",
        "expected_clause": "x",
        "expected_keywords": ["penalty", "late", "return"]
    },
]

Helper: Extract Citation from Model Output

In [None]:
#extract_citaction function
import re

def extract_citation(answer_text):
    """
    Extract regulation and clause from model output.
    Example expected format: Regulation 7(x)
    """
    match = re.search(r'Regulation\s+(\d+)\s*\(?([ivx]+)?\)?', answer_text, re.IGNORECASE)

    if match:
        regulation = match.group(1)
        clause = match.group(2)
        return regulation, clause

    return None, None


Definition of the metrics:


*   Faithfullness
*   Citation accuracy
*   Answer relevance
*   Hallucination percentage
*   Overall score




In [None]:
#faithfullness check function
def check_faithfulness(answer, retrieved_chunks):
    """
    Checks if answer content is grounded in retrieved chunks.
    """
    combined_context = " ".join([c["text"] for c in retrieved_chunks])

    if answer.lower() in combined_context.lower():
        return 2  # Fully supported

    # Partial overlap
    overlap_count = sum(1 for word in answer.split() if word.lower() in combined_context.lower())

    if overlap_count > len(answer.split()) * 0.5:
        return 1  # Mostly supported

    return 0  # Hallucinated


In [None]:
#citation accuracy check
def normalize_clause(clause):
    if clause is None:
        return "main"
    clause = clause.lower().strip()
    clause = re.sub(r"\s+", "", clause)  # remove spaces
    return clause

def check_citation(answer, expected_regulation, expected_clause):

    pred_reg, pred_clause = extract_citation(answer)

    if pred_reg is None:
        return 0

    pred_clause = normalize_clause(pred_clause)
    expected_clause = normalize_clause(expected_clause)

    if pred_reg == expected_regulation and pred_clause == expected_clause:
        return 2  # exact match

    elif pred_reg == expected_regulation:
        return 1  # regulation correct, clause wrong

    else:
        return 0



In [None]:
#relevance scoring
def check_relevance(answer, expected_keywords):

    answer_lower = answer.lower()

    if not expected_keywords:
        return 0

    matches = sum(1 for kw in expected_keywords if kw.lower() in answer_lower)

    coverage_ratio = matches / len(expected_keywords)

    # Require at least 50% keyword coverage
    if coverage_ratio >= 0.5:
        return 1
    else:
        return 0



In [None]:
#hallucination detection
def detect_hallucination(faithfulness_score):
    return 1 if faithfulness_score == 0 else 0


Full pipeline function for evaluation

In [None]:
#Full evaluation pipeline
import time
import torch
import gc

def evaluate_model(model_name):

    tokenizer, model = load_model(model_name)

    total_faithfulness = 0
    total_citation = 0
    total_relevance = 0
    total_hallucination = 0

    start_time = time.time()

    for item in golden_dataset:

        question = item["question"]

        # Retrieve
        retrieved = retrieve(question, index, chunks, top_k=3)

        # Generate
        answer = generate_answer(tokenizer, model, question, retrieved)

        # Score
        faith = check_faithfulness(answer, retrieved)
        cite = check_citation(answer, item["expected_regulation"], item["expected_clause"])
        rel = check_relevance(answer, item["expected_keywords"])
        hall = detect_hallucination(faith)

        total_faithfulness += faith
        total_citation += cite
        total_relevance += rel
        total_hallucination += hall

    end_time = time.time()

    num_questions = len(golden_dataset)
    total_possible = num_questions * 5  # (2 + 2 + 1)

    overall_score = ((total_faithfulness + total_citation + total_relevance) / total_possible) * 100
    hallucination_rate = (total_hallucination / num_questions) * 100
    avg_time = (end_time - start_time) / num_questions

    results = {
        "Faithfulness %": (total_faithfulness / (num_questions * 2)) * 100,
        "Citation %": (total_citation / (num_questions * 2)) * 100,
        "Relevance %": (total_relevance / num_questions) * 100,
        "Hallucination Rate %": hallucination_rate,
        "Overall %": overall_score,
        "Avg Response Time (s)": avg_time
    }

    # Free memory
    del model
    del tokenizer
    gc.collect()
    torch.cuda.empty_cache()

    return results



# 6. FULL PIPELINE

In [None]:
# Load your regulation text
pdf_path = "/content/drive/MyDrive/RAG-dataset/RCentre.pdf"  # Your PDF file

raw_text = extract_pdf_text(pdf_path)
cleaned_text = clean_text(raw_text)

chunks = clause_aware_chunking(cleaned_text)

index, embeddings = create_vector_index(chunks)


In [None]:
#Pipeline execution for three models
results_qwen = evaluate_model("Qwen/Qwen2.5-7B-Instruct")
print(results_qwen)

In [None]:
results_mistral = evaluate_model("mistralai/Mistral-7B-Instruct-v0.2")
print(results_mistral)

In [None]:
results_phi = evaluate_model("microsoft/Phi-3.5-mini-instruct")
print(results_phi)

In [None]:
results_yi = evaluate_model("01-ai/Yi-6B-Chat")
print(results_yi)

In [None]:
results_deepseek = evaluate_model("deepseek-ai/deepseek-llm-7b-chat")
print(results_deepseek)

# 6. SIMPLE CHATBOT (Gradio UI)

In [None]:
#select current model
current_model_name = "microsoft/Phi-3.5-mini-instruct"
tokenizer, model = load_model(current_model_name)

In [None]:
#Function chatbot_response
def chatbot_response(user_query, show_sources):

    # Retrieve top 3 clauses
    retrieved = retrieve(user_query, index, chunks, top_k=3)

    # Generate answer
    answer = generate_answer(tokenizer, model, user_query, retrieved)

    if show_sources:
        sources = "\n\n--- Retrieved Clauses ---\n"
        for c in retrieved:
            sources += f"\nRegulation {c['regulation']}({c['clause']}):\n{c['text']}\n"
        return answer + sources

    return answer


In [None]:
#Function for loading the selecting model
def load_selected_model(model_name):
    global tokenizer, model, current_model_name

    if model_name != current_model_name:
        del model
        del tokenizer
        torch.cuda.empty_cache()

        tokenizer, model = load_model(model_name)
        current_model_name = model_name

    return f"{model_name} loaded successfully."


In [None]:
import gradio as gr

custom_css = """
#container {
    max-width: 900px;
    margin: auto;
}
.header {
    text-align: center;
    padding: 10px;
}
.footer {
    text-align: center;
    font-size: 12px;
    color: gray;
    margin-top: 20px;
}
"""

model_loaded_flag = False  # Global control

with gr.Blocks(css=custom_css, theme=gr.themes.Soft(), title="UTM Regulation RAG Assistant") as demo:

    with gr.Column(elem_id="container"):

        # Header
        gr.Markdown("""
        <div class="header">
            <h1>üìö UTM Resource Centre Regulation Assistant</h1>
            <p>Retrieval-Augmented Generation (RAG) for Regulatory Question Answering</p>
        </div>
        """)

        # -------------------------
        # Model Configuration
        # -------------------------
        with gr.Group():
            gr.Markdown("### ‚öôÔ∏è Model Configuration")

            model_selector = gr.Dropdown(
                choices=[
                    "mistralai/Mistral-7B-Instruct-v0.2",
                    "Qwen/Qwen2.5-7B-Instruct",
                    "microsoft/Phi-3.5-mini-instruct",
                    "01-ai/Yi-6B-Chat",
                    "deepseek-ai/deepseek-llm-7b-chat"
                ],
                value=current_model_name,
                label="Select Large Language Model"
            )

            load_button = gr.Button("üöÄ Load Selected Model", variant="primary")

            model_status = gr.Textbox(
                label="Model Status",
                interactive=False,
                placeholder="No model loaded."
            )

        # -------------------------
        # Chat Section
        # -------------------------
        gr.Markdown("### üí¨ Ask About the Regulations")

        chatbot = gr.Chatbot(
            label="Conversation",
            height=400,
            bubble_full_width=False
        )

        with gr.Row():
            msg = gr.Textbox(
                label="Your Question",
                placeholder="Load a model first...",
                scale=4,
                interactive=False   # üîí initially disabled
            )

            send_button = gr.Button(
                "Send",
                variant="secondary",
                scale=1,
                interactive=False   # üîí initially disabled
            )

        with gr.Row():
            show_sources = gr.Checkbox(label="Show Retrieved Clauses")
            clear = gr.Button("üóë Clear Chat")

        # -------------------------
        # Backend Functions
        # -------------------------

        def load_model_ui(selected_model):
            global model_loaded_flag
            try:
                status_message = load_selected_model(selected_model)
                model_loaded_flag = True
                return (
                    f"‚úÖ {status_message}",
                    gr.update(interactive=True, placeholder="Type your question here..."),
                    gr.update(interactive=True)
                )
            except Exception as e:
                model_loaded_flag = False
                return (
                    f"‚ùå Error loading model: {str(e)}",
                    gr.update(interactive=False),
                    gr.update(interactive=False)
                )

        def respond(message, chat_history, show_sources):
            if not model_loaded_flag:
                return "", chat_history

            if not message.strip():
                return "", chat_history

            response = chatbot_response(message, show_sources)
            chat_history.append((message, response))
            return "", chat_history

        # -------------------------
        # Bind Events
        # -------------------------

        load_button.click(
            load_model_ui,
            inputs=model_selector,
            outputs=[model_status, msg, send_button]
        )

        msg.submit(
            respond,
            [msg, chatbot, show_sources],
            [msg, chatbot]
        )

        send_button.click(
            respond,
            [msg, chatbot, show_sources],
            [msg, chatbot]
        )

        clear.click(
            lambda: [],
            None,
            chatbot,
            queue=False
        )

        # Footer
        gr.Markdown("""
        <div class="footer">
        MSc Coursework Project ‚Äî Regulatory RAG System Evaluation (Qwen2.5 | Mistral | Phi-3.5)
        </div>
        """)

demo.launch()
