# Designing a System to Read Any Table and Store Relevant Information

#### 1. Schema Analysis

* Detect column types: Use heuristics or data profiling to identify which columns are IDs, names, descriptions, dates, or codes.
* Exclude: Columns that match patterns like *_id, id, or are unique identifiers.
* Include: Columns with textual, categorical, or descriptive content.

#### 2. Automated Column Selection

* Use rules or machine learning to classify columns:
  * Textual/Descriptive: Include in embedding.
  * Identifiers/Technical: Store as metadata.
  * Dates/Numeric: Usually metadata, unless semantically relevant.

#### 3. Dynamic Embedding Pipeline
* For each table:
  * Concatenate selected columns into a single string for embedding.
  * Store all columns as metadata for retrieval and filtering.

#### 4. Sample Workflow
* Read table schema and sample data.
* Classify columns (textual, identifier, numeric, etc.).
* Prepare embedding input by concatenating relevant columns.
* Store all original columns as metadata for each row.
* Embed and store in vector database.

`prompt = The data being fetched will be from a postgresql database. The data consists of several tables with varying schema. Here the user doesn't get to decide to specify an embedding model. Based on the practical implementation example: give me an example code of how the data from a postgresql database will be read, how it will generate dynamic embeddings, how is the storage strategy going to look like, and how will an answer to an asked query could be found.`

#### Explanation of Key Components
1. Reading Data from PostgreSQL:

* The fetch_table_data method retrieves all rows from a specified table.
* The get_table_schema and classify_columns methods dynamically analyze the schema to identify textual columns for embedding, excluding IDs and non-semantic fields.

1. Dynamic Embedding Generation:

* A fixed embedding model (all-MiniLM-L6-v2) is used to ensure consistency since users cannot specify a model.
* Textual columns are concatenated into a single string per row (combined_text) for embedding, ensuring adaptability to varying schemas.

3. Storage Strategy:

* Embeddings are stored in separate tables (e.g., customers_embeddings) for each source table, linked by original_id to the original data.
* Metadata (non-embedded fields like IDs, numeric data) is stored as JSONB for flexible retrieval.
* An IVFFlat index is created on the embedding column for efficient similarity searches, as recommended in the search results .

4. Answering Queries:

* The similarity_search method embeds the user query using the same model and performs a cosine similarity search using pgvectorâ€™s <=> operator, converting distance to similarity (1 - distance) as shown in search results .
* Results include the original ID, content, metadata, and similarity score, allowing for detailed responses or further filtering.

##### Key Features and Adaptations
* Schema Adaptability: The system automatically detects and processes relevant columns, making it suitable for tables with varying schemas.
* Performance Optimization: Separate embedding tables and indexing (IVFFlat) ensure efficient storage and retrieval, as highlighted in search results .
* Scalability: Batch processing and connection management are implemented to handle large datasets, with potential for further optimization using batch inserts or COPY as suggested in search result 

In [15]:
import psycopg2
import psycopg2.extras
import pandas as pd
from sentence_transformers import SentenceTransformer
import numpy as np
from typing import List, Dict, Any
import math

EMBEDDING_MODE_NAME = 'all-MiniLM-L6-v2'
EMBEDDING_LENGTH = 384
model = SentenceTransformer(EMBEDDING_MODE_NAME)

class VectorEmbeddingSystem:
  def __init__(self, db_params: Dict[str, Any]):
    self.conn = psycopg2.connect(**db_params)
    self.conn.autocommit = True
    self.cur = self.conn.cursor()
    self.setup_vector_extension()
    
  def setup_vector_extension(self):
    self.cur.execute("CREATE EXTENSION IF NOT EXISTS vector;")
    print("Vector extension enabled")
    
  def get_table_schema(self, table_name: str) -> List[Dict[str, Any]]:
    """Fetch schema of a given table to identify columns for embedding."""
    self.cur.execute("""
        SELECT column_name, data_type
        FROM information_schema.columns
        WHERE table_name = %s;
    """, (table_name,))
    return [{"name": row[0], "type": row[1]} for row in self.cur.fetchall()]
  
  def classify_columns(self, schema: List[Dict[str, Any]]) -> List[str]:
    """Classify columns to embed (textual/descriptive) vs. metadata (IDs, numeric)."""
    text_columns = []
    for col in schema:
      col_name = col["name"].lower()
      col_type = col["type"].lower()
      if "char" in col_type or "text" in col_type:
        if "id" not in col_name:
          text_columns.append(col["name"])
    return text_columns
  
  def fetch_table_data(self, table_name: str) -> pd.DataFrame:
    """Fetch all data from a given table."""
    self.cur.execute(f"SELECT * FROM {table_name};")
    columns = [desc[0] for desc in self.cur.description]
    data = self.cur.fetchall()
    return pd.DataFrame(data, columns=columns)
  
  def create_embedding_table(self, table_name: str, embedding_length: int):
    """Create a table to store embeddings for a specific source table."""
    embedding_table_name = f"{table_name}_embeddings"
    self.cur.execute(f"""
          CREATE TABLE IF NOT EXISTS {embedding_table_name} (
            id SERIAL PRIMARY KEY,
            original_id TEXT,
            content TEXT,
            embedding vector({embedding_length}),
            metadata JSONB
          );
    """)
    self.cur.execute(f"""
          CREATE INDEX IF NOT EXISTS idx_{embedding_table_name}_embedding
          ON {embedding_table_name}
          USING ivfflat (embedding vector_cosine_ops)
          WITH (lists = 100);
    """)
    print(f"EMBEDDING table {embedding_table_name} created with index.")
    
  def generate_and_store_embeddings(self, table_name: str):
    """Generate embeddings for relevant columns and store them."""
    schema = self.get_table_schema(table_name)
    text_columns = self.classify_columns(schema)
    if not text_columns:
      print(f"No textual columns to embed in table {table_name}.")
      return
    
    # Prepare data for embedding
    data_df = self.fetch_table_data(table_name)
    embedding_table_name = f"{table_name}_embeddings"
    self.create_embedding_table(table_name, EMBEDDING_LENGTH)
    
    data_df['combined_text'] = data_df[text_columns].astype(str).agg(''.join, axis=1)
    embeddings = model.encode(data_df['combined_text'].tolist())
    
    # Identify primary key or first column as original_id
    original_id_col = data_df.columns[0]
    
    # Store embeddings and metadata
    for idx, row in data_df.iterrows():
      metadata = row.drop(['combined_text'] + text_columns).to_dict()
      
      # Convert NaN to None and all non-JSON-serializable types to string
      for k, v in metadata.items():
        if pd.isna(v):
          metadata[k] = None
        elif type(v).__name__ not in ['int', 'float', 'bool', 'NoneType', 'dict', 'list']:
          metadata[k] = str(v)
          
      embedding = embeddings[idx].tolist()
      self.cur.execute(f"""
            INSERT INTO {embedding_table_name} (original_id, content, embedding, metadata) VALUES (%s, %s, %s, %s);
      """, (str(row[original_id_col]), row['combined_text'], embedding, psycopg2.extras.Json(metadata)))
      print(f"Embeddings stored for table {table_name} in {embedding_table_name}.")
      
  def similarity_search(self, query: str, table_name: str, k: int = 5) -> List[Dict[str, Any]]:
    """Perform similarity search on embeddings for a given table."""
    query_embedding = model.encode([query])[0].tolist()
    embedding_table_name = f"{table_name}_embeddings"
    
    embedding_str = "[" + ",".join(str(x) for x in query_embedding) + "]"
    self.cur.execute(f"""
            SELECT original_id, content, embedding, metadata, 1 - (embedding <=> %s::vector) AS cosine_similarity
            FROM {embedding_table_name}
            ORDER BY cosine_similarity DESC
            LIMIT %s;
    """, (embedding_str, k))
    
    results = []
    for row in self.cur.fetchall():
      results.append({
        "original_id": row[0],
        "content": row[1],
        "metadata": row[2],
        "similarity": row[3]
      })
    return results
  
  def close_connection(self):
    """Close database connection"""
    self.cur.close()
    self.conn.close()
    print("Database connection closed")
    

In [18]:
DB_PARAMS = {
  "database": "postgres",
  "user": "postgres",
  "password": "#tigi0907",
  "host": "localhost",
  "port": 5432
}

def main():
  vector_system = VectorEmbeddingSystem(DB_PARAMS)
  
  tables = ["customers", "employees", "products"]
  
  for table in tables:
    vector_system.generate_and_store_embeddings(table)
    
  query = "Find customers from Germany"
  table_name = "customers"
  results = vector_system.similarity_search(query, table_name)
  
  print(f"Query: {query}")
  print("Top matching results:")
  for result in results:
    similarity = result['similarity']
    try:
      similarity_str = f"{float(similarity):.4f}"
    except Exception:
      similarity_str = str(similarity)
    print(f"ID: {result['original_id']}, Content: {result['content'][:100]}..., Similarity: {similarity_str}, Metadata: {result['metadata']}")
    
  vector_system.close_connection()
    
if __name__ == '__main__':
  main()


Vector extension enabled
EMBEDDING table customers_embeddings created with index.
Embeddings stored for table customers in customers_embeddings.
Embeddings stored for table customers in customers_embeddings.
Embeddings stored for table customers in customers_embeddings.
Embeddings stored for table customers in customers_embeddings.
Embeddings stored for table customers in customers_embeddings.
Embeddings stored for table customers in customers_embeddings.
Embeddings stored for table customers in customers_embeddings.
Embeddings stored for table customers in customers_embeddings.
Embeddings stored for table customers in customers_embeddings.
Embeddings stored for table customers in customers_embeddings.
Embeddings stored for table customers in customers_embeddings.
Embeddings stored for table customers in customers_embeddings.
Embeddings stored for table customers in customers_embeddings.
Embeddings stored for table customers in customers_embeddings.
Embeddings stored for table customer