Project Structure
- Setup Chunking and processing steps for inputs (PDFs, docx, txt)
- Setup PGVector Store (VS) + postgresql => Done
- Using LLMs (OpenAI, Gemini,...) to query vectors from VS

# Create PostgreDB 

In [24]:
import sys
import os
from dotenv import load_dotenv

# Load environment variables
load_dotenv('/Users/longnguyen/Documents/coding/fastapi/rag_with_llama/deployment/.env')

True

In [4]:
import psycopg2

db_name = os.environ['POSTGRES_DB']
host = "localhost"
password = os.environ['POSTGRES_PASSWORD']
port = "5432"
user = os.environ['POSTGRES_USER']
# conn = psycopg2.connect(connection_string)
conn = psycopg2.connect(
    dbname=db_name,
    host=host,
    password=password,
    port=port,
    user=user,
)
conn.autocommit = True

# with conn.cursor() as c:
#     c.execute(f"DROP DATABASE IF EXISTS {db_name}")
#     c.execute(f"CREATE DATABASE {db_name}")

In [5]:
print(conn.status)
print(conn.get_dsn_parameters())
with conn.cursor() as cursor:
    cursor.execute("SELECT 1")
    print("Connection is active")

1
{'user': 'admin', 'channel_binding': 'prefer', 'dbname': 'rag_db', 'host': 'localhost', 'port': '5432', 'options': '', 'sslmode': 'prefer', 'sslcompression': '0', 'sslcertmode': 'allow', 'sslsni': '1', 'ssl_min_protocol_version': 'TLSv1.2', 'gssencmode': 'prefer', 'krbsrvname': 'postgres', 'gssdelegation': '0', 'target_session_attrs': 'any', 'load_balance_hosts': 'disable'}
Connection is active


# Checking file input

In [1]:
from pydantic import BaseModel, Field, field_validator
from pydantic_ai import Agent, RunContext
from uuid import UUID
from chonkie import TokenChunker, SemanticChunker

In [2]:
import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"

from pathlib import Path
import PyPDF2
from chonkie import SemanticChunker
from sentence_transformers import SentenceTransformer


def extract_text_from_pdf(pdf_path):
    """
    Extract text from PDF file using PyPDF2.

    Args:
        pdf_path (str): Path to the PDF file

    Returns:
        str: Extracted text from all pages
    """
    text = ""
    with open(pdf_path, 'rb') as file:
        pdf_reader = PyPDF2.PdfReader(file)

        for page_num in range(len(pdf_reader.pages)):
            page = pdf_reader.pages[page_num]
            text += page.extract_text() + "\n"

    return text


def chunk_with_semantic_chunker(text, chunk_size=512, similarity_threshold=0.7, embedding_model=None):
    """
    Chunk text using Chonkie's SemanticChunker with custom embedding model.

    Args:
        text (str): Text to chunk
        chunk_size (int): Maximum tokens per chunk
        similarity_threshold (float): Similarity threshold for semantic chunking
        embedding_model: Custom embedding model (SentenceTransformer or similar)

    Returns:
        list: List of chunks
    """
    if embedding_model:
        chunker = SemanticChunker(
            chunk_size=chunk_size,
            similarity_threshold=similarity_threshold,
            embedding_model=embedding_model
        )
    else:
        # Use default embedding model
        chunker = SemanticChunker(
            chunk_size=chunk_size,
            similarity_threshold=similarity_threshold
        )

    chunks = chunker.chunk(text)
    return chunks


def save_chunks_to_file(chunks, output_path, chunker_type):
    """
    Save chunks to a text file.

    Args:
        chunks (list): List of chunks
        output_path (str): Output file path
        chunker_type (str): Type of chunker used
    """
    with open(output_path, 'w', encoding='utf-8') as f:
        f.write(f"Chunks created using {chunker_type}\n")
        f.write("=" * 50 + "\n\n")

        for i, chunk in enumerate(chunks, 1):
            f.write(f"Chunk {i}:\n")
            f.write("-" * 20 + "\n")
            f.write(f"{chunk.text}\n\n")
            f.write(f"Tokens: {chunk.token_count}\n")
            if hasattr(chunk, 'start_index'):
                f.write(f"Start Index: {chunk.start_index}\n")
            if hasattr(chunk, 'end_index'):
                f.write(f"End Index: {chunk.end_index}\n")
            f.write("\n" + "="*50 + "\n\n")

  from .autonotebook import tqdm as notebook_tqdm


