Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgres : pgvector implemenation #1926

Open
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

makkarss929
Copy link

@makkarss929 makkarss929 commented Mar 31, 2024

…entation

Description

  1. Worked on this enhancement feature #1558
  2. Created PostgresDataBackend from Ibis.DataBackend
  3. Created PostgresVectorSearcher class inspired by MongoAtlasVectorSearcher.

Next steps :

  1. Testing with changes

Related Issues

Checklist

  • Is this code covered by new or existing unit tests or integration tests?
  • Did you run make unit-testing and make integration-testing successfully?
  • Do new classes, functions, methods and parameters all have docstrings?
  • Were existing docstrings updated, if necessary?
  • Was external documentation updated, if necessary?

Additional Notes or Comments

@makkarss929 makkarss929 marked this pull request as draft March 31, 2024 09:39
@makkarss929 makkarss929 marked this pull request as ready for review March 31, 2024 10:38
@makkarss929 makkarss929 marked this pull request as draft March 31, 2024 10:38
Copy link
Collaborator

@jieguangzhou jieguangzhou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are some advice

superduperdb/backends/postgres/data_backend.py Outdated Show resolved Hide resolved
superduperdb/base/build.py Outdated Show resolved Hide resolved
superduperdb/vector_search/postgres.py Outdated Show resolved Hide resolved
superduperdb/vector_search/postgres.py Show resolved Hide resolved
superduperdb/base/build.py Outdated Show resolved Hide resolved
Copy link
Collaborator

@blythed blythed left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good start. What about indexing? It would be very useful if we could use that to speed up the calculations: https://github.com/pgvector/pgvector?tab=readme-ov-file#indexing.

@makkarss929 makkarss929 requested a review from blythed April 6, 2024 20:39
@makkarss929 makkarss929 marked this pull request as ready for review April 17, 2024 21:47
@makkarss929 makkarss929 changed the title Creating PostgresDataBackend from IbisDataBackend for pgvector implem… pgvector implemenation Apr 17, 2024
@makkarss929 makkarss929 changed the title pgvector implemenation Postgres : pgvector implemenation Apr 18, 2024
@makkarss929
Copy link
Author

@jieguangzhou @blythed @kartik4949 I have made all requested changes, please review my PR. thanks

Copy link
Collaborator

@jieguangzhou jieguangzhou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @makkarss929
Some suggestions for modifications, mainly we don’t need to modify so much, our architecture can handle these logics.

Please help to clean the outputs of the notebook

Comment on lines 54 to 102
if CFG.cluster.vector_search.type == 'in_memory':
if flatten:
raise NotImplementedError('Flatten not yet supported for ibis')

if not outputs:
return

table_records = []
for ix in range(len(outputs)):
d = {
'_input_id': str(ids[ix]),
'output': outputs[ix],
}
table_records.append(d)
if not outputs:
return

for r in table_records:
if isinstance(r['output'], dict) and '_content' in r['output']:
r['output'] = r['output']['_content']['bytes']
table_records = []
for ix in range(len(outputs)):
d = {
'_input_id': str(ids[ix]),
'output': outputs[ix],
}
table_records.append(d)

for r in table_records:
if isinstance(r['output'], dict) and '_content' in r['output']:
r['output'] = r['output']['_content']['bytes']

db.databackend.insert(f'_outputs.{predict_id}', table_records)

elif CFG.cluster.vector_search.type == 'pg_vector':
# Connect to your PostgreSQL database
conn = psycopg2.connect(CFG.cluster.vector_search.uri)
table_name = f'_outputs.{predict_id}'
with conn.cursor() as cursor:
cursor.execute('CREATE EXTENSION IF NOT EXISTS vector')
cursor.execute(f"""DROP TABLE IF EXISTS "{table_name}";""")
cursor.execute(
f"""CREATE TABLE "{table_name}" (
_input_id VARCHAR PRIMARY KEY,
output vector(1024),
_fold VARCHAR
);
"""
)
for ix in range(len(outputs)):
try:
cursor.execute(
f"""INSERT INTO "{table_name}" (_input_id, output) VALUES (%s, %s);""",
[str(ids[ix]), outputs[ix]]
)
except:
pass

