New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Emit final state message for full refresh syncs and consolidate read flows #35622
Conversation
…ove emitting legacy state, fixing lots of tests
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Ignored Deployment
|
if sync_mode == SyncMode.full_refresh: | ||
# We use a dummy state if there is no suitable value provided by full_refresh streams that do not have a valid cursor. | ||
# Incremental streams running full_refresh mode emit a meaningful state | ||
stream_state = stream_state or {"sync_mode": "full_refresh"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After testing on the platform, the platform will persist stream state regardless of the sync mode it's running in. It might actually be better to store {}
rather than a bogus value that isn't usable.
The other alternative is a best attempt at a cursor using the emitted_at
of the last record. My big reservation on why this is a bad idea is that gives a false perception of an accurate cursor value which is misleading for all our full refresh streams. And this value is not reliable because its our internal clock not the API's.
slices = self.stream_slices( | ||
cursor_field=cursor_field, | ||
sync_mode=SyncMode.incremental, | ||
sync_mode=sync_mode, # todo: change this interface to no longer rely on sync_mode for behavior |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving the todo was intentional. I don't think the stream_slice()
or read_records()
should be designed to rely on a sync_mode
, but I want to try to reduce the complexity and breaking changes in this one PR.
That being said, didn't see any usage of sync_mode
for internal implementations this in some connectors I glanced at
@@ -21,7 +21,7 @@ | |||
|
|||
|
|||
class SourceS3(FileBasedSource): | |||
_concurrency_level = DEFAULT_CONCURRENCY | |||
_concurrency_level = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: i'll remove this before merging, but I turned this off to test that this worked on the platform for synchronous file-based sources
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got one concern about emitting dummy state messages that could be re-used by the platform. Apart from that, my comments are mostly nits
stream_state = state_manager.get_stream_state(stream_name, stream_instance.namespace) | ||
|
||
if stream_state and "state" in dir(stream_instance): | ||
stream_instance.state = stream_state # type: ignore # we check that state in the dir(stream_instance) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the platform use the state {"sync_mode": "full_refresh"}
to pass it to the source? I would fear some sources might break like this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. I think we should filter out the dummy state objects before setting stream states to ensure we only set valid states
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
discussed offline, we'll use a sentinel value to indicate how this was set so we. can avoid triggering a bad setter
value
stream_state, | ||
state_manager, | ||
internal_config, | ||
) | ||
|
||
record_counter = 0 | ||
stream_name = configured_stream.stream.name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This seems redundant given https://github.com/airbytehq/airbyte/pull/35622/files#diff-51c861828e0a614ef6ee21390f3ba17d7138549f0575051fdb24868ee05eabbbR204
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will remove!
), | ||
) | ||
return AirbyteMessage(type=MessageType.STATE, state=AirbyteStateMessage(data=dict(self._get_legacy_state()))) | ||
# According to the Airbyte protocol, the StreamDescriptor namespace field is not required. However, the platform will throw |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this only for state messages? I don't see this logic being applied here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Honestly, I'm not sure. This comment was written when I did per-stream status 1.5 years ago. It might've been fixed in the platform in the time since. But what I can do is republish a pre-release version and test it w/ this streamlined. Depending on how that goes I may or may not update this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I retested this on a pre-release and it looks like the original hack is no longer needed and descriptor with namespace None
works how we expect it would.
I'll clean this up and the test thanks for the suggestion!
state_value = ( | ||
airbyte_state_message.state.stream.stream_state.dict() if airbyte_state_message.state.stream else {} | ||
) | ||
logger.info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a nice log to have but I fear it might be quite noisy. Do we know of a source that usually have big syncs and checkpoint_interval that could blast this log message a ton and see if this is acceptable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah yeah so I plan to remove this after finishing all functional validations. It just helps me validate the behavior on the source side against what makes it to the DB. But thanks for the reminder
checkpoint = self._checkpoint_state(stream_state, state_manager, per_stream_state_enabled) | ||
yield checkpoint | ||
airbyte_state_message = self._checkpoint_state(stream_state, state_manager) | ||
state_value = airbyte_state_message.state.stream.stream_state.dict() if airbyte_state_message.state.stream else {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we extract to an accessor for state_value as we seem to do the same logic multiple time. I think the maintenance is probably null though as it would mean a breaking change in the Airbyte protocol lib
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually as per the earlier comment, I'm going to get rid of the logging statements which were just used for testing. I only had this long string of accessors to populate the log statement so we won't need the helper method
return create_response_builder( | ||
response_template=RESPONSE_TEMPLATE, | ||
records_path=FieldPath("data"), | ||
# pagination_strategy=StripePaginationStrategy() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we remove this?
def test_full_refresh_sync(self, http_mocker): | ||
start_datetime = _NOW - timedelta(days=14) | ||
config = { | ||
"start_date": start_datetime.isoformat()[:-13]+"Z", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is dangerous since if _NOW have microseconds == 0, this will give a unexpected format.
>>> datetime(2024, 1, 1, 2, 3, 4).isoformat()
'2024-01-01T02:03:04'
>>> datetime(2024, 1, 1, 2, 3, 4, 5).isoformat()
'2024-01-01T02:03:04.000005'
We should probably define the format we expect instead
|
||
def _incremental_concurrent_stream(slice_to_partition_mapping, slice_logger, logger, message_repository, cursor): | ||
stream = _concurrent_stream(slice_to_partition_mapping, slice_logger, logger, message_repository, cursor) | ||
# stream.state = {"created_at": timestamp} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why this is commented out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
me being careless probably. removed
@@ -107,39 +107,24 @@ def get_error_display_message(self, exception: BaseException) -> Optional[str]: | |||
""" | |||
return None | |||
|
|||
def read_full_refresh( | |||
def read( # type: ignore # ignoring typing for ConnectorStateManager because of circular dependencies |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: an alternative is to only import the type when running mypy
example:
if typing.TYPE_CHECKING:
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
stream_state = state_manager.get_stream_state(stream_name, stream_instance.namespace) | ||
|
||
if stream_state and "state" in dir(stream_instance): | ||
stream_instance.state = stream_state # type: ignore # we check that state in the dir(stream_instance) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. I think we should filter out the dummy state objects before setting stream states to ensure we only set valid states
|
||
if not has_slices: | ||
# Safety net to ensure we always emit at least one state message even if there are no slices |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we should update this comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
!
@@ -31,6 +31,8 @@ | |||
|
|||
JsonSchema = Mapping[str, Any] | |||
|
|||
FULL_REFRESH_SENTINEL_STATE_KEY = "__ab_full_refresh_state_message" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add a comment explaining why this is needed
Suspect IssuesThis pull request was deployed and Sentry observed the following issues:
Did you find this useful? React with a 👍 or 👎 |
Closes https://github.com/airbytehq/airbyte-internal-issues/issues/2925
First pass at adding state for full refresh, consolidating flows, remove emitting legacy state, adding integration tests, fixing lots of tests
What
At a high level what this change does is makes it so that regardless of the sync mode that a stream is running in, we always emit at least one state message upon successful completion of a sync. For incremental streams running as full refresh w/ valid record cursors, the final state message will be the last record cursor value observed. For full refresh streams, the final state message will be a dummy value
{"sync_mode": "full_refresh"}
.How
AbstractSource
level, consolidates full refresh and incremental helper functions into a single oneStream
level, removes theread_full_refresh
andread_incremental
methods from the Stream interface. This would be classified as breaking. We could opt for just marking this as deprecated. However, this might be even more misleading because theAbstractSource
flow is now using theStream.read()
method. I didn't see any usage of this in our repo beyond concurrent or file-based, but technically OSS customers could have done something like this.integration testsMOCK SERVER TESTS using a full refresh, an incremental, and an incremental running in full refresh moderead_incremental
andread_full_refresh
method implementationsTesting and Validation
Integration testMOCK SERVER TESTS cases:get_updated_state()
as incremental updates statePre-release testing:
Recommended reading order
abstract_source.py
core.py
connector_state_manager.py
integration_source_fixture.py
test_integration_abstract_source.py
streams/concurrent/adapters.py
file_based/stream/concurrent/adapters.py
🚨 User Impact 🚨
Are there any breaking changes? What is the end result perceived by the user?
We are deprecating two methods on the
Stream
interfaceread_full_refresh()
andread_incremental()
. There are not any sources in our repository that override these methods, but it's possible for OSS customers who've written a highly customized connector to be impacted. We've moved to a consolidated flow using theStream.read()
method moving forward and in order to use the current version of the CDK, they must migrate their connector to use this new method.