In [10]:
print("\nStep 1: Loading documents")
# texts = extract_text_from_pdf(r'D:\Project\rag_with_llama\docs\llama2.pdf')
texts = extract_text_from_pdf(r'../docs/llama2.pdf')

# Create output directory
output_dir = "chunked_output"
os.makedirs(output_dir, exist_ok=True)

print("\nStep 2: Chunking with SemanticChunker")
semantic_chunks = chunk_with_semantic_chunker(
    texts,
    chunk_size=512,
    similarity_threshold=0.7
    )


Step 1: Loading documents

Step 2: Chunking with SemanticChunker


Falling back to loading default provider model.
Falling back to SentenceTransformerEmbeddings.


In [14]:
semantic_chunks[0]

Chunk(text='Llama 2 : Open Foundation and Fine-Tuned Chat Models
Hugo Touvron∗Louis Martin†Kevin Stone†
Peter Albert Amjad Almahairi Yasmine Babaei Nikolay Bashlykov Soumya Batra
', token_count=43, start_index=0, end_index=167)

In [15]:
from typing import List, Dict, Any, Optional

class EmbeddingGenerator:
    """Generate embeddings using SentenceTransformers."""

    def __init__(self, model_name: str = 'all-MiniLM-L6-v2'):
        """
        Initialize embedding generator.

        Args:
            model_name: SentenceTransformer model name
        """
        self.model_name = model_name
        self.model = SentenceTransformer(model_name)
        self.embedding_dim = self.model.get_sentence_embedding_dimension()

    def embed_text(self, text: str) -> List[float]:
        """
        Generate embedding for a single text.
        Args:
            text: Input text
        Returns:
            List of embedding values
        """
        embedding = self.model.encode(text)
        return embedding.tolist()

    def embed_batch(self, texts: List[str]) -> List[List[float]]:
        """
        Generate embeddings for multiple texts.
        Args:
            texts: List of input texts
        Returns:
            List of embedding lists
        """
        embeddings = self.model.encode(texts)
        return [emb.tolist() for emb in embeddings]

embedding_model = './embedded_model/all-MiniLM-L6-v2'
embedding_generator = EmbeddingGenerator(embedding_model)

In [16]:
embedded_txt = embedding_generator.embed_batch(semantic_chunks)

In [17]:
len(embedded_txt)

1332

In [1]:
import psycopg2
from pgvector.psycopg2 import register_vector  # Missing import

In [None]:
# -- First, turn off the pager to avoid the 'more' error
# \pset pager off

# -- Now run your queries (notice the semicolon at the end)
# SELECT version();

# SELECT 2+2 AS result;

# SELECT NOW();

# SELECT 'Hello PostgreSQL!' AS message;

In [None]:
# import sentence_transformers
# model = sentence_transformers.SentenceTransformer('all-MiniLM-L6-v2')
# model.save('./embedded_model/all-MiniLM-L6-v2')
# model_test = sentence_transformers.SentenceTransformer('./embedded_model/all-MiniLM-L6-v2')

# Test fulll flow

In [14]:
%reload_ext autoreload
%autoreload 2

In [15]:
import os
import uuid
from pathlib import Path
from typing import List, Dict, Any

from fastapi import FastAPI, File, UploadFile, HTTPException, Form
from fastapi.responses import HTMLResponse
from pydantic import BaseModel, Field

import sys
sys.path.append('../document_processing')  # Go up one level, then into folder

import sys
sys.path.append('../docs')  # Go up one level, then into folder

# Your existing components - unchanged
from embed_chunks_to_db_v2 import ChunkEmbeddingPipeline, EmbeddingGenerator, VectorStore
import google.generativeai as genai

In [4]:
db_params = {
    'host': os.getenv('DB_HOST', 'localhost'),
    'port': os.getenv('DB_PORT', '5432'),
    'dbname': os.getenv('DB_NAME', 'rag_db'),
    'user': os.getenv('DB_USER', 'admin'),
    'password': os.getenv('DB_PASSWORD', 'admin')
}

table_name = 'document_chunks'

# Don't initialize models at startup
pipeline = None
def get_pipeline():
    global pipeline
    if pipeline is None:
        pipeline = ChunkEmbeddingPipeline(
                    db_params=db_params,
                    embedding_model='./embedded_model/all-MiniLM-L6-v2',  # Your embedding model path
                    table_name='document_chunks'  # Your existing table name
                )
    return pipeline

pipeline = get_pipeline()
file = pipeline.extract_text_from_pdf(r'../docs/llama2.pdf')
chunks = pipeline.chunk_text(file)

In [5]:
def test_pgvector_connection():
    """Validate pgvector integration at the connection level"""
    vector_store = VectorStore(db_params, 'document_chunks')
    
    try:
        conn = vector_store._get_connection()
        with conn.cursor() as cur:
            # Test 1: Confirm pgvector types are registered
            cur.execute("SELECT '[1,2,3]'::vector;")
            result = cur.fetchone()[0]
            print(f"✓ Vector type working: {result}")
            
            # Test 2: Verify cosine distance operator
            cur.execute("SELECT '[1,0,0]'::vector <=> '[0,1,0]'::vector;")
            distance = cur.fetchone()[0]
            print(f"✓ Cosine distance: {distance:.3f}")
            
            # Test 3: Table schema validation
            cur.execute(f"""
                SELECT column_name, data_type 
                FROM information_schema.columns 
                WHERE table_name = '{vector_store.table_name}' 
                AND column_name = 'embedding'
            """)
            schema = cur.fetchone()
            print(f"✓ Embedding column: {schema}")
            
        conn.close()
        return True
        
    except Exception as e:
        print(f"✗ pgvector connection failed: {e}")
        return False
    
test_pgvector_connection()

✓ Vector type working: [1. 2. 3.]
✓ Cosine distance: 1.000
✓ Embedding column: ('embedding', 'USER-DEFINED')


True

In [6]:
from dataclasses import dataclass
from typing import List, Dict, Any, Optional


@dataclass
class Chunk:
    """Chunk data structure to match your existing interface."""
    id: str
    document_id: str
    text: str
    embedding: List[float]
    metadata: Optional[Dict] = None

In [7]:
# Generate embeddings in batch
print("Generating embeddings...")
embeddings = pipeline.embedding_generator.embed_batch([chunk.text for chunk in chunks])

Generating embeddings...


In [8]:
chunk_size = 512
similarity_threshold = 0.7
filename = 'llama2.pdf'
file_type = 'pdf'
file_size = len(file)
document_id = str(uuid.uuid4())
metadata = {'source': 'llama2.pdf'}

# Create Chunk objects using your interface
chunk_objects = []
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
    chunk_metadata = {
        'chunk_index': i,
        'token_count': chunk.token_count,
        'start_index': getattr(chunk, 'start_index', None),
        'end_index': getattr(chunk, 'end_index', None),
        'chunk_size': chunk_size,
        'similarity_threshold': similarity_threshold,
        'embedding_model': pipeline.embedding_generator.model_name,
        'embedding_dimension': len(embedding),
        'filename': filename,
        'file_type': file_type,
        'file_size': file_size
    }

    # Add any additional metadata
    if metadata:
        chunk_metadata.update(metadata)

    chunk_obj = Chunk(
        id=str(uuid.uuid4()),
        document_id=document_id,
        text=chunk.text,
        embedding=embedding,
        metadata=chunk_metadata
    )
    chunk_objects.append(chunk_obj)

In [9]:
# Use your add_chunks method with pgvector
print("Inserting chunks into database using pgvector...")
pipeline.vector_store.add_chunks(chunk_objects)

Inserting chunks into database using pgvector...


In [16]:
test_results = pipeline.search_documents('text', limit=1, threshold=0.1)
assert len(test_results) > 0, "Insert succeeded but search failed"
print(f"✓ Search operational: {len(test_results)} results")

✓ Search operational: 1 results


In [3]:
def fast_reset_vector_store(vector_store):
    """Two-step reset: TRUNCATE then DROP for optimal performance"""
    conn = vector_store._get_connection()
    
    try:
        with conn.cursor() as cur:
            # Step 1: Instant data removal (no WAL overhead)
            cur.execute(f"TRUNCATE TABLE {vector_store.table_name} CASCADE;")
            print("✓ Data truncated instantly")
            
            # Step 2: Clean schema removal
            cur.execute(f"DROP TABLE {vector_store.table_name} CASCADE;")
            print("✓ Table schema dropped")
            
        conn.commit()
        
    except Exception as e:
        conn.rollback()
        raise e
    finally:
        conn.close()

# Usage
fast_reset_vector_store(pipeline.vector_store)

✓ Data truncated instantly
✓ Table schema dropped


# Check inserting 

In [17]:
# Verify success
conn = pipeline.vector_store._get_connection()
with conn.cursor() as cur:
    cur.execute(f"SELECT COUNT(*) FROM {pipeline.vector_store.table_name}")
    count_after = cur.fetchone()[0]
    
    # Validate embedding integrity
    cur.execute(f"""
        SELECT *
        FROM {pipeline.vector_store.table_name} 
        ORDER BY created_at DESC LIMIT 2
    """)
    
    results = cur.fetchall()

In [18]:
results[0]

('8b22d921-57b9-4668-a91e-6b6d456975c5',
 'e5cf45cb-c5b6-486a-8bf8-62b08dae5f86',
 'Prajjwal Bhargava Shruti Bhosale Dan Bikel Lukas Blecher Cristian Canton Ferrer Moya Chen\n',
 array([ 7.12140696e-03,  1.58904791e-01, -4.11183350e-02, -1.60896853e-02,
        -1.19244307e-01,  1.66793168e-02,  7.66921416e-02,  2.09899573e-03,
        -7.99256749e-03, -3.11005227e-02,  6.69132546e-03, -1.19025588e-01,
         9.55456719e-02, -4.00688946e-02,  2.43876386e-03,  2.39557661e-02,
        -2.64740479e-03,  2.02800948e-02,  7.83016626e-03, -7.15690479e-02,
        -2.38059536e-02, -1.06638953e-01, -9.48437303e-03,  2.73222160e-02,
        -6.30895719e-02, -4.59780265e-03,  1.97287090e-02,  1.22995349e-03,
         2.02850197e-02, -3.98318842e-02, -3.29108760e-02,  1.30037338e-01,
         7.59432688e-02, -2.78772302e-02, -1.24752801e-02,  8.16548020e-02,
        -5.94116375e-02,  4.51856200e-03,  1.69892237e-02, -4.73481510e-03,
        -2.29553003e-02,  7.49921007e-03, -2.45867595e-02, -6.

In [26]:
# embedding_model = './embedded_model/all-MiniLM-L6-v2'
# embedding_generator = EmbeddingGenerator(embedding_model)

query = "What is Llama 2?"
results = pipeline.search_documents(
    query=query,
    limit=100,
    threshold=0.5
)

# Step 2: Build context for LLM
context = "\n\n".join([f"[Context {i+1}]: {r['text']}" for i, r in enumerate(results)])

# Step 3: Generate response with Gemini (if available)
gemini_key = os.getenv('GOOGLE_API_KEY')
genai.configure(api_key=gemini_key)
if gemini_key:
    model = genai.GenerativeModel('gemini-2.5-flash')
    prompt = f"""Answer based on this context:
            {context}
            Question: {query}
            Answer:"""
    
    response = model.generate_content(prompt)
    answer = response.text

E0000 00:00:1758712330.021180  127741 alts_credentials.cc:93] ALTS creds ignored. Not running on GCP and untrusted ALTS is not enabled.


In [27]:
answer

'Llama 2 is an updated version of Llama 1, trained on a new mix of publicly available data. It is a new technology and an LLM (Large Language Model) that was pretrained on 2 trillion tokens of data from publicly available sources. Llama 2 is a static model trained on an offline dataset between January 2023 and July 2023.\n\nLlama 2 is released openly to encourage responsible AI innovation and is intended for commercial and research use in English. Variants of the Llama 2 model are available with 7B, 13B, and 70B parameters. It carries potential risks with use, and before deployment, developers should perform safety testing and tuning.'