New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
changefeedccl: support azure-event-hub:// for azure kafka streaming #115806
Conversation
ac5848d
to
4b91a03
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good. Minor comments!
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy and @wenyihu6)
-- commits
line 31 at r1:
We probably don't need "insecure_tls_skip_verify". I added that for confluent as a hack for roachtests. Will likely remove that during this release.
-- commits
line 35 at r1:
typo. should be "myeventhubs"?
pkg/ccl/changefeedccl/sink_kafka.go
line 992 at r1 (raw file):
// newMissingParameterError returns an error message for using unsupported // values in sinkURL. func newRequiredValueError(param string, unsupportedValue, allowedValue string) error {
Can you rename this to NewUnsupportedValueForParameterErrors
?
pkg/ccl/changefeedccl/sink_kafka.go
line 1018 at r1 (raw file):
} if wasSet && dest != allowedBoolValue { return errors.Newf("unsupported value %t for parameter %s, please use %t instead", dest,
This can use newRequiredValueError
right?
pkg/ccl/changefeedccl/sink_kafka.go
line 1129 at r1 (raw file):
// parameters from the sinkURL. Additionally, it validates options based on the // given sinkURL and returns an error for unsupported values. func extendKafkaConfigWithDefault(u *sinkURL, dialConfig kafkaDialConfig) (kafkaDialConfig, error) {
I think this can be renamed to "setDefaultParametersForConfluentAndAzure". I would add a comment to say exactly which parameters its setting to true.
pkg/ccl/changefeedccl/sink_kafka.go
line 1177 at r1 (raw file):
// https://learn.microsoft.com/en-us/azure/event-hubs/azure-event-hubs-kafka-overview // on how to connect to azure event hub kafka protocol. func buildAzureKafkaConfig(u sinkURL) (dialConfig kafkaDialConfig, _ error) {
Can you please add some tests for params like"topic_name" and "topic_prefix" (and whichever else non-auth-related params we want to allow).
pkg/ccl/changefeedccl/sink_kafka_connection_test.go
line 371 at r1 (raw file):
expectedConfig, expectedError := buildDialConfig(sinkURL{URL: oldUri}) actualConfig, actualError := buildDialConfig(sinkURL{URL: newUri}) require.Equal(t, expectedError, actualError)
We should store the dial cfg structs that you expect to see in the test cases above. I think testing this way is a bit risky.
pkg/ccl/changefeedccl/sink_kafka_connection_test.go
line 375 at r1 (raw file):
}) } }
Would you mind running the kafka-auth
roachtest to make sure it passes. Just bc you made changes to confluent, there's a small chance it could fail. Easier to check now than have it fail in nightlies.
75ee1db
to
54efa8e
Compare
85280a7
to
5c86024
Compare
Feel free to hold PR review off until we merge #116950 |
We are introducing azure-event-hub:// scheme for kafka streaming in cockroachdb#115806. This patch refactors specific sections of the existing code into functions to simplify code review. Note that this patch does not change existing logic or functionality. Part of: cockroachdb#103901, cockroachdb#110558 Release Note: none
The first commit is from #117893. |
We are introducing azure-event-hub:// scheme for kafka streaming in cockroachdb#115806. This patch refactors specific sections of the existing code into functions to simplify code review. Note that this patch does not change existing logic or functionality. Part of: cockroachdb#103901, cockroachdb#110558 Release Note: none
fa1543f
to
d97e54e
Compare
Friendly ping on the review : ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
@@ -170,8 +174,8 @@ func TestShowChangefeedJobsRedacted(t *testing.T) { | |||
expectedSinkURI = strings.Replace(expectedSinkURI, certSecret, "redacted", 1) | |||
expectedDescription := strings.Replace(createStmt, apiSecret, "redacted", 1) | |||
expectedDescription = strings.Replace(expectedDescription, certSecret, "redacted", 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
expectedDescription is written twice here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh it's chained here. The second line here uses expectedDescription
instead of createStmt
as the input for strings.Replace
. Do you think I should rename the variable used the first time here or do something like below to make it cleaner?
expectedDescription := strings.Replace(strings.Replace(expectedDescription, certSecret, "redacted", 1), apiSecret, "redacted", 1)
OR
replacer := strings.NewReplacer(apiSecret, "redacted", certSecret, "redacted")
expectedDescription := replacer.Replace(createStmt)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went with the second option here. Lmk if you are against it.
117893: changefeedccl: refactor buildDialConfig r=jayshrivastava a=wenyihu6 We are introducing azure-event-hub:// scheme for kafka streaming in #115806. This patch refactors specific sections of the existing code into functions to simplify code review. Note that this patch does not change existing logic or functionality. Part of: #103901, #110558 Release Note: none 118962: kvserver: remove replicate queue pre-process check r=andrewbaptist a=kvoli Following c9cf068, the `preProcessCheck` for the replicate queue is now redundant. All callers of `processOneChange` will first call `replicaCanBeProcessed`, which checks for a valid lease and that the replica is not destroyed. Epic: none Release note: None Co-authored-by: Wenyi Hu <wenyi@cockroachlabs.com> Co-authored-by: Austen McClernon <austen@cockroachlabs.com>
752674e
to
1517d8e
Compare
TFTRs! bors r=jayshrivastava |
Build failed: |
Previously, users had to navigate the complexities of obtaining azure event hub's kafka endpoint and the corresponding sasl_user and sasl_password for azure streaming. This patch improves it to support a new scheme azure-event-hub:// with the following syntax: ``` azure-event-hub://<NamespaceName>.servicebus.windows.net?shared_access_key_name=<KeyName>&shared_access_key=<KeyValue> ``` azure-event-hub:// can now be used to connect to kafka hosted on Azure event hubs. The sinkURL must include mandatory parameters shared_access_key_name and shared_access_key. By default and by requirements, the options "tls_enabled=true," "sasl_handshake=true," "sasl_enabled=true," and "sasl_mechanism=PLAIN" are applied, as they are the only supported options. Other parameters such as "topic_name", and "topic_prefix" are also supported. Resolves: cockroachdb#103901, cockroachdb#110558 Release note (enterprise change): Changefeeds now support a new scheme azure-event-hub:// for kafka data streaming to azure event hubs. The sinkURL must include mandatory parameters shared_access_key_name and shared_access_key. By default and by requirements, the options "tls_enabled=true," "sasl_handshake=true," "sasl_enabled=true," and "sasl_mechanism=PLAIN" are applied, as they are the only supported options. Other parameters such as "topic_name", and "topic_prefix" are also supported. An example URI is: ``` azure-event-hub://myeventhubs.servicebus.windows.net:9093?shared_access_key_name=abc&shared_access_key=123 ```
Now that we support azure-event-hub:// scheme, this patch redacts shared_access_key info for azure event hub from SHOW JOBS output. See also: cockroachdb#103901 Epic: None Release note: None
This patch changes roachtest kafka-azure for azure event hub using the new scheme azure-event-hub://.
TFTRs! bors r=jayshrivastava |
Build succeeded: |
changefeedccl: support azure-event-hub:// for azure kafka streaming
Previously, users had to navigate the complexities of obtaining azure event
hub's kafka endpoint and the corresponding sasl_user and sasl_password for azure
streaming.
This patch improves it to support a new scheme azure-event-hub:// with the
following syntax:
azure-event-hub:// can now be used to connect to kafka hosted on Azure event
hubs. The sinkURL must include mandatory parameters shared_access_key_name and
shared_access_key. By default and by requirements, the options
"tls_enabled=true," "sasl_handshake=true," "sasl_enabled=true," and
"sasl_mechanism=PLAIN" are applied, as they are the only supported options.
Other parameters such as "topic_name", and "topic_prefix" are also supported.
Resolves: #103901, #110558
Release note (enterprise change): Changefeeds now support a new scheme
azure-event-hub:// for kafka data streaming to azure event hubs. The sinkURL
must include mandatory parameters shared_access_key_name and shared_access_key.
By default and by requirements, the options "tls_enabled=true,"
"sasl_handshake=true," "sasl_enabled=true," and "sasl_mechanism=PLAIN" are
applied, as they are the only supported options. Other parameters such as
"topic_name", and "topic_prefix" are also supported.
An example URI is:
changefeedccl: redact shared_access_key from SHOW JOBS
Now that we support azure-event-hub:// scheme, this patch redacts
shared_access_key info for azure event hub from SHOW JOBS output.
See also: #103901
Epic: None
Release note: None
roachtest/cdc: add roachtest for azure-event-hub://
This patch adds a roachtest for kafka streaming using azure-event-hub://.