In [None]:
import os
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
from peft import PeftModel
from PyPDF2 import PdfReader

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from langchain.schema import Document
from langchain.llms.base import LLM
from pydantic import PrivateAttr



In [None]:
def extract_text_from_pdfs(folder_path):
    text = ""
    for file in os.listdir(folder_path):
        if file.endswith(".pdf"):
            try:
                reader = PdfReader(os.path.join(folder_path, file))
                for page in reader.pages:
                    page_text = page.extract_text()
                    if page_text:
                        text += page_text + "\n"
            except Exception as e:
                print(f"Gagal memproses {file}: {e}")
    return text

In [None]:
def build_faiss_index(text, model_name="sentence-transformers/all-MiniLM-L6-v2"):
    if not text.strip():
        return None
    splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
    chunks = splitter.split_text(text)
    embeddings = HuggingFaceEmbeddings(model_name=model_name)
    return FAISS.from_texts(chunks, embeddings)

def load_faiss_retriever(path, model_name="sentence-transformers/all-MiniLM-L6-v2"):
    if not os.path.exists(path):
        return None
    embeddings = HuggingFaceEmbeddings(model_name=model_name)
    index = FAISS.load_local(path, embeddings, allow_dangerous_deserialization=True)
    return index.as_retriever(search_type="similarity", search_kwargs={"k": 3})

In [None]:
from langchain.schema import Document

class DuckDuckGoRetriever:
    def __init__(self, k=3):
        self.k = k

    def get_relevant_documents(self, query):
        if ddg is None:
            print("❌ Modul duckduckgo-search tidak tersedia.")
            return []
        
        try:
            results = ddg(query, max_results=self.k)
        except Exception as e:
            print(f"❌ DuckDuckGo error: {e}")
            return []
        
        if not results:
            print("⚠️ Tidak ada hasil dari DuckDuckGo.")
            return []

        docs = []
        for r in results:
            content = (
                r.get("body") or 
                r.get("text") or 
                r.get("title") or 
                r.get("href") or ""
            )
            if content:
                print(f"🌐 [DuckDuckGo] {content}")
                docs.append(Document(page_content=content, metadata={"source": r.get("href", "duckduckgo")}))
        
        return docs


In [None]:
class CustomLLM(LLM):
    _model: any = PrivateAttr()
    _tokenizer: any = PrivateAttr()
    _device: str = PrivateAttr(default="cuda" if torch.cuda.is_available() else "cpu")

    def __init__(self, model, tokenizer, device=None, **kwargs):
        super().__init__(**kwargs)
        self._model = model
        self._tokenizer = tokenizer
        self._device = device or ("cuda" if torch.cuda.is_available() else "cpu")

    @property
    def _llm_type(self):
        return "custom-mistral-lora"

    def _call(self, prompt: str, stop=None) -> str:
        inputs = self._tokenizer(prompt, return_tensors="pt").to(self._device)
        if "token_type_ids" in inputs:
            del inputs["token_type_ids"]
        outputs = self._model.generate(
            **inputs,
            max_new_tokens=256,
            temperature=0.7,
            top_p=0.9,
            do_sample=True,
            pad_token_id=self._tokenizer.eos_token_id
        )
        response = self._tokenizer.decode(outputs[0], skip_special_tokens=True)
        return response[len(prompt):].strip()


In [None]:
def combined_retriever(query, retrievers):
    docs = []
    for retriever in retrievers:
        if retriever:
            docs += retriever.get_relevant_documents(query)
    return docs

def answer_query(query, llm, retrievers):
    docs = combined_retriever(query, retrievers)
    if not docs:
        return "Maaf, tidak ada informasi relevan ditemukan.", []

    context = "\n\n---\n\n".join([f"Sumber: {d.metadata.get('source', 'unknown')}\nKonten: {d.page_content}" for d in docs])
    prompt = f"""
Anda adalah virtual asisten nutrisi yang informatif. Berdasarkan konteks berikut, jawab pertanyaan pengguna. Jika tidak yakin, katakan tidak tahu.

[KONTEKS]
{context}

[PERTANYAAN]
{query}

[JAWABAN ANDA]
"""
    answer = llm._call(prompt)
    return answer, docs


In [None]:
# Konfigurasi path dan model
pdf_folder = "WHO_doc"  # Folder berisi PDF WHO/FAO
index_path = "faiss_who_index"
adapter_path = "./mistral-lora-adapter"  # Path ke adapter LoRA
base_model_name = "mistralai/Mistral-7B-v0.1"

# Cek apakah FAISS index sudah ada, jika belum buat dari PDF
if not os.path.exists(index_path):
    raw_text = extract_text_from_pdfs(pdf_folder)
    index = build_faiss_index(raw_text)
    if index:
        index.save_local(index_path)
    else:
        print("Gagal membangun indeks FAISS.")
else:
    print("Menggunakan indeks FAISS lokal yang sudah ada.")

# Tokenizer setup
tokenizer = AutoTokenizer.from_pretrained(base_model_name)
tokenizer.pad_token = tokenizer.eos_token

# Konfigurasi quantization untuk efisiensi memori
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_use_double_quant=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.float16
)

# Load base model dengan konfigurasi quantized
base_model = AutoModelForCausalLM.from_pretrained(
    base_model_name,
    device_map="auto",
    trust_remote_code=True,
    quantization_config=bnb_config
)

# Load adapter LoRA ke model dasar
model = PeftModel.from_pretrained(
    base_model,
    adapter_path,
    torch_dtype=torch.float16
)
model.eval()

# Bungkus model dan tokenizer dalam custom class
llm = CustomLLM(model, tokenizer)

# Load retriever: FAISS lokal + fallback DuckDuckGo
retrievers = [
    load_faiss_retriever(index_path),
    DuckDuckGoRetriever()
]

In [None]:
# Fungsi bantu untuk buat prompt
def generate_prompt(context, question):
    return f"""Berikut ini adalah konteks dari dokumen WHO/FAO:\n\n{context}\n\nPertanyaan: {question}\nJawaban:"""

# Fungsi untuk menggabungkan hasil dari dua retriever
def combine_retrieval_results(retrievers, query, top_k=3):
    docs = []
    for retriever in retrievers:
        try:
            result = retriever.get_relevant_documents(query)
            if result:
                docs.extend(result)
        except Exception as e:
            print(f"Gagal menggunakan retriever {retriever.__class__.__name__}: {e}")
    # Hapus duplikat berdasarkan konten
    seen = set()
    unique_docs = []
    for doc in docs:
        if doc.page_content not in seen:
            seen.add(doc.page_content)
            unique_docs.append(doc)
    return unique_docs[:top_k]

In [None]:
def answer_question(question: str):
    print(f"\nPertanyaan: {question}\n")

    # Ambil dokumen dari semua retriever (FAISS + DuckDuckGo)
    documents = combine_retrieval_results(retrievers, question, top_k=3)

    if not documents:
        return "Maaf, saya tidak menemukan jawaban relevan dari dokumen."

    # Gabungkan konteks dari dokumen
    context = "\n\n".join([doc.page_content for doc in documents])

    # Bangun prompt dengan konteks
    prompt = generate_prompt(context=context, question=question)

    # Tokenisasi
    inputs = tokenizer(prompt, return_tensors="pt").to("cuda")

    # Generate jawaban
    with torch.no_grad():
        outputs = model.generate(
            input_ids=inputs.input_ids,
            attention_mask=inputs.attention_mask,
            max_new_tokens=1024,
            temperature=0.7,
            top_k=50,
            top_p=0.9,
            do_sample=True,
            pad_token_id=tokenizer.eos_token_id
        )

    # Decode jawaban
    answer = tokenizer.decode(outputs[0], skip_special_tokens=True)

    # Hilangkan prompt dari jawaban
    final_answer = answer.replace(prompt, "").strip()
    return final_answer


In [None]:
question = "Apa saja makanan yang cocok untuk diet?"
jawaban, docs = answer_query(question, llm, retrievers)
print("\n💬 Jawaban:\n", jawaban)
