Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CC-349: Added delete support for sink. #641

Merged
merged 2 commits into from May 22, 2019

Conversation

@andybryant
Copy link
Contributor

commented Apr 18, 2019

A record with null value is considered a tombstone event and result in deleting the corresponding row in the destination table.
Deletes are disabled by default. Set config delete.enabled to true to enable. Also requires pk.mode = record_key otherwise it can't find the record to delete.

Tested against Postgres

Fixes #607 and fixes #127

@ConfluentCLABot

This comment has been minimized.

Copy link

commented Apr 18, 2019

@confluentinc It looks like @andybryant just signed our Contributor License Agreement. 👍

Always at your service,

clabot

@andybryant andybryant force-pushed the simple-machines:updated-delete-support branch from 90348fb to 81fe612 Apr 18, 2019

@rhauch

This comment has been minimized.

Copy link
Member

commented Apr 26, 2019

@andybryant thanks for updating the changes and creating this new PR. Very much looking forward to having this feature, but currently the build results show a number of test failures.

@rhauch
Copy link
Member

left a comment

Overall looks really good, and I like the basic approach. I do have a few comments and suggestions, but nothing that significant.

preparedStatementBinder = dbDialect.statementBinder(
preparedStatement,
// should this be using dbDialect.createPreparedStatement ?
updatePreparedStatement = connection.prepareStatement(insertSql);

This comment has been minimized.

Copy link
@rhauch

rhauch Apr 26, 2019

Member

It is probably better to use dbDialect.createPreparedStatement(...), even though that basically does something similar. The benefit of using the dialect, however, is that the dialect logs the operation and concrete dialects can override the behavior.

This comment has been minimized.

Copy link
@andybryant

andybryant Apr 29, 2019

Author Contributor

done

private static final String DELETE_ENABLED_DEFAULT = "false";
private static final String DELETE_ENABLED_DOC =
"Whether to treat ``null`` record values as deletes. Requires ``pk.mode`` "
+ "to be ``record_key``.";

This comment has been minimized.

Copy link
@rhauch

rhauch Apr 26, 2019

Member

Because Validator only has the context of an individual configuration value, it's not straightforward to validate one config value (e.g., pk.mode must be record_key). One technique that works, however, is to add a check in the JdbcSinkConfig constructor to check these two config values against this constraint and throw ConfigException if they are not compatible. This will help ensure that invalid configurations are caught immediately before the connector is started and without having to figure out why the task failing at a later time.

Also, to help C3 and other GUI tools present valid options to begin with, how about adding

  • a Recommender implementation for delete.enabled that hides it if pk.mode is not record_key; and
  • a Recommender for pk.mode that recommends only record_key if delete.enabled=true.

It also may be

This comment has been minimized.

Copy link
@andybryant

andybryant Apr 29, 2019

Author Contributor

added recommenders for both and tested locally in C3

String buildDeleteStatement(
TableId table,
Collection<ColumnId> keyColumns
);

This comment has been minimized.

Copy link
@rhauch

rhauch Apr 26, 2019

Member

Should this be a default method that throws UnsupportedOperationException? It's possible that other projects depend on this connector and add their own dialect, and adding a non-default method would break those projects. Yeah, it's likely they extend GenericDatabaseDialect, but we can't rule it out. Plus, I think this makes sense, too, because by default dialects (that don't extend GenericDatabaseDialect) probably won't support deletes anyway.

This comment has been minimized.

Copy link
@andybryant

andybryant Apr 29, 2019

Author Contributor

good idea - fixed

if (fieldsMetadata.keyFieldNames.isEmpty()) {
throw new ConnectException("Require primary keys to support delete");
}
sql = dbDialect.buildDeleteStatement(

This comment has been minimized.

Copy link
@rhauch

rhauch Apr 26, 2019

Member

Should we catch UnsupportedOperationException and throw a ConnectException with a more useful message?

This comment has been minimized.

Copy link
@andybryant

andybryant Apr 29, 2019

Author Contributor

done

);
break;
default:
throw new ConnectException("Deletes are only supported for pk.mode record_key");

This comment has been minimized.

Copy link
@rhauch

rhauch Apr 26, 2019

Member

Strictly speaking, this case wouldn't be necessary if Config validation ensures that config.pkMode == RecordKey when config.deleteEnabled. However, I still think it's good to have, just in case.

}

