# Exploring a RAG System with ChromaDB and GPUStack (OpenAI-compatible API)

This notebook demonstrates a **Retrieval-Augmented Generation (RAG)** pipeline using:

- **GPUStack** (OpenAI-compatible HTTP API) for **embeddings** and the **LLM** get api key from this link: https://gpustack.unibe.ch/
- **ChromaDB** as the vector database (stores embeddings + metadata)
- Helper functions to ingest documents, chunk them, embed them, retrieve relevant chunks, and generate an answer

The main goal is to understand *how the pieces fit together*:

1. **Ingest** documents → split into chunks  
2. **Embed** chunks → store in a vector DB  
3. **Retrieve** top-k similar chunks for a user query  
4. **Augment** the LLM prompt with retrieved chunks  
5. **Generate** a grounded response

> **Educational note:** This notebook is designed for teaching. It prioritizes clarity over production best-practices.


In [None]:
# Install required packages. Run here or on the terminal

!pip install -r requirements.txt


In [None]:
import os
import pdfplumber
import nltk
import chromadb
from tqdm import tqdm
from openai import OpenAI

nltk.download('punkt', quiet=True)
nltk.download('punkt_tab', quiet=True)

# --- GPUStack (OpenAI-compatible) client setup ---
api_key = open("api-key", "r").read().strip() 
print(api_key)
GPUSTACK_BASE_URL = os.getenv("GPUSTACK_BASE_URL", "https://gpustack.unibe.ch/v1")
GPUSTACK_API_KEY = os.getenv("GPUSTACK_API_KEY", api_key)

client = OpenAI(
    base_url=GPUSTACK_BASE_URL,
    api_key=GPUSTACK_API_KEY,
)


In [None]:
# Quick check if your api key if working for embedding

test_emb = client.embeddings.create(
    model="qwen3-embedding-0.6b",
    input=["Hello from GPUStack!"],
)

print("Embedding length:", len(test_emb.data[0].embedding))


In [None]:
# Quick check if your api key if working for LLM

test_chat = client.chat.completions.create(
    model="gpt-oss-120b",
    temperature=0.2,
    top_p=1,
    max_tokens=200,
    messages=[
        {"role": "user", "content": "In one sentence, explain what RAG is."}
    ],
)

print(test_chat.choices[0].message.content)


In [None]:
# Let's set some variables

# GPUStack models (OpenAI-compatible)
embedding_model_name = "qwen3-embedding-0.6b"
llm_model = "gpt-oss-120b"

vector_db = "chromaDB"  

collection_name = "dsl_embeddings_gpustack_demo_1"

raw_db = "/home/ahmad-unibe/dsl_data"  # root directory where raw documents are stored

data_language = "english"  # tokenizer language for sentence splitting

db_directory = os.path.join(os.path.expanduser('~'), '.db')  # where ChromaDB will be stored locally
chunk_size = 20  # number of sentences per chunk

prompt = """
You are a helpful, polite assistant that works at the Data Science Lab (DSL).

You will receive:
1) Retrieved context chunks (may be partial and messy).
2) A user question.

Use the context to answer as well as you can.
- If the context does not contain enough information, say you don't know.
- If appropriate, advise contacting DSL via info.dsl@unibe.ch or support.dsl@unibe.ch.

---
CONTEXT:
{data}

---
QUESTION:
{query}
"""


In [None]:
# Helper functions: read raw files (txt/pdf), split to sentences, chunk sentences

def get_file_paths(root_dir: str, file_extensions: list[str]) -> list[str]:
    """
    Retrieves a list of paths to all files with specified extensions in the given root directory and its subdirectories.

    Args:
        root_dir (str): The root directory to search for files.
        file_extensions (list[str]): A list of file extensions to retrieve. For example, ["txt", "pdf"]

    Returns:
        List[str]: A list of file paths to all matching files found within the root directory and its subdirectories.
    """
    file_paths = []
    
    for dirpath, _, filenames in os.walk(root_dir):
        for filename in filenames:
            if any(filename.endswith(f".{ext}") for ext in file_extensions):
                file_paths.append(os.path.join(dirpath, filename))
    
    return file_paths



def read_text_file(file_path: str) -> str:
    """
    Reads the content of a text file and returns it as a single string.

    Args:
        file_path (str): The path to the .txt file to read.

    Returns:
        str: The content of the file as a single string.
    """
    with open(file_path, 'r', encoding='utf-8') as file:
        content = file.read()
    
    return content


