-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor state management out of BufferStrategy #13669
Conversation
* | ||
* THis install such a hook to be triggered when that happens. | ||
*/ | ||
void registerFlushAllEventHook(VoidCallable onFlushAllEventHook); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the callback pattern that doesn't seem idiomatic. the fact that the event hook is set after the instantiation of object is a code smell.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, the "flush all event" is indeed a code smell to be looked at, thank you for investigating this.
The buffering strategy refactor was already a pretty large PR on handling AirbyteMessage differently, handling the state messages in a good manner was not necessarily the primary priority there... So the hook "pattern" was introduced/hacked to avoid diverging too much from the previous behavior of destinations on that matter.
@@ -91,22 +94,13 @@ public void flushAll() throws Exception { | |||
}, Map.of("bufferSizeInBytes", bufferSizeInBytes)); | |||
close(); | |||
clear(); | |||
|
|||
if (onFlushAllEventHook != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the fact that all buffers need to replicate this behavior is a smell.
...main/java/io/airbyte/integrations/destination/record_buffer/SerializedBufferingStrategy.java
Show resolved
Hide resolved
...ava/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java
Outdated
Show resolved
Hide resolved
...ava/io/airbyte/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java
Outdated
Show resolved
Hide resolved
...a/io/airbyte/integrations/destination/record_buffer/InMemoryRecordBufferingStrategyTest.java
Outdated
Show resolved
Hide resolved
…integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.java Co-authored-by: Edward Gao <edward.gao@airbyte.io>
What
I was looking into how to update state handling in destinations to handle the new version of the protocol. In the meanwhile, I noticed that we had a weird pattern where the record buffers had a callback to handle tracking state messages. This is not idiomatic to java and mixes two different concerns.
Refactor the code to keep them separate.
See comment in line, but there's one thing in there that I want to make sure it isn't a bug.