db.databackend.insert(f'_outputs.{predict_id}', table_records)
# Commit the transaction
conn.commit()
# Close the connection
conn.close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don’t need to insert them into the table here. When vector search is really launched, the add method of PostgresVectorSearcher will be called, and at this time, the corresponding vector can be accepted and added to the table. It is necessary to manage duplicate items well as previously commented.

The results of the model are saved separately from vector search service

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jieguangzhou Don't worry about duplication, we have _input_id as a PRIMARY KEY while creating vector table Example _outputs.listener1::0. I have checked there is no duplication.

Comment on lines +121 to +131
elif uri.startswith('postgres://') or uri.startswith("postgresql://"):
name = uri.split('//')[0]
if type == 'data_backend':
ibis_conn = ibis.connect(uri)
return mapping['ibis'](ibis_conn, name)
else:
assert type == 'metadata'
from sqlalchemy import create_engine

sql_conn = create_engine(uri)
return mapping['sqlalchemy'](sql_conn, name)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this, we can directly load the URI with ibis

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove this

Comment on lines +38 to +39
elif item.startswith('postgres://') or item.startswith('postgresql://'):
kwargs['data_backend'] = item
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this, the same reason

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will remove this

Comment on lines +19 to +28
if CFG.cluster.vector_search.type != 'pg_vector':
if not db.server_mode:
request_server(
service='vector_search',
endpoint='create/search',
args={
'vector_index': self.vector_index,
},
type='get',
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to change this, this is for an independent vector search service, in which the pg vector will be used logically.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jieguangzhou It's throwing error that's why, I did this

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What error? Could you please put the error message here?

Comment on lines +107 to +114
if CFG.cluster.vector_search.type != 'pg_vector':
response = request_server(
service='vector_search',
data=h,
endpoint='query/search',
args={'vector_index': self.vector_index, 'n': n},
)
return response['ids'], response['scores']
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jieguangzhou It's throwing error that's why, I did this

Comment on lines +44 to +89
if CFG.cluster.vector_search.type != 'pg_vector':
vi = db.vector_indices[vector_index]
if isinstance(query, dict):
# ruff: noqa: E501
query: CompoundSelect = Serializable.decode(query) # type: ignore[no-redef]
assert isinstance(query, CompoundSelect)
if not ids:
select = query
else:
select = query.select_using_ids(ids)
docs = db.select(select)
docs = [doc.unpack() for doc in docs]
key = vi.indexing_listener.key
if '_outputs.' in key:
key = key.split('.')[1]
# TODO: Refactor the below logic
vectors = []
if isinstance(db.databackend, MongoDataBackend):
vectors = [
{
'vector': MongoStyleDict(doc)[
f'_outputs.{vi.indexing_listener.predict_id}'
],
'id': str(doc['_id']),
}
for doc in docs
]
elif isinstance(db.databackend, IbisDataBackend):
docs = db.execute(select.outputs(vi.indexing_listener.predict_id))
from superduperdb.backends.ibis.data_backend import INPUT_KEY

vectors = [
{
'vector': doc[f'_outputs.{vi.indexing_listener.predict_id}'],
'id': str(doc[INPUT_KEY]),
}
for doc in docs
]
for r in vectors:
if hasattr(r['vector'], 'numpy'):
r['vector'] = r['vector'].numpy()
vectors = [
{
'vector': doc[f'_outputs.{vi.indexing_listener.predict_id}'],
'id': str(doc[INPUT_KEY]),
}
for doc in docs
]
for r in vectors:
if hasattr(r['vector'], 'numpy'):
r['vector'] = r['vector'].numpy()

if vectors:
db.fast_vector_searchers[vi.identifier].add(
[VectorItem(**vector) for vector in vectors]
)
if vectors:
db.fast_vector_searchers[vi.identifier].add(
[VectorItem(**vector) for vector in vectors]
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jieguangzhou Remember we discussed in meeting Ibis is not compatible with pgvector whereas psycopg2 supports. It's throwing error when Ibis is trying to access table with vector embeddings created with psycopg2.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have two tables when using the pg vector search

  1. The model output table(table1), all the embedding vectors results are saved here
  2. the pg vector search table(table2), save the vectors, and build an index here

The workflow for building a vector search index is as follows:

  1. Query the vectors from table1
  2. Use these vectors to build the vector search(the add method of PostgresVectorSearcher)

As you can see here, we get the vectors and call the add db.fast_vector_searchers[vi.identifier].add

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, you want me to create 2 tables

  1. outputs.listener1::0 which is created by ibis.
  2. outputs.listener1::0_pgvector which will be created by psycopg2. with the add method of PostgresVectorSearcher.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, just one table, the pgvector table created by psycopg2

The first one created by ibis data_backend automatically

Copy link
Collaborator

@blythed blythed Apr 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jieguangzhou why do we need 2 tables. Why can't we just use the original listener table?
My thought was that the table will be created as a pg_vector table when the listener is created
somehow? Copying data into another table sounds wasteful.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be viewed in conjunction with comment #1926 (comment) ; here are my thoughts:

  1. The saving behavior of model outputs should not be limited by downstream applications (such as vector search). If we need to compatible vector search separately, it would increase the complexity of saving query results. This should be a very pure interface. If vector search is to be built, then an index should be built on this table on the vector search side (if an index cannot be built, a separate table needs to be created).
  2. The vector search component should be independent. Ideally, we should be able to switch the underlying engine of my vector search at any time. If we couple vector search with saving, then expandability will decrease. If my vector search uses two different Postgres databases from databackend (one for data saving and one for vector search calculations), it cannot be supported.
  3. Ideally, if the PostgresVectorSearcher can determine that the uri being used is the same as the databackend's uri, it should first attempt to build an index in the model output table. If building an index is not possible, then a separate table needs to be created.

WDYT? @blythed

Copy link
Collaborator

@blythed blythed Apr 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that developers could potentially use pg_vector as an external vector-store. However, most likely
users will be postgres users.

By hosting the vector-search with pg_vector, we avoid the problem of mapping the data to a new table/ database. This is exactly like MongoDB Atlas vector-search.

In the configuration with have CFG.cluster.vector_search.type = 'native' for such cases. If that is set, then the "copy across" the vectors jobs are skipped.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just conducted a basic test with pgvector:

  1. If a vector table is created using pgvector, ibis currently cannot directly read the data inside, as it throws an error for not being able to parse the vector type data of pgvector. Adaptations are necessary; otherwise, our data backend will no longer be able to access this table. I suspect that it would require activating the vector feature specifically for the ibis backend’s connection, which could be quite troublesome.
  2. To create and access vector type data in pgvector, each connection needs to activate the vector feature to enable data interchange.
    It is suggested to split it into two tables, as ibis does not seem to be well compatible with pgvector at the moment.
    If merging into one table is considered, the following tests need to be conducted:
  3. Test with two types of SQL databases (Postgres and SQLite) to ensure that non-Postgres vector implementations are compatible and do not affect the original SQL functionalities. Since SQLite already has unit tests, only Postgres needs to be tested.
  4. Conduct application tests in cases where the vector search type is specified as pgvector, including vector searches and non-vector searches, to ensure that non-vector search functions are preserved normally. These tests should be integration tests

@makkarss929 @blythed

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -54,6 +61,26 @@ def on_load(self, db: Datalayer) -> None:
self.compatible_listener = t.cast(
Listener, db.load('listener', self.compatible_listener)
)
if CFG.cluster.vector_search.type == "pg_vector":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic should be isolated in the PgVector class (vector_searcher).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@blythed you want me to move this logic to PosgresVectorSearcher class, and then call it from here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic should be isolated in the PgVector class (vector_searcher).

@blythed you want me to move this logic to PosgresVectorSearcher class, and then call it from here. right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@blythed working on it

elif isinstance(db.databackend, IbisDataBackend):
docs = db.execute(select.outputs(vi.indexing_listener.predict_id))
from superduperdb.backends.ibis.data_backend import INPUT_KEY
if CFG.cluster.vector_search.type != 'pg_vector':
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because of same error Ibis doesn't support pgvector and vector datatype

@blythed
Copy link
Collaborator

blythed commented May 23, 2024

@makkarss929 the ibis project has fixed the data types for pgvector. Would you like to finish your PR? Otherwise we can take this over.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants