# Download dataset

In [3]:
# Dataset: https://huggingface.co/datasets/Qdrant/dbpedia-entities-openai3-text-embedding-3-small-512-100K
urls = ["https://huggingface.co/datasets/Qdrant/dbpedia-entities-openai3-text-embedding-3-small-512-100K/raw/main/data/train-00000-of-00001.parquet"]

In [4]:
# Download dataset
import os
import requests

def download_file(url, folder):
    filename = url.split('/')[-1]
    filepath = os.path.join(folder, filename)
    if os.path.exists(filepath):
        print(f"Skipping {filename}")
        return
    response = requests.get(url, stream=True)
    if response.status_code == 200:
        # Open the file and write the content
        with open(filepath, 'wb') as f:
            for chunk in response.iter_content(chunk_size=128):
                f.write(chunk)
        print(f"Downloaded {filename}")
    else:
        print(f"Failed to download {filename}")

folder_name = "dataset"
if not os.path.exists(folder_name):
    os.makedirs(folder_name)
for url in urls:
    download_file(url, folder_name)

Downloaded train-00000-of-00001.parquet


In [5]:
filenames = list(map(lambda url: "dataset/" + url.split('/')[-1], urls))[:1]
filenames

['dataset/train-00000-of-00001.parquet']

# Connect to Postgres

In [None]:
import io
import psycopg2
pg_engine = psycopg2.connect('postgres://postgres:SecurePassword123!@localhost:5432/postgres')
pg_engine.autocommit = True
pg_session = pg_engine.cursor()
binary_f = io.BytesIO(b"")

# Load dataset to Postgres

In [5]:
# Load dataset to Postgres
import pyarrow.dataset as ds
from pgpq import ArrowToPostgresBinaryEncoder

def load_parquets(parquet_files, table_name):
    print(f"loading {len(parquet_files)} files")
    print(f"loading files: {parquet_files}")
    dataset = ds.dataset(parquet_files)

    encoder = ArrowToPostgresBinaryEncoder(dataset.schema)

    pg_schema = encoder.schema()

    tmp_table_name = "_tmp_parquet_data"
    pg_schema_columns = [(col_name.replace('-', '_'), col) for col_name, col in pg_schema.columns]
    typed_cols = [f'"{col_name}" {col.data_type.ddl()}' for col_name, col in pg_schema_columns]
    cols = [col_name for col_name, _ in pg_schema_columns]
    cols_joined = ','.join(cols)
    typed_cols_joined = ','.join(typed_cols)
    print(f"Columns: {cols_joined}")

    ddl = f"CREATE UNLOGGED TABLE {tmp_table_name} ({typed_cols_joined})"

    pg_session.execute(f"DROP TABLE IF EXISTS {tmp_table_name}")
    pg_session.execute(ddl)
    print(f"pg schema {pg_schema}")
    print(f"Assuming underlying postgres table was created with columns: {typed_cols} via a statement equivalent (or columnwise-type-castable) to'{ddl}'")

    binary_f.truncate(0)
    binary_f.seek(0)
    copy = binary_f
    copy.write(encoder.write_header())
    batches = dataset.to_batches()
    count = 0
    for i, batch in enumerate(batches):
        print(f"batch: {i} batch len: {len(batch)}")
        b = encoder.write_batch(batch)
        copy.write(b)
        count += len(batch)

    copy.write(encoder.finish())
    binary_f.seek(0)

    print(f"Copying dataset into postgres...")
    pg_session.execute(f'CREATE TABLE IF NOT EXISTS {table_name}({typed_cols_joined})')
    pg_session.copy_expert(f'COPY "{tmp_table_name}" ({cols_joined}) FROM STDIN WITH (FORMAT BINARY)', binary_f)
    pg_session.execute(f'INSERT INTO "{table_name}" SELECT * FROM "{tmp_table_name}"')
    pg_session.execute(f'DROP TABLE "{tmp_table_name}"')
    pg_session.execute(f'VACUUM FULL "{table_name}"')
    pg_session.execute(f'ALTER TABLE {table_name} ALTER COLUMN text_embedding_3_small_512_embedding TYPE real[] USING text_embedding_3_small_512_embedding::real[]')
    return count

In [6]:
load_parquets(filenames, 'openai')

NameError: name 'filenames' is not defined

# Sanity check

In [19]:
# Sample from dataset
query = '''
    SELECT
        _id,
        title,
        text
    FROM
        openai
    ORDER BY
        RANDOM()
    LIMIT 1
'''
pg_session.execute(query)
pg_session.fetchone()

('<dbpedia:Giovanni_di_Giovanni>',
 'Giovanni di Giovanni',
 'Giovanni di Giovanni (c. 1350 – May 7, 1365?) is one of the youngest victims of the  campaign against sodomy waged in Florence since the Middle Ages.He was convicted by the Podestà court of being the passive partner of a number of different men. He was labeled "a public and notorious passive sodomite." His punishment was to be paraded on the back of an ass, then to be publicly castrated.')

# Generate ground truth dataset

In [8]:
query = """
    CREATE TABLE openai_query (id TEXT, vector REAL[], nn_ids TEXT[]);
    CREATE TABLE openai_reference (id TEXT, vector REAL[]);
    INSERT INTO openai_query (id, vector) SELECT _id, text_embedding_3_small_512_embedding FROM openai ORDER BY RANDOM() LIMIT 100;
    INSERT INTO openai_reference SELECT _id, text_embedding_3_small_512_embedding FROM openai WHERE _id NOT IN (SELECT id FROM openai_query);
"""

pg_session.execute(query)

DuplicateTable: relation "openai_query" already exists


In [None]:
# Generate "ground truth" dataset
query = f'''
    UPDATE openai_query
    SET nn_ids = nearest_ids.ids
    FROM (
        SELECT
            q.id AS id,
            ARRAY_AGG(r.id) AS ids
        FROM openai_query q
        JOIN LATERAL (
            SELECT
                id,
                vector
            FROM
                openai_reference r
            ORDER BY
                q.vector <-> r.vector
            LIMIT 10
        ) r
        ON TRUE
        GROUP BY
            q.id
    ) AS nearest_ids
    WHERE
        openai_query.id = nearest_ids.id
'''
pg_session.execute(query)

# Generate copy (for testing index on original)

In [None]:
query = """
    ALTER TABLE openai_reference ADD COLUMN vector512 REAL[];
    UPDATE openai_reference SET vector512 = vector;
"""
pg_session.execute(query)

# Generate Matryoshka

In [33]:
# Generate Open AI Matryoshka embeddings
query = '''
    CREATE OR REPLACE FUNCTION normalize_array(arr REAL[])
    RETURNS REAL[] AS $$
    DECLARE
        magnitude REAL := 0;
        normalized_arr REAL[];
    BEGIN
        -- Calculate the magnitude of the array
        SELECT sqrt(sum(val * val)) INTO magnitude
        FROM unnest(arr) AS dt(val);
    
        -- Check if magnitude is zero to avoid division by zero
        IF magnitude = 0 THEN
            RETURN arr;
        END IF;
    
        -- Normalize the array
        SELECT array_agg(val / magnitude) INTO normalized_arr
        FROM unnest(arr) AS dt(val);
    
        RETURN normalized_arr;
    END;
    $$ LANGUAGE plpgsql;
    ALTER TABLE openai_reference ADD COLUMN vector256 REAL[];
'''
try:
    pg_session.execute(query)
    pg_session.execute('''
        UPDATE openai_reference SET vector256 = vector256[1:256];
        UPDATE openai_reference SET vector256 = normalize_array(vector256);
    ''')
except Exception as e:
    print(e)
    pass

# Generate PQ

In [42]:
query = """
    SELECT drop_quantization('openai_reference', 'vector');
    SELECT quantize_table('openai_reference', 'vector', 256, 16, 'l2sq', 10000);
"""
pg_session.execute(query)

# Create Indexes

In [43]:
query = """
    CREATE INDEX ON openai_reference USING lantern_hnsw (vector256) WITH (M=32, ef_construction=64, ef=128, dim=256);
"""
pg_session.execute(query)

In [None]:
query = """
    CREATE INDEX ON openai_reference USING lantern_hnsw (vector512) WITH (M=32, ef_construction=64, ef=128, dim=256);
"""
pg_session.execute(query)

In [2]:
query = """
    CREATE INDEX ON openai_reference USING lantern_hnsw (vector) WITH (M=32, ef_construction=64, ef=128, pq=True);
"""
pg_session.execute(query)

# Check recall

In [None]:
def get_recall(column):
    query = f"""
        SELECT
            q.id AS query_id,
            nn_ids AS truth_ids,
            ARRAY_AGG(b.id) AS reference_ids
        FROM
            openai_query q
        JOIN LATERAL (
            SELECT
                id
            FROM
                openai_reference r
            ORDER BY
                q.vector <-> r.{column}
            LIMIT 10
        ) r
        ON TRUE
        GROUP BY
            q.id
    """
    pg_session.execute(query)
    data = pg_session.fetchall()
    recall = 0
    for row in data:
        truth_ids = set(row[1])
        reference_ids = set(row[2])
        recall += len(truth_ids.intersection(reference_ids)) / len(truth_ids)
    return recall / len(data)

In [None]:
# Generate recall for Open AI embeddings
get_recall('vector512')

0.58
0.86


In [None]:
# Generate recall for PQ embeddings
get_recall('vector')

In [None]:
# Generate recall for Matryoshka embeddings
get_recall('vector256')