# PostgreSQL Ingestion guard

A small rag_docs table that stores doc_id + content_hash

On startup:
1. compute current file hash
2. compare to stored hash
3. if unchanged skip ingestion
4. if changed re-ingest (optionally delete old rows first)

In [None]:
import hashlib
from sqlalchemy import create_engine, text
from llama_index.vector_stores.postgres import PGVectorStore
from llama_index.core import StorageContext, VectorStoreIndex

# ---------- helpers ----------
def file_hash(path: str) -> str:
    h = hashlib.sha256()
    with open(path, "rb") as f:
        for chunk in iter(lambda: f.read(1024 * 1024), b""):
            h.update(chunk)
    return h.hexdigest()

def ensure_doc_table(engine):
    with engine.begin() as conn:
        conn.execute(text("""
            CREATE TABLE IF NOT EXISTS rag_docs (
              doc_id TEXT PRIMARY KEY,
              content_hash TEXT NOT NULL,
              updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
            );
        """))

def get_stored_hash(engine, doc_id: str):
    with engine.begin() as conn:
        row = conn.execute(
            text("SELECT content_hash FROM rag_docs WHERE doc_id=:doc_id"),
            {"doc_id": doc_id},
        ).fetchone()
    return row[0] if row else None

def upsert_hash(engine, doc_id: str, content_hash: str):
    with engine.begin() as conn:
        conn.execute(text("""
            INSERT INTO rag_docs (doc_id, content_hash)
            VALUES (:doc_id, :content_hash)
            ON CONFLICT (doc_id)
            DO UPDATE SET content_hash = EXCLUDED.content_hash, updated_at = now();
        """), {"doc_id": doc_id, "content_hash": content_hash})

def table_has_rows(engine, table_name: str) -> bool:
    with engine.begin() as conn:
        # Safe enough if table_name is trusted. If not, validate against a whitelist.
        row = conn.execute(text(f"SELECT EXISTS (SELECT 1 FROM {table_name} LIMIT 1)")).fetchone()
    return bool(row[0])

# ---------- your config ----------
connection_string = f"postgresql://postgres:{postgres_pw}@localhost:5432/vector_db"
engine = create_engine(connection_string)

TABLE_NAME = "paul_graham_essay"
DOC_ID = "paul_graham_essay_source"  # any stable id you choose
SOURCE_PATH = "/path/to/your/source_file.pdf"  # or .txt/.md

# ---------- vector store ----------
vector_store = PGVectorStore.from_params(
    database="vector_db",
    host="localhost",
    password=postgres_pw,
    port=5432,
    user="postgres",
    table_name=TABLE_NAME,
    embed_dim=1536,
    hnsw_kwargs={
        "hnsw_m": 16,
        "hnsw_ef_construction": 64,
        "hnsw_ef_search": 40,
        "hnsw_dist_method": "vector_cosine_ops",
    },
)
storage_context = StorageContext.from_defaults(vector_store=vector_store)

# ---------- ingestion guard ----------
ensure_doc_table(engine)

current_hash = file_hash(SOURCE_PATH)
stored_hash = get_stored_hash(engine, DOC_ID)

already_ingested = (stored_hash == current_hash) and table_has_rows(engine, TABLE_NAME)

if already_ingested:
    # ‚úÖ Fast path: reuse what's already in Postgres (no re-splitting, no re-embedding)
    index = VectorStoreIndex.from_vector_store(vector_store=vector_store, storage_context=storage_context)
else:
    # üîÅ Slow path: ingest only if new/changed or table empty
    # Optional but recommended if you want "replace" semantics:
    # - delete existing rows from TABLE_NAME (and maybe doc metadata)
    # - then re-ingest
    #
    # If you DO want replace semantics, uncomment:
    # with engine.begin() as conn:
    #     conn.execute(text(f"TRUNCATE TABLE {TABLE_NAME};"))

    index = VectorStoreIndex.from_documents(
        documents, storage_context=storage_context, show_progress=True
    )
    upsert_hash(engine, DOC_ID, current_hash)

query_engine = index.as_query_engine()