In [1]:
import time

import json
import pgvector
from typing import List, Optional
import numpy as np
from faker import Faker

from pgvector.psycopg2 import register_vector
import psycopg2

# qdrant
from qdrant_client import QdrantClient
from qdrant_client.http import models
from qdrant_client.http.models import Distance, VectorParams
from qdrant_client.http.models import PointStruct
from qdrant_client.http.models import Filter, FieldCondition, MatchValue
from qdrant_client.http.models import CollectionStatus

In [3]:
# Выполнить для vector_db, который был создан в pg: CREATE EXTENSION vector;

In [6]:
def create_embedding():
    return np.random.uniform(low=-1.0, high=1.0, size=100).tolist()

def timer(func, kwargs):
    start = time.time()
    func(**kwargs)
    end = time.time()
    return end - start

In [5]:
count_of_store_ids_per_doc = 100
fake_something = Faker()

embs = np.random.uniform(low=-1.0, high=1.0, size=(1000000, 100))
doc_ids = list(range(len(embs)))
payloads = []

for i in range(len(embs)):
    payloads.append(
        {
            "store_id": [fake_something.random.randint(0, 100) for i in range(count_of_store_ids_per_doc)] ,
            "product_name": " ".join(fake_something.words()),
            "product_sku": i,
        }
    )
print(len(payloads))

1000000


In [7]:
store_id = 10
embedding = embs[0].copy().astype(np.float32)

## PgVector - 0.3616370439529419

In [15]:
conn = psycopg2.connect(dbname='vector_db', 
                        host="localhost", 
                        port="15432", 
                        user="username", 
                        password="password",)

conn.autocommit = True
cursor = conn.cursor()
# cursor.execute('CREATE EXTENSION IF NOT EXISTS vector')
# cursor.execute('DROP TABLE IF EXISTS documents')
register_vector(cursor)

# cursor.execute('DROP TABLE IF EXISTS documents')
# cursor.execute('CREATE TABLE documents (id bigserial PRIMARY KEY, product_name text, store_id integer[], embedding vector(100))')
# cursor.execute('CREATE INDEX ON documents USING hnsw (embedding vector_l2_ops) WITH (m = 16, ef_construction = 100)')
# cursor.execute('SET max_parallel_maintenance_workers=15;')
# cursor.execute('SET max_parallel_workers=15;')

In [7]:
# for embedding, payload in zip(embs, payloads):
#     store_id = payload["store_id"]
#     product_name = payload["product_name"]
#     product_sku = payload["product_sku"]
    
#     cursor.execute('INSERT INTO documents (id, product_name, store_id, embedding) VALUES (%s, %s, %s, %s)', (product_sku, product_name, store_id, embedding))

In [65]:
time_k = 0
for i in range(100):
    time_k += timer(cursor.execute, {"query": """SELECT 
                                                    id, 
                                                    product_name, 
                                                    store_id
                                                FROM documents 
                                                WHERE %s = ANY(store_id)
                                                ORDER BY embedding <=> %s
                                                LIMIT 24""", "vars": (store_id, embedding)})
    
time_k / 100

0.3616370439529419

## QDrant - 0.007430486679077149

In [None]:
# Qdrant - pre-filter есть
# Pgvector - pre-filter нет

In [20]:
class QCollection:
    def __init__(self, 
                 collection_name: str,
                 url: str = "localhost", 
                 port: int = 6333):
        self.client = QdrantClient(url=url, port=port, timeout=10000)
        self.collection_name = collection_name
        
    def get_collection_info(self):
        response = self.client.get_collection(collection_name=self.collection_name)
        return response
    
    def create_collection(self, emb_dim: int = 100):
        self.client.create_collection(
            collection_name=self.collection_name,
            vectors_config=VectorParams(size=emb_dim, distance=Distance.COSINE, on_disk=False),
        )
    
    def delete_collection(self):
        self.client.delete_collection(collection_name=self.collection_name)
        
    def payload_index_store_id(self):
        self.client.create_payload_index(
            collection_name=self.collection_name,
            field_name="store_id",
            field_schema="integer",
        )

        
class QDocument:
    def __init__(self, 
                 collection_name: str,
                 url: str = "localhost", 
                 port: int = 6333,):
        self.client = QdrantClient(url=url, port=port)
        self.collection_name = collection_name
        
    def add_document(self, doc_id: int, embedding: List[float], metadata: dict):
        # points with the same id will be overwritten when re-uploaded.
        self.client.upsert(
            collection_name=self.collection_name,
            points=[
                models.PointStruct(
                    id=doc_id,
                    payload=metadata,
                    vector=embedding,
                ),
            ],
        )
    
    def add_documents(self, doc_ids: List[int], embeddings: List[List[float]], metadata: List[dict]):
        self.client.upsert(
            collection_name=self.collection_name,
            points=models.Batch(
                ids=doc_ids,
                payloads=metadata,
                vectors=embeddings,
            ),
        )
    
    def delete_documents(self, doc_ids: List[int]):
        self.client.delete(
            collection_name=self.collection_name,
            points_selector=models.PointIdsList(
                points=doc_ids,
            ),
        )
        
    def retrieve_documents(self, doc_ids: List[int]) -> dict:
        response = self.client.retrieve(
            collection_name=self.collection_name,
            ids=doc_ids,
            with_payload=True,
            with_vectors=True,
        )
        return response
    
    def count_documents(self, store_id: int) -> dict:
        response = self.client.count(
            collection_name=self.collection_name,
            count_filter=models.Filter(
                must=[
                    models.FieldCondition(key="store_id", match=models.MatchValue(value=store_id)),
                ]
            ),
            exact=True,
        )
        return response
    
    def search(self, embedding: List[float], store_id: int) -> dict:
        response = self.client.search(
            collection_name=self.collection_name,
            query_filter=models.Filter(
                must=[
                    models.FieldCondition(
                        key="store_id",
                        match=models.MatchValue(
                            value=int(store_id),
                        ),
                    )
                ]
            ),
            search_params=models.SearchParams(hnsw_ef=128, exact=False),
            query_vector=embedding,
            limit=24,
            with_vectors=False,
            with_payload=True,
#             score_threshold=100,  # [0;1] cos
        )
        return response
    
    def batch_search(self):
        ...
        # https://qdrant.tech/documentation/concepts/search/#batch-search-api

In [21]:
q_collection = QCollection(collection_name="temp")
q_document = QDocument(collection_name="temp")

q_collection.delete_collection()
q_collection.create_collection()

In [66]:
print(" ".join([str(elem) + "\n" for elem in list(q_collection.get_collection_info())]))
print(len(payloads), len(embs), len(doc_ids))