def read_pdf_file(file_path: str) -> str:
    """
    Reads the content of a PDF file and returns it as a single string.
    
    Args:
        file_path (str): The path to the PDF file to read.
    
    Returns:
        str: The content of the PDF as a single string.
    """
    text_content = []
    
    with pdfplumber.open(file_path) as pdf:
        for page in pdf.pages:
            # Extract text from each page
            page_text = page.extract_text()
            if page_text:  # Ensure the page has text
                text_content.append(page_text)
    
    # Join all pages' text into a single string
    return "\n".join(text_content)


def split_text_into_sentences(text: str, language: str) -> list[str]:
    """
    Splits the given text into a list of sentences using NLTK's sentence tokenizer.

    Args:
        text (str): The input text to split into sentences.
        language (str): The language of the text for the sentence tokenizer

    Returns:
        list[str]: A list of sentences.
    """
    sentences = nltk.sent_tokenize(text, language=language)
    return sentences


def chunk_sentences(sentences: list[str], chunk_size: int) -> list[str]:
    """
    Groups a list of sentences into chunks, each containing up to `chunk_size` sentences.

    Args:
        sentences (list[str]): A list of sentences.
        chunk_size (int): The number of sentences per chunk.

    Returns:
        list[str]: A list of text chunks, each containing up to `chunk_size` sentences.
    """
    chunks = []
    for i in range(0, len(sentences), chunk_size):
        chunk = " ".join(sentences[i:i + chunk_size])
        chunks.append(chunk)
    return chunks

In [None]:
# --- Embedding and storing documents in ChromaDB (GPUStack embeddings) ---

# Initialize ChromaDB client

chroma_client = chromadb.PersistentClient(
    path=db_directory)

print("\n--- Embedding and Storing Documents in ChromaDB ---")
print(f"Embedding Model (GPUStack): {embedding_model_name}")
print(f"Chunk Size (sentences per chunk): {chunk_size}")
print(f"Raw Data Directory: {raw_db}")
print(f"Vector Database Directory: {db_directory}\n")
print(f"Vector Database is: {vector_db}\n")

# Step 1: Load documents (txt and pdf)
file_paths = get_file_paths(raw_db, ["txt", "pdf"])
print(f"Found {len(file_paths)} files to process.\n")

# Create or retrieve the collection in ChromaDB
collection = chroma_client.get_or_create_collection(collection_name)

def embed_texts(texts: list[str], model: str, batch_size: int = 32) -> list[list[float]]:
    """Embed a list of texts using GPUStack embeddings API, in batches."""
    all_embeddings: list[list[float]] = []
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i+batch_size]
        resp = client.embeddings.create(model=model, input=batch)
        all_embeddings.extend([d.embedding for d in resp.data])
    return all_embeddings

for file_path in tqdm(file_paths, desc="Processing documents"):
    # Step 2: Read content based on file type
    if file_path.endswith('.txt'):
        text = read_text_file(file_path)
    elif file_path.endswith('.pdf'):
        text = read_pdf_file(file_path)
    else:
        print(f"Unsupported file type: {file_path}")
        continue

    # Step 3: Split text into sentences
    sentences = split_text_into_sentences(text, data_language)

    # Step 4: Chunk sentences into groups
    chunks = chunk_sentences(sentences, chunk_size)

    # Step 5: Embed each chunk (batch for efficiency)
    embeddings = embed_texts(chunks, model=embedding_model_name, batch_size=32)

    # Use file name as the document ID and create metadata with chunk index
    file_name = os.path.basename(file_path)
    ids = []
    metadatas = []
    for i, _ in enumerate(chunks):
        chunk_id = f"{file_name}_chunk_{i}"
        ids.append(chunk_id)
        metadatas.append({"file_name": file_name, "chunk_id": chunk_id, "chunk_index": i})

    # Step 6: Add to ChromaDB
    collection.add(
        ids=ids,
        documents=chunks,
        embeddings=embeddings,
        metadatas=metadatas,
    )

print("Done! Documents embedded and stored in ChromaDB.")


In [None]:
class ChromaRetriever:
    """Retrieve documents from a ChromaDB collection using GPUStack embeddings."""

    def __init__(self, embedding_model: str, db_path: str, db_collection: str, n_results: int) -> None:
        self.embedding_model = embedding_model
        self.db_path = db_path
        self.db_collection = db_collection
        self.n_results = n_results

        self.client = chromadb.PersistentClient(path=self.db_path)
        self.collection = self.client.get_collection(name=self.db_collection)

    def _embed(self, text: str) -> list[float]:
        resp = client.embeddings.create(model=self.embedding_model, input=[text])
        return resp.data[0].embedding

    def retrieve(self, query: str):
        """Embeds the query and retrieves relevant documents from the collection."""
        try:
            embedded_query = self._embed(query)
            results = self.collection.query(
                query_embeddings=[embedded_query],
                n_results=self.n_results
            )
            return results
        except Exception as e:
            print(f"An error occurred during retrieval: {e}")
            return None

    def format_results_for_prompt(self, results) -> str:
        """Format retrieved chunks into a readable context string for the LLM prompt."""
        if not results or not results.get("documents"):
            return "No relevant data found."

        formatted_data = ""
        for idx, (doc, metadata) in enumerate(zip(results['documents'][0], results['metadatas'][0])):
            chunk_id = metadata.get('chunk_id', 'N/A')
            file_name = metadata.get('file_name', 'N/A')
            formatted_data += f"Document {idx + 1}:\n"
            formatted_data += f"Chunk ID: {chunk_id}\n"
            formatted_data += f"File Name: {file_name}\n"
            formatted_data += f"Content:\n{doc}\n"
            formatted_data += "-" * 60 + "\n"
        return formatted_data


In [None]:
class Responder:
    """Generate responses using a GPUStack-hosted LLM (OpenAI-compatible chat API)."""

    def __init__(self, data: str, model: str, prompt_template: str, query: str) -> None:
        self.data = data
        self.model = model
        self.prompt_template = prompt_template
        self.query = query
        self.prompt = prompt_template.format(data=self.data, query=self.query)

    def generate_response(self) -> str:
        """One-shot response generation."""
        try:
            response = client.chat.completions.create(
                model=self.model,
                temperature=0.3,
                top_p=1,
                max_tokens=800,
                messages=[
                    {"role": "user", "content": self.prompt}
                ],
            )
            return response.choices[0].message.content
        except Exception as e:
            raise RuntimeError(f"An error occurred during response generation: {e}")

    def stream_response(self):
        """Stream the response token-by-token (if supported by the server)."""
        try:
            stream = client.chat.completions.create(
                model=self.model,
                temperature=0.3,
                top_p=1,
                max_tokens=800,
                messages=[{"role": "user", "content": self.prompt}],
                stream=True,
            )
            for event in stream:
                delta = event.choices[0].delta
                if delta and getattr(delta, "content", None):
                    print(delta.content, end="", flush=True)
            print("\n")
        except TypeError:
            # Some OpenAI-compatible servers do not support streaming.
            print(self.generate_response())
        except Exception as e:
            raise RuntimeError(f"An error occurred during streaming: {e}")


In [None]:
#let's try the retriever by itself first

results_numbers = 5

retriever = ChromaRetriever(embedding_model=embedding_model_name, 
                                db_path=db_directory, 
                                db_collection=collection_name, 
                                n_results=results_numbers)

while True:
    query = str(input("Type a query to search the DB. Type 'quit' to exit:  "))

    if query.lower() == 'quit':
        break
    else:
        results = retriever.retrieve(query)


            # Print out the results
        print("\n--- Query Results ---\n")
        for idx, (doc, metadata, distance) in enumerate(zip(results['documents'][0], results['metadatas'][0], results['distances'][0])):
            print(f"Result {idx + 1}:")
            print(f"Document ID: {metadata.get('chunk_id', 'N/A')}")
            print(f"File Name: {metadata.get('file_name', 'N/A')}")
            print(f"Distance: {distance}")
            print(f"Content:\n{doc}\n")
            print("-" * 80)


In [None]:
#Let's try the whole
while True:
    retriever = ChromaRetriever(embedding_model=embedding_model_name, 
                                db_path=db_directory, 
                                db_collection=collection_name, 
                                n_results=5)
        
    user_query = str(input("Ask a question. Type quit to exit:  "))
    if user_query.lower() == "quit":
        break
    else:
        print("Looking the DB for relevant information .......")
        # get the data for the RAG and put it in str format
        search_results = retriever.retrieve(user_query)
        formated_result = retriever.format_results_for_prompt(search_results)

        responder = Responder(data=formated_result, model=llm_model, prompt_template=prompt, query=user_query)
        responder.stream_response()