-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Tombstone Message Handling for JDBC Sink Connector #302
Conversation
085a497
to
d7e51af
Compare
@ivanyu, could you please take a look at this pull request when you get a chance? Your feedback would be really valuable. Thank you! |
if (records.isEmpty()) { | ||
log.debug("Records is empty"); | ||
if (records.isEmpty() && tombstoneRecords.isEmpty()) { | ||
log.debug("Records are empty."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
I am not sure about this debug.
- the debug text should include Records and tombstone records are empty.
- Do we want to have a debug is the records are empty as before and there are tombstones .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to have a debug is the records are empty as before and there are tombstones.
It seems redundant to include if statements solely for debugging empty records, as the prepare statement debug (mentioned previously) will already cover this aspect by not debugging the statement if no records are there.
} else { | ||
records.add(record); | ||
} | ||
if (records.size() >= config.batchSize || tombstoneRecords.size() >= config.batchSize) { | ||
log.debug("Flushing buffered records after exceeding configured batch size {}.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
might be worth adding in the records.size() and tombstoneRecords.size() so we know which one caused the flush
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @davidradl for the review, I have addressed most of the comments
d7e51af
to
5e2a3c8
Compare
Hey @Joel-hanson, thanks a million for your contribution I have this on my list for review on Monday, sorry for the delay I am just catching up on a few projects at the moment! |
records.add(record); | ||
} | ||
if (records.size() >= config.batchSize || tombstoneRecords.size() >= config.batchSize) { | ||
log.debug("Flushing buffered records {} and tombstone records {} after exceeding the configured batch size of {}.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey,
Just started going through the PR but there are a couple of checkstyle errors on line 102 & Line 124 for lines that exceed 120 characters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Joel-hanson for the PR! I've left a few thoughts but overall it looks pretty good.
I'm unable to run the integration tests on my Apple Silicon Macbook, though. I've tried several things (including looking through Testcontainers issues and using an alternative Docker runtime) with no success.
Would it be possible to use Postgres either instead of or in addition to Oracle for the new integration tests? As Apple Silicon becomes more and more prevalent for devs the cost of having tests that can't be run locally increases.
src/main/java/io/aiven/connect/jdbc/dialect/DatabaseDialect.java
Outdated
Show resolved
Hide resolved
src/main/java/io/aiven/connect/jdbc/dialect/DatabaseDialect.java
Outdated
Show resolved
Hide resolved
2a27255
to
3aab7d2
Compare
@aindriu-aiven @C0urante I have addressed most of the comments. Could you please do a re-review? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Joel-hanson! This is looking really good. I've given all the changes to the main codebase and integration tests a pass. Still haven't looked at the unit tests yet; will try to do that in the next pass once these comments are addressed.
Also, please be sure to build the project locally before pushing more commits. You can do this with ./gradlew clean build
; right now there are still a handful of Checkstyle errors that are causing the build to fail.
src/main/java/io/aiven/connect/jdbc/dialect/DatabaseDialect.java
Outdated
Show resolved
Hide resolved
src/integrationTest/java/io/aiven/kafka/connect/jdbc/oracle/VerifyInsertIT.java
Outdated
Show resolved
Hide resolved
src/integrationTest/java/io/aiven/kafka/connect/jdbc/oracle/VerifyDeleteIT.java
Outdated
Show resolved
Hide resolved
src/integrationTest/java/io/aiven/kafka/connect/jdbc/oracle/VerifyDeleteIT.java
Outdated
Show resolved
Hide resolved
src/integrationTest/java/io/aiven/kafka/connect/jdbc/oracle/VerifyDeleteIT.java
Outdated
Show resolved
Hide resolved
src/integrationTest/java/io/aiven/kafka/connect/jdbc/oracle/VerifyDeleteIT.java
Outdated
Show resolved
Hide resolved
src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/VerifyDeleteIT.java
Show resolved
Hide resolved
3aab7d2
to
5ef719d
Compare
src/integrationTest/java/io/aiven/kafka/connect/jdbc/oracle/VerifyDeleteIT.java
Outdated
Show resolved
Hide resolved
src/integrationTest/java/io/aiven/kafka/connect/jdbc/oracle/AbstractOracleIT.java
Outdated
Show resolved
Hide resolved
src/integrationTest/java/io/aiven/kafka/connect/jdbc/oracle/VerifyDeleteIT.java
Outdated
Show resolved
Hide resolved
src/integrationTest/java/io/aiven/kafka/connect/jdbc/oracle/VerifyInsertIT.java
Outdated
Show resolved
Hide resolved
src/integrationTest/java/io/aiven/kafka/connect/jdbc/postgres/VerifyInsertIT.java
Outdated
Show resolved
Hide resolved
src/integrationTest/java/io/aiven/kafka/connect/jdbc/oracle/VerifyDeleteIT.java
Outdated
Show resolved
Hide resolved
5ef719d
to
43b2959
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @Joel-hanson!
One last request--can you squash your commits into a single commit? No need to mention "address review comments" or anything like that, we just need a description that'll be helpful in the git history for the master branch after the merge.
- Add support for handling tombstone messages in the JDBC sink connector. - Implement ability to delete rows based on tombstone messages. - Introduce new parameter `delete.enabled` to control delete behavior. - Align functionality with documented approach for processing tombstones, similar to Confluent JDBC driver behavior. - Add new integration test with Oracle database. - Implement integration tests for PostgreSQL and Oracle database to test insert and delete operations. - Add new sink config validation for delete enabled config. - Updated the docs to include the new config for sink connector. Signed-off-by: Joel Hanson <joelhanson025@gmail.com> Signed-off-by: Joel Hanson <joel.hanson2@ibm.com>
43b2959
to
4a10120
Compare
@C0urante Thanks, I have squashed the commit and updated the documentation file to include the |
@C0urante Thank you for merging PR. I was wondering if you could provide an estimate of when the next release will be published that includes these changes? If there's anything I can do to assist with the release process, please let me know. Thank you again for your support and for maintaining this project. |
@Joel-hanson I've reached out to some people internally, will let you know when I hear back. |
Description:
This pull request addresses issue #165, focusing on enhancing the functionality of the JDBC sink connector to handle deletes on tombstone messages effectively. By implementing this feature, users can now delete rows corresponding to tombstone messages, which is particularly useful for scenarios involving Change Data Capture (CDC) from another database.
Changes:
delete.enabled
, to control delete behavior.Related Issue(s):
Signed-off-by: Joel Hanson joelhanson025@gmail.com