Check Kernel Python

In [1]:
import sys
sys.executable

'C:\\Users\\aldha\\Projects\\mini-rag\\.venv\\Scripts\\python.exe'

## Install Dependencies

In [2]:
!pip install pdfplumber beautifulsoup4 nltk tqdm google-genai gradio



In [3]:
import os
import re
import nltk
import pdfplumber
from bs4 import BeautifulSoup
from tqdm import tqdm

nltk.download("punkt")
nltk.download("punkt_tab")

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\aldha\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package punkt_tab to
[nltk_data]     C:\Users\aldha\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


True

## 1. Prepare the Document Corpus

Di processing step, dilakukan pengambilan data dari `corpus/raw` kemudian diambil text nya, dan dimasukkan ke `corpus/processed/docs` dengan sequence sebagai name

### 1.1. Fungsi Clean Text (Remove duplicate space & normalize new lines)

In [4]:
def clean_text(text: str) -> str:
    text = re.sub(r"\s+", " ", text)        # remove extra spaces
    text = re.sub(r"\n+", "\n", text)       # normalize newlines
    text = re.sub(r"\t+", " ", text)
    return text.strip()

### 1.2. Fungsi untuk extract text dari file .pdf

In [5]:
def extract_text_from_pdf(pdf_path: str) -> str:
    text = ""
    with pdfplumber.open(pdf_path) as pdf:
        for page in pdf.pages:
            page_text = page.extract_text()
            if page_text:
                text += page_text + "\n"
    return clean_text(text)

### 1.3. Fungsi untuk extract text dari file .html

In [6]:
def extract_text_from_html(html_path: str) -> str:
    with open(html_path, "r", encoding="utf-8") as f:
        soup = BeautifulSoup(f, "html.parser")

    # remove script & style
    for tag in soup(["script", "style", "nav", "footer", "header"]):
        tag.decompose()

    text = soup.get_text(separator="\n")
    return clean_text(text)

### 1.4. Fungsi untuk extract text dari file .txt

In [7]:
def extract_text_from_txt(txt_path: str) -> str:
    with open(txt_path, "r", encoding="utf-8") as f:
        return clean_text(f.read())

### 1.5. Scan folder `corpus/raw` extract text ke `corpus/processed/docs`

In [8]:
RAW_DIR = "corpus/raw"
OUT_DIR = "corpus/processed/docs"

os.makedirs(OUT_DIR, exist_ok=True)

doc_id = 1

for root, _, files in os.walk(RAW_DIR):
    for file in tqdm(files):
        path = os.path.join(root, file)

        try:
            if file.endswith(".html"):
                text = extract_text_from_html(path)

            else:
                continue

            if len(text) < 500:  # skip dokumen terlalu pendek
                continue

            out_path = os.path.join(OUT_DIR, f"doc_{doc_id:02d}.txt")
            with open(out_path, "w", encoding="utf-8") as f:
                f.write(text)

            doc_id += 1

        except Exception as e:
            print(f"❌ Error processing {file}: {e}")

100%|█████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00, 18558.87it/s]
100%|██████████████████████████████████████████████████████████████████████████████████| 22/22 [00:00<00:00, 35.69it/s]


## 2. Chunking

### 2.1. Load Docs yang ada di `corpus/processed/docs`

In [9]:
DOC_DIR = "corpus/processed/docs"
docs = sorted(os.listdir(DOC_DIR))
print(f"Loaded {len(docs)} documents")

Loaded 22 documents


### 2.2. Fungsi Chunk text dengan tokenizer NLTK, dengan default Chunk Size = 150, dan Overlap = 30

In [10]:
def chunk_text(
    text: str,
    chunk_size: int = 150,
    overlap: int = 30
):
    tokens = nltk.word_tokenize(text)
    chunks = []

    start = 0
    while start < len(tokens):
        end = start + chunk_size
        chunk_tokens = tokens[start:end]
        chunk_text = " ".join(chunk_tokens)

        chunks.append(chunk_text)

        start = end - overlap  # overlap

    return chunks

### 2.3. Iterate semua docs, dan chunk text kemudian dimasukan ke `corpus/chunks/chunks.json`

In [11]:
all_chunks = []
chunk_id = 0

for doc_id, file in enumerate(tqdm(docs)):
    path = os.path.join(DOC_DIR, file)
    
    if not file.lower().endswith(".txt"):
        continue

    try:
        with open(path, "r", encoding="utf-8") as f:
            text = f.read()
    
        chunks = chunk_text(text)
    
        for i, chunk in enumerate(chunks):
            all_chunks.append({
                "chunk_id": chunk_id,
                "doc_id": doc_id,
                "doc_name": file,
                "chunk_index": i,
                "text": chunk
            })
            chunk_id += 1
            
    except UnicodeDecodeError:
        print(f"❌ Error encoding di file: {file}")
    except Exception as e:
        print(f"⚠️ Error lain pada {file}: {e}")

100%|█████████████████████████████████████████████████████████████████████████████████| 22/22 [00:00<00:00, 177.79it/s]


In [12]:
import json

OUT_PATH = "corpus/chunks/chunks.json"
os.makedirs(os.path.dirname(OUT_PATH), exist_ok=True)

with open(OUT_PATH, "w", encoding="utf-8") as f:
    json.dump(all_chunks, f, indent=2, ensure_ascii=False)

print(f"Saved {len(all_chunks)} chunks")

Saved 245 chunks


### 2.4. Preview Chunking Process

In [13]:
for c in all_chunks[:3]:
    print("\n--- Chunk", c["chunk_id"], "---")
    print("Doc:", c["doc_name"])
    print(c["text"][:300], "...")


--- Chunk 0 ---
Doc: doc_01.txt
15.687 Pengungsi Bencana di Sumut Terpapar Penyakit Kulit login register Nasional Peristiwa 15.687 Pengungsi Bencana di Sumut Terpapar Penyakit Kulit CNN Indonesia Selasa , 23 Des 2025 13:52 WIB Bagikan : url telah tercopy Dinas Kesehatan Provinsi Sumatera Utara mencatat ada 15.687 warga terdampak b ...

--- Chunk 1 ---
Doc: doc_01.txt
Kasus terbanyak ditemukan di Kabupaten Langkat , Tapanuli Tengah , Deliserdang , Batubara , Tebingtinggi , dan Mandailing Natal . `` Ini perlu menjadi fokus perhatian , terutama terkait faktor risiko seperti paparan air kotor , sanitasi lingkungan yang belum optimal , keterbatasan air bersih , serta ...

--- Chunk 2 ---
Doc: doc_01.txt
banjir . `` Sekarang Tapteng mulai kering , sehingga keluhan ISPA meningkat , '' ujarnya . Pilihan Redaksi Pengungsi Banjir-Longsor Sumut Mulai Terserang Penyakit Kulit dan ISPA 100 Nakes Sulsel Dikirim ke Daerah Bencana Aceh Tamiang Gelombang II Bantuan Internasional ke Aceh : Obat-obatan 

In [14]:
lengths = [len(nltk.word_tokenize(c["text"])) for c in all_chunks]

print("Chunk stats:")
print(f"  Total chunks      : {len(all_chunks)}")
print(f"  Avg tokens/chunk  : {sum(lengths)//len(lengths)}")
print(f"  Min tokens/chunk  : {min(lengths)}")
print(f"  Max tokens/chunk  : {max(lengths)}")

Chunk stats:
  Total chunks      : 245
  Avg tokens/chunk  : 142
  Min tokens/chunk  : 11
  Max tokens/chunk  : 152


## 3. Embedding

Dalam process embedding, saya menggunakan `intfloat/multilingual-e5-base` dan `BAAI/bge-m3` sebagai perbandingan

### 3.1. Load Chunk

In [15]:
import json

with open("corpus/chunks/chunks.json", "r", encoding="utf-8") as f:
    chunks = json.load(f)

texts = [c["text"] for c in chunks]

### 3.2. Define Compared Embedding Models

In [16]:
!pip install sentence-transformers



In [17]:
from sentence_transformers import SentenceTransformer
import numpy as np
from tqdm import tqdm

embedding_models = {
    "bge_m3": "BAAI/bge-m3",
    "e5_multilingual": "intfloat/multilingual-e5-base"
}

  from .autonotebook import tqdm as notebook_tqdm


### 3.3. Encode semua texts yang di dapat dari chunks sesuai embedding_models, dan simpan hasil embedding docs di local

Iterate semua models yang dipilih dan encode chunks text ke dalam embedding di local dengan nama embeddings_{key}

In [18]:
import os
import numpy as np
from sentence_transformers import SentenceTransformer

os.makedirs("corpus/chunks", exist_ok=True)

for key, model_name in embedding_models.items():
    print(f"Embedding with {model_name}")

    model = SentenceTransformer(model_name)

    embeddings = model.encode(
        texts,
        show_progress_bar=True,
        normalize_embeddings=True
    )

    out_path = f"corpus/chunks/embeddings_{key}.npy"
    np.save(out_path, embeddings)

    print(f"Saved {out_path} | shape = {embeddings.shape}")

Embedding with BAAI/bge-m3


Batches: 100%|███████████████████████████████████████████████████████████████████████████| 8/8 [01:44<00:00, 13.10s/it]


Saved corpus/chunks/embeddings_bge_m3.npy | shape = (245, 1024)
Embedding with intfloat/multilingual-e5-base


Batches: 100%|███████████████████████████████████████████████████████████████████████████| 8/8 [00:30<00:00,  3.81s/it]

Saved corpus/chunks/embeddings_e5_multilingual.npy | shape = (245, 768)





## 4. Vector Retrieval

### 4.1. Import & Load Data dari chunks, dan embedded chunks sesuai dengan embedding models

In [19]:
import json
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

In [20]:
 # Load embeddings (pilih salah satu / bandingkan dua)
embeddings_bge = np.load("corpus/chunks/embeddings_bge_m3.npy")
embeddings_e5  = np.load("corpus/chunks/embeddings_e5_multilingual.npy")

print("Chunks:", len(chunks))
print("Embeddings shape:", embeddings_bge.shape)

Chunks: 245
Embeddings shape: (245, 1024)


### 4.2.Ambil Embedding models, untuk encode query yang diberikan user 

In [21]:
from sentence_transformers import SentenceTransformer

query_model_bge = SentenceTransformer("BAAI/bge-m3")
query_model_e5  = SentenceTransformer("intfloat/multilingual-e5-base")

Fungsi untuk embedding_query sesuai model yang dipilih

In [22]:
def embed_query(query: str, model: SentenceTransformer):
    return model.encode(
        query,
        normalize_embeddings=True
    )

### 4.3. Buat Fungsi MMR (Maximal Marginal Relevance)

In [23]:
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

def mmr(
    query_embedding,
    doc_embeddings,
    top_k=3,
    lambda_param=0.7
):
    query_embedding = query_embedding.reshape(1, -1)

    relevance = cosine_similarity(doc_embeddings, query_embedding).reshape(-1)

    selected_idx = []
    candidate_idx = list(range(len(doc_embeddings)))

    for _ in range(top_k):
        if len(selected_idx) == 0:
            idx = np.argmax(relevance)
            selected_idx.append(idx)
            candidate_idx.remove(idx)
            continue

        mmr_scores = []

        for i in candidate_idx:
            redundancy = max(
                cosine_similarity(
                    doc_embeddings[i].reshape(1, -1),
                    doc_embeddings[selected_idx]
                )[0]
            )

            score = (
                lambda_param * relevance[i]
                - (1 - lambda_param) * redundancy
            )
            mmr_scores.append(score)

        best = candidate_idx[np.argmax(mmr_scores)]
        selected_idx.append(best)
        candidate_idx.remove(best)

    return selected_idx

### 4.4. Buat Fungsi Retrieve dengan apply MMR

In [24]:
def retrieve_with_mmr(
    query_embedding,
    embeddings,
    chunks,
    k=3
):
    idxs = mmr(
        query_embedding,
        embeddings,
        top_k=k
    )

    return [chunks[i] for i in idxs]

### 4.5. Buat fungsi Build Context untuk membuat context dari chunks yang di ambil

In [25]:
def build_context(retrieved_chunks):
    return "\n\n".join(
        [f"- {c['text']}" for c in retrieved_chunks]
    )

## 5. RAG Generation

### 5.1. Init API KEY (Menggunakan LLM Gemini)

In [26]:
import os
os.environ["GOOGLE_API_KEY"] = ""

### 5.2. Init Google Generative AI Client

In [27]:
import os
from google import genai

ai_client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"])

### 5.3. Menggunakan Model Gemini 2.0 Flash, karena model Gemini 1.5 sudah tidak tersedia

