In [107]:
# !pip install transformers torch accelerate
# !pip install -U langchain langchain_huggingface langchain-chroma langchain_community sentence-transformers langchain_huggingface langchain_core chromadb ipywidgets pypdf 

In [115]:
!pip install pypdf


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)




In [125]:
import os
import bs4
import requests
import shutil 
import time
from typing import List
from langchain_chroma import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from langchain_community.document_loaders import WebBaseLoader
from langchain_core.retrievers import BaseRetriever
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.document_loaders import PyPDFLoader

class RagUtils:
    def __init__(self,embeddings_dir: str):
        self.embeddings_dir = embeddings_dir
        self.original_get = requests.get
        self.model_name = "BAAI/bge-m3"
        self.model_kwargs = {'device': 'cpu'}
        self.encode_kwargs = {'normalize_embeddings': False}

    def patched_get(self, url, *args, **kwargs):
        """
        Patch the requests.get method to use a custom User-Agent header.
        """
        user_agent =  "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"
        os.environ["USER_AGENT"] =user_agent
        headers = kwargs.pop("headers", {})
        headers["User-Agent"] = user_agent
        return self.original_get(url, headers=headers, *args, **kwargs)

    def textSplitter(self, text: str, chunk_size: int, chunk_overlap: int, metadataSource: str):
        """
        Split the input text into smaller chunks using RecursiveCharacterTextSplitter.
        """
        documents = [Document(page_content=text, metadata={"source": metadataSource})]
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
            strip_whitespace=True,
        )
        chunks = text_splitter.split_documents(documents)
        print(f"Split text into {len(chunks)} chunks.")
        return chunks

    def documentsSplitter(self, documents: Document, chunk_size: int, chunk_overlap: int):
        """
        Split the input documents into smaller chunks using RecursiveCharacterTextSplitter.
        """
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,
            strip_whitespace=True,
        )
        chunks = text_splitter.split_documents(documents)
        print(f"Split text into {len(chunks)} chunks.")
        return chunks

    def ingest(self, chunks: List[Document]):
        """
        Ingest the list of Document chunks into a vector database.
        """
        embedding = HuggingFaceEmbeddings(
            model_name=self.model_name,
            model_kwargs=self.model_kwargs,
            encode_kwargs=self.encode_kwargs
        )

        vector_store = None  # กำหนดค่าเริ่มต้นให้กับตัวแปร vector_store
        if os.path.exists(os.path.join(self.embeddings_dir, "chroma.sqlite3")):
            print("-- use Append data to vector store--")
            vector_store = Chroma(persist_directory=self.embeddings_dir, embedding_function=embedding)
            vector_store.add_documents(documents=chunks)
        else:
            print("-- use New vector store--")
            os.makedirs(self.embeddings_dir, exist_ok=True)
            vector_store = Chroma.from_documents(documents=chunks, embedding=embedding, persist_directory=self.embeddings_dir)

        print("-- Ingest to Vector Database Success ---")
        return vector_store

    def deleteVectorDatabase(self):
        """
        ลบข้อมูลเก่าในไดเรกทอรี embeddings
        """
        if os.path.exists(self.embeddings_dir):
            shutil.rmtree(self.embeddings_dir)
            print(f"Vector database at {self.embeddings_dir} deleted successfully.")
        else:
            print(f"Vector database at {self.embeddings_dir} does not exist.")

    def loadContentFromWebsite(self, url: str, targetClassName: str):
        """
        Load content from a website using the specified target class name.
        """
        requests.get = self.patched_get
        bs4_strainer = bs4.SoupStrainer(class_=targetClassName)
        loader = WebBaseLoader(
            web_paths=(url,),
            bs_kwargs={"parse_only": bs4_strainer},
        )
        docs = loader.load()
        requests.get = self.original_get
        return docs

    def loadVectorStore(self):
        """
        Load the vector store from the specified embeddings directory.
        """
        
        embedding = HuggingFaceEmbeddings(
            model_name=self.model_name,
            model_kwargs=self.model_kwargs,
            encode_kwargs=self.encode_kwargs
        )
        vector_store = Chroma(persist_directory=self.embeddings_dir, embedding_function=embedding)
        print("-- Vector Store Loaded --")
        return vector_store

    def deleteDocumentByIds(self, document_ids: List[str]):
        """
        ลบเอกสารใน Chroma vector store ตาม document_ids ที่กำหนด
        """
        if os.path.exists(os.path.join(self.embeddings_dir, "chroma.sqlite3")):
            vector_store = Chroma(persist_directory=self.embeddings_dir, embedding_function=HuggingFaceEmbeddings(
                model_name=self.model_name,
                model_kwargs=self.model_kwargs,
                encode_kwargs=self.encode_kwargs
            ))
            vector_store.delete(document_ids=document_ids)
            print(f"Documents with IDs {document_ids} deleted successfully.")
        else:
            print(f"Vector store at {self.embeddings_dir} does not exist.")

    def deleteAllDocuments(self):
        """
        ลบเอกสารทั้งหมดใน Chroma vector store
        """
        if os.path.exists(os.path.join(self.embeddings_dir, "chroma.sqlite3")):
            vector_store = Chroma(persist_directory=self.embeddings_dir, embedding_function=HuggingFaceEmbeddings(
                model_name=self.model_name,
                model_kwargs=self.model_kwargs,
                encode_kwargs=self.encode_kwargs
            ))
            vector_store.delete()
            print("All documents deleted successfully.")
        else:
            print(f"Vector store at {self.embeddings_dir} does not exist.")

    def getRetriever(self, vector_store: Chroma):
        """
        Get a retriever from the vector store for similarity search.
        """
        retriever = vector_store.as_retriever(search_type="similarity", search_kwargs={"k": 3})
        return retriever

    def genPrompt(self, question: str, retriever: BaseRetriever):
        """
        Generate a prompt based on the retrieved documents for the given question.
        """
        retrieved_docs = retriever.invoke(question)
        context = ' '.join([doc.page_content for doc in retrieved_docs])
        prompt = f"""
            [Instructions] 
                Question: {question}
                Context: {context} 
                Answer:
            [/Instructions]
            """
        return prompt
    
    def loadContentFromPDF(self, pdf_path: str):
        """
        Load content from a PDF file.
        """
        loader = PyPDFLoader(pdf_path)
        docs = loader.load_and_split()
        return docs

