Skip to content

Commit

Permalink
Updated Service Bus components to track2 SDK
Browse files Browse the repository at this point in the history
Signed-off-by: Alessandro Segala (ItalyPaleAle) <43508+ItalyPaleAle@users.noreply.github.com>
  • Loading branch information
ItalyPaleAle committed May 4, 2022
1 parent e2fedcd commit bbb776e
Show file tree
Hide file tree
Showing 11 changed files with 608 additions and 524 deletions.
223 changes: 128 additions & 95 deletions bindings/azure/servicebusqueues/servicebusqueues.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"sync/atomic"
"time"

servicebus "github.com/Azure/azure-service-bus-go"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
servicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
sbadmin "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin"
"github.com/cenkalti/backoff/v4"

azauth "github.com/dapr/components-contrib/authentication/azure"
Expand All @@ -43,8 +45,8 @@ const (
// AzureServiceBusQueues is an input/output binding reading from and sending events to Azure Service Bus queues.
type AzureServiceBusQueues struct {
metadata *serviceBusQueuesMetadata
ns *servicebus.Namespace
queue *servicebus.QueueEntity
client *servicebus.Client
adminClient *sbadmin.Client
shutdownSignal int32
logger logger.Logger
ctx context.Context
Expand All @@ -64,83 +66,71 @@ func NewAzureServiceBusQueues(logger logger.Logger) *AzureServiceBusQueues {
}

// Init parses connection properties and creates a new Service Bus Queue client.
func (a *AzureServiceBusQueues) Init(metadata bindings.Metadata) error {
meta, err := a.parseMetadata(metadata)
func (a *AzureServiceBusQueues) Init(metadata bindings.Metadata) (err error) {
a.metadata, err = a.parseMetadata(metadata)
if err != nil {
return err
}
userAgent := "dapr-" + logger.DaprVersion
a.metadata = meta

var ns *servicebus.Namespace
userAgent := "dapr-" + logger.DaprVersion
if a.metadata.ConnectionString != "" {
ns, err = servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(a.metadata.ConnectionString),
servicebus.NamespaceWithUserAgent(userAgent))
a.client, err = servicebus.NewClientFromConnectionString(a.metadata.ConnectionString, &servicebus.ClientOptions{
ApplicationID: userAgent,
})
if err != nil {
return err
}

a.adminClient, err = sbadmin.NewClientFromConnectionString(a.metadata.ConnectionString, nil)
if err != nil {
return err
}
} else {
// Initialization code
settings, sErr := azauth.NewEnvironmentSettings(azauth.AzureServiceBusResourceName, metadata.Properties)
if sErr != nil {
return sErr
settings, innerErr := azauth.NewEnvironmentSettings(azauth.AzureServiceBusResourceName, metadata.Properties)
if innerErr != nil {
return innerErr
}

tokenProvider, tErr := settings.GetAMQPTokenProvider()
if tErr != nil {
return tErr
token, innerErr := settings.GetTokenCredential()
if innerErr != nil {
return innerErr
}

ns, err = servicebus.NewNamespace(servicebus.NamespaceWithTokenProvider(tokenProvider),
servicebus.NamespaceWithUserAgent(userAgent))
if err != nil {
return err
a.client, innerErr = servicebus.NewClient(a.metadata.NamespaceName, token, &servicebus.ClientOptions{
ApplicationID: userAgent,
})
if innerErr != nil {
return innerErr
}

// We set these separately as the ServiceBus SDK does not provide a way to pass the environment via the options
// pattern unless you allow it to recreate the entire environment which seems wasteful.
ns.Name = a.metadata.NamespaceName
ns.Environment = *settings.AzureEnvironment
ns.Suffix = settings.AzureEnvironment.ServiceBusEndpointSuffix
a.adminClient, innerErr = sbadmin.NewClient(a.metadata.NamespaceName, token, nil)
if innerErr != nil {
return innerErr
}
}
a.ns = ns

qm := ns.NewQueueManager()

ctx := context.Background()

queues, err := qm.List(ctx)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
getQueueRes, err := a.adminClient.GetQueue(ctx, a.metadata.QueueName, nil)
if err != nil {
return err
}

var entity *servicebus.QueueEntity
for _, q := range queues {
if q.Name == a.metadata.QueueName {
entity = q

break
}
}

// Create queue if it does not exist
if entity == nil {
var ttl time.Duration
var ok bool
ttl, ok, err = contrib_metadata.TryGetTTL(metadata.Properties)
if err != nil {
return err
if getQueueRes == nil {
// Need to create the queue
ttlDur := contrib_metadata.Duration{
Duration: a.metadata.ttl,
}

if !ok {
ttl = a.metadata.ttl
}
entity, err = qm.Put(ctx, a.metadata.QueueName, servicebus.QueueEntityWithMessageTimeToLive(&ttl))
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
_, err = a.adminClient.CreateQueue(ctx, a.metadata.QueueName, &sbadmin.CreateQueueOptions{
Properties: &sbadmin.QueueProperties{
DefaultMessageTimeToLive: to.Ptr(ttlDur.ToISOString()),
},
})
if err != nil {
return err
}
}
a.queue = entity

a.clearShutdown()

Expand Down Expand Up @@ -188,88 +178,131 @@ func (a *AzureServiceBusQueues) Operations() []bindings.OperationKind {
}

func (a *AzureServiceBusQueues) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

client, err := a.ns.NewQueue(a.queue.Name)
sender, err := a.client.NewSender(a.metadata.QueueName, nil)
if err != nil {
return nil, err
}
defer client.Close(ctx)
defer sender.Close(ctx)

msg := servicebus.NewMessage(req.Data)
msg := &servicebus.Message{
Body: req.Data,
}
if val, ok := req.Metadata[id]; ok && val != "" {
msg.ID = val
msg.MessageID = &val
}
if val, ok := req.Metadata[correlationID]; ok && val != "" {
msg.CorrelationID = val
msg.CorrelationID = &val
}

ttl, ok, err := contrib_metadata.TryGetTTL(req.Metadata)
if err != nil {
return nil, err
}

if ok {
msg.TTL = &ttl
msg.TimeToLive = &ttl
}

return nil, client.Send(ctx, msg)
return nil, sender.SendMessage(ctx, msg, nil)
}

func (a *AzureServiceBusQueues) Read(handler func(context.Context, *bindings.ReadResponse) ([]byte, error)) error {
var sbHandler servicebus.HandlerFunc = func(ctx context.Context, msg *servicebus.Message) error {
_, err := handler(ctx, &bindings.ReadResponse{
Data: msg.Data,
Metadata: map[string]string{id: msg.ID, correlationID: msg.CorrelationID, label: msg.Label},
})
if err == nil {
return msg.Complete(ctx)
}

return msg.Abandon(ctx)
}

// Connections need to retry forever with a maximum backoff of 5 minutes and exponential scaling.
connConfig := retry.DefaultConfig()
connConfig.Policy = retry.PolicyExponential
connConfig.MaxInterval, _ = time.ParseDuration("5m")
connConfig.MaxInterval = 5 * time.Minute
connBackoff := connConfig.NewBackOffWithContext(a.ctx)

for !a.isShutdown() {
client := a.attemptConnectionForever(connBackoff)

if client == nil {
receiver := a.attemptConnectionForever(connBackoff)
if receiver == nil {
a.logger.Errorf("Failed to connect to Azure Service Bus Queue.")
continue
}
defer client.Close(context.Background())

if err := client.Receive(a.ctx, sbHandler); err != nil {
msgs, err := receiver.ReceiveMessages(a.ctx, 10, nil)
if err != nil {
a.logger.Warnf("Error reading from Azure Service Bus Queue binding: %s", err.Error())
}

// Blocks until the connection is closed
for _, msg := range msgs {
body, err := msg.Body()
if err != nil {
a.logger.Warnf("Error reading message body: %s", err.Error())
a.abandonMessage(receiver, msg)
continue
}

metadata := make(map[string]string)
metadata[id] = msg.MessageID
if msg.CorrelationID != nil {
metadata[correlationID] = *msg.CorrelationID
}
if msg.Subject != nil {
metadata[label] = *msg.Subject
}

_, err = handler(a.ctx, &bindings.ReadResponse{
Data: body,
Metadata: metadata,
})
if err != nil {
a.abandonMessage(receiver, msg)
continue
}

err = receiver.CompleteMessage(a.ctx, msg, nil)
if err != nil {
a.logger.Warnf("Error completing message: %s", err.Error())
continue
}
}

// Disconnect (gracefully) before attempting to re-connect (unless we're shutting down)
// Use a background context here because a.ctx may be canceled already at this stage
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
if err := receiver.Close(ctx); err != nil {
// Log only
a.logger.Warnf("Error closing receiver of Azure Service Bus Queue binding: %s", err.Error())
}
cancel()
}
return nil
}

func (a *AzureServiceBusQueues) attemptConnectionForever(backoff backoff.BackOff) *servicebus.Queue {
var client *servicebus.Queue
retry.NotifyRecover(func() error {
clientAttempt, err := a.ns.NewQueue(a.queue.Name)
if err != nil {
return err
}
client = clientAttempt
return nil
}, backoff,
func (a *AzureServiceBusQueues) abandonMessage(receiver *servicebus.Receiver, msg *servicebus.ReceivedMessage) {
ctx, cancel := context.WithTimeout(a.ctx, 30*time.Second)
err := receiver.AbandonMessage(ctx, msg, nil)
if err != nil {
// Log only
a.logger.Warnf("Error abandoning message: %s", err.Error())
}
cancel()
}

func (a *AzureServiceBusQueues) attemptConnectionForever(backoff backoff.BackOff) *servicebus.Receiver {
var receiver *servicebus.Receiver
retry.NotifyRecover(
func() error {
clientAttempt, err := a.client.NewReceiverForQueue(a.metadata.QueueName, nil)
if err != nil {
return err
}
receiver = clientAttempt
return nil
},
backoff,
func(err error, d time.Duration) {
a.logger.Debugf("Failed to connect to Azure Service Bus Queue Binding with error: %s", err.Error())
},
func() {
a.logger.Debug("Successfully reconnected to Azure Service Bus.")
backoff.Reset()
})
return client
},
)
return receiver
}

func (a *AzureServiceBusQueues) Close() error {
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ require (
)

require (
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v0.4.0
github.com/labd/commercetools-go-sdk v0.3.2
github.com/nacos-group/nacos-sdk-go/v2 v2.0.1
gopkg.in/couchbase/gocb.v1 v1.6.4
Expand All @@ -167,10 +168,10 @@ require gopkg.in/couchbaselabs/jsonx.v1 v1.0.1 // indirect

require (
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
github.com/Azure/azure-sdk-for-go/sdk/messaging/internal v0.1.0 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 // indirect
github.com/appscode/go-querystring v0.0.0-20170504095604-0126cfb3f1dc // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/gin-gonic/gin v1.7.7 // indirect
github.com/hashicorp/go-hclog v0.14.1 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/mattn/go-colorable v0.1.8 // indirect
Expand Down
9 changes: 8 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,17 @@ github.com/Azure/azure-sdk-for-go/sdk/azcore v0.23.1 h1:3CVsSo4mp8NDWO11tHzN/mdo
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.23.1/go.mod h1:w5pDIZuawUmY3Bj4tVx3Xb8KS96ToB0j315w9rqpAg0=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.14.0 h1:NVS/4LOQfkBpk+B1VopIzv1ptmYeEskA8w/3K/w7vjo=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.14.0/go.mod h1:RG0cZndeZM17StwohYclmcXSr4oOJ8b1I5hB8llIc6Y=
github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.2/go.mod h1:KLF4gFr6DcKFZwSuH8w8yEK6DpFl3LP5rhdvAb7Yz5I=
github.com/Azure/azure-sdk-for-go/sdk/internal v0.9.2 h1:Px2KVERcYEg2Lv25AqC2hVr0xUWaq94wuEObLIkYzmA=
github.com/Azure/azure-sdk-for-go/sdk/internal v0.9.2/go.mod h1:CdSJQNNzZhCkwDaV27XV1w48ZBPtxe7mlrZAsPNxD5g=
github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.7.0 h1:XkMDpP4GWkeW3RLxJ6JKjJGTD0Xq2je/SBeKG+WPTuI=
github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.7.0/go.mod h1:HgRcYBKlSo9ZWdxt3Y36aIlxKsSk9pdAmB4WvYv9FX4=
github.com/Azure/azure-sdk-for-go/sdk/keyvault/internal v0.3.0 h1:0gy84rslo34rGGBe2cDxfs4iDMwbKc0/4yDna1S7j8Q=
github.com/Azure/azure-sdk-for-go/sdk/keyvault/internal v0.3.0/go.mod h1:mu846WjGmdK5vWqWv25J416znWpnFjZp4+O34KW8H7U=
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v0.4.0 h1:ALpdyAqAOhrHiTMWOkIjB7hEiG7KTvyx7CZ9Qel+ZYQ=
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v0.4.0/go.mod h1:E3RSrZI5ub9zlZeobR+UUuuuUv4rz6JcVjVpfAmZ65Y=
github.com/Azure/azure-sdk-for-go/sdk/messaging/internal v0.1.0 h1:mvQhIGmKI3vmdNDUMG0ZHaZ1p+JcHE3m0q1RMOyKxRo=
github.com/Azure/azure-sdk-for-go/sdk/messaging/internal v0.1.0/go.mod h1:7hMUlcqiMXDUJtU1EWQlhhkC4BfIr6pEsiyuRYq4xLQ=
github.com/Azure/azure-service-bus-go v0.11.5 h1:EVMicXGNrSX+rHRCBgm/TRQ4VUZ1m3yAYM/AB2R/SOs=
github.com/Azure/azure-service-bus-go v0.11.5/go.mod h1:MI6ge2CuQWBVq+ly456MY7XqNLJip5LO1iSFodbNLbU=
github.com/Azure/azure-storage-blob-go v0.6.0/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y=
Expand All @@ -95,6 +100,7 @@ github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSW
github.com/Azure/go-autorest/autorest v0.9.0/go.mod h1:xyHB1BMZT0cuDHU7I0+g046+BFDTQ8rEZB0s4Yfa6bI=
github.com/Azure/go-autorest/autorest v0.9.3/go.mod h1:GsRuLYvwzLjjjRoWEIyMUaYq8GNUx2nRB378IPt/1p0=
github.com/Azure/go-autorest/autorest v0.11.18/go.mod h1:dSiJPy22c3u0OtOKDNttNgqpNFY/GeWa7GH/Pz56QRA=
github.com/Azure/go-autorest/autorest v0.11.22/go.mod h1:BAWYUWGPEtKPzjVkp0Q6an0MJcJDsoh5Z1BFAEFs4Xs=
github.com/Azure/go-autorest/autorest v0.11.24/go.mod h1:G6kyRlFnTuSbEYkQGawPfsCswgme4iYf6rfSKUDzbCc=
github.com/Azure/go-autorest/autorest v0.11.27 h1:F3R3q42aWytozkV8ihzcgMO4OA4cuqr3bNlsEuF6//A=
github.com/Azure/go-autorest/autorest v0.11.27/go.mod h1:7l8ybrIdUmGqZMTD0sRtAr8NvbHjfofbf8RSP2q7w7U=
Expand All @@ -103,6 +109,8 @@ github.com/Azure/go-autorest/autorest/adal v0.8.0/go.mod h1:Z6vX6WXXuyieHAXwMj0S
github.com/Azure/go-autorest/autorest/adal v0.8.1/go.mod h1:ZjhuQClTqx435SRJ2iMlOxPYt3d2C/T/7TiQCVZSn3Q=
github.com/Azure/go-autorest/autorest/adal v0.8.3/go.mod h1:ZjhuQClTqx435SRJ2iMlOxPYt3d2C/T/7TiQCVZSn3Q=
github.com/Azure/go-autorest/autorest/adal v0.9.13/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M=
github.com/Azure/go-autorest/autorest/adal v0.9.14/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M=
github.com/Azure/go-autorest/autorest/adal v0.9.17/go.mod h1:XVVeme+LZwABT8K5Lc3hA4nAe8LDBVle26gTrguhhPQ=
github.com/Azure/go-autorest/autorest/adal v0.9.18 h1:kLnPsRjzZZUF3K5REu/Kc+qMQrvuza2bwSnNdhmzLfQ=
github.com/Azure/go-autorest/autorest/adal v0.9.18/go.mod h1:XVVeme+LZwABT8K5Lc3hA4nAe8LDBVle26gTrguhhPQ=
github.com/Azure/go-autorest/autorest/azure/auth v0.4.2/go.mod h1:90gmfKdlmKgfjUpnCEpOJzsUEjrWDSLwHIG73tSXddM=
Expand Down Expand Up @@ -457,7 +465,6 @@ github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm
github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M=
github.com/gin-gonic/gin v1.7.3/go.mod h1:jD2toBW3GZUr5UMcdrwQA10I7RuaFOl/SGeDjXkfUtY=
github.com/gin-gonic/gin v1.7.7 h1:3DoBmSbJbZAWqXJC3SLjAPfutPJJRN1U5pALB7EeTTs=
github.com/gin-gonic/gin v1.7.7/go.mod h1:axIBovoeJpVj8S3BwE0uPMTeReE4+AfFtqpqaZ1qq1U=
github.com/go-chi/chi v4.0.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ=
github.com/go-chi/chi/v5 v5.0.0/go.mod h1:BBug9lr0cqtdAhsu6R4AAdvufI0/XBzAQSsUqJpoZOs=
github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q=
Expand Down

0 comments on commit bbb776e

Please sign in to comment.