# PostgreSQL with pgvector for Embedding Storage

[**`pgvector`**](https://github.com/pgvector/pgvector) is an open-source PostgreSQL extension for storing and searching ML-generated embeddings. It supports **exact and approximate nearest neighbor (ANN) search**, seamlessly integrating with PostgreSQL's indexing and querying features.

### 🚀 Why Use PostgreSQL for Vector Storage?

1️⃣ **Unified Data Store** – Store both **vector embeddings and structured data** (metadata, reports, categories) in one place, reducing system complexity.  
2️⃣ **Enterprise-Grade Reliability** – Benefit from **ACID compliance, backups, replication, and security features** proven over 30+ years.  
3️⃣ **Rich SQL Querying** – Run **powerful joins, filters, and aggregations** across relational and vector data without additional databases.  
4️⃣ **Scalable & Performant** – Handle **large-scale embeddings** efficiently with PostgreSQL’s indexing (`IVFFLAT`, `HNSW`).  
5️⃣ **Open Source & Cost-Effective** – No vendor lock-in, strong community support, and the flexibility of customization.   

### 🛠️ Key Benefits for AI & Analytics Workloads
✅ **Fast similarity search** – Retrieve the most relevant documents, reports, or features using efficient ANN search.  
✅ **Feature store for ML** – Store embeddings alongside **metadata (ASINs, timestamps, categories, user preferences)** to enhance ML models.  
✅ **Low-latency retrieval** – Supports **real-time AI inference** via fast SQL queries, making it ideal for recommendation systems & search.  
✅ **Works with existing tools** – Seamlessly integrates with **Python, Pandas, SQL-based analytics, and BI tools**.  

📌 **Bottom line:** `pgvector` turns PostgreSQL into a **powerful, low-latency, AI-ready vector store** while keeping your architecture simple and scalable.

Author: https://www.github.com/deburky

In [None]:
import sys
import os

sys.path.append(os.path.abspath("../src"))  # Ensure src/ is in Python path

## Load an embedding model

In [None]:
from sentence_transformers import SentenceTransformer

# Define local path
local_model_path = "../models/all-MiniLM-L6-v2"

# Load and save model
model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
model.save(local_model_path)

print(f"✅ Model saved to {local_model_path}")

## Perform embedding generation

In [None]:
import pandas as pd
from pgvector_db.utils import time_it
from pgvector_db.generate_embeddings import EmbeddingGenerator

# Define local model path
local_model_path = "../models/all-MiniLM-L6-v2"

# Initialize generator with batch processing
generator = EmbeddingGenerator(
    model_path=local_model_path,
    model_type="sentence-transformers",
    batch_size=100,
    device="mps"
)

# Load dataset
dataset = pd.read_parquet("../data/dataset_with_embeddings.parquet")
dataset = dataset[~dataset["text"].isna()].copy()

# Generate embeddings in batches
text_samples = dataset["text"].astype(str).tolist()

@time_it
def generate_embeddings(text_samples):
    return generator.generate_embeddings(text_samples)

embeddings = generate_embeddings(text_samples)

# Save embeddings to Parquet
output_path = "../data/generated_embeddings.parquet"
generator.save_to_parquet(text_samples, embeddings, output_path)
print(f"✅ Embeddings saved to {output_path}")

## Real-time streaming

Here we mimic real-time streaming with Postgres. Both copy and insert give the same latency results.

1. For true real-time, low-latency writes (one record at a time) → Use `pg_insert`.
- Best suited for event-driven ingestion.
- Works well with Lambda, Flask API, Kafka events.
- Simple and requires no extra Parquet conversion.

2. For mini-batching (e.g., process events every few seconds/minutes) → Use `pg_copy`.
- More efficient for bulk inserts because PostgreSQL handles batch I/O better.
- Reduces transaction overhead by writing multiple rows at once.
- Ideal for batch streaming frameworks like Spark Streaming, Kinesis, or Kafka with batch intervals.

In [None]:
import os
import pandas as pd
import random
from pgvector_db.generate_embeddings import EmbeddingGenerator
from pgvector_db.utils import DBConfigLocal

password = os.environ.get("PG_PASSWORD")

# Define PostgreSQL DBConfig
db_config_realtime = DBConfigLocal(
    db_name="vector_db",
    db_user="py_pg_user",
    db_password=password,
    db_host="localhost",
    schema_name="public",
    table_name="realtime_documents"
)

# Initialize Embedding Generator
local_model_path = "../models/all-MiniLM-L6-v2"
generator = EmbeddingGenerator(
    model_path=local_model_path,
    model_type="sentence-transformers",
    batch_size=100,  # Simulating real-time, so 1 at a time
    device="mps"
)

# Load dataset & prepare for streaming simulation
path_to_data = "../data/generated_embeddings.parquet"
dataset = pd.read_parquet(path_to_data)
dataset = dataset[~dataset["text"].isna()].copy()
text_samples = dataset["text"].tolist()

# Simulate incoming data (random streaming)
streaming_batch = random.sample(text_samples, 100)  # Select 5 random texts

In [None]:
import time
import numpy as np
from pgvector_db.utils import time_it
from pgvector_db.pg_insert import pg_insert

# Process streaming text with INSERT
insert_latencies = []

@time_it
def process_streaming_batch(streaming_batch):
    """Processes a batch of streaming text data and inserts embeddings into PostgreSQL."""
    for text in streaming_batch:
        start_time = time.time()

        # Generate embeddings for this text
        embedding = generator.generate_embeddings([text])[0]  # Extract 1D array

        # Convert to list and wrap inside DataFrame
        df = pd.DataFrame({"text": [text], "embedding": [embedding.tolist()]})

        # Insert into PostgreSQL
        pg_insert(df, db_config_realtime, batch_size=1)

        latency = time.time() - start_time
        insert_latencies.append(latency)

# Run streaming
process_streaming_batch(streaming_batch)

print(f"✅ Avg Insert Latency: {np.mean(insert_latencies):.4f} sec per record")

In [None]:
import time
import numpy as np
from pgvector_db.utils import time_it
from pgvector_db.pg_copy import pg_copy

# Process streaming text with COPY
copy_latencies = []

@time_it
def process_streaming_batch(streaming_batch):
    """Processes a batch of streaming text data and copy embeddings into PostgreSQL."""
    for text in streaming_batch:
        start_time = time.time()

        # Generate embeddings for this text
        embedding = generator.generate_embeddings([text])[0]  # Extract 1D array

        # Convert to list and wrap inside DataFrame
        df = pd.DataFrame({"text": [text], "embedding": [embedding.tolist()]})

        # Copy into PostgreSQL
        pg_copy(df, db_config_realtime)

        latency = time.time() - start_time
        copy_latencies.append(latency)

# Run streaming
process_streaming_batch(streaming_batch)

print(f"✅ Avg Copy Latency: {np.mean(insert_latencies):.4f} sec per record")

## Load precomputed embeddings

In [None]:
import pandas as pd

path_to_data = "../data/generated_embeddings.parquet"
dataset = pd.read_parquet(path_to_data)

print(dataset.shape)

## Insert workflow

When you send an `INSERT` command to PostgreSQL, the server processes it in several steps:

1️⃣ **Query Parsing & Validation** – PostgreSQL analyzes the query structure, verifies the table and columns exist, and checks user permissions.  
2️⃣ **Data Type & Constraint Checks** – Ensures values match column types and adhere to constraints (e.g., `NOT NULL`, `UNIQUE`).  
3️⃣ **Row Insertion** – Writes the new row into shared buffers and updates internal data structures.  
4️⃣ **Index Updates** – If indexes exist, PostgreSQL updates them accordingly.  
5️⃣ **Transaction Handling** – The operation runs within a transaction, ensuring changes are either **committed** or **rolled back** on failure.  

In [None]:
import os
import numpy as np
from pgvector_db.pg_insert import pg_insert
from pgvector_db.utils import DBConfigLocal

# Retrieve password from the environment
password = os.environ.get('PG_PASSWORD')

# Convert DataFrame embeddings (numpy arrays) into lists for PostgreSQL
dataset["embedding"] = dataset["embeddings"].apply(lambda x: np.array(x).tolist())

# Use Local DBConfig for Local PostgreSQL
db_config_local = DBConfigLocal(
    db_name="vector_db",
    db_user="py_pg_user",
    db_password=password,
    db_host="localhost",
    schema_name="public",
    table_name="documents"
)

# Run batch insert for Local
pg_insert(dataset, db_config_local, batch_size=1000)

## Copy workflow

When you execute a `COPY` command in PostgreSQL, the server processes it in the following steps:

1️⃣ **Query Parsing & Validation** – PostgreSQL verifies the command structure, ensures the target table exists, and checks user permissions.  
2️⃣ **Data Formatting & Parsing** – Reads data from a file or standard input (`STDIN`), interpreting CSV, binary, or text formats.  
3️⃣ **Batch Data Insertion** – Loads multiple rows at once, bypassing individual `INSERT` overhead for faster ingestion.  
4️⃣ **Constraint & Type Checks** – Validates data against column types, constraints (`NOT NULL`, `CHECK`), and primary/foreign keys.  
5️⃣ **Index & WAL Handling** – Updates indexes if present and minimizes **WAL (Write-Ahead Logging)** overhead for performance.  
6️⃣ **Transaction Control** – If `COPY` is part of a transaction, changes can be **committed** or **rolled back** on failure.  

In [None]:
import os
import numpy as np
from pgvector_db.pg_copy import pg_copy
from pgvector_db.utils import DBConfigLocal

# Retrieve password from the environment
password = os.environ.get('PG_PASSWORD')

# Convert DataFrame embeddings (numpy arrays) into lists for PostgreSQL
dataset["embedding"] = dataset["embeddings"].apply(lambda x: np.array(x).tolist())

# Use Local DBConfig for Local PostgreSQL
db_config_local = DBConfigLocal(
    db_name="vector_db",
    db_user="py_pg_user",
    db_password=password,
    db_host="localhost",
    schema_name="public",
    table_name="documents"
)

# Run copy for Local
pg_copy(dataset, db_config_local)