Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 New Destination Vector Database (powered by LangChain) #26184

Merged
merged 36 commits into from Jul 12, 2023

Conversation

flash1293
Copy link
Contributor

@flash1293 flash1293 commented May 17, 2023

How to test

  • Have local Airbyte running
  • Check out this PR
  • Go to airbyte-integrations/connectors/destination-langchain
  • Run docker build . -t airbyte/destination-langchain:dev
  • Add custom connector (no need for a custom registry here as evertyhing is happening on your machine) - repository name is airbyte/destination-langchain, image tag is dev
  • Configure a source, a langchain destination and start syncing data!

What

Closes #27821
Closes #27971

This PR adds the "Langchain" destination which combines the langchain helpers for processing, embedding and indexing to build a unified destination for various vector stores that takes care of the embedding on the fly and makes it super easy to get started with building integrations on top of langchain.

https://www.loom.com/share/db63e997d01d49118bf382dff876f7c1

Screenshot 2023-07-03 at 16 20 51

How

General class setup implementing the main functionality:

  • Processor - turns records into langchain documents that can be indexed into vector stores
    • Split into chunks of defined length
    • Prepare metadata
  • Embedder (abstract super class) - provides means to embed documents, turning them into a vector
  • Indexer (abstract super class) - receives a batch of chunks and indexes them into the vector database. Uses the embedder instance
  • Batcher - collects records up to batch size, then processes them
  • Destination - initializes other components, passes records to the batcher and implements the main processing loop (call processor to get chunks, pass to indexer to push into vector store)

Unclear parts / TODOs

  • It's annoying text fields are configured on the global level - they are probably not known at this point of the setup and will apply to all streams at the same time. However, it's important to configure the right fields for the embedding
  • Basic multi-threading using queues between processing, embedding and indexing could speed up the throughput quite a bit, but would also complicate the setup - however it's probably worth it implementing (doesn't have to be part of the initial release)
  • Adding an option to do local embedding would be nice so it's possible to run the destination without singing up for any service (e.g. llama-cpp or fake embeddings)
  • Add more unit tests
  • Add basic integration tests

New Connector

Todos

  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Connector version is set to 0.0.1
    • Dockerfile has version 0.0.1
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • docs/integrations/<source or destination>/<name>.md including changelog with an entry for the initial version. See changelog example
    • docs/integrations/README.md
  • Build is successful
  • If new credentials are required for use in CI, add them to GSM. Instructions.

@github-actions
Copy link
Contributor

github-actions bot commented May 17, 2023

Before Merging a Connector Pull Request

Wow! What a great pull request you have here! 🎉

To merge this PR, ensure the following has been done/considered for each connector added or updated:

  • PR name follows PR naming conventions
  • Breaking changes are considered. If a Breaking Change is being introduced, ensure an Airbyte engineer has created a Breaking Change Plan and you've followed all steps in the Breaking Changes Checklist
  • Connector version has been incremented in the Dockerfile and metadata.yaml according to our Semantic Versioning for Connectors guidelines
  • Secrets in the connector's spec are annotated with airbyte_secret
  • All documentation files are up to date. (README.md, bootstrap.md, docs.md, etc...)
  • Changelog updated in docs/integrations/<source or destination>/<name>.md with an entry for the new version. See changelog example
  • You, or an Airbyter, have run /test successfully on this PR - or on a non-forked branch
  • You, or an Airbyter, have run /publish successfully on this PR - or on a non-forked branch
  • You've updated the connector's metadata.yaml file new!

If the checklist is complete, but the CI check is failing,

  1. Check for hidden checklists in your PR description

  2. Toggle the github label checklist-action-run on/off to re-run the checklist CI.

@octavia-squidington-iii octavia-squidington-iii added the area/documentation Improvements or additions to documentation label Jun 29, 2023
Copy link
Contributor

@sherifnada sherifnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a great start

My main advice would be to keep in mind how this should be structured to achieve maximum throughput since it seems the current bandwidths we're hitting are actually bad. Not something for v1 but probably an important thing to keep in mind.

I noticed a couple of things seem hard-bound to specific providers e.g: OpenAI embeddings, probably make our life easier if we distinguish between generic stuff and concrete implementations

Also we should have way more unit tests ;)

schema_extra = {"group":"processing"}


class EmbeddingConfigModel(BaseModel):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should probably be called OpenAI embedding Config or something

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a second option for embeddings (fake embeddings that can be used to test the pipeline) and generalized these parts

import re


class ProcessingConfigModel(BaseModel):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is spec.json generated from this file?

Copy link
Contributor Author

@flash1293 flash1293 Jul 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's generated on the flow, removed the old spec.json

) -> Iterable[AirbyteMessage]:
config_model = ConfigModel.parse_obj(config)
self._init_indexer(config_model)
self.processor = Processor(config_model.processing, configured_catalog, max_metadata_size=self.indexer.max_metadata_size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the name Processor is ambiguous

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed to docment_processor

return relevant_fields

def _extract_metadata(self, record: AirbyteRecordMessage) -> dict:
metadata = record.data.copy()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might be a heavy operation on performance (tomorrow problem though)

relevant_fields = record.data
return relevant_fields

def _extract_metadata(self, record: AirbyteRecordMessage) -> dict:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if i'm reading this correctly metadata is everything in the input record that isn't a text/data field?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, we could also make it configurable. Technically you could use the column selection feature for this.

@@ -0,0 +1,26 @@
import time

def measure_time(func):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

neat use of decorators

METADATA_STREAM_FIELD = "_airbyte_stream"
METADATA_NATURAL_ID_FIELD = "_natural_id"

class Processor:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naive question. Why do we need this if embedding APIs can tokenize themselves?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tokenization is just turning utf8 encoded text into a sequence of one-hot vector representations and as you are saying the embedding API takes care of that: https://help.openai.com/en/articles/4936856-what-are-tokens-and-how-to-count-them

What the processor is doing is turning the records into "documents":

  • Split them up appropriately
  • Separate text and metadata
  • Add bookkeeping metadata for incremental sync (id and stream name)


@property
def embedding_dimensions(self) -> int:
return 1536
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the size of the embedding vectors of the openai model, added a comment to explain

@flash1293 flash1293 changed the title Langchain destination 🎉 New Destination Vector Database (powered by LangChain) Jul 5, 2023
@flash1293 flash1293 marked this pull request as ready for review July 5, 2023 12:23
@octavia-squidington-iii
Copy link
Collaborator

destination-langchain test report (commit bddd100c3d) - ❌

⏲️ Total pipeline duration: 60 seconds

Step Result
Validate airbyte-integrations/connectors/destination-langchain/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Code format checks
Connector package install

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=destination-langchain test

@octavia-squidington-iii
Copy link
Collaborator

destination-langchain test report (commit 6126f4f302) - ❌

⏲️ Total pipeline duration: 30 seconds

Step Result
Validate airbyte-integrations/connectors/destination-langchain/metadata.yaml
Connector version semver check
Connector version increment check
QA checks
Code format checks
Connector package install

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=destination-langchain test

@octavia-squidington-iii
Copy link
Collaborator

destination-langchain test report (commit 099ad208a9) - ✅

⏲️ Total pipeline duration: 33.58s

Step Result
Validate airbyte-integrations/connectors/destination-langchain/metadata.yaml
Connector version semver check
QA checks
Code format checks
Connector package install
Build destination-langchain docker image for platform linux/x86_64
Unit tests
Integration tests

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=destination-langchain test

@octavia-squidington-iii
Copy link
Collaborator

destination-langchain test report (commit 099ad208a9) - ✅

⏲️ Total pipeline duration: 22.19s

Step Result
Validate airbyte-integrations/connectors/destination-langchain/metadata.yaml
Connector version semver check
QA checks
Code format checks
Connector package install
Build destination-langchain docker image for platform linux/x86_64
Unit tests
Integration tests

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=destination-langchain test

@octavia-squidington-iii
Copy link
Collaborator

destination-langchain test report (commit 0a571f10d8) - ✅

⏲️ Total pipeline duration: 01mn42s

Step Result
Validate airbyte-integrations/connectors/destination-langchain/metadata.yaml
Connector version semver check
QA checks
Code format checks
Connector package install
Build destination-langchain docker image for platform linux/x86_64
Unit tests
Integration tests

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=destination-langchain test

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a couple of comments here and there while taking a break. I can recheck later if you want

@flash1293
Copy link
Contributor Author

@maxi297 Thanks for your review, addressed your points. Could you take another look?

@octavia-squidington-iii
Copy link
Collaborator

destination-langchain test report (commit 92693970e0) - ❌

⏲️ Total pipeline duration: 01mn37s

Step Result
Validate airbyte-integrations/connectors/destination-langchain/metadata.yaml
Connector version semver check
QA checks
Code format checks
Connector package install
Build destination-langchain docker image for platform linux/x86_64
Unit tests
Integration tests

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=destination-langchain test

@octavia-squidington-iii
Copy link
Collaborator

destination-langchain test report (commit b52a36420c) - ✅

⏲️ Total pipeline duration: 58.09s

Step Result
Validate airbyte-integrations/connectors/destination-langchain/metadata.yaml
Connector version semver check
QA checks
Code format checks
Connector package install
Build destination-langchain docker image for platform linux/x86_64
Unit tests
Integration tests

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=destination-langchain test

"""
Generate documents from records.
:param records: List of AirbyteRecordMessages
:return: Tuple of (List of document chunks, Natural id to delete if applicable)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you comment, either here or in destination.py, what the IDs to delete are about? It's not clear what natural ids are

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an identifier of a record so we can remove documents belonging to a record before inserting new documents when in append-dedup mode. As one record might be split across multiple documents, it has to be made part of the metadata. It's leveraged here:

self.pinecone_index.delete(filter={METADATA_NATURAL_ID_FIELD: {"$in": delete_ids}})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed it to record_id instead of natural id to make it easier to understand and added a comment.

try:
self.embeddings.embed_query("test")
except Exception as e:
return str(e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's generally better to use repr than str because it returns the "printable representation of the object".

Example with a ValueError:

>>> try:
...     raise ValueError("hello")
... except Exception as e:
...     print(str(e))
...
hello
>>> try:
...     raise ValueError("hello")
... except Exception as e:
...     print(repr(e))
...
ValueError('hello')

That being said, we should be able to do even better and return the stacktrace. Here's how we do it in sources

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

honestly I feel like in this case, str(e) plus a stacktrace is more readable? the ValueError piece makes it harder to read for an end user

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, added this


:::caution

DocArrayHnswSearch is meant to be used on a local workstation and won't work on Kubernetes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't have to be done now, but we'll need a way to disable DocArrayHnswSearch before we can add this destination to Cloud

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good callout, how does that work? Similarly to the *-strict-encrypt variants on connectors? Or can we adjust the spec somehow?

Copy link
Contributor

@clnoll clnoll left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome @flash1293! Left a couple of comments.

I ran a langchain destination locally per the instructions in the PR description, using DocArrayHnswSearch + OpenAI and got a green sync, but fyi there were a couple of unexpected error messages in the logs.

errors: $.mode: must be a constant value pinecone, $.mode: does not have a value in the enumeration [pinecone], $.pinecone_key: is missing but it is required, $.pinecone_environment: is missing but it is required, $.index: is missing but it is required
errors: $.openai_key: object found, string expected
errors: $.mode: must be a constant value fake, $.mode: does not have a value in the enumeration [fake]

raise Exception(
f"DocArrayHnswSearchIndexer only supports overwrite mode, got {stream.destination_sync_mode} for stream {stream.stream.name}"
)
for file in os.listdir(self.config.destination_path):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there's some asymmetry between this and the PineconeIndexer, which only deletes an index for the specific stream that's being re-synced. Is it worth putting a warning in the connector UI that the files in this directory will be deleted before every sync?


@property
def embedding_dimensions(self) -> int:
# vector size produced by text-embedding-ada-002 model
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the vector size and/or model be configurable? Is text-embedding-ada-002 the default? I'm not seeing it mentioned anywhere else.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is mentioned in the documentation and as a description as part of the spec:

I think it makes sense to start like this (OpenAI recently deprecated all other embedding models as this one is preferable in all situations), we probably want to add other embeddings at a later stage though that might have different numbers of dimensions (the number of dimensions is an intrinsic part of the embedding model).

@flash1293
Copy link
Contributor Author

@clnoll

I ran a langchain destination locally per the instructions in the PR description, using DocArrayHnswSearch + OpenAI and got a green sync, but fyi there were a couple of unexpected error messages in the logs.

I've noticed that too, but for me those show up on other syncs as well (e.g. this one is from slack to local json), so I didn't think it's a problem:

2023-07-06 13:40:21 INFO i.a.v.j.JsonSchemaValidator(test):121 - JSON schema validation failed. 
errors: $.client_id: is missing but it is required, $.client_secret: is missing but it is required, $.access_token: is missing but it is required, $.option_title: must be a constant value Default OAuth2.0 authorization
2023-07-06 13:40:21 INFO i.a.v.j.JsonSchemaValidator(test):121 - JSON schema validation failed. 

@flash1293 flash1293 requested review from clnoll and girarda July 11, 2023 09:09
@octavia-squidington-iii
Copy link
Collaborator

destination-langchain test report (commit 74dc691848) - ✅

⏲️ Total pipeline duration: 03mn06s

Step Result
Validate airbyte-integrations/connectors/destination-langchain/metadata.yaml
Connector version semver check
QA checks
Code format checks
Connector package install
Build destination-langchain docker image for platform linux/x86_64
Unit tests
Integration tests

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=destination-langchain test

Copy link
Contributor

@girarda girarda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code looks good to me 🎸

be sure to go through and update the PR checklist before merging https://github.com/airbytehq/airbyte/actions/runs/5518023163/jobs/10061428323?pr=26184

* required for the testing need to go to `TEST_REQUIREMENTS` list

### Publishing a new version of the connector
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💰

@@ -0,0 +1,42 @@
#
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to add acceptance tests for this connector?

@octavia-squidington-iii
Copy link
Collaborator

destination-langchain test report (commit 7b9da895b2) - ✅

⏲️ Total pipeline duration: 40.78s

Step Result
Validate airbyte-integrations/connectors/destination-langchain/metadata.yaml
Connector version semver check
QA checks
Code format checks
Connector package install
Build destination-langchain docker image for platform linux/x86_64
Unit tests
Integration tests

🔗 View the logs here

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=destination-langchain test

@flash1293 flash1293 merged commit b52e88a into master Jul 12, 2023
20 checks passed
@flash1293 flash1293 deleted the flash1293/langchain-destination branch July 12, 2023 18:07
@slavakurilyak
Copy link

Excellent work on this integration!

It would be great to see this destination appear in the Connectors list of Airbyte. Consider categorizing this destination under 'LangChain' and 'Pinecone' since Pinecone is unlocked thanks to LangChain.

@flash1293
Copy link
Contributor Author

Thanks for the feedback @slavakurilyak , we have a bunch of plans in this direction, stay tuned!

efimmatytsin pushed a commit to scentbird/airbyte that referenced this pull request Jul 27, 2023
…6184)

* basic version

* polish

* iterate

* keep working

* fix spec

* wip

* improve destination

* basic unit tests

* move embedding dimensionality into embedder

* improve several things

* adjust documentation

* remove unnecessary call

* add some debug information

* fix local destination

* various small fixes

* bring tests into order

* document and add batching to pinecone

* checklist

* improve performance a bit and add test

* fix formatting

* fix metadata

* install C++ 11 on python base

* no more alpine for ci-connector-ops

* remove hard-to-run test

* more documentation

* better documentation

* add icon

* some small adjustments

* review comments

* format

* review comments

---------

Co-authored-by: alafanechere <augustin.lafanechere@gmail.com>
Co-authored-by: Augustin <augustin@airbyte.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues area/documentation Improvements or additions to documentation connectors/destination/langchain team/extensibility
Projects
None yet
Development

Successfully merging this pull request may close these issues.

connectors-ci: Allow compiling native python extensions Work on langchain destination
8 participants