changefeedccl: support alternative kafka hash algo#161265
changefeedccl: support alternative kafka hash algo#161265craig[bot] merged 1 commit intocockroachdb:masterfrom
Conversation
aerfrei
left a comment
There was a problem hiding this comment.
Looks good! Great to have this done. Left some nits
| if opts.IsSet(changefeedbase.OptHashAlg) { | ||
| if !changefeedbase.HashAlgEnabled.Get(&settings.SV) { | ||
| return errors.Newf( | ||
| "option %s requires cluster setting %s to be enabled", |
There was a problem hiding this comment.
nit: We might want to add a test for this error message.
| } | ||
| } | ||
|
|
||
| // Check feature flag for hash_alg. |
There was a problem hiding this comment.
nit: This comment seems to mostly read aloud the code below, maybe remove it?
| settings.ApplicationLevel, | ||
| "changefeed.hash_alg.enabled", | ||
| "if enabled, allows specifying the hash_alg changefeed option to"+ | ||
| " choose between fnv-1a (default) and murmur2 hash functions for"+ |
There was a problem hiding this comment.
nit: The strings we're concatenating here do not have a space between the second and third, meaning this will become "... forKafka partitioning". To match what we do elsewhere in this file, have these end with "+ and no space at the beginning of the strings.
| var inner kgo.Partitioner | ||
|
|
||
| if changefeedbase.HashAlgEnabled.Get(&settings.SV) && strings.ToLower(hashMethod) == "murmur2" { | ||
| // Use franz-go default (murmur2) |
There was a problem hiding this comment.
super nit: this comment and the one below "Default to fnv-1a" could be full sentences. Maybe we'd prefer something like "murmur2 is the franz-go default and can be specified by the changefeed". And "When not otherwise specified, we use the Sarama default hash method of fnv-1a."
There was a problem hiding this comment.
Did some word smithing, ptal.
pkg/ccl/changefeedccl/sink_test.go
Outdated
| defer leaktest.AfterTest(t)() | ||
| defer log.Scope(t).Close(t) | ||
|
|
||
| settings := cluster.MakeTestingClusterSettings() |
There was a problem hiding this comment.
nit: Might we want to define this before we we define kgoPartitioner, where it's used?
| if changefeedbase.HashAlgEnabled.Get(&settings.SV) && strings.ToLower(hashMethod) == "murmur2" { | ||
| // Use franz-go default (murmur2) | ||
| inner = kgo.StickyKeyPartitioner(nil /* hasher */) | ||
| } else { |
There was a problem hiding this comment.
To avoid this else, we should be able to return directly in the if and then handle the default case not nested.
b587855 to
0e147b1
Compare
rharding6373
left a comment
There was a problem hiding this comment.
Thanks for the review! Ready for another look.
| } | ||
| } | ||
|
|
||
| // Check feature flag for hash_alg. |
| if opts.IsSet(changefeedbase.OptHashAlg) { | ||
| if !changefeedbase.HashAlgEnabled.Get(&settings.SV) { | ||
| return errors.Newf( | ||
| "option %s requires cluster setting %s to be enabled", |
| settings.ApplicationLevel, | ||
| "changefeed.hash_alg.enabled", | ||
| "if enabled, allows specifying the hash_alg changefeed option to"+ | ||
| " choose between fnv-1a (default) and murmur2 hash functions for"+ |
| var inner kgo.Partitioner | ||
|
|
||
| if changefeedbase.HashAlgEnabled.Get(&settings.SV) && strings.ToLower(hashMethod) == "murmur2" { | ||
| // Use franz-go default (murmur2) |
There was a problem hiding this comment.
Did some word smithing, ptal.
| if changefeedbase.HashAlgEnabled.Get(&settings.SV) && strings.ToLower(hashMethod) == "murmur2" { | ||
| // Use franz-go default (murmur2) | ||
| inner = kgo.StickyKeyPartitioner(nil /* hasher */) | ||
| } else { |
pkg/ccl/changefeedccl/sink_test.go
Outdated
| defer leaktest.AfterTest(t)() | ||
| defer log.Scope(t).Close(t) | ||
|
|
||
| settings := cluster.MakeTestingClusterSettings() |
Potential Bug(s) DetectedThe three-stage Claude Code analysis has identified potential bug(s) in this PR that may warrant investigation. Next Steps: Note: When viewing the workflow output, scroll to the bottom to find the Final Analysis Summary. After you review the findings, please tag the issue as follows:
|
| inner = kgo.StickyKeyPartitioner(nil /* hasher */) | ||
| return &kgoChangefeedPartitioner{inner: inner} | ||
| } | ||
| // If not enabled, or if fnv-1a is specified, use fnv-1a by default. |
There was a problem hiding this comment.
nit: Maybe "If the cluster setting is not enabled or no option is specified, use fnv-1a by default.". It wasn't clear to me what is not enabled (the cluster setting? or no option provided?). And if we specify fnv-1a, it's not really by default.
| func newKgoChangefeedPartitioner(hashMethod string, settings *cluster.Settings) kgo.Partitioner { | ||
| var inner kgo.Partitioner | ||
|
|
||
| if changefeedbase.HashAlgEnabled.Get(&settings.SV) && strings.ToLower(hashMethod) == "murmur2" { |
There was a problem hiding this comment.
Claude raised an issue here that seems legit to me: we actually don't want to check this cluster setting here, since if the setting was turned on to create the changefeed successfully with murmur2, if it is turned off later we don't want to switch to the fnv-1a hash mid changefeed. We should also probably add a test for this scenario if possible.
Location: pkg/ccl/changefeedccl/sink_kafka_v2.go:620
Issue: The runtime cluster setting check in newKgoChangefeedPartitioner causes partition inconsistency for existing changefeeds. When a changefeed created with hash_alg='murmur2' resumes after the cluster setting changefeed.hash_alg.enabled is disabled, the partitioner silently falls back to fnv-1a, routing the same keys to different Kafka partitions.
There was a problem hiding this comment.
Hm, this is an interesting case where using a flag to prevent executing a risky new codepath (I realize the new codepath is not very risky, but this is why we require flags when backporting), can lead to weird, possibly undesirable behavior.
I removed the flag from the partitioner code, since it doesn't seem risky. I updated the commit message to reflect the expected behavior if the setting is toggled.
I tested manually to make sure it behaves as expected. Unfortunately there's a blocker to implementing a test in the test suite for this case, since kafkaFeed does not support multiple partitions, which are critical for validating that the hasher is hashing keys correctly. I spent a couple hours on this today but it's going to take more effort. I created #161310 to track this effort.
There was a problem hiding this comment.
Manual testing output: https://gist.github.com/rharding6373/159b952498435f0248e459c224f502bd
There was a problem hiding this comment.
Great, I changed the PR description to reflect the commit message change. I'm thinking with that manual testing even without the work of #161310 it should be fine to merge. Approving.
0e147b1 to
4508455
Compare
4508455 to
17010c2
Compare
| } | ||
|
|
||
| if opts.IsSet(changefeedbase.OptHashAlg) { | ||
| if !changefeedbase.HashAlgEnabled.Get(&settings.SV) { |
There was a problem hiding this comment.
We should do the cluster setting check in ALTER CHANGEFEED too. We might also want to restrict changing the option altogether so that we don't send keys to different partitions after changing the hash algorithm.
Once you add the check, could you also add a test for it?
There was a problem hiding this comment.
We might also want to restrict changing the option altogether so that we don't send keys to different partitions after changing the hash algorithm.
Changing the key partition mapping is the point of changing the hash algorithm. Seems like this should be left to the user, though it also seems like a potential foot-gun.
Added a test for alter changefeed, thanks for catching the lack of coverage. The test shows that the existing check already covers the alter case.
85d2788 to
2292bbc
Compare
|
Changed the name from |
Potential Bug(s) DetectedThe three-stage Claude Code analysis has identified potential bug(s) in this PR that may warrant investigation. Next Steps: Note: When viewing the workflow output, scroll to the bottom to find the Final Analysis Summary. After you review the findings, please tag the issue as follows:
|
Previously, the franz-go based kafka sink only supported the fnv-1a hashing algorithm, because the hashing function was compatible with the original sarama-based sink. However, some users need alternative hashing functions that are compatible with their existing ecosystem. This PR adds the `partition_alg` changefeed option, with the options `fnv-1a` (default) and `murmur2` (kafka's default). This is protected by a cluster setting. Epic: CRDB-58732 Fixes: cockroachdb#161202 Release note (general change): Changefeeds now support the `partition_alg` option for specifying a kafka partitioning algorithm. Currently `fnv-1a` (default) and `murmur2` are supported. The option is only valid on kafka v2 sinks. This is protected by the cluster setting `changefeed.partition_alg.enabled`. An example usage: `SET CLUSTER SETTING changefeed.partition_alg.enabled=true; CREATE CHANGEFEED ... INTO 'kafka://...' WITH partition_alg='murmur2';` Note that if a changefeed is created using the murmur2 algorithm, and then the cluster setting is disabled, the changefeed will continue using the murmur2 algorithm unless the changefeed is altered to use a differed `partition_alg`.
2292bbc to
b5f89d9
Compare
|
Thanks Claude for finding the two instances of the old name I forgot to replace in the PR description. |
|
TFTRs! bors t=aerfrei,andyyang890 |
|
bors r=aerfrei,andyyang890 (fixing a typo) |
|
Based on the specified backports for this PR, I applied new labels to the following linked issue(s). Please adjust the labels as needed to match the branches actually affected by the issue(s), including adding any known older branches. Issue #161202: branch-release-25.2, branch-release-25.4, branch-release-26.1. 🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
|
Encountered an error creating backports. Some common things that can go wrong:
You might need to create your backport manually using the backport tool. error creating merge commit from b5f89d9 to blathers/backport-release-25.2-161265: POST https://api.github.com/repos/rharding6373/cockroach/merges: 409 Merge conflict [] you may need to manually resolve merge conflicts with the backport tool. Backport to branch 25.2.x failed. See errors above. error creating merge commit from b5f89d9 to blathers/backport-release-25.4-161265: POST https://api.github.com/repos/rharding6373/cockroach/merges: 409 Merge conflict [] you may need to manually resolve merge conflicts with the backport tool. Backport to branch 25.4.x failed. See errors above. error creating merge commit from b5f89d9 to blathers/backport-release-26.1-161265: POST https://api.github.com/repos/rharding6373/cockroach/merges: 409 Merge conflict [] you may need to manually resolve merge conflicts with the backport tool. Backport to branch 26.1.x failed. See errors above. 🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
Previously, the franz-go based kafka sink only supported the fnv-1a hashing algorithm, because the hashing function was compatible with the original sarama-based sink. However, some users need alternative hashing functions that are compatible with their existing ecosystem.
This PR adds the
partition_algchangefeed option, with the optionsfnv-1a(default) andmurmur2(kafka's default). This is protected by a cluster setting.Epic: CRDB-58732
Fixes: #161202
Release note (general change): Changefeeds now support the
partition_algoption for specifying a kafka partitioning algorithm. Currently
fnv-1a(default) and
murmur2are supported. The option is only valid on kafkav2 sinks. This is protected by the cluster setting
changefeed.partition_alg.enabled. An example usage:SET CLUSTER SETTING changefeed.partition_alg.enabled=true; CREATE CHANGEFEED ... INTO 'kafka://...' WITH partition_alg='murmur2';Note that if a changefeed iscreated using the murmur2 algorithm, and then the cluster setting is
disabled, the changefeed will continue using the murmur2 algorithm
unless the changefeed is altered to use a differed
partition_alg.