In [126]:
# นำเข้าโมดูลที่ต้องการ
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch

# วิธีใช้งาน
# from src.utils.typhoon_2_assistant import Typhoon2Assistant
# assistant = Typhoon2Assistant()
# assistant.ask("ใส่ข้อความที่ต้องการถามที่นี่")

class Typhoon2Assistant:
    def __init__(self):
        self.model_id = "scb10x/llama3.2-typhoon2-1b-instruct"
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_id)
        self.model = AutoModelForCausalLM.from_pretrained(self.model_id, torch_dtype=torch.bfloat16)
        # self.model = AutoModelForCausalLM.from_pretrained(self.model_id, torch_dtype=torch.bfloat16, device_map="auto")

    def ask(self, prompt: str):
        # เตรียมข้อความนำเข้า
        messages = [
            {"role": "system", "content": "You are a friendly assistant. Answer the question based only on the following context. If you don't know the answer, then reply, No Context available for this question."},
            {"role": "user", "content": prompt}
        ]
        # แปลงข้อความเป็น token (คืนค่าเป็น tensor โดยตรง)
        input_ids = self.tokenizer.apply_chat_template(messages, add_generation_prompt=True, return_tensors="pt").to(self.model.device)
        
        # สร้าง attention_mask จาก input_ids (1 สำหรับ token จริง, 0 สำหรับ padding)
        attention_mask = (input_ids != self.tokenizer.pad_token_id).long().to(self.model.device)
        
        # กำหนดพารามิเตอร์สำหรับการสร้างข้อความ
        terminators = [self.tokenizer.eos_token_id, self.tokenizer.convert_tokens_to_ids("<|eot_id|>")]
        
        # สร้างข้อความตอบกลับ
        outputs = self.model.generate(
            input_ids,
            attention_mask=attention_mask,  # ส่ง attention_mask เข้าไป
            max_new_tokens=512,
            eos_token_id=terminators,
            do_sample=True,
            temperature=0.7,
            top_p=0.95,
            pad_token_id=self.tokenizer.pad_token_id  # กำหนด pad_token_id อย่างชัดเจน
        )
        
        # แปลงผลลัพธ์กลับเป็นข้อความ
        response = outputs[0][input_ids.shape[-1]:]
        answer = self.tokenizer.decode(response, skip_special_tokens=True)
        return answer



In [127]:
class AskWebsite:
    def __init__(self,url: str,targetClassName: str, clearOldData: bool = False):
        self.url = url
        self.targetClassName = targetClassName
        self.regUtils = RagUtils(embeddings_dir="./embeddings-load-website")

        if clearOldData:
            self.regUtils.deleteAllDocuments()
        
        self.ingest();
             
    def ingest(self):
        documents = self.regUtils.loadContentFromWebsite(
            url=self.url,
            targetClassName=self.targetClassName
        )
        chunks = self.regUtils.documentsSplitter(
            documents=documents, 
            chunk_size=512, 
            chunk_overlap=51
        )
        self.regUtils.ingest(
            chunks=chunks,
        )
        vectorstore = self.regUtils.loadVectorStore()
        retriever = self.regUtils.getRetriever(vectorstore)
        self.retriever = retriever
    
    def ask(self, question: str):
        prompt = self.regUtils.genPrompt(question=question, retriever=self.retriever)
        llm = Typhoon2Assistant()
        answer = llm.ask(prompt)
        return answer, prompt

In [128]:
class AskPDF:
    def __init__(self, pdf_path: str, clearOldData: bool = False):
        self.pdf_path = pdf_path
        self.regUtils = RagUtils(embeddings_dir="./embeddings-load-pdf")

        if clearOldData:
            self.regUtils.deleteAllDocuments()
        
        self.ingest();
             
    def ingest(self):
        documents = self.regUtils.loadContentFromPDF(
            pdf_path=self.pdf_path,
        )
        chunks = self.regUtils.documentsSplitter(
            documents=documents, 
            chunk_size=512, 
            chunk_overlap=51
        )
        self.regUtils.ingest(
            chunks=chunks,
        )
        vectorstore = self.regUtils.loadVectorStore()
        retriever = self.regUtils.getRetriever(vectorstore)
        self.retriever = retriever
    
    def ask(self, question: str):
        prompt = self.regUtils.genPrompt(question=question, retriever=self.retriever)
        llm = Typhoon2Assistant()
        answer = llm.ask(prompt)
        return answer, prompt


        

In [129]:
assistance = AskPDF(
    pdf_path="doc1.pdf",
    clearOldData=False,
)

Split text into 268 chunks.
-- use New vector store--
-- Ingest to Vector Database Success ---
-- Vector Store Loaded --


In [132]:
answer,prompt = assistance.ask("เป็นงานวิจัยเกี่ยวกับอะไร")
print(prompt)
print("-------")
print(answer)


            [Instructions] 
                Question: เป็นงานวิจัยเกี่ยวกับอะไร
                Context: 1.5 ขอบเขตของการวิจัย 2 
 1.6 ประโยชน์ที่คาดว่าจะได้รับ 3 
 1.7 นิยามศัพท์เฉพาะ 3 
