# Setup:

1. Ensure everything in `System/README.md` is done
2. Create a file `System/.env` that has API_KEY={Gemini_API_key}

Sources: This would be near impossible without them
- Article: https://codeawake.com/blog/postgresql-vector-database
- RAG from scratch: https://github.com/ruizguille/rag-from-scratch
- Personal project: https://github.com/Lingotech-Davis/NewsDashboard

## Outline
- Learning: This is just me going through tutorial code & combining different aspects from the above 3 sources
- Final System: Has all the documentation

# Learning

For now we will use a simple placeholder for the actual sources
- Note: When we run benchmarks we should have a set of sources anyway

In [5]:
import wikipedia

# Parameters
query = "Obama"
k = 5

# Get titles
titles = wikipedia.search(query, results=k)

# Load page
page = wikipedia.page(titles[0], auto_suggest=False)

# Useful attributes
print(page.url)
print(page.title)
print(page.content[:100] + "...")

https://en.wikipedia.org/wiki/Barack_Obama
Barack Obama
Barack Hussein Obama II (born August 4, 1961) is an American politician who served as the 44th presi...


Set up database stuff

In [6]:
# A class that defines a document as:
# - text: full raw text
# - metadata: a dictionary
# - chunks: a list of chunks from the raw text ['text'] and ['embedding']
class RAGDocument():
  def __init__(self, text: str, metadata: dict[str]):
    self.text = text
    self.metadata = metadata
    self.chunks = "Not chunked yet"
  def _to_chunks(self, max_tokens=512, overlap=25):
    tokens = self.text.split()
    self.chunks = []
    start_idx = 0
    while start_idx < len(tokens):
      end_idx = start_idx + max_tokens
      curr_chunk = tokens[start_idx:end_idx]
      self.chunks.append({"text": " ".join(curr_chunk),
                          "embedding": "Not embedded yet"})
      start_idx += max_tokens - overlap
  def _to_embeddings(self, embedding_model=None):
    for index, chunk in enumerate(self.chunks):
      self.chunks[index]["embedding"] = embedding_model.encode(chunk['text'])
  def chunkify(self, max_tokens=512, overlap=25, embedding_model=None):
    if overlap >= max_tokens:
      raise ValueError("Overlap must be smaller than max_tokens")
    if not embedding_model:
      raise ValueError("Embedding model not specified")
    self._to_chunks(max_tokens, overlap)
    self._to_embeddings(embedding_model=embedding_model)


# Testing it out
class testEmbeddingModel():
  def __init__(self):
    return
  def encode(self, text):
    return [len(text)] * 384
doc = RAGDocument("Hello, this is an example \n of a document", metadata={"Author": "Chengyi", "Date": "1/29/26"})
doc.chunkify(embedding_model=testEmbeddingModel())
print(doc.text)
print(doc.metadata)
print(doc.chunks)

Hello, this is an example 
 of a document
{'Author': 'Chengyi', 'Date': '1/29/26'}
[{'text': 'Hello, this is an example of a document', 'embedding': [39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39, 39

In [7]:
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import Text, select
from sqlalchemy.dialects.postgresql import JSONB
from pgvector.sqlalchemy import Vector
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
import numpy as np

# 1. Initialize Vector class
class Base(DeclarativeBase):
    pass

class VectorDocument(Base):
    __tablename__ = 'vectors'

    id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
    text: Mapped[str] = mapped_column(Text)
    vector = mapped_column(Vector(384))
    metadata_: Mapped[dict | None] = mapped_column('metadata', JSONB)

    def __repr__(self):
        return f'Vector(id={self.id}, text={self.text[:50]}..., metadata={self.metadata_})'

# 2. Prepare to create a database
username = 'username'
password = 'password'
DB_URL = f'postgresql+asyncpg://{username}:{password}@localhost:5432/edgerag_db'
engine = create_async_engine(DB_URL)

async def db_create():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

# 3. Create session
Session = async_sessionmaker(engine, expire_on_commit=False)

# 4. Initialize adding a document to database
async def add_document_to_vector_db(doc: RAGDocument):
    doc_chunks = doc.chunks
    commit_chunks = []
    for doc_chunk in doc_chunks:
        commit_chunks.append({
            'text': doc_chunk['text'],
            'vector': doc_chunk['embedding'],
            'metadata_': doc.metadata
        })
    async with Session() as db:
        for commit_chunk in commit_chunks:
            db.add(VectorDocument(**commit_chunk))
        await db.commit()

# 5. Initialize searching the database
async def vector_search(query_vector, top_k=3):
    async with Session() as db:
        query = (
            select(VectorDocument.text, VectorDocument.metadata_, VectorDocument.vector.cosine_distance(query_vector).label('distance'))
            .order_by('distance')
            .limit(top_k)
        )
        res = await db.execute(query)
        result = []
        for text, metadata, distance, in res:
            result.append({
                'text': text,
                'metadata': metadata,
                'score': 1 - distance
            })
        return result

# Testing it out
docs = [
    RAGDocument(
        text="Python is a high-level programming language known for its simplicity and readability. It's widely used in web development, data science, and artificial intelligence.",
        metadata={"topic": "programming", "language": "Python", "author": "Test User"}
    ),
    RAGDocument(
        text="Machine learning is a subset of artificial intelligence that focuses on building systems that learn from data. Popular frameworks include TensorFlow, PyTorch, and scikit-learn.",
        metadata={"topic": "AI/ML", "subtopic": "machine learning", "author": "Test User"}
    ),
    RAGDocument(
        text="PostgreSQL is a powerful open-source relational database system. It supports advanced features like JSONB data types, full-text search, and vector similarity search through extensions.",
        metadata={"topic": "databases", "db_type": "PostgreSQL", "author": "Test User"}
    ),
]
await db_create()
embedding_model = testEmbeddingModel()
for doc in docs:
    doc.chunkify(max_tokens=512, overlap=2, embedding_model=embedding_model)
    await add_document_to_vector_db(doc)
results = await vector_search([69]*384, top_k=3)
for result in results:
    print("Result:", result)

Result: {'text': "Python is a high-level programming language known for its simplicity and readability. It's widely used in web development, data science, and artificial intelligence.", 'metadata': {'topic': 'programming', 'author': 'Test User', 'language': 'Python'}, 'score': 1.0}
Result: {'text': 'Machine learning is a subset of artificial intelligence that focuses on building systems that learn from data. Popular frameworks include TensorFlow, PyTorch, and scikit-learn.', 'metadata': {'topic': 'AI/ML', 'author': 'Test User', 'subtopic': 'machine learning'}, 'score': 1.0}
Result: {'text': 'PostgreSQL is a powerful open-source relational database system. It supports advanced features like JSONB data types, full-text search, and vector similarity search through extensions.', 'metadata': {'topic': 'databases', 'author': 'Test User', 'db_type': 'PostgreSQL'}, 'score': 1.0}


In [8]:
# Development: Reset
async def reset_db():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
        await conn.run_sync(Base.metadata.create_all)

await reset_db()

In [None]:
# Keys
import os
import dotenv

dotenv.load_dotenv('.env')
API_KEY = os.getenv("API_KEY")

In [9]:
from google import genai
from sentence_transformers import SentenceTransformer

client = genai.Client(api_key=API_KEY)
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')

SYSTEM_PROMPT = """
You are an AI assistant that answers questions about documents in your knowledge base.
"""

RAG_PROMPT = """
Use the following pieces of context to answer the user question.
You must only use the facts from the context to answer.
If the answer cannot be found in the context, say that you don't have enough information to answer the question and provide any relevant facts found in the context.

Context:
{context}

User Question:
{question}
"""

async def answer_question_with_rag(question):
    query_vector = embedding_model.encode(question)
    top_chunks = await vector_search(query_vector, top_k=3)
    context = '\n\n---\n\n'.join([chunk['text'] for chunk in top_chunks]) + '\n\n---'
    user_message = RAG_PROMPT.format(context=context, question=question)
    messages = SYSTEM_PROMPT + " " + user_message
    response = client.models.generate_content(
        model="gemini-2.5-flash",
        contents = messages
    )
    return response

  from .autonotebook import tqdm as notebook_tqdm
Loading weights: 100%|██████████| 103/103 [00:01<00:00, 101.79it/s, Materializing param=pooler.dense.weight]                             
BertModel LOAD REPORT from: sentence-transformers/all-MiniLM-L6-v2
Key                     | Status     |  | 
------------------------+------------+--+-
embeddings.position_ids | UNEXPECTED |  | 

Notes:
- UNEXPECTED	:can be ignored when loading from different task/architecture; not ok if you expect identical arch.


In [21]:
question = "What are the main challenges in renewable energy adoption?"
response = await answer_question_with_rag(question)
print(response.text) # This is an empty response, just for demonstration purposes!

I don't have enough information to answer the question as the provided context is empty.


# Final System

In this example, here is our system:
- Retrieval: Wikipedia Articles
- Database: Postgres
- Embedding model: all-MiniLM-L6-v2
- Text generation model: gemini-2.5-flash

The parts should be pretty swappable.

In [1]:
# Keys
import os
import dotenv

dotenv.load_dotenv('.env')
API_KEY = os.getenv("API_KEY")

Define helper classes / items

In [3]:
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy import Text
from pgvector.sqlalchemy import Vector

class RAGDocument():

  """
  A class that initializes a document as:
  - text: full raw text
  - metadata: a dictionary
  """

  def __init__(self, text: str, metadata: dict[str]):

    """
    Parameters:
    _ (above)
    - chunks: a list of chunks from the raw text with keys ['text'] and ['embedding']
    """

    self.text = text
    self.metadata = metadata
    self.chunks = "Not chunked yet"
  def _to_chunks(self, max_tokens, overlap):

    """
    Helper: reates a list of chunks of size max_tokens + overlap
    """

    tokens = self.text.split()
    self.chunks = []
    start_idx = 0
    while start_idx < len(tokens):
      end_idx = start_idx + max_tokens
      curr_chunk = tokens[start_idx:end_idx]
      self.chunks.append({"text": " ".join(curr_chunk),
                          "embedding": "Not embedded yet"})
      start_idx += max_tokens - overlap
  def _to_embeddings(self, embedding_model):

    """
    Helper: Encodes the chunk using the embedding model
    """

    for index, chunk in enumerate(self.chunks):
      self.chunks[index]["embedding"] = embedding_model.encode(chunk['text'])
  def chunkify(self, max_tokens=512, overlap=25, embedding_model=None):

    """
    Parameters:
    - max_tokens: number of tokens per chunk
    - overlap: how much overlap to consider
    - embedding_model: a function that has the method encode(text) which returns a vector of size VECTOR_SIZE
    """

    if overlap >= max_tokens:
      raise ValueError("Overlap must be smaller than max_tokens")
    if not embedding_model:
      raise ValueError("Embedding model not specified")
    self._to_chunks(max_tokens, overlap)
    self._to_embeddings(embedding_model=embedding_model)


class Base(DeclarativeBase):

    """
    For defining VectorDocument
    """

    pass

class VectorDocument(Base):

    """
    For representing a document in the database
    """

    __tablename__ = 'vectors'
    id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
    text: Mapped[str] = mapped_column(Text)
    vector = mapped_column(Vector(384))
    metadata_: Mapped[dict | None] = mapped_column('metadata', JSONB)

    def __repr__(self):
        return f'Vector(id={self.id}, text={self.text[:50]}..., metadata={self.metadata_})'

Define database

In [4]:
from sqlalchemy import select
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker

class RAGDatabase():
  def __init__(self, username='username', password='password'):

    """
    Initializes a connection to the database
    """

    self.username = username
    self.password = username
    self.DB_URL = f'postgresql+asyncpg://{username}:{password}@localhost:5432/edgerag_db'
    self.engine = create_async_engine(self.DB_URL)
    self.Session = async_sessionmaker(self.engine, expire_on_commit=False)
    if self.Session:
      print("Successfully initialized connection")

  async def db_create(self):
      async with self.engine.begin() as conn:
          await conn.run_sync(Base.metadata.create_all)

  async def __reset_db(self):
    async with self.engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
        await conn.run_sync(Base.metadata.create_all)

  async def add_document_to_vector_db(self, doc: RAGDocument): # TODO: This function needs a safety guardrail to ensure duplicates don't get added to the database!

    """
    Adds a document to the database
    """

    doc_chunks = doc.chunks
    commit_chunks = []
    for doc_chunk in doc_chunks:
        commit_chunks.append({
            'text': doc_chunk['text'],
            'vector': doc_chunk['embedding'],
            'metadata_': doc.metadata
        })
    async with self.Session() as db:
        for commit_chunk in commit_chunks:
            db.add(VectorDocument(**commit_chunk))
        await db.commit()

  async def vector_search(self, query_vector, top_k=3):

    """
    Searches the database for documents that are cosine similar to the query vector
    """

    async with self.Session() as db:
        query = (
            select(VectorDocument.text, VectorDocument.metadata_, VectorDocument.vector.cosine_distance(query_vector).label('distance'))
            .order_by('distance')
            .limit(top_k)
        )
        res = await db.execute(query)
        result = []
        for text, metadata, distance, in res:
            result.append({
                'text': text,
                'metadata': metadata,
                'score': 1 - distance
            })
        return result

Define RAG system

In [8]:

class RAG():
  def __init__(self, api_searcher, text_model, embedding_model, keyword_model):

    """
    To make this system as modular as possible,
    I'm defining these parameters as functions:
    - api_searcher: a function that contains a method scrape(query, results=self.k) and returns a list of Document objects
    - text_model: a function that contains a method generate(content) and returns a response text string
    - embedding_model: a function that contains a method encode(text) and returns a vector of size VECTOR_SIZE
    - keyword_model: a function that contains a method extract(query) and returns a keyword text string
    """

    # Handle non-variables
    if not api_searcher or not text_model or not embedding_model or not keyword_model:
       print("Please check parameters!")

    # Initialize attributes
    self.k = 5
    self.api_searcher = api_searcher
    self.text_model = text_model
    self.embedding_model = embedding_model
    self.keyword_model = keyword_model

    # Initialize database
    self.database = RAGDatabase()

    # Initialize prompts
    self.system_prompt = """
    You are an AI assistant that answers questions about documents in your knowledge base.
    """

    self.rag_prompt = """
    Use the following pieces of context to answer the user question.
    You must only use the facts from the context to answer.
    If the answer cannot be found in the context, say that you don't have enough information to answer the question and provide any relevant facts found in the context.

    Context:
    {context}

    User Question:
    {question}
    """

  def _API_search(self, query):

    """
    Calls an API to scrape for relevant texts
    """

    return self.api_searcher.scrape(query, results=self.k)

  def _get_keyword(self, query):

    """
    Extracts a keywrod from a query
    """

    return self.keyword_model.extract(query)

  def _get_embedding(self, text):

     """
     Gets embedding from text
     """

     return self.embedding_model.encode(text)

  async def _generate_text(self, content):

     """
     Generates text
     """

     return await self.text_model.generate(content)

  async def ask(self, question, add=False, print_output=False):

    """
    Main function call: Asks the RAG system a question
    """

    if add:
      keyword = self._get_keyword(question)
      await self.add(keyword)

    query_vector = self._get_embedding(question)
    top_chunks = await self.database.vector_search(query_vector, top_k=self.k)
    context = '\n\n---\n\n'.join([chunk['text'] for chunk in top_chunks]) + '\n\n---'
    user_message = self.rag_prompt.format(context=context, question=question)
    messages = self.system_prompt + " " + user_message
    response = await self._generate_text(messages)

    if print_output:
      print("Response:", response)

    return response

  async def add(self, query):

    """
    Helper function: Search the API (not database) and add documents to the database
    """

    docs = self._API_search(query)
    for doc in docs:
      doc.chunkify(embedding_model=self.embedding_model)
      await self.database.add_document_to_vector_db(doc)
    print(len(docs), "(potentially unique?) documents added")
    return

Initialize wrappers & actual objects

In [13]:
from google import genai
from sentence_transformers import SentenceTransformer
import wikipedia
from keybert import KeyBERT

"""
We have to define the following:
- api_searcher
- text_model
- embedding_model
- keyword_model
"""

# Example: api_searcher through Python's wikipedia module
class wikipedia_searcher():
  def __init__(self, wk):
    self.wk = wk
    print("Initialized Wikipedia")
  def scrape(self, query, results):
    titles = self.wk.search(query, results=results)
    pages = [self.wk.page(title, auto_suggest=False) for title in titles]
    return [RAGDocument(text=page.content, metadata={'URL': page.url, 'title': page.title}) for page in pages]

api_searcher = wikipedia_searcher(wikipedia)

# Example: text_model through Gemini
class gemini_model():
  def __init__(self, api_key):
    self.client = genai.Client(api_key=api_key)
    print("Initialized Gemini")
  async def generate(self, content):
    if DEBUG:
      return "You are currently in debug mode so no Gemini API credits will be used"
    response = self.client.models.generate_content(
      model="gemini-2.5-flash",
      contents = content
    )
    return ' '.join(response.text.split())

text_model = gemini_model(api_key=API_KEY)

# Example: embedding_model through SentenceTransformer # NOTE: Already has the encode method, thus, no need to create a wrapper around it.
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')

# Example: keyword model through KeyBERT
class keybert_model():
  def __init__(self):
    self.keyword_model = KeyBERT('distilbert-base-nli-mean-tokens')
    print("Initialized KeyBERT")
  def extract(self, query):
    return self.keyword_model.extract_keywords(query)[0][0]

keyword_model = keybert_model()

Initialized Wikipedia
Initialized Gemini


Loading weights: 100%|██████████| 103/103 [00:00<00:00, 716.57it/s, Materializing param=pooler.dense.weight]                             
BertModel LOAD REPORT from: sentence-transformers/all-MiniLM-L6-v2
Key                     | Status     |  | 
------------------------+------------+--+-
embeddings.position_ids | UNEXPECTED |  | 

Notes:
- UNEXPECTED	:can be ignored when loading from different task/architecture; not ok if you expect identical arch.
Loading weights: 100%|██████████| 100/100 [00:00<00:00, 628.52it/s, Materializing param=transformer.layer.5.sa_layer_norm.weight]   


Initialized KeyBERT


Finally, our RAG system

In [14]:
rag = RAG(api_searcher=api_searcher,
          text_model=text_model,
          embedding_model=embedding_model,
          keyword_model=keyword_model)

DEBUG = False

await rag.database.db_create()

response = await rag.ask("Who is Barack Obama?", add=True, print_output=True)

Successfully initialized connection
5 (potentially unique?) documents added
Response: Barack Hussein Obama II (born August 4, 1961) is an American politician who served as the 44th president of the United States from 2009 to 2017. He is a member of the Democratic Party and was the first African American president. Before his presidency, Obama served as a U.S. senator representing Illinois from 2005 to 2008 and as an Illinois state senator from 1997 to 2004. He was born in Honolulu, Hawaii, and graduated from Columbia University in 1983 with a Bachelor of Arts degree in political science. He later enrolled in Harvard Law School in 1988, where he was the first black president of the Harvard Law Review. Obama worked as a community organizer in Chicago, became a civil rights attorney, and taught constitutional law at the University of Chicago Law School from 1992 to 2004. He was awarded the 2009 Nobel Peace Prize for efforts in international diplomacy. During his presidency, his administra