In [28]:
MODEL_FLASH = "gemini-2.0-flash"

### 5.4. Membuat Fungsi untuk Membuat prompt dengan menambahkan context dari Retrieved Context

In [29]:
def generate_answer_gemini(
    client,
    question: str,
    context: str = None,
    model=MODEL_FLASH
):
    if context:
        prompt = f"""
            You are an AI assistant. Answer the question using ONLY the context below.
            If the answer is not in the context, say "Information not found in the provided documents."
            
            Context:
            {context}
            
            Question:
            {question}
            
            Answer:
        """
    else:
        prompt = f"""
            Question:
            {question}
            
            Answer concisely:
        """


    response = client.models.generate_content(
        model=model,
        contents=prompt
    )
    return response.text

### 5.5. Run LLM-RAG menggunakan gradio

In [31]:
import gradio as gr
import time

def answer_with_k(query, k):
    answers_dict = []

    query_bge = embed_query(query, query_model_bge)
    query_e5 = embed_query(query, query_model_e5)
    
    # embeddings: bge-m3
    retrieved = retrieve_with_mmr(
        query_embedding=query_bge,
        embeddings=embeddings_bge,
        chunks=chunks,
        k=k
    )
    context = build_context(retrieved)
    answer = generate_answer_gemini(
        client=ai_client,
        question=query,
        context=context,
        model=MODEL_FLASH
    )
    answers_dict.append({
        'k': k,
        'embeddings': 'BAAI/bge-m3',
        'answers': answer,
    })

    time.sleep(2)
    
    # embeddings: e5
    retrieved = retrieve_with_mmr(
        query_embedding=query_e5,
        embeddings=embeddings_e5,
        chunks=chunks,
        k=k
    )
    context = build_context(retrieved)
    answer = generate_answer_gemini(
        client=ai_client,
        question=query,
        context=context,
        model=MODEL_FLASH
    )
    answers_dict.append({
        'k': k,
        'embeddings': 'intfloat/multilingual-e5-base',
        'answers': answer,
    })
    return answers_dict


def rag_chat(question):
    answers = []
    final_answers = ""

     # Non RAG
    answer_non_rag = generate_answer_gemini(
        ai_client, 
        question, 
        None, 
        MODEL_FLASH
    )

    final_answers += f"""
        NON-RAG Answers:
        ==================================================
        answers={answer_non_rag}
    """
    
    answers_k = answer_with_k(query=question, k=1)
    answers.extend(answers_k)
    
    answers_k = answer_with_k(query=question, k=2)
    answers.extend(answers_k)
    
    answers_k = answer_with_k(query=question, k=3)
    answers.extend(answers_k)

    final_answers += """\n
        RAG Answers:
        ==================================================
    """
    for ans in answers:
        final_answers += "\n"
        answers_text = f"""
            k={ans['k']}
            embeddings={ans['embeddings']}
            answers={ans['answers']}
        """
        final_answers += answers_text
    
    return final_answers


interface = gr.Interface(
    fn=rag_chat,
    inputs=gr.Textbox(label="Ask a question"),
    outputs=gr.Textbox(label="RAG Answer"),
    title="Mini RAG Demo",
    description="Simple Retrieval-Augmented Generation system"
)

interface.launch()


        NON-RAG Answers:
        answers=Ya, ada laporan tentang fenomena langit merah di Indonesia akhir-akhir ini, terutama dikaitkan dengan peningkatan polusi udara dan partikel debu di atmosfer.

    

        RAG Answers:
    

            k=1
            embeddings=BAAI/bge-m3
            answers=Ya, terjadi fenomena langit merah pada Kamis (18/12) petang di Pandeglang.

        

            k=1
            embeddings=intfloat/multilingual-e5-base
            answers=Ya, fenomena alam langit merah terjadi di wilayah Pandeglang selatan pada Kamis (18/12) petang.

        

            k=2
            embeddings=BAAI/bge-m3
            answers=Sebagai informasi , fenomena alam langit merah di wilayah Pandeglang selatan terjadi pada Kamis ( 18/12 ) petang .

        

            k=2
            embeddings=intfloat/multilingual-e5-base
            answers=Ya, sebagai informasi, fenomena alam langit merah terjadi di wilayah Pandeglang selatan pada Kamis (18/12) petang.

        

 