2 เอกสารและงานวิจัยที่เกี่ยวข้อง 4 
 2.1 เซลลูโลส (cellulose) 4 
 2.2 โครงสร้างของเซลลูโลส 5 
 2.3 ชนิดของเซลลูโลส  5 
 2.4 เทคโนโลยี Pre-treatment สำหรับวัสดุที่มีเซลลูโลสสูง 6 
 2.5 คาร์บอกซีเมทิลเซลลูโลส    9 
 2.6 จุลินทรีย์ที่สร้างเอนไซม์เซลลูเลส 10 
 2.7 แบคทีเรียเซลลูเลส 11 
 2.8 เอนไซม์เซลลูเลส (cellulase) 12 
 2.9 การทำงานของเอนไซม์เซลลูเลส 14 
 2.10 การสังเคราะห์เอนไซม์เซลลูเลส 15 ATGGCAACTAAAATCAAGGGTTGCGCTCTTTGCGGGACTTAACCCAACTTCTCACAACAC
GACCTGACAACAACCTTGCACCACCTGTCACTCTGCTCCCGAAGGAGAAGCCCTATCTCT
AGGGTTGTCAAAGGATGTCAAGACCTGTTAAGGTTCTTCGCGTTGCTTCAAATTAAACCA
CTTGCTCCACCGCTTGTGCGGGCCCCCTTCATTTCCTTTGATTTTCACCCTTGCGGCCGT
ACTCCCCAGGCGGATTGCTTAATGCTTTAACTTCACCACTAAAGGGCGAAAACCCTCTAA
CACTTAGCACTCTTCTTTTACGGCGTGAACTACCAGGGTATCTAATCCTGTTTGCTCCCC
เอกสารฉบับนี้ดาวน์โหลดเมื่อวันที่ 26/02/2025
โดย Ch

In [103]:
assistance = AskWebsite(
    url="https://www.sanook.com/travel/1451675/",
    targetClassName= "EntryReaderInner",
    clearOldData=False,
)

Split text into 8 chunks.
-- use Append data to vector store--
-- Ingest to Vector Database Success ---
-- Vector Store Loaded --


In [104]:
answer,prompt = assistance.ask("มีฟาสปอร์ตดีใหม")
print(prompt)
print(answer)


            [Instructions] 
                Question: มีฟาสปอร์ตดีใหม
                Context: ขั้นตอนการต่ออายุพาสปอร์ต 
ถ้าพาสปอร์ตของคุณใกล้หมดอายุ ไม่ต้องกังวล! เพราะตอนนี้การต่อพาสปอร์ตทำได้ง่ายและสะดวกมากขึ้น ไม่ต้องเสียเวลาลางานหรือรีบตื่นเช้าเหมือนเมื่อก่อน แถมยังมีบริการในวันเสาร์อีกด้วย
1. การจองคิวต่ออายุพาสปอร์ตผ่านระบบออนไลน์
เพื่อความสะดวกและลดระยะเวลารอคอย ควรจองคิวล่วงหน้าผ่านเว็บไซต์ www.qpassport.in.th โดยมีขั้นตอนดังนี้: ขั้นตอนการต่ออายุพาสปอร์ต 
ถ้าพาสปอร์ตของคุณใกล้หมดอายุ ไม่ต้องกังวล! เพราะตอนนี้การต่อพาสปอร์ตทำได้ง่ายและสะดวกมากขึ้น ไม่ต้องเสียเวลาลางานหรือรีบตื่นเช้าเหมือนเมื่อก่อน แถมยังมีบริการในวันเสาร์อีกด้วย
1. การจองคิวต่ออายุพาสปอร์ตผ่านระบบออนไลน์
เพื่อความสะดวกและลดระยะเวลารอคอย ควรจองคิวล่วงหน้าผ่านเว็บไซต์ www.qpassport.in.th โดยมีขั้นตอนดังนี้: ขั้นตอนการต่ออายุพาสปอร์ต 
ถ้าพาสปอร์ตของคุณใกล้หมดอายุ ไม่ต้องกังวล! เพราะตอนนี้การต่อพาสปอร์ตทำได้ง่ายและสะดวกมากขึ้น ไม่ต้องเสียเวลาลางานหรือรีบตื่นเช้าเหมือนเมื่อก่อน แถมยังมีบริการในวันเสาร์อีกด้วย
1. 