Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable consumer group identification for client #3405

Closed
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .build-tools/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
)

require (
github.com/dapr/kit v0.13.1-0.20240306152601-e33fbab74548 // indirect
github.com/dapr/kit v0.13.1-0.20240402103809-0c7cfce53d9e // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/iancoleman/orderedmap v0.0.0-20190318233801-ac98e3ecb4b0 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions .build-tools/go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/dapr/kit v0.13.1-0.20240306152601-e33fbab74548 h1:rh6cYZ/2nnufSAhnz+fwmsyNWljt0uyxmfIcG6t499I=
github.com/dapr/kit v0.13.1-0.20240306152601-e33fbab74548/go.mod h1:dons8V2bF6MPR2yFdxtTa86PfaE7EJtKAOkZ9hOavBQ=
github.com/dapr/kit v0.13.1-0.20240402103809-0c7cfce53d9e h1:mLvqfGuppb6uhsijmwTlF5sZVtGvig+Ua5ESKF17SxA=
github.com/dapr/kit v0.13.1-0.20240402103809-0c7cfce53d9e/go.mod h1:dons8V2bF6MPR2yFdxtTa86PfaE7EJtKAOkZ9hOavBQ=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
76 changes: 76 additions & 0 deletions 3346.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
From 873c147aef366bdcb9a5049521fedf622fd91340 Mon Sep 17 00:00:00 2001
From: Bernd Verst <github@bernd.dev>
Date: Thu, 8 Feb 2024 13:11:56 -0800
Subject: [PATCH] Recover interrupted eventhubs subscriptions (#3344)

Signed-off-by: Bernd Verst <github@bernd.dev>
---
.../component/azure/eventhubs/eventhubs.go | 39 +++++++++++++++----
1 file changed, 32 insertions(+), 7 deletions(-)

diff --git a/internal/component/azure/eventhubs/eventhubs.go b/internal/component/azure/eventhubs/eventhubs.go
index b0e7c47376..854b459cca 100644
--- a/internal/component/azure/eventhubs/eventhubs.go
+++ b/internal/component/azure/eventhubs/eventhubs.go
@@ -309,13 +309,16 @@ func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, config Subscr
Handler: retryHandler,
}

+ subscriptionLoopFinished := make(chan bool, 1)
+
// Process all partition clients as they come in
- go func() {
+ subscriberLoop := func() {
for {
// This will block until a new partition client is available
// It returns nil if processor.Run terminates or if the context is canceled
partitionClient := processor.NextPartitionClient(subscribeCtx)
if partitionClient == nil {
+ subscriptionLoopFinished <- true
return
}
aeh.logger.Debugf("Received client for partition %s", partitionClient.PartitionID())
@@ -329,15 +332,37 @@ func (aeh *AzureEventHubs) Subscribe(subscribeCtx context.Context, config Subscr
}
}()
}
- }()
+ }

// Start the processor
go func() {
- // This is a blocking call that runs until the context is canceled
- err = processor.Run(subscribeCtx)
- // Do not log context.Canceled which happens at shutdown
- if err != nil && !errors.Is(err, context.Canceled) {
- aeh.logger.Errorf("Error from event processor: %v", err)
+ for {
+ go subscriberLoop()
+ // This is a blocking call that runs until the context is canceled
+ err = processor.Run(subscribeCtx)
+ // Exit if the context is canceled
+ if err != nil && errors.Is(err, context.Canceled) {
+ return
+ }
+ if err != nil {
+ aeh.logger.Errorf("Error from event processor: %v", err)
+ } else {
+ aeh.logger.Debugf("Event processor terminated without error")
+ }
+ // wait for subscription loop finished signal
+ select {
+ case <-subscribeCtx.Done():
+ return
+ case <-subscriptionLoopFinished:
+ // noop
+ }
+ // Waiting here is not strictly necessary, however, we will wait for a short time to increase the likelihood of transient errors having disappeared
+ select {
+ case <-subscribeCtx.Done():
+ return
+ case <-time.After(5 * time.Second):
+ // noop - continue the for loop
+ }
}
}()

2 changes: 2 additions & 0 deletions common/component/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
config.Consumer.Fetch.Default = meta.consumerFetchDefault
config.Consumer.Group.Heartbeat.Interval = meta.HeartbeatInterval
config.Consumer.Group.Session.Timeout = meta.SessionTimeout
config.ClientID = meta.ClientID
config.Consumer.Group.InstanceId = meta.ConsumerGroup
config.ChannelBufferSize = meta.channelBufferSize

config.Net.KeepAlive = meta.ClientConnectionKeepAliveInterval
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ require (
github.com/cloudwego/kitex-examples v0.1.1
github.com/cyphar/filepath-securejoin v0.2.4
github.com/dancannon/gorethink v4.0.0+incompatible
github.com/dapr/kit v0.13.1-0.20240306152601-e33fbab74548
github.com/dapr/kit v0.13.1-0.20240402103809-0c7cfce53d9e
github.com/didip/tollbooth/v7 v7.0.1
github.com/eclipse/paho.mqtt.golang v1.4.3
github.com/fasthttp-contrib/sessions v0.0.0-20160905201309-74f6ac73d5d5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -452,8 +452,8 @@ github.com/dancannon/gorethink v4.0.0+incompatible h1:KFV7Gha3AuqT+gr0B/eKvGhbjm
github.com/dancannon/gorethink v4.0.0+incompatible/go.mod h1:BLvkat9KmZc1efyYwhz3WnybhRZtgF1K929FD8z1avU=
github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuAyr0=
github.com/danieljoos/wincred v1.1.2/go.mod h1:GijpziifJoIBfYh+S7BbkdUTU4LfM+QnGqR5Vl2tAx0=
github.com/dapr/kit v0.13.1-0.20240306152601-e33fbab74548 h1:rh6cYZ/2nnufSAhnz+fwmsyNWljt0uyxmfIcG6t499I=
github.com/dapr/kit v0.13.1-0.20240306152601-e33fbab74548/go.mod h1:dons8V2bF6MPR2yFdxtTa86PfaE7EJtKAOkZ9hOavBQ=
github.com/dapr/kit v0.13.1-0.20240402103809-0c7cfce53d9e h1:mLvqfGuppb6uhsijmwTlF5sZVtGvig+Ua5ESKF17SxA=
github.com/dapr/kit v0.13.1-0.20240402103809-0c7cfce53d9e/go.mod h1:dons8V2bF6MPR2yFdxtTa86PfaE7EJtKAOkZ9hOavBQ=
github.com/dave/jennifer v1.4.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhrIygKg=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
2 changes: 1 addition & 1 deletion tests/certification/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/dapr/components-contrib v1.13.0-rc.10
github.com/dapr/dapr v1.13.0
github.com/dapr/go-sdk v1.10.1
github.com/dapr/kit v0.13.1-0.20240306152601-e33fbab74548
github.com/dapr/kit v0.13.1-0.20240402103809-0c7cfce53d9e
github.com/eclipse/paho.mqtt.golang v1.4.3
github.com/go-chi/chi/v5 v5.0.12
github.com/go-redis/redis/v8 v8.11.5
Expand Down
4 changes: 2 additions & 2 deletions tests/certification/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,8 @@ github.com/dapr/dapr v1.13.0 h1:yExu47iCyqBSghAGVjgVjica4NfFd0dVlPXQTpQWR98=
github.com/dapr/dapr v1.13.0/go.mod h1:VFjFGrLb84k5pjmWNn9reI5D28OQifdUbBdymXxbZDc=
github.com/dapr/go-sdk v1.10.1 h1:g6mM2RXyGkrzsqWFfCy8rw+UAt1edQEgRaQXT+XP4PE=
github.com/dapr/go-sdk v1.10.1/go.mod h1:lPjyF/xubh35fbdNdKkxBbFxFNCmta4zmvsk0JxuUG0=
github.com/dapr/kit v0.13.1-0.20240306152601-e33fbab74548 h1:rh6cYZ/2nnufSAhnz+fwmsyNWljt0uyxmfIcG6t499I=
github.com/dapr/kit v0.13.1-0.20240306152601-e33fbab74548/go.mod h1:dons8V2bF6MPR2yFdxtTa86PfaE7EJtKAOkZ9hOavBQ=
github.com/dapr/kit v0.13.1-0.20240402103809-0c7cfce53d9e h1:mLvqfGuppb6uhsijmwTlF5sZVtGvig+Ua5ESKF17SxA=
github.com/dapr/kit v0.13.1-0.20240402103809-0c7cfce53d9e/go.mod h1:dons8V2bF6MPR2yFdxtTa86PfaE7EJtKAOkZ9hOavBQ=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/pubsub/jetstream/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ toolchain go1.22.2

require (
github.com/dapr/components-contrib v1.10.6-0.20230403162214-9ee9d56cb7ea
github.com/dapr/kit v0.13.1-0.20240306152601-e33fbab74548
github.com/dapr/kit v0.13.1-0.20240402103809-0c7cfce53d9e
)

require (
Expand Down
4 changes: 2 additions & 2 deletions tests/e2e/pubsub/jetstream/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.14.0 h1:dEopBSOSjB5f
github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.14.0/go.mod h1:qDSbb0fgIfFNjZrNTPtS5MOMScAGyQtn1KlSvoOdqYw=
github.com/cloudevents/sdk-go/v2 v2.14.0 h1:Nrob4FwVgi5L4tV9lhjzZcjYqFVyJzsA56CwPaPfv6s=
github.com/cloudevents/sdk-go/v2 v2.14.0/go.mod h1:xDmKfzNjM8gBvjaF8ijFjM1VYOVUEeUfapHMUX1T5To=
github.com/dapr/kit v0.13.1-0.20240306152601-e33fbab74548 h1:rh6cYZ/2nnufSAhnz+fwmsyNWljt0uyxmfIcG6t499I=
github.com/dapr/kit v0.13.1-0.20240306152601-e33fbab74548/go.mod h1:dons8V2bF6MPR2yFdxtTa86PfaE7EJtKAOkZ9hOavBQ=
github.com/dapr/kit v0.13.1-0.20240402103809-0c7cfce53d9e h1:mLvqfGuppb6uhsijmwTlF5sZVtGvig+Ua5ESKF17SxA=
github.com/dapr/kit v0.13.1-0.20240402103809-0c7cfce53d9e/go.mod h1:dons8V2bF6MPR2yFdxtTa86PfaE7EJtKAOkZ9hOavBQ=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
Expand Down