New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DBZ-6365 Support streaming a list of shards/gtids with multiple tasks #135
Conversation
Welcome as a new contributor to Debezium, @twthorn. Reviewers, please add missing author name(s) and alias name(s) to the COPYRIGHT.txt and Aliases.txt respectively. |
@@ -142,17 +143,22 @@ public List<Map<String, String>> taskConfigs(int maxTasks, List<String> currentS | |||
prevGtidsPerShard.keySet(), currentShards); | |||
} | |||
final String keyspace = connectorConfig.getKeyspace(); | |||
// Check the configs in case there is a user specified GTID override | |||
verifyShardGtidConfig(); | |||
Map<String, String> gtidsPerShard = getGtidsPerShardFromConfig(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is not correct, the shard/grids will need to come from the Kafka storage instead of from static config on the normal run.
The static config value is only used for the first run when there is no history in Kafka storage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't merge in this PR yet, I will work with Tom separately to sort out some of the issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated with this feedback, thanks!
|
||
private void verifyShardGtidConfig() { | ||
final List<String> gtids = connectorConfig.getGtid(); | ||
if (connectorConfig.getShard() != null && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is re-used check - maybe it would be good to extract it it to a separate method at VitessConnectorConfig
. You can use a validate method for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added, thanks!
Hi @twthorn, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key. |
Updated to use GTIDs only on initial run (not stored in previous or current shard-gtid map). Also use shard list whenever specified. Added more tests & some refactoring to clean things up. Also force-pushed to fix commit message formatting (no other change). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall, one comment on whether we should support shrinking shard list.
if (prevGtidsPerShard != null && !hasSameShards(prevGtidsPerShard.keySet(), currentShards)) { | ||
LOGGER.warn("Some shards for the previous generation {} are not persisted. Expected shards: {}", | ||
prevGtidsPerShard.keySet(), currentShards); | ||
if (prevGtidsPerShard.keySet().containsAll(currentShards)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we throw Exception in this case? People usually don't look at the logs until problem happens. If we let this pass, we won't have the old gtids for shrinked shards anymore since we don't look for older gtids more than one generation old. If we think shrinking shards is a valid use case, then we probably need to a config flag to indicate whether we should halt on shard list shrinking or not, the default value for that config value should be false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update, it looks good to me now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the feedback. Updated to throw an exception.
If we end up with a use case later, we can implement functionality for contracting the shard list, but for now we will opt for the more cautious path of preventing any lost state (plus this is a new feature, so there's no use cases that do this contracting shard list, and we only know of a use case for expansion i.e., ours).
if (prevGtidsPerShard != null && !hasSameShards(prevGtidsPerShard.keySet(), currentShards)) { | ||
LOGGER.warn("Some shards for the previous generation {} are not persisted. Expected shards: {}", | ||
prevGtidsPerShard.keySet(), currentShards); | ||
if (prevGtidsPerShard.keySet().containsAll(currentShards)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update, it looks good to me now.
Hi @twthorn, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key. |
Fixed a bug where the shards were not being passed in from the config correctly between the two taskConfigs methods. Updated tests to reproduce the error, which is now fixed. Not sure why commit message is failing, I have the correct prefix. |
@twthorn Not sure if you title the commit as 'DBZ-6365: ' instead of 'DBZ-6365 ' would make the commit check pass. |
@twthorn Applied, thanks! |
Summary
We added support previously for single thread mode for reading in a string of csv shards. Now we extend that to also work with multiple tasks
Verification
Added unit tests & acceptance tests for verifying things work as expected when multiple tasks are used with the shard config csv string.
Note: for the registration of the metrics in the integration test, they were always registered under
task_0_1_0
despite the fact thattask_0_2_0
would be the expected name for a connector with 2 max tasks. When I checked the logs thetaskConfigs
function was always being called with 1 even if our config said 2. From what I can tell this is a limitation of the integration tests using the embedded engine, so I just overrode the numTasks for generating the task ID to always be 1.