@Test
public void insertThenDeleteThenInsertInBatchFlush() throws SQLException {

This comment has been minimized.

Copy link
@rhauch

rhauch Apr 26, 2019

Member

Aren't here a few other cases that we should consider testing? It'd be great to know how much coverage of add(record) the unit tests have, and whether to add more tests if there are more critical lines not hit by the current tests.

This comment has been minimized.

Copy link
@andybryant

andybryant Apr 29, 2019

Author Contributor

added a few extra tests in latest commit. Let me know if you can think of any other missing scenarios

@andybryant

This comment has been minimized.

Copy link
Contributor Author

commented Apr 29, 2019

@rhauch - thanks for reviewing. I've pushing a new commit aimed at addressing your review suggestions. Looks like Jenkins is happy. Let me know if anything else is outstanding. Cheers Andy

@nghyjohn

This comment has been minimized.

Copy link

commented May 2, 2019

I have tried to delete record on mysql by using your jar,
while doing sink, I have used TimestampConverter for my last_update field.
Error shows seems it is because delete action will have null in after,
any suggestions?

[2019-04-30 08:48:45,271] ERROR WorkerSinkTask{id=mysql-jdbc-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:506)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [converting timestamp formats], found: null
at org.apache.kafka.connect.transforms.util.Requirements.requireStruct(Requirements.java:52)
at org.apache.kafka.connect.transforms.TimestampConverter.applyWithSchema(TimestampConverter.java:336)
at org.apache.kafka.connect.transforms.TimestampConverter.apply(TimestampConverter.java:275)
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 14 more
[2019-04-30 08:48:45,276] ERROR WorkerSinkTask{id=mysql-jdbc-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)

@andybryant

This comment has been minimized.

Copy link
Contributor Author

commented May 6, 2019

@rhauch - any chance you'll get some time to review the latest update? I'd hate for this PR to go stale too.
Thx Andy

@andybryant

This comment has been minimized.

Copy link
Contributor Author

commented May 6, 2019

@nghyjohn - the transforms are performed before the connector is called (you can see it's not in the stacktrace at all). So for this setup to work, you would need all your transforms to not fail on null message values. It would be nice if there was an elegant way to do this but I'm not aware of one. In your case, you could write a wrapper transform that forwards messages with null values, otherwise it delegates to the standard TimestampConverter.

@devshawn

This comment has been minimized.

Copy link

commented May 13, 2019

@andybryant @rhauch - any updates on this PR? We need this feature for an upcoming project and I'm trying to avoid forking and building this.

@bigkraig

This comment has been minimized.

Copy link

commented May 20, 2019

@rhauch friendly ping

@rhauch

rhauch approved these changes May 22, 2019

Copy link
Member

left a comment

Thanks for the fixes, @andybryant. LGTM!

@rhauch rhauch changed the base branch from master to 5.3.x May 22, 2019

@rhauch

This comment has been minimized.

Copy link
Member

commented May 22, 2019

Merging to the 5.3.x branch for inclusion in the upcoming CP 5.3.0. Also forward merged to master.

@rhauch rhauch merged commit dd103d0 into confluentinc:5.3.x May 22, 2019

1 check passed

continuous-integration/jenkins/pr-merge This commit looks good
Details

@rhauch rhauch changed the title Fixes #127. Added delete support for sink. CC-349: Added delete support for sink. May 22, 2019

keySchema = record.keySchema();
schemaChanged = true;
}
if (isNull(record.valueSchema())) {

This comment has been minimized.

Copy link
@anthavio

anthavio Jun 4, 2019

Oddly record.valueSchema() returns NOT null even when record.value() returns null for me in real scenario. In your test you send records without value schema when is no value but it seem not to be a when executed in complete connector sink being employed

@limadelrey

This comment has been minimized.

Copy link

commented Jun 25, 2019

Merging to the 5.3.x branch for inclusion in the upcoming CP 5.3.0. Also forward merged to master.

@rhauch when is CP 5.3.0 expected to launch?

@helpermethod

This comment has been minimized.

Copy link

commented Jul 1, 2019

Is this already part of the 5.3.0 beta-releases?

@helpermethod

This comment has been minimized.

Copy link

commented Aug 5, 2019

For anyone who is struggling with using the 5.3.0 release:

If you are using JSON both the payload and the schema attributes of the value need to be set to null.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
9 participants
You can’t perform that action at this time.