('status', <CollectionStatus.GREEN: 'green'>)
 ('optimizer_status', <OptimizersStatusOneOf.OK: 'ok'>)
 ('vectors_count', 1000000)
 ('indexed_vectors_count', 985000)
 ('points_count', 1000000)
 ('segments_count', 6)
 ('config', CollectionConfig(params=CollectionParams(vectors=VectorParams(size=100, distance=<Distance.COSINE: 'Cosine'>, hnsw_config=None, quantization_config=None, on_disk=False), shard_number=1, sharding_method=None, replication_factor=1, write_consistency_factor=1, read_fan_out_factor=None, on_disk_payload=True, sparse_vectors=None), hnsw_config=HnswConfig(m=16, ef_construct=100, full_scan_threshold=10000, max_indexing_threads=0, on_disk=False, payload_m=None), optimizer_config=OptimizersConfig(deleted_threshold=0.2, vacuum_min_vector_number=1000, default_segment_number=0, max_segment_size=None, memmap_threshold=None, indexing_threshold=20000, flush_interval_sec=5, max_optimization_threads=1), wal_config=WalConfig(wal_capacity_mb=32, wal_segments_ahead=0), quantization_c

In [24]:
%%time

step = 5000
for t in range(0, len(doc_ids), step):
    q_document.add_documents(doc_ids=doc_ids[t:step + t], 
                             embeddings=embs[t:step + t].tolist(), 
                             metadata=payloads[t:step + t])

q_collection.payload_index_store_id()

CPU times: user 38.8 s, sys: 9.3 s, total: 48.1 s
Wall time: 10min 23s


In [69]:
time_k = 0
for i in range(100):
    time_k += timer(q_document.search, {"embedding": embedding.tolist(), "store_id": store_id})
    
time_k / 100

0.007430486679077149

## Redis

In [9]:
import redis
import requests
from redis.commands.search.field import (
    NumericField,
    TagField,
    TextField,
    VectorField,
)
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
from redis.commands.search.query import Query

In [159]:
class RedisDB:
    def __init__(self, collection_prefix="documents", host="localhost", port=8503):
        self.client = redis.Redis(host=host, port=port, decode_responses=True)
        self.collection_prefix = collection_prefix
        print(f"Host connection: {self.client.ping()}")

    def add_documents(self, doc_ids: List[int], embeddings: List[List[float]], metadata: List[dict]):
        pipeline = self.client.pipeline()
        for doc_id, payload, embedding in zip(doc_ids, metadata, embeddings):
            payload["embedding"] = embedding
            payload["store_id"] = list(map(str, payload["store_id"])) # only for tags
            redis_key = f"{self.collection_prefix}:{doc_id}"
#             print(redis_key)
            pipeline.json().set(redis_key, "$", payload)
        executed = pipeline.execute()
        print(f"Success: {sum(executed) / len(executed)}.")
        
    def count_documents(self) -> None:
        keys = sorted(self.client.keys(f"{self.collection_prefix}:*"))
        print(len(keys))
    
    def index_info(self, index_name: Optional[str] = None):
        index_name = f"idx:{self.collection_prefix if index_name is None else index_name}" 
        return self.client.ft(index_name).info()
        
    def retrieve_document(self, doc_id: int) -> dict:
        return self.client.json().get(f"{self.collection_prefix}:{doc_id}")
    
    def create_index(self, index_name: Optional[str] = None):
        index_name = f"idx:{self.collection_prefix if index_name is None else index_name}" 
        # schema
#         schema = (
#             NumericField("$.store_id", as_name="store_id"),
#             NumericField("$.product_sku", as_name="product_sku"),
#             TextField("$.product_name", no_stem=True, as_name="product_name"),
#             VectorField("$.embedding",
#                 "HNSW", {
#                     "TYPE": "FLOAT32",
#                     "DIM": 100,
#                     "DISTANCE_METRIC": "COSINE",
#                     "M": 16,
#                     "EF_CONSTRUCTION": 100, # Up it to 200 as default redis LATER!
#                 }, as_name="embedding",
#             ),
#         )
        
        schema = (
            TagField("$.store_id", as_name="store_id"), # swap to "$.store_id.*"
            NumericField("$.product_sku", as_name="product_sku"),
            TextField("$.product_name", no_stem=True, as_name="product_name"),
            VectorField("$.embedding",
                "HNSW", {
                    "TYPE": "FLOAT32",
                    "DIM": 100,
                    "DISTANCE_METRIC": "COSINE",
                    "M": 16,
                    "EF_CONSTRUCTION": 100, # Up it to 200 as default redis LATER!
                }, as_name="embedding",
            ),
        )
        
        # index definition
        definition = IndexDefinition(prefix=[f"{self.collection_prefix}:"], index_type=IndexType.JSON)
        # create index
        self.client.ft(index_name).create_index(fields=schema, definition=definition)
    
    def search(self, query: Query, store_id: int, query_vector: bytes, index_name: Optional[str] = None):
        index_name = f"idx:{self.collection_prefix if index_name is None else index_name}"
        response = self.client.ft(index_name).search(query, {"query_vector": query_vector, "store_id": store_id}).docs
        return response

### store_id as NumericField - 0.0011725187301635741

In [125]:
# rediska.client.flushall()
rediska = RedisDB()
# 16 56

Host connection: True


In [None]:
rediska.add_documents(doc_ids=doc_ids, embeddings=embs.tolist(), metadata=payloads)

In [127]:
rediska.count_documents()

1000000


In [130]:
rediska.create_index()

In [138]:
rediska.index_info()  # wait till 'percent_indexed': '1'

{'index_name': 'idx:documents',
 'index_options': [],
 'index_definition': ['key_type',
  'JSON',
  'prefixes',
  ['documents:'],
  'default_score',
  '1'],
 'attributes': [['identifier',
   '$.store_id',
   'attribute',
   'store_id',
   'type',
   'NUMERIC'],
  ['identifier',
   '$.product_sku',
   'attribute',
   'product_sku',
   'type',
   'NUMERIC'],
  ['identifier',
   '$.product_name',
   'attribute',
   'product_name',
   'type',
   'TEXT',
   'WEIGHT',
   '1',
   'NOSTEM'],
  ['identifier',
   '$.embedding',
   'attribute',
   'embedding',
   'type',
   'VECTOR',
   'algorithm',
   'HNSW',
   'data_type',
   'FLOAT32',
   'dim',
   100,
   'distance_metric',
   'COSINE',
   'M',
   16,
   'ef_construction',
   200]],
 'num_docs': '1000000',
 'max_doc_id': '1000000',
 'num_terms': '942',
 'num_records': '104907091',
 'inverted_sz_mb': '266.12841796875',
 'vector_index_sz_mb': '720.0846557617188',
 'total_inverted_index_blocks': '1035000',
 'offset_vectors_sz_mb': '2.7751789093

In [143]:
query = (
    Query('(@store_id:[$store_id $store_id])=>[KNN 24 @embedding $query_vector AS vector_score]')
     .sort_by('vector_score')
     .return_fields('vector_score', 'store_id', 'product_name', 'product_sku')
     .dialect(2)
     .paging(0, 24)
)

time_k = 0
for i in range(100):
    time_k += timer(rediska.search, {"query": query, "query_vector": embs[0].astype(np.float32).tobytes(), "store_id": store_id})
    
time_k / 100

0.0011725187301635741

In [139]:
query = (
    Query('(@store_id:[$store_id $store_id])=>[KNN 24 @embedding $query_vector AS vector_score]')
     .sort_by('vector_score')
     .return_fields('vector_score', 'store_id', 'product_name', 'product_sku')
     .dialect(2)
     .paging(0, 24)
)

response = rediska.search(query=query, store_id=store_id, query_vector=embs[0].astype(np.float32).tobytes())

In [140]:
print(len(response))
for doc in response:
    print(f"{store_id}," in doc.store_id)

24
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True


### store_id as TagField - 0.001410665512084961

In [135]:
count_of_store_ids_per_doc = 100
fake_something = Faker()

embs = np.random.uniform(low=-1.0, high=1.0, size=(1000000, 100))
doc_ids = list(range(len(embs)))
payloads = []

for i in range(len(embs)):
    payloads.append(
        {
            "store_id": list(set([fake_something.random.randint(0, 100) for i in range(count_of_store_ids_per_doc)])),
            "product_name": " ".join(fake_something.words()),
            "product_sku": i,
        }
    )
print(len(payloads))

1000000


In [83]:
unique_stores = set()
for payload in payloads:
    unique_stores.update(set(payload["store_id"]))
    break
print(len(unique_stores))

1278


In [160]:
rediska.client.flushall()
rediska = RedisDB()

Host connection: True


In [161]:
rediska.add_documents(doc_ids=doc_ids, embeddings=embs.tolist(), metadata=payloads)

Success: 1.0.


In [162]:
rediska.count_documents()

1000000


In [163]:
rediska.create_index()

In [170]:
rediska.index_info()["percent_indexed"]  # wait till 'percent_indexed': '1'

'1'

In [174]:
query = (
    Query('(@store_id:{$store_id})=>[KNN 24 @embedding $query_vector AS vector_score]')
     .sort_by('vector_score')
     .return_fields('vector_score', 'store_id', 'product_name', 'product_sku')
     .dialect(2) # dialect 3 for .*
     .paging(0, 24)
)

response = rediska.search(query=query, store_id=store_id, query_vector=embs[0].astype(np.float32).tobytes())

In [176]:
print(len(response))
for doc in response:
    print(f"{store_id}" in json.loads(doc.store_id))

24
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True


In [177]:
query = (
    Query('(@store_id:{$store_id})=>[KNN 24 @embedding $query_vector AS vector_score]')
     .sort_by('vector_score')
     .return_fields('vector_score', 'store_id', 'product_name', 'product_sku')
     .dialect(2) # dialect 3 for .*
     .paging(0, 24)
)

time_k = 0
for i in range(100):
    time_k += timer(rediska.search, {"query": query, "query_vector": embs[0].astype(np.float32).tobytes(), "store_id": store_id})
    
time_k / 100

0.001267526149749756

In [178]:
rediska.index_info()

{'index_name': 'idx:documents',
 'index_options': [],
 'index_definition': ['key_type',
  'JSON',
  'prefixes',
  ['documents:'],
  'default_score',
  '1'],
 'attributes': [['identifier',
   '$.store_id',
   'attribute',
   'store_id',
   'type',
   'TAG',
   'SEPARATOR',
   ','],
  ['identifier',
   '$.product_sku',
   'attribute',
   'product_sku',
   'type',
   'NUMERIC'],
  ['identifier',
   '$.product_name',
   'attribute',
   'product_name',
   'type',
   'TEXT',
   'WEIGHT',
   '1',
   'NOSTEM'],
  ['identifier',
   '$.embedding',
   'attribute',
   'embedding',
   'type',
   'VECTOR',
   'algorithm',
   'HNSW',
   'data_type',
   'FLOAT32',
   'dim',
   100,
   'distance_metric',
   'COSINE',
   'M',
   16,
   'ef_construction',
   100]],
 'num_docs': '1000000',
 'max_doc_id': '1000000',
 'num_terms': '942',
 'num_records': '5907234',
 'inverted_sz_mb': '83.89717102050781',
 'vector_index_sz_mb': '723.476318359375',
 'total_inverted_index_blocks': '103520',
 'offset_vectors_sz_