New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rewrite EventHubs component #1292
Conversation
86a324d
to
4d96d30
Compare
authentication/azure/auth_amqp.go
Outdated
// 1. Client credentials | ||
// 2. Client certificate | ||
// 3. MSI. | ||
func (s EnvironmentSettings) GetAADTokenProvider() (*aad.TokenProvider, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as the one in #1201
return processor, nil | ||
} | ||
|
||
func (aeh *AzureEventHubs) createHubManager() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HubManager client from azure-event-hubs go sdk for creating/managing an event hub.
return aeh.createConsumerGroup(hubName) | ||
} | ||
|
||
func (aeh *AzureEventHubs) getConsumerGroupsClient() (*mgmt.ConsumerGroupsClient, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consumer group client from the event hubs management clients in azure-go-sdk for managing consumer groups.
"time" | ||
|
||
"github.com/Azure/azure-amqp-common-go/v3/aad" | ||
"github.com/Azure/azure-amqp-common-go/v3/conn" | ||
eventhub "github.com/Azure/azure-event-hubs-go/v3" | ||
"github.com/Azure/azure-event-hubs-go/v3/eph" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as right now there does not seem to be any option for batching within event processor host.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Linking dapr/dapr#2218 (comment) for context on this comment.)
Correct, EPH does not. To implement batch receive, the component would have to go down a level of abstraction to Hub.Receive with the ReceiveWithPrefetchCount option, which also means taking on explicit partition and checkpoint management that EPH otherwise currently handles. There's definitely a cost-benefit analysis to be done there, if/when pubsub batching is pursued as a feature in Dapr.
if err != nil { | ||
return err | ||
} | ||
|
||
leaserCheckpointer, err := storage.NewStorageLeaserCheckpointer(cred, aeh.metadata.storageAccountName, aeh.metadata.storageContainerName, azure.PublicCloud) | ||
// Set topic name, consumerID prefix for partition checkpoint lease blob path. | ||
leaserPrefixOpt := storage.WithPrefixInBlobPath(aeh.getStoragePrefixString(req.Topic)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is needed to support multiple consumers with the same storage container option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a comment in the code as well for future maintainers.
pubsub/azure/eventhubs/eventhubs.go
Outdated
aeh.logger.Info("connecting to Azure EventHubs via AAD. connection established on first publish/subscribe") | ||
aeh.logger.Debugf("req.Topic field in incoming requests honored") | ||
|
||
if aeh.metadata.EnableEnitiyManagement { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of more points here:
if entity Management is enabled, it does need permissions at least at the EventHub namespace level to be able to create event hubs and consumer groups. (Manage permission)
Also the other metadata, SubscriptionID and ResourceGroupName is a mandatory requirement when enabling entityManagement as for now, it assumes that the topic will be subscribed to.
If needed resourceGroupName and SubscriptionID can be validated only as part of subscription metadata validation since it will only be required for a subscriber and not for a publisher alone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrt comment #1055 (comment) from @CodeMonkeyLeet , entity management for EventHubs through Dapr does step into provisioning infrastructure resources and also does not go into fully managing the lifecycle of the resource. (should not delete)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to timebox my review on this for now because I've spent a couple of hours looking at this and I'm not done yet. I think this needs a design review first as I'm spending a chunk of time trying to infer if particular behaviors are intended.
In particular, there are a lot of contingent and interacting configuration options as indicated in this comment and I think it would be a good starting point to enumerate the expected valid configuration combinations and the resulting behaviors.
In general, it seems like we could scope down the scenarios we support, for example, supporting connection string auth to EventHubs namespace but without enableEntityManagement where the component supports multiple event hubs that must be predeployed already seems like a something we could just choose not to support, hence design review.
"time" | ||
|
||
"github.com/Azure/azure-amqp-common-go/v3/aad" | ||
"github.com/Azure/azure-amqp-common-go/v3/conn" | ||
eventhub "github.com/Azure/azure-event-hubs-go/v3" | ||
"github.com/Azure/azure-event-hubs-go/v3/eph" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Linking dapr/dapr#2218 (comment) for context on this comment.)
Correct, EPH does not. To implement batch receive, the component would have to go down a level of abstraction to Hub.Receive with the ReceiveWithPrefetchCount option, which also means taking on explicit partition and checkpoint management that EPH otherwise currently handles. There's definitely a cost-benefit analysis to be done there, if/when pubsub batching is pursued as a feature in Dapr.
pubsub/azure/eventhubs/eventhubs.go
Outdated
defaultMessageRetentionInDays = 1 | ||
defaultPartitionCount = 1 | ||
|
||
timeoutInSec time.Duration = 5 * time.Second //nolint |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the //nolint
here? Is it because the time.Duration is specified? (It's unnecessary given that it should be inferred from time.Second already?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for the name timeoutInSec
, that is throwing a linter error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, use a new name and consider a bigger default as resource provisioning can be slow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If needed, split into multiple timeout configs for different scenarios.
pubsub/azure/eventhubs/eventhubs.go
Outdated
missingResourceGroupNameMsg = "error: missing resourceGroupName attribute required for entityManagement" | ||
missingSubscriptionIDMsg = "error: missing subscriptionID attribute required for entityManagement" | ||
entityManagementConnectionStrMsg = "error: entity management support is not available with connectionString" | ||
differentTopicConnectionStringErrorTmpl = "error: connection string already contains topic/hubName different from requested topic %s" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hubName
is an internal var name, consider using clarifying:
differentTopicConnectionStringErrorTmpl = "error: connection string already contains topic/hubName different from requested topic %s" | |
differentTopicConnectionStringErrorTmpl = "error: specified topic %s does not match the event hub name in the provided connectionString" |
pubsub/azure/eventhubs/eventhubs.go
Outdated
storageAccountName string | ||
storageAccountKey string | ||
storageContainerName string | ||
ConnectionString string `json:"connectionString,omitempty"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider refactoring into separate source files now that the implementation has grown sufficiently complicated to warrant it. A common split in other components is to deal with metadata in a separate metadata.go file.
pubsub/azure/eventhubs/eventhubs.go
Outdated
StorageAccountKey string `json:"storageAccountKey,omitempty"` | ||
StorageContainerName string `json:"storageContainerName,omitempty"` | ||
EnableEnitiyManagement bool `json:"enableEntityManagement,omitempty,string"` | ||
MessageRetentionInDays int32 `json:"messageRetentionInDays,omitempty,string"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my understanding, is there any reason to annotate int32
members MessageRetentionInDays
and PartitionCount
as string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes without the annotation as String, it fails when unmarshaling the JSON serialised config.
pubsub/azure/eventhubs/eventhubs.go
Outdated
if parsed.HubName != "" { | ||
return parsed.HubName, nil | ||
} | ||
return "", nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The if
check and default return can be condensed down to just return parsed.HubName, nil
pubsub/azure/eventhubs/eventhubs.go
Outdated
StorageAccountName string `json:"storageAccountName,omitempty"` | ||
StorageAccountKey string `json:"storageAccountKey,omitempty"` | ||
StorageContainerName string `json:"storageContainerName,omitempty"` | ||
EnableEnitiyManagement bool `json:"enableEntityManagement,omitempty,string"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo:
EnableEnitiyManagement bool `json:"enableEntityManagement,omitempty,string"` | |
EnableEntityManagement bool `json:"enableEntityManagement,omitempty,string"` |
if err != nil { | ||
return err | ||
} | ||
|
||
leaserCheckpointer, err := storage.NewStorageLeaserCheckpointer(cred, aeh.metadata.storageAccountName, aeh.metadata.storageContainerName, azure.PublicCloud) | ||
// Set topic name, consumerID prefix for partition checkpoint lease blob path. | ||
leaserPrefixOpt := storage.WithPrefixInBlobPath(aeh.getStoragePrefixString(req.Topic)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a comment in the code as well for future maintainers.
pubsub/azure/eventhubs/eventhubs.go
Outdated
aeh.logger.Infof("connectionString will be used to connect to given hub %q only. connection established on first publish/subscribe", hubName) | ||
aeh.logger.Debugf("req.Topic field in incoming requests must be same as the one in the connectionString") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Capitalization. Also consider clarifying implied impact on first connect instead of having a separate debug message.
aeh.logger.Infof("connectionString will be used to connect to given hub %q only. connection established on first publish/subscribe", hubName) | |
aeh.logger.Debugf("req.Topic field in incoming requests must be same as the one in the connectionString") | |
aeh.logger.Infof("connectionString provided is specific to event hub %q. Publishing or subscribing to a topic that does not match this event hub will fail when attempted.", hubName) |
@CodeMonkeyLeet Based on the decision there, I can modify this PR. |
204b508
to
0c18574
Compare
967e15d
to
a8f2f4b
Compare
pubsub/azure/eventhubs/eventhubs.go
Outdated
defaultMessageRetentionInDays = 1 | ||
defaultPartitionCount = 1 | ||
|
||
timeoutInSec time.Duration = 5 * time.Second //nolint |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, use a new name and consider a bigger default as resource provisioning can be slow.
pubsub/azure/eventhubs/eventhubs.go
Outdated
defaultMessageRetentionInDays = 1 | ||
defaultPartitionCount = 1 | ||
|
||
timeoutInSec time.Duration = 5 * time.Second //nolint |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If needed, split into multiple timeout configs for different scenarios.
pubsub/azure/eventhubs/eventhubs.go
Outdated
return false, err | ||
} | ||
if *g.Name == aeh.metadata.ConsumerGroup { | ||
aeh.logger.Infof("consumer group %s esists for the requested topic/eventHub %s", aeh.metadata.ConsumerGroup, hubName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: exists
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resolved all three.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall is good. Just some minor comments.
@@ -52,6 +62,15 @@ const ( | |||
sysPropIotHubConnectionModuleID = "iothub-connection-module-id" | |||
sysPropIotHubEnqueuedTime = "iothub-enqueuedtime" | |||
sysPropMessageID = "message-id" | |||
|
|||
defaultMessageRetentionInDays = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the default be infinity?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no it cannot be. See https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quotas for numbers.
timeoutInSec time.Duration = 5 * time.Second //nolint | ||
|
||
// See https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quotas for numbers. | ||
maxMessageRetention = int32(90) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this max coming from event hubs or is it arbitrary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see comment above line.
storageContainerName string | ||
ConnectionString string `json:"connectionString,omitempty"` | ||
EventHubNamespace string `json:"eventHubNamespace,omitempty"` | ||
ConsumerGroup string `json:"consumerID"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why JSON is consumerID and not ConsumerGroup? I think these should match, not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think consumerID is a field coming from dapr ... It is across all pubsub ... not only eventhub ... And in every pubsub it is mapped to some form of a field called cosumer group or such ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The metadata should make sense within the component itself. Someone that uses EventHubs expects ConsumerGroup as the terminology. There is no requirement that metadata should match cross components.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently the metadata properties are marshalled into json and then un-marshalled into the Go struct here. consumerID
is the incoming property name from daprd
Here consumerID
is automatically set by daprd
and not an independent property of this component.
It is called consumerGroup within this component.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I found the code that handles this logic in daprd. That is the logic that automatically sets the ConsumerID attributed based on the app ID. So, this attribute is the same for all components. LGTM.
aaadbd7
to
8d1e3ec
Compare
Signed-off-by: Mukundan Sundararajan <msundar.ms@outlook.com>
8d1e3ec
to
29e22a0
Compare
Codecov Report
@@ Coverage Diff @@
## master #1292 +/- ##
==========================================
- Coverage 34.95% 34.60% -0.36%
==========================================
Files 149 149
Lines 12933 13112 +179
==========================================
+ Hits 4521 4537 +16
- Misses 7927 8089 +162
- Partials 485 486 +1
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I just have a comment to use ConsumerGroup for consistency within EventHubs. That wins over having the same name cross components for metadata.
Signed-off-by: Mukundan Sundararajan <msundar.ms@outlook.com>
Signed-off-by: Mukundan Sundararajan <msundar.ms@outlook.com> Signed-off-by: Amit Mor <amit.mor@hotmail.com>
Signed-off-by: Mukundan Sundararajan <msundar.ms@outlook.com> Signed-off-by: jigargandhi <jigarr.gandhi@gmail.com>
Description
Rewrite of eventhubs component.
Adding more resiliency and retries if needed on connection will be part of the certification task and is left off for this PR.
Issue reference
We strive to have all PR being opened based on an issue, where the problem or feature have been discussed prior to implementation.
part of #951
closes #964
closes dapr/dapr#3753
closes #1225
Checklist
Please make sure you've completed the relevant tasks for this PR, out of the following list: