In [None]:
from __future__ import annotations

import uuid
from typing import List, Tuple, Set

from langchain_core.documents import Document
from langchain_core.embeddings import DeterministicFakeEmbedding
from langchain_postgres import Column, PGEngine, PGVectorStore
from sqlalchemy import text
from sqlalchemy.exc import ProgrammingError

CONN = "postgresql+psycopg://user:password@localhost:5432/vector_db"
TABLE_NAME = "my_doc_collection"
VECTOR_SIZE = 768
embedding = DeterministicFakeEmbedding(size=VECTOR_SIZE)

engine = PGEngine.from_connection_string(url=CONN)

try:
    engine.init_vectorstore_table(
        table_name=TABLE_NAME,
        vector_size=VECTOR_SIZE,
        id_column=Column("doc_id", "TEXT", False),
        overwrite_existing=True,
    )
except ProgrammingError as e:
    if getattr(e.orig, "sqlstate", "") != "42P07":
        raise


class VerbosePGVectorStore(PGVectorStore):
    async def _aexisting_ids(self, ids: List[str]) -> Set[str]:
        if not ids:
            return set()
        sql = text(
            f'SELECT "doc_id" FROM "{self.get_table_name()}" '
            'WHERE "doc_id" = ANY (:ids)'
        )
        async with self._engine._pool.connect() as conn:
            rows = await conn.execute(sql, {"ids": ids})
            return {row[0] for row in rows.fetchall()}

    def _existing_ids(self, ids: List[str]) -> Set[str]:
        return self._engine._run_as_sync(self._aexisting_ids(ids))

    def add_documents(  # type: ignore[override]
        self,
        documents: List[Document],
        ids: List[str],
        **kwargs,
    ) -> Tuple[List[str], int, int]:
        before = self._existing_ids(ids)
        super_ids = super().add_documents(documents, ids=ids, **kwargs)
        after = self._existing_ids(ids)
        inserted = len(after - before)
        updated = len(before & set(ids))
        return super_ids, inserted, updated


store = VerbosePGVectorStore.create_sync(
    engine=engine,
    table_name=TABLE_NAME,
    embedding_service=embedding,
    id_column="doc_id",
)


def det_id(text: str) -> str:
    return str(uuid.uuid5(uuid.NAMESPACE_URL, text))


docs = [
    Document(page_content="Apples and oranges"),
    Document(page_content="Cars and airplanes"),
    Document(page_content="Train"),
]
ids = [det_id(d.page_content) for d in docs]


_, ins, upd = store.add_documents(docs, ids=ids)
print(f"Run 1 ➜ inserted={ins}, updated={upd}")

_, ins, upd = store.add_documents(docs, ids=ids)
print(f"Run 2 ➜ inserted={ins}, updated={upd}")

print("Suchergebnisse für: I'd like a fruit.")
for doc in store.similarity_search("I'd like a fruit.", k=5):
    print(" •", doc.page_content)