In [12]:
import os
from langchain_huggingface import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from langchain.document_loaders import PyPDFLoader, Docx2txtLoader, TextLoader
from langchain.text_splitter import CharacterTextSplitter
from langchain.chains import RetrievalQA
from langchain.llms import HuggingFacePipeline
from transformers import pipeline, AutoTokenizer, AutoModel
from dotenv import dotenv_values
from torch import cuda

In [13]:
config = dotenv_values(".env")

# HF_TOKEN = config.get('HUGGINGFACE_TOKEN', None)

MODEL_NAME = 'meta-llama/Llama-3.2-1B'
EMBEDDING_NAME = 'sentence-transformers/paraphrase-MiniLM-L6-v2'

EMBEDDING_MODEL_PATH = config.get(
    'DATABASE_EMBEDDING_MODEL_PATH', './database/embedding_model')

VECTOR_STORE_PATH = config.get(
    'DATABASE_VECTOR_STORE_PATH', './database/vector_store')

MODEL_PATH = config.get('DATABASE_MODEL_PATH', './database/model')

print("Embedding model path:", EMBEDDING_MODEL_PATH)
print("Vector store path:", VECTOR_STORE_PATH)

DOCUMENT_GROUP_NAME = 'test'

IS_GPU = cuda.is_available()

print("Is GPU available:", IS_GPU)

Embedding model path: ./database/embedding_model
Vector store path: ./database/vector_store
Is GPU available: False


In [3]:
def load_documents(file_paths):
    """Load multiple documents from various formats into a single list of documents."""
    documents = []

    for file_path in file_paths:
        ext = os.path.splitext(file_path)[1].lower()  # Get file extension

        if ext == '.pdf':
            loader = PyPDFLoader(file_path)
        elif ext == '.docx':
            loader = Docx2txtLoader(file_path)
        elif ext == '.txt':
            loader = TextLoader(file_path)
        else:
            print(f"❌ Unsupported file format: {ext}")
            continue

        documents.extend(loader.load())  # Load document and add to list

    return documents

In [9]:
def load_embeddings_model(model_name: str = "paraphrase-MiniLM-L6-v2"):
    """Initialize HuggingFace embeddings."""

    os.makedirs(EMBEDDING_MODEL_PATH, exist_ok=True)

    # Check if the model already exists in the cache
    local_model_path = os.path.join(EMBEDDING_MODEL_PATH, model_name)

    # Load the embeddings model from the cache directory or download it
    return HuggingFaceEmbeddings(model_name=model_name, show_progress=True, cache_folder=local_model_path)


def load_vector_store(embeddings: HuggingFaceEmbeddings = None, vector_store_path: str = "vector_store/<your_vector_store_name>"):
    """Load the FAISS vector store if it exists."""
    try:
        faiss_index_path = os.path.join(vector_store_path, "index.faiss")
        faiss_pkl_path = os.path.join(vector_store_path, "index.pkl")

        if os.path.exists(faiss_index_path) and os.path.exists(faiss_pkl_path):
            # Load persisted vector store
            persisted_vectorstore = FAISS.load_local(
                vector_store_path, embeddings, allow_dangerous_deserialization=True)
            print("✅ Loaded vector store from local storage.")
            return persisted_vectorstore
        else:
            raise FileNotFoundError
    except FileNotFoundError:
        return None


def create_and_save_vector_store(embeddings, vector_store_path, file_paths):
    """Create a new FAISS vector store from the given PDF and save it."""
    print("⚠️ Creating a new vector store, if one already exists it will be overwritten.")

    if os.path.exists(vector_store_path):
        os.remove(vector_store_path)
        print("🗑️ Removed existing vector store.")

    os.makedirs(vector_store_path, exist_ok=True)

    # Load document using PyPDFLoader
    documents = load_documents(file_paths)

    # Split document into chunks
    text_splitter = CharacterTextSplitter(
        chunk_size=1000,
        chunk_overlap=30,
        separator="\n"
    )
    docs = text_splitter.split_documents(documents)

    # Create vectors using FAISS
    vectorstore = FAISS.from_documents(docs, embeddings)

    # Persist the vectors locally on disk
    vectorstore.save_local(vector_store_path)
    print("💾 Vector store saved locally.")

    return vectorstore


def initialize_llm(model_name: str = 'distilgpt2', max_new_tokens: int = 1024, temperature: float = 0.7):
    """Initialize the HuggingFace pipeline for text generation, and save/load the model."""
    model_save_path = os.path.join(MODEL_PATH, model_name)

    # Check if the model is already saved
    if os.path.exists(model_save_path):
        print(f"🔄 Loading model from {model_save_path}...")
        text_gen_pipeline = pipeline(
            "text-generation",
            model=model_save_path,
            tokenizer=model_save_path,
            framework="pt",
            device=0 if IS_GPU else -1,
        )
    else:
        # Get the model size before downloading
        print(
            f"⬇️ Downloading and saving model '{model_name}' to {model_save_path}...")
        text_gen_pipeline = pipeline(
            "text-generation",
            model=model_name,
            tokenizer=model_name,
            framework="pt",
            max_new_tokens=max_new_tokens,
            do_sample=True,
            temperature=temperature,
            device=0 if IS_GPU else -1,
        )

        # Save the model and tokenizer
        text_gen_pipeline.model.save_pretrained(model_save_path)
        text_gen_pipeline.tokenizer.save_pretrained(model_save_path)
        print(f"✅ Model '{model_name}' saved to {model_save_path}.")

    return HuggingFacePipeline(pipeline=text_gen_pipeline)


def initialize_qa_chain(llm, vectorstore):
    """Initialize the RetrievalQA chain with the given LLM and vectorstore."""
    return RetrievalQA.from_chain_type(
        llm=llm,
        chain_type="stuff",
        retriever=vectorstore.as_retriever(),
        verbose=True
    )


def similarity_search(vectorstore, query, k=5):
    """Search the vectorstore for similar documents to the query."""
    results = vectorstore.similarity_search(query, k=k)
    return results

In [5]:
# Yo autoformatter stop messing with my code
# Also if your model is loaded liao just turn off your wifi, cause loading will connect to hugging face for some reason and take will take fcking forever
embeddings = load_embeddings_model(EMBEDDING_NAME)

vector_store_path = os.path.join(
    VECTOR_STORE_PATH, f"{DOCUMENT_GROUP_NAME}_{embeddings.model_name}")

file_paths = ["documents/test.pdf", "documents/test.docx"]

vector_store = load_vector_store(embeddings, vector_store_path)

if vector_store is None:
    vector_store = create_and_save_vector_store(
        embeddings, vector_store_path, file_paths)

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


✅ Loaded vector store from local storage.


In [10]:
# IMPORTANT: MAKE SURE YOU'RE AUTHENTICATED AND HAVE ACCESS
llm = initialize_llm(model_name=MODEL_NAME)

qa_chain = initialize_qa_chain(llm, vector_store)

⬇️ Downloading and saving model 'meta-llama/Llama-3.2-1B' to ./database/models\meta-llama/Llama-3.2-1B...


config.json:   0%|          | 0.00/843 [00:00<?, ?B/s]

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


model.safetensors:   0%|          | 0.00/2.47G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/185 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/50.5k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/9.09M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/301 [00:00<?, ?B/s]

✅ Model 'meta-llama/Llama-3.2-1B' saved to ./database/models\meta-llama/Llama-3.2-1B.


  return HuggingFacePipeline(pipeline=text_gen_pipeline)


In [11]:
custom_prompt = "You are teaching a class on Data Science and AI, if the question is not related, you should say you do not know the answer."

system_prompt = f"""
You are a teaching assistant at a University. 
If you are asked a question that you cannot answer, you should say you do not know the answer, do not make up an answer.
{custom_prompt}
A student asks you the following question:
"""

query = "Is this a pass/fail course?"

context = similarity_search(vector_store, query, k=5)


query_with_context = f"""
{system_prompt}
Question: {query}
Context: {context} 
"""

result = qa_chain.invoke(query)
print(result)

Batches:   0%|          | 0/1 [00:00<?, ?it/s]



[1m> Entering new RetrievalQA chain...[0m


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Setting `pad_token_id` to `eos_token_id`:None for open-end generation.
Starting from v4.46, the `logits` model output will have the same type as the model (except at train time, where it will always be FP32)


KeyboardInterrupt: 