Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

airbyte-lib: Track streams in cache #34517

Merged
merged 14 commits into from
Jan 29, 2024

Conversation

flash1293
Copy link
Contributor

@flash1293 flash1293 commented Jan 25, 2024

This PR introduces an internal table to track which streams got synced to a cache.

This allows to read from a cache even if no source is available:

cache = ab.new_local_cache("cache_test")

# read from cache without a source
print(list(cache["stream1"]))

This can be used for situations where data is written to a cache in one script, reading from it in another.

Conceptually, the cache remembers what it stores in a persistent way not tied to python heap memory.

Behavior

The cache already kept a source_catalog around with all streams that was read from. This information is now saved to an internal table when changed and loaded from the table when the cache object is created. New streams are merged into the existing catalog by the stream name, replacing existing stream definitions in case of collisions.

The internal table _airbytelib_streams is added to the same schema as all other tables.

When accessing streams from the cache object, all stream names that are in the catalog and also have an existing final table are available.

When accessing streams for a ReadResult, only the streams that got synced in that read are available, not all of them.

Implementation details

As it's available already, this PR is using sqlalchemy ORM capabilities to create the table in the cache and manage its data.

Unclear points

Table name vs. stream name

A lot of the logic in the cache class is hinging on the stream name. However, the mapping from stream name to table name depends on the configuration (e.g. via the configured prefix). This leaves an edge case that could lead to confusing behavior:

  • A cache is initialized without table name prefix and some data is synced
  • Later in a different script, a new cache instance pointing to the same duckdb file is initialized which has a table name prefix configured
  • The cache appears empty as for none of the streams in the internal streams table, a data table can be found

This could be "fixed" by leveraging the table name column in the internal streams table, but it makes the world more complex than justified IMHO.

Different sources with identical stream names

As the interface of the cache is built around the stream name, there's no straightforward way to support multiple sources with identical stream names. I think it's OK to not handle this situation, just wanted to bring it up.

Copy link

vercel bot commented Jan 25, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

Name Status Preview Comments Updated (UTC)
airbyte-docs ✅ Ready (Inspect) Visit Preview 💬 Add feedback Jan 29, 2024 9:10am

Copy link
Collaborator

@aaronsteers aaronsteers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's looking great. Some feedback inline 👍

airbyte-lib/airbyte_lib/results.py Outdated Show resolved Hide resolved
airbyte-lib/airbyte_lib/results.py Outdated Show resolved Hide resolved
airbyte-lib/airbyte_lib/results.py Outdated Show resolved Hide resolved
airbyte-lib/airbyte_lib/source.py Show resolved Hide resolved
airbyte-lib/airbyte_lib/caches/base.py Show resolved Hide resolved
Comment on lines 64 to 67
stream_name = Column(String, primary_key=True)
source_name = Column(String)
table_name = Column(String)
catalog_metadata = Column(String)
Copy link
Collaborator

@aaronsteers aaronsteers Jan 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wdyt of using a composite three-column primary key here?

Suggested change
stream_name = Column(String, primary_key=True)
source_name = Column(String)
table_name = Column(String)
catalog_metadata = Column(String)
stream_name = Column(String, primary_key=True)
source_name = Column(String, primary_key=True)
table_name = Column(String, primary_key=True)
catalog_metadata = Column(String)

The implications would be:

  1. Dupes are allowed for different sources using the same stream name.
  2. Dupes are allowed for the same stream name and same source name - iif table suffix and/or suffix are different.
  3. If a user configures the cache with different prefix/suffix settings, they'll basically be writing to a different namespace within the cache.
    1. Meaning, I could initialize my cache with "_test1" suffix, "_test2" suffix, etc., AirbyteLib would treat these as different namespaces with different tablesets.
  4. When we get to supporting multiple sources in the same cache, we might want a dynamic default table prefix like '{source_name}_'. But we can tackle that later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem I'm seeing with that is that the cache api is stream name only based.

For example, you have the following entries in the _airbytelib_streams table:

stream source table
my_stream source1 source1_my_stream
my_stream source1 source1_prefixed_my_stream

If you do cache["my_stream"], then what table are you getting? For this prefix case, we could decide based on the table name, by comparing the table name the cache would choose for a stream name with the table name stored in the _airbytelib_streams table. I'm going to implement this.

But what about this case:

stream source table
my_stream source1 source1_my_stream
my_stream source2 source2_my_stream

cache["my_stream"] could be either of those, and there is no way to decide. Changing this to cache["source1_my_stream"] raises lots of new questions - it just doesn't feel like the right approach to me, this edge case shouldn't make it harder for the "regular" case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, in summary, we should actually use the table name as primary key, because that's what needs to be unique for the current feature set.

For multi-source, I think it's OK to merge the streams and overwrite / raise in the collision case - after all, this is also what would happen in case of regular destinations.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Yeah, I think this sounds like a great path forward. 👍

@flash1293
Copy link
Contributor Author

@aaronsteers I adjusted the PR as discussed above. This is the behavior now:

  • Prefixes will serve as "namespaces" for cache (all maintained in the same internal table, but the user only "sees" the tables of the current namespace)
  • If different sources write the same stream name, the last one will win (like in regular Airbyte destinations) - this allows us to keep the "stream name only" semantics when interacting with the cache which I think is something we should keep
  • Moved the logic into a "catalog manager"

Copy link
Collaborator

@aaronsteers aaronsteers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great. Thanks for the comments and related updates. Feel free to merge when ready.

Comment on lines +35 to +38
stream_name = Column(String)
source_name = Column(String)
table_name = Column(String, primary_key=True)
catalog_metadata = Column(String)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this approach to the primary key. 👍👍

airbyte-lib/airbyte_lib/source.py Show resolved Hide resolved
Comment on lines 64 to 67
stream_name = Column(String, primary_key=True)
source_name = Column(String)
table_name = Column(String)
catalog_metadata = Column(String)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Yeah, I think this sounds like a great path forward. 👍

@flash1293 flash1293 merged commit 74ceae0 into master Jan 29, 2024
24 checks passed
@flash1293 flash1293 deleted the flash1293/airbyte-lib-streams-internal-table branch January 29, 2024 10:17
jatinyadav-cc pushed a commit to ollionorg/datapipes-airbyte that referenced this pull request Feb 21, 2024
jatinyadav-cc pushed a commit to ollionorg/datapipes-airbyte that referenced this pull request Feb 26, 2024
jatinyadav-cc pushed a commit to ollionorg/datapipes-airbyte that referenced this pull request Feb 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants