From bbb776ef42ec4ca4e29f9d19946b2e345f82ccfe Mon Sep 17 00:00:00 2001 From: "Alessandro Segala (ItalyPaleAle)" <43508+ItalyPaleAle@users.noreply.github.com> Date: Wed, 4 May 2022 04:33:55 +0000 Subject: [PATCH] Updated Service Bus components to track2 SDK Signed-off-by: Alessandro Segala (ItalyPaleAle) <43508+ItalyPaleAle@users.noreply.github.com> --- .../servicebusqueues/servicebusqueues.go | 223 +++++++------ go.mod | 3 +- go.sum | 9 +- metadata/duration.go | 40 +++ metadata/duration_test.go | 87 +++++ pubsub/azure/servicebus/message.go | 214 +++++------- pubsub/azure/servicebus/message_test.go | 90 +++--- pubsub/azure/servicebus/servicebus.go | 305 +++++++++--------- pubsub/azure/servicebus/subscription.go | 105 +++--- .../bindings/azure/servicebusqueues/go.mod | 5 +- .../bindings/azure/servicebusqueues/go.sum | 51 +-- 11 files changed, 608 insertions(+), 524 deletions(-) create mode 100644 metadata/duration_test.go diff --git a/bindings/azure/servicebusqueues/servicebusqueues.go b/bindings/azure/servicebusqueues/servicebusqueues.go index 9fd2d1e63c..9302ed2a90 100644 --- a/bindings/azure/servicebusqueues/servicebusqueues.go +++ b/bindings/azure/servicebusqueues/servicebusqueues.go @@ -21,7 +21,9 @@ import ( "sync/atomic" "time" - servicebus "github.com/Azure/azure-service-bus-go" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + servicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" + sbadmin "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin" "github.com/cenkalti/backoff/v4" azauth "github.com/dapr/components-contrib/authentication/azure" @@ -43,8 +45,8 @@ const ( // AzureServiceBusQueues is an input/output binding reading from and sending events to Azure Service Bus queues. type AzureServiceBusQueues struct { metadata *serviceBusQueuesMetadata - ns *servicebus.Namespace - queue *servicebus.QueueEntity + client *servicebus.Client + adminClient *sbadmin.Client shutdownSignal int32 logger logger.Logger ctx context.Context @@ -64,83 +66,71 @@ func NewAzureServiceBusQueues(logger logger.Logger) *AzureServiceBusQueues { } // Init parses connection properties and creates a new Service Bus Queue client. -func (a *AzureServiceBusQueues) Init(metadata bindings.Metadata) error { - meta, err := a.parseMetadata(metadata) +func (a *AzureServiceBusQueues) Init(metadata bindings.Metadata) (err error) { + a.metadata, err = a.parseMetadata(metadata) if err != nil { return err } - userAgent := "dapr-" + logger.DaprVersion - a.metadata = meta - var ns *servicebus.Namespace + userAgent := "dapr-" + logger.DaprVersion if a.metadata.ConnectionString != "" { - ns, err = servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(a.metadata.ConnectionString), - servicebus.NamespaceWithUserAgent(userAgent)) + a.client, err = servicebus.NewClientFromConnectionString(a.metadata.ConnectionString, &servicebus.ClientOptions{ + ApplicationID: userAgent, + }) + if err != nil { + return err + } + + a.adminClient, err = sbadmin.NewClientFromConnectionString(a.metadata.ConnectionString, nil) if err != nil { return err } } else { - // Initialization code - settings, sErr := azauth.NewEnvironmentSettings(azauth.AzureServiceBusResourceName, metadata.Properties) - if sErr != nil { - return sErr + settings, innerErr := azauth.NewEnvironmentSettings(azauth.AzureServiceBusResourceName, metadata.Properties) + if innerErr != nil { + return innerErr } - tokenProvider, tErr := settings.GetAMQPTokenProvider() - if tErr != nil { - return tErr + token, innerErr := settings.GetTokenCredential() + if innerErr != nil { + return innerErr } - ns, err = servicebus.NewNamespace(servicebus.NamespaceWithTokenProvider(tokenProvider), - servicebus.NamespaceWithUserAgent(userAgent)) - if err != nil { - return err + a.client, innerErr = servicebus.NewClient(a.metadata.NamespaceName, token, &servicebus.ClientOptions{ + ApplicationID: userAgent, + }) + if innerErr != nil { + return innerErr } - // We set these separately as the ServiceBus SDK does not provide a way to pass the environment via the options - // pattern unless you allow it to recreate the entire environment which seems wasteful. - ns.Name = a.metadata.NamespaceName - ns.Environment = *settings.AzureEnvironment - ns.Suffix = settings.AzureEnvironment.ServiceBusEndpointSuffix + a.adminClient, innerErr = sbadmin.NewClient(a.metadata.NamespaceName, token, nil) + if innerErr != nil { + return innerErr + } } - a.ns = ns - - qm := ns.NewQueueManager() - ctx := context.Background() - - queues, err := qm.List(ctx) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + getQueueRes, err := a.adminClient.GetQueue(ctx, a.metadata.QueueName, nil) if err != nil { return err } - - var entity *servicebus.QueueEntity - for _, q := range queues { - if q.Name == a.metadata.QueueName { - entity = q - - break - } - } - - // Create queue if it does not exist - if entity == nil { - var ttl time.Duration - var ok bool - ttl, ok, err = contrib_metadata.TryGetTTL(metadata.Properties) - if err != nil { - return err + if getQueueRes == nil { + // Need to create the queue + ttlDur := contrib_metadata.Duration{ + Duration: a.metadata.ttl, } - - if !ok { - ttl = a.metadata.ttl - } - entity, err = qm.Put(ctx, a.metadata.QueueName, servicebus.QueueEntityWithMessageTimeToLive(&ttl)) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + _, err = a.adminClient.CreateQueue(ctx, a.metadata.QueueName, &sbadmin.CreateQueueOptions{ + Properties: &sbadmin.QueueProperties{ + DefaultMessageTimeToLive: to.Ptr(ttlDur.ToISOString()), + }, + }) if err != nil { return err } } - a.queue = entity a.clearShutdown() @@ -188,88 +178,131 @@ func (a *AzureServiceBusQueues) Operations() []bindings.OperationKind { } func (a *AzureServiceBusQueues) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { - ctx, cancel := context.WithTimeout(ctx, time.Second*5) + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - client, err := a.ns.NewQueue(a.queue.Name) + sender, err := a.client.NewSender(a.metadata.QueueName, nil) if err != nil { return nil, err } - defer client.Close(ctx) + defer sender.Close(ctx) - msg := servicebus.NewMessage(req.Data) + msg := &servicebus.Message{ + Body: req.Data, + } if val, ok := req.Metadata[id]; ok && val != "" { - msg.ID = val + msg.MessageID = &val } if val, ok := req.Metadata[correlationID]; ok && val != "" { - msg.CorrelationID = val + msg.CorrelationID = &val } - ttl, ok, err := contrib_metadata.TryGetTTL(req.Metadata) if err != nil { return nil, err } - if ok { - msg.TTL = &ttl + msg.TimeToLive = &ttl } - return nil, client.Send(ctx, msg) + return nil, sender.SendMessage(ctx, msg, nil) } func (a *AzureServiceBusQueues) Read(handler func(context.Context, *bindings.ReadResponse) ([]byte, error)) error { - var sbHandler servicebus.HandlerFunc = func(ctx context.Context, msg *servicebus.Message) error { - _, err := handler(ctx, &bindings.ReadResponse{ - Data: msg.Data, - Metadata: map[string]string{id: msg.ID, correlationID: msg.CorrelationID, label: msg.Label}, - }) - if err == nil { - return msg.Complete(ctx) - } - - return msg.Abandon(ctx) - } - // Connections need to retry forever with a maximum backoff of 5 minutes and exponential scaling. connConfig := retry.DefaultConfig() connConfig.Policy = retry.PolicyExponential - connConfig.MaxInterval, _ = time.ParseDuration("5m") + connConfig.MaxInterval = 5 * time.Minute connBackoff := connConfig.NewBackOffWithContext(a.ctx) for !a.isShutdown() { - client := a.attemptConnectionForever(connBackoff) - - if client == nil { + receiver := a.attemptConnectionForever(connBackoff) + if receiver == nil { a.logger.Errorf("Failed to connect to Azure Service Bus Queue.") continue } - defer client.Close(context.Background()) - if err := client.Receive(a.ctx, sbHandler); err != nil { + msgs, err := receiver.ReceiveMessages(a.ctx, 10, nil) + if err != nil { a.logger.Warnf("Error reading from Azure Service Bus Queue binding: %s", err.Error()) } + + // Blocks until the connection is closed + for _, msg := range msgs { + body, err := msg.Body() + if err != nil { + a.logger.Warnf("Error reading message body: %s", err.Error()) + a.abandonMessage(receiver, msg) + continue + } + + metadata := make(map[string]string) + metadata[id] = msg.MessageID + if msg.CorrelationID != nil { + metadata[correlationID] = *msg.CorrelationID + } + if msg.Subject != nil { + metadata[label] = *msg.Subject + } + + _, err = handler(a.ctx, &bindings.ReadResponse{ + Data: body, + Metadata: metadata, + }) + if err != nil { + a.abandonMessage(receiver, msg) + continue + } + + err = receiver.CompleteMessage(a.ctx, msg, nil) + if err != nil { + a.logger.Warnf("Error completing message: %s", err.Error()) + continue + } + } + + // Disconnect (gracefully) before attempting to re-connect (unless we're shutting down) + // Use a background context here because a.ctx may be canceled already at this stage + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + if err := receiver.Close(ctx); err != nil { + // Log only + a.logger.Warnf("Error closing receiver of Azure Service Bus Queue binding: %s", err.Error()) + } + cancel() } return nil } -func (a *AzureServiceBusQueues) attemptConnectionForever(backoff backoff.BackOff) *servicebus.Queue { - var client *servicebus.Queue - retry.NotifyRecover(func() error { - clientAttempt, err := a.ns.NewQueue(a.queue.Name) - if err != nil { - return err - } - client = clientAttempt - return nil - }, backoff, +func (a *AzureServiceBusQueues) abandonMessage(receiver *servicebus.Receiver, msg *servicebus.ReceivedMessage) { + ctx, cancel := context.WithTimeout(a.ctx, 30*time.Second) + err := receiver.AbandonMessage(ctx, msg, nil) + if err != nil { + // Log only + a.logger.Warnf("Error abandoning message: %s", err.Error()) + } + cancel() +} + +func (a *AzureServiceBusQueues) attemptConnectionForever(backoff backoff.BackOff) *servicebus.Receiver { + var receiver *servicebus.Receiver + retry.NotifyRecover( + func() error { + clientAttempt, err := a.client.NewReceiverForQueue(a.metadata.QueueName, nil) + if err != nil { + return err + } + receiver = clientAttempt + return nil + }, + backoff, func(err error, d time.Duration) { a.logger.Debugf("Failed to connect to Azure Service Bus Queue Binding with error: %s", err.Error()) }, func() { a.logger.Debug("Successfully reconnected to Azure Service Bus.") backoff.Reset() - }) - return client + }, + ) + return receiver } func (a *AzureServiceBusQueues) Close() error { diff --git a/go.mod b/go.mod index f350bbb074..c4aeda8eac 100644 --- a/go.mod +++ b/go.mod @@ -158,6 +158,7 @@ require ( ) require ( + github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v0.4.0 github.com/labd/commercetools-go-sdk v0.3.2 github.com/nacos-group/nacos-sdk-go/v2 v2.0.1 gopkg.in/couchbase/gocb.v1 v1.6.4 @@ -167,10 +168,10 @@ require gopkg.in/couchbaselabs/jsonx.v1 v1.0.1 // indirect require ( github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect + github.com/Azure/azure-sdk-for-go/sdk/messaging/internal v0.1.0 // indirect github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 // indirect github.com/appscode/go-querystring v0.0.0-20170504095604-0126cfb3f1dc // indirect github.com/fsnotify/fsnotify v1.5.1 // indirect - github.com/gin-gonic/gin v1.7.7 // indirect github.com/hashicorp/go-hclog v0.14.1 // indirect github.com/kylelemons/godebug v1.1.0 // indirect github.com/mattn/go-colorable v0.1.8 // indirect diff --git a/go.sum b/go.sum index e0cbb3b5a7..c545254882 100644 --- a/go.sum +++ b/go.sum @@ -69,12 +69,17 @@ github.com/Azure/azure-sdk-for-go/sdk/azcore v0.23.1 h1:3CVsSo4mp8NDWO11tHzN/mdo github.com/Azure/azure-sdk-for-go/sdk/azcore v0.23.1/go.mod h1:w5pDIZuawUmY3Bj4tVx3Xb8KS96ToB0j315w9rqpAg0= github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.14.0 h1:NVS/4LOQfkBpk+B1VopIzv1ptmYeEskA8w/3K/w7vjo= github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.14.0/go.mod h1:RG0cZndeZM17StwohYclmcXSr4oOJ8b1I5hB8llIc6Y= +github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.2/go.mod h1:KLF4gFr6DcKFZwSuH8w8yEK6DpFl3LP5rhdvAb7Yz5I= github.com/Azure/azure-sdk-for-go/sdk/internal v0.9.2 h1:Px2KVERcYEg2Lv25AqC2hVr0xUWaq94wuEObLIkYzmA= github.com/Azure/azure-sdk-for-go/sdk/internal v0.9.2/go.mod h1:CdSJQNNzZhCkwDaV27XV1w48ZBPtxe7mlrZAsPNxD5g= github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.7.0 h1:XkMDpP4GWkeW3RLxJ6JKjJGTD0Xq2je/SBeKG+WPTuI= github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.7.0/go.mod h1:HgRcYBKlSo9ZWdxt3Y36aIlxKsSk9pdAmB4WvYv9FX4= github.com/Azure/azure-sdk-for-go/sdk/keyvault/internal v0.3.0 h1:0gy84rslo34rGGBe2cDxfs4iDMwbKc0/4yDna1S7j8Q= github.com/Azure/azure-sdk-for-go/sdk/keyvault/internal v0.3.0/go.mod h1:mu846WjGmdK5vWqWv25J416znWpnFjZp4+O34KW8H7U= +github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v0.4.0 h1:ALpdyAqAOhrHiTMWOkIjB7hEiG7KTvyx7CZ9Qel+ZYQ= +github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v0.4.0/go.mod h1:E3RSrZI5ub9zlZeobR+UUuuuUv4rz6JcVjVpfAmZ65Y= +github.com/Azure/azure-sdk-for-go/sdk/messaging/internal v0.1.0 h1:mvQhIGmKI3vmdNDUMG0ZHaZ1p+JcHE3m0q1RMOyKxRo= +github.com/Azure/azure-sdk-for-go/sdk/messaging/internal v0.1.0/go.mod h1:7hMUlcqiMXDUJtU1EWQlhhkC4BfIr6pEsiyuRYq4xLQ= github.com/Azure/azure-service-bus-go v0.11.5 h1:EVMicXGNrSX+rHRCBgm/TRQ4VUZ1m3yAYM/AB2R/SOs= github.com/Azure/azure-service-bus-go v0.11.5/go.mod h1:MI6ge2CuQWBVq+ly456MY7XqNLJip5LO1iSFodbNLbU= github.com/Azure/azure-storage-blob-go v0.6.0/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y= @@ -95,6 +100,7 @@ github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSW github.com/Azure/go-autorest/autorest v0.9.0/go.mod h1:xyHB1BMZT0cuDHU7I0+g046+BFDTQ8rEZB0s4Yfa6bI= github.com/Azure/go-autorest/autorest v0.9.3/go.mod h1:GsRuLYvwzLjjjRoWEIyMUaYq8GNUx2nRB378IPt/1p0= github.com/Azure/go-autorest/autorest v0.11.18/go.mod h1:dSiJPy22c3u0OtOKDNttNgqpNFY/GeWa7GH/Pz56QRA= +github.com/Azure/go-autorest/autorest v0.11.22/go.mod h1:BAWYUWGPEtKPzjVkp0Q6an0MJcJDsoh5Z1BFAEFs4Xs= github.com/Azure/go-autorest/autorest v0.11.24/go.mod h1:G6kyRlFnTuSbEYkQGawPfsCswgme4iYf6rfSKUDzbCc= github.com/Azure/go-autorest/autorest v0.11.27 h1:F3R3q42aWytozkV8ihzcgMO4OA4cuqr3bNlsEuF6//A= github.com/Azure/go-autorest/autorest v0.11.27/go.mod h1:7l8ybrIdUmGqZMTD0sRtAr8NvbHjfofbf8RSP2q7w7U= @@ -103,6 +109,8 @@ github.com/Azure/go-autorest/autorest/adal v0.8.0/go.mod h1:Z6vX6WXXuyieHAXwMj0S github.com/Azure/go-autorest/autorest/adal v0.8.1/go.mod h1:ZjhuQClTqx435SRJ2iMlOxPYt3d2C/T/7TiQCVZSn3Q= github.com/Azure/go-autorest/autorest/adal v0.8.3/go.mod h1:ZjhuQClTqx435SRJ2iMlOxPYt3d2C/T/7TiQCVZSn3Q= github.com/Azure/go-autorest/autorest/adal v0.9.13/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M= +github.com/Azure/go-autorest/autorest/adal v0.9.14/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M= +github.com/Azure/go-autorest/autorest/adal v0.9.17/go.mod h1:XVVeme+LZwABT8K5Lc3hA4nAe8LDBVle26gTrguhhPQ= github.com/Azure/go-autorest/autorest/adal v0.9.18 h1:kLnPsRjzZZUF3K5REu/Kc+qMQrvuza2bwSnNdhmzLfQ= github.com/Azure/go-autorest/autorest/adal v0.9.18/go.mod h1:XVVeme+LZwABT8K5Lc3hA4nAe8LDBVle26gTrguhhPQ= github.com/Azure/go-autorest/autorest/azure/auth v0.4.2/go.mod h1:90gmfKdlmKgfjUpnCEpOJzsUEjrWDSLwHIG73tSXddM= @@ -457,7 +465,6 @@ github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M= github.com/gin-gonic/gin v1.7.3/go.mod h1:jD2toBW3GZUr5UMcdrwQA10I7RuaFOl/SGeDjXkfUtY= github.com/gin-gonic/gin v1.7.7 h1:3DoBmSbJbZAWqXJC3SLjAPfutPJJRN1U5pALB7EeTTs= -github.com/gin-gonic/gin v1.7.7/go.mod h1:axIBovoeJpVj8S3BwE0uPMTeReE4+AfFtqpqaZ1qq1U= github.com/go-chi/chi v4.0.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ= github.com/go-chi/chi/v5 v5.0.0/go.mod h1:BBug9lr0cqtdAhsu6R4AAdvufI0/XBzAQSsUqJpoZOs= github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= diff --git a/metadata/duration.go b/metadata/duration.go index 1617c692af..3d3ea6231e 100644 --- a/metadata/duration.go +++ b/metadata/duration.go @@ -14,9 +14,11 @@ limitations under the License. package metadata // JSON marshaling and unmarshaling methods for time.Duration based on https://stackoverflow.com/a/48051946 +// Includes methods to return an ISO-8601 formatted string from a time.Duration. import ( "encoding/json" "errors" + "strconv" "time" ) @@ -50,3 +52,41 @@ func (d *Duration) UnmarshalJSON(b []byte) error { return errors.New("invalid duration") } } + +// ToISOString returns the duration formatted as a ISO-8601 duration string (-ish). +// This methods supports days, hours, minutes, and seconds. It assumes all durations are in UTC time and are not impacted by DST (so all days are 24-hours long). +// This method does not support fractions of seconds, and durations are truncated to seconds. +// See https://en.wikipedia.org/wiki/ISO_8601#Durations for referece. +func (d Duration) ToISOString() string { + // Truncate to seconds, removing fractional seconds + trunc := d.Truncate(time.Second) + + seconds := int64(trunc.Seconds()) + if seconds == 0 { + // Zero value + return "P0D" + } + + res := "P" + if seconds >= 86400 { + res += strconv.FormatInt(seconds/86400, 10) + "D" + seconds %= 86400 + } + if seconds == 0 { + // Short-circuit if there's nothing left (we had whole days only) + return res + } + res += "T" + if seconds >= 3600 { + res += strconv.FormatInt(seconds/3600, 10) + "H" + seconds %= 3600 + } + if seconds >= 60 { + res += strconv.FormatInt(seconds/60, 10) + "M" + seconds %= 60 + } + if seconds > 0 { + res += strconv.FormatInt(seconds, 10) + "S" + } + return res +} diff --git a/metadata/duration_test.go b/metadata/duration_test.go new file mode 100644 index 0000000000..85baa2fb04 --- /dev/null +++ b/metadata/duration_test.go @@ -0,0 +1,87 @@ +/* +Copyright 2021 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metadata + +import ( + "testing" + "time" +) + +func TestDuration_ToISOString(t *testing.T) { + tests := []struct { + name string + duration time.Duration + want string + }{ + { + duration: time.Duration(0), + want: "P0D", + }, + { + duration: time.Duration(1 * time.Second), + want: "PT1S", + }, + { + name: "truncate fractions of seconds", + duration: time.Duration(1100 * time.Millisecond), + want: "PT1S", + }, + { + duration: time.Duration(24 * time.Hour), + want: "P1D", + }, + { + duration: time.Duration(48 * time.Hour), + want: "P2D", + }, + { + duration: time.Duration(50 * time.Hour), + want: "P2DT2H", + }, + { + duration: time.Duration(50*time.Hour + 20*time.Minute), + want: "P2DT2H20M", + }, + { + duration: time.Duration(50*time.Hour + 100*time.Minute), + want: "P2DT3H40M", + }, + { + duration: time.Duration(50*time.Hour + 20*time.Minute + 15*time.Second), + want: "P2DT2H20M15S", + }, + { + duration: time.Duration(50*time.Hour + 15*time.Second), + want: "P2DT2H15S", + }, + { + duration: time.Duration(240*time.Hour + 15*time.Second), + want: "P10DT15S", + }, + } + for _, tt := range tests { + name := tt.name + if name == "" { + name = tt.want + } + t.Run(tt.name, func(t *testing.T) { + d := Duration{ + Duration: tt.duration, + } + if got := d.ToISOString(); got != tt.want { + t.Errorf("Duration.ToISOString() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pubsub/azure/servicebus/message.go b/pubsub/azure/servicebus/message.go index f6bbab7211..4cce425c49 100644 --- a/pubsub/azure/servicebus/message.go +++ b/pubsub/azure/servicebus/message.go @@ -19,7 +19,7 @@ import ( "strconv" "time" - azservicebus "github.com/Azure/azure-service-bus-go" + azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" contrib_metadata "github.com/dapr/components-contrib/metadata" "github.com/dapr/components-contrib/pubsub" @@ -72,12 +72,18 @@ const ( ReplyToSessionID = "ReplyToSessionId" // read, write. ) -func NewPubsubMessageFromASBMessage(asbMsg *azservicebus.Message, topic string) (*pubsub.NewMessage, error) { +func NewPubsubMessageFromASBMessage(asbMsg *azservicebus.ReceivedMessage, topic string) (*pubsub.NewMessage, error) { pubsubMsg := &pubsub.NewMessage{ - Data: asbMsg.Data, Topic: topic, } + body, err := asbMsg.Body() + if err != nil { + return nil, err + } + + pubsubMsg.Data = body + addToMetadata := func(msg *pubsub.NewMessage, key, value string) { if msg.Metadata == nil { msg.Metadata = make(map[string]string) @@ -86,55 +92,51 @@ func NewPubsubMessageFromASBMessage(asbMsg *azservicebus.Message, topic string) msg.Metadata[fmt.Sprintf("metadata.%s", key)] = value } - if asbMsg.ID != "" { - addToMetadata(pubsubMsg, MessageIDMetadataKey, asbMsg.ID) + if asbMsg.MessageID != "" { + addToMetadata(pubsubMsg, MessageIDMetadataKey, asbMsg.MessageID) } if asbMsg.SessionID != nil { addToMetadata(pubsubMsg, SessionIDMetadataKey, *asbMsg.SessionID) } - if asbMsg.CorrelationID != "" { - addToMetadata(pubsubMsg, CorrelationIDMetadataKey, asbMsg.CorrelationID) + if asbMsg.CorrelationID != nil && *asbMsg.CorrelationID != "" { + addToMetadata(pubsubMsg, CorrelationIDMetadataKey, *asbMsg.CorrelationID) } - if asbMsg.Label != "" { - addToMetadata(pubsubMsg, LabelMetadataKey, asbMsg.Label) + if asbMsg.Subject != nil && *asbMsg.Subject != "" { + addToMetadata(pubsubMsg, LabelMetadataKey, *asbMsg.Subject) } - if asbMsg.ReplyTo != "" { - addToMetadata(pubsubMsg, ReplyToMetadataKey, asbMsg.ReplyTo) + if asbMsg.ReplyTo != nil && *asbMsg.ReplyTo != "" { + addToMetadata(pubsubMsg, ReplyToMetadataKey, *asbMsg.ReplyTo) } - if asbMsg.To != "" { - addToMetadata(pubsubMsg, ToMetadataKey, asbMsg.To) + if asbMsg.To != nil && *asbMsg.To != "" { + addToMetadata(pubsubMsg, ToMetadataKey, *asbMsg.To) } - if asbMsg.ContentType != "" { - addToMetadata(pubsubMsg, ContentTypeMetadataKey, asbMsg.ContentType) + if asbMsg.ContentType != nil && *asbMsg.ContentType != "" { + addToMetadata(pubsubMsg, ContentTypeMetadataKey, *asbMsg.ContentType) } - if asbMsg.LockToken != nil { - addToMetadata(pubsubMsg, LockTokenMetadataKey, asbMsg.LockToken.String()) + if asbMsg.LockToken != [16]byte{} { + addToMetadata(pubsubMsg, LockTokenMetadataKey, string(asbMsg.LockToken[:])) } // Always set delivery count. addToMetadata(pubsubMsg, DeliveryCountMetadataKey, strconv.FormatInt(int64(asbMsg.DeliveryCount), 10)) - //nolint:golint,nestif - if asbMsg.SystemProperties != nil { - systemProps := asbMsg.SystemProperties - if systemProps.EnqueuedTime != nil { - // Preserve RFC2616 time format. - addToMetadata(pubsubMsg, EnqueuedTimeUtcMetadataKey, systemProps.EnqueuedTime.UTC().Format(http.TimeFormat)) - } - if systemProps.SequenceNumber != nil { - addToMetadata(pubsubMsg, SequenceNumberMetadataKey, strconv.FormatInt(*systemProps.SequenceNumber, 10)) - } - if systemProps.ScheduledEnqueueTime != nil { - // Preserve RFC2616 time format. - addToMetadata(pubsubMsg, ScheduledEnqueueTimeUtcMetadataKey, systemProps.ScheduledEnqueueTime.UTC().Format(http.TimeFormat)) - } - if systemProps.PartitionKey != nil { - addToMetadata(pubsubMsg, PartitionKeyMetadataKey, *systemProps.PartitionKey) - } - if systemProps.LockedUntil != nil { - // Preserve RFC2616 time format. - addToMetadata(pubsubMsg, LockedUntilUtcMetadataKey, systemProps.LockedUntil.UTC().Format(http.TimeFormat)) - } + if asbMsg.EnqueuedTime != nil { + // Preserve RFC2616 time format. + addToMetadata(pubsubMsg, EnqueuedTimeUtcMetadataKey, asbMsg.EnqueuedTime.UTC().Format(http.TimeFormat)) + } + if asbMsg.SequenceNumber != nil { + addToMetadata(pubsubMsg, SequenceNumberMetadataKey, strconv.FormatInt(*asbMsg.SequenceNumber, 10)) + } + if asbMsg.ScheduledEnqueueTime != nil { + // Preserve RFC2616 time format. + addToMetadata(pubsubMsg, ScheduledEnqueueTimeUtcMetadataKey, asbMsg.ScheduledEnqueueTime.UTC().Format(http.TimeFormat)) + } + if asbMsg.PartitionKey != nil { + addToMetadata(pubsubMsg, PartitionKeyMetadataKey, *asbMsg.PartitionKey) + } + if asbMsg.LockedUntil != nil { + // Preserve RFC2616 time format. + addToMetadata(pubsubMsg, LockedUntilUtcMetadataKey, asbMsg.LockedUntil.UTC().Format(http.TimeFormat)) } return pubsubMsg, nil @@ -142,136 +144,72 @@ func NewPubsubMessageFromASBMessage(asbMsg *azservicebus.Message, topic string) // NewASBMessageFromPubsubRequest builds a new Azure Service Bus message from a PublishRequest. func NewASBMessageFromPubsubRequest(req *pubsub.PublishRequest) (*azservicebus.Message, error) { - asbMsg := azservicebus.NewMessage(req.Data) + asbMsg := &azservicebus.Message{ + Body: req.Data, + } // Common properties. - ttl, hasTTL, _ := contrib_metadata.TryGetTTL(req.Metadata) - if hasTTL { - asbMsg.TTL = &ttl + ttl, ok, _ := contrib_metadata.TryGetTTL(req.Metadata) + if ok { + asbMsg.TimeToLive = &ttl } // Azure Service Bus specific properties. // reference: https://docs.microsoft.com/en-us/rest/api/servicebus/message-headers-and-properties#message-headers - msgID, hasMsgID, _ := tryGetMessageID(req.Metadata) - if hasMsgID { - asbMsg.ID = msgID + msgID, ok, _ := tryGetString(req.Metadata, MessageIDMetadataKey) + if ok { + asbMsg.MessageID = &msgID } - correlationID, hasCorrelationID, _ := tryGetCorrelationID(req.Metadata) - if hasCorrelationID { - asbMsg.CorrelationID = correlationID + correlationID, ok, _ := tryGetString(req.Metadata, CorrelationIDMetadataKey) + if ok { + asbMsg.CorrelationID = &correlationID } - sessionID, hasSessionID, _ := tryGetSessionID(req.Metadata) - if hasSessionID { + sessionID, okSessionID, _ := tryGetString(req.Metadata, SessionIDMetadataKey) + if okSessionID { asbMsg.SessionID = &sessionID } - label, hasLabel, _ := tryGetLabel(req.Metadata) - if hasLabel { - asbMsg.Label = label + label, ok, _ := tryGetString(req.Metadata, LabelMetadataKey) + if ok { + asbMsg.Subject = &label } - replyTo, hasReplyTo, _ := tryGetReplyTo(req.Metadata) - if hasReplyTo { - asbMsg.ReplyTo = replyTo + replyTo, ok, _ := tryGetString(req.Metadata, ReplyToMetadataKey) + if ok { + asbMsg.ReplyTo = &replyTo } - to, hasTo, _ := tryGetTo(req.Metadata) - if hasTo { - asbMsg.To = to + to, ok, _ := tryGetString(req.Metadata, ToMetadataKey) + if ok { + asbMsg.To = &to } - partitionKey, hasPartitionKey, _ := tryGetPartitionKey(req.Metadata) - if hasPartitionKey { - if hasSessionID { - if partitionKey != sessionID { - return nil, fmt.Errorf("session id %s and partition key %s should be equal when both present", sessionID, partitionKey) - } - } - - if asbMsg.SystemProperties == nil { - asbMsg.SystemProperties = &azservicebus.SystemProperties{} + partitionKey, ok, _ := tryGetString(req.Metadata, PartitionKeyMetadataKey) + if ok { + if okSessionID && partitionKey != sessionID { + return nil, fmt.Errorf("session id %s and partition key %s should be equal when both present", sessionID, partitionKey) } - asbMsg.SystemProperties.PartitionKey = &partitionKey + asbMsg.PartitionKey = &partitionKey } - contentType, hasContentType, _ := tryGetContentType(req.Metadata) - if hasContentType { - asbMsg.ContentType = contentType + contentType, ok, _ := tryGetString(req.Metadata, ContentTypeMetadataKey) + if ok { + asbMsg.ContentType = &contentType } - scheduledEnqueueTime, hasScheduledEnqueueTime, _ := tryGetScheduledEnqueueTime(req.Metadata) - if hasScheduledEnqueueTime { - if asbMsg.SystemProperties == nil { - asbMsg.SystemProperties = &azservicebus.SystemProperties{} - } - - asbMsg.SystemProperties.ScheduledEnqueueTime = scheduledEnqueueTime + scheduledEnqueueTime, ok, _ := tryGetScheduledEnqueueTime(req.Metadata) + if ok { + asbMsg.ScheduledEnqueueTime = scheduledEnqueueTime } return asbMsg, nil } -func tryGetMessageID(props map[string]string) (string, bool, error) { - if val, ok := props[MessageIDMetadataKey]; ok && val != "" { - return val, true, nil - } - - return "", false, nil -} - -func tryGetCorrelationID(props map[string]string) (string, bool, error) { - if val, ok := props[CorrelationIDMetadataKey]; ok && val != "" { - return val, true, nil - } - - return "", false, nil -} - -func tryGetSessionID(props map[string]string) (string, bool, error) { - if val, ok := props[SessionIDMetadataKey]; ok && val != "" { - return val, true, nil - } - - return "", false, nil -} - -func tryGetLabel(props map[string]string) (string, bool, error) { - if val, ok := props[LabelMetadataKey]; ok && val != "" { - return val, true, nil - } - - return "", false, nil -} - -func tryGetReplyTo(props map[string]string) (string, bool, error) { - if val, ok := props[ReplyToMetadataKey]; ok && val != "" { - return val, true, nil - } - - return "", false, nil -} - -func tryGetTo(props map[string]string) (string, bool, error) { - if val, ok := props[ToMetadataKey]; ok && val != "" { - return val, true, nil - } - - return "", false, nil -} - -func tryGetPartitionKey(props map[string]string) (string, bool, error) { - if val, ok := props[PartitionKeyMetadataKey]; ok && val != "" { - return val, true, nil - } - - return "", false, nil -} - -func tryGetContentType(props map[string]string) (string, bool, error) { - if val, ok := props[ContentTypeMetadataKey]; ok && val != "" { +func tryGetString(props map[string]string, key string) (string, bool, error) { + if val, ok := props[key]; ok && val != "" { return val, true, nil } diff --git a/pubsub/azure/servicebus/message_test.go b/pubsub/azure/servicebus/message_test.go index 04d080401e..c26164d302 100644 --- a/pubsub/azure/servicebus/message_test.go +++ b/pubsub/azure/servicebus/message_test.go @@ -20,7 +20,7 @@ import ( "time" "github.com/Azure/azure-amqp-common-go/v3/uuid" - azservicebus "github.com/Azure/azure-service-bus-go" + azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -64,18 +64,16 @@ func TestNewASBMessageFromPubsubRequest(t *testing.T) { }, }, expectedAzServiceBusMessage: azservicebus.Message{ - Data: testMessageData, - ID: testMessageID, - CorrelationID: testCorrelationID, - SessionID: &testSessionID, - Label: testLabel, - ReplyTo: testReplyTo, - To: testTo, - SystemProperties: &azservicebus.SystemProperties{ - PartitionKey: &testPartitionKey, - ScheduledEnqueueTime: &nowUtc, - }, - ContentType: testContentType, + Body: testMessageData, + MessageID: &testMessageID, + CorrelationID: &testCorrelationID, + SessionID: &testSessionID, + Subject: &testLabel, + ReplyTo: &testReplyTo, + To: &testTo, + PartitionKey: &testPartitionKey, + ScheduledEnqueueTime: &nowUtc, + ContentType: &testContentType, }, expectError: false, }, @@ -95,17 +93,15 @@ func TestNewASBMessageFromPubsubRequest(t *testing.T) { }, }, expectedAzServiceBusMessage: azservicebus.Message{ - Data: testMessageData, - ID: testMessageID, - CorrelationID: testCorrelationID, + Body: testMessageData, + MessageID: &testMessageID, + CorrelationID: &testCorrelationID, SessionID: &testSessionID, - Label: testLabel, - ReplyTo: testReplyTo, - To: testTo, - SystemProperties: &azservicebus.SystemProperties{ - PartitionKey: &testPartitionKey, - }, - ContentType: testContentType, + Subject: &testLabel, + ReplyTo: &testReplyTo, + To: &testTo, + PartitionKey: &testPartitionKey, + ContentType: &testContentType, }, expectError: true, }, @@ -121,18 +117,17 @@ func TestNewASBMessageFromPubsubRequest(t *testing.T) { require.NotNil(t, err) } else { require.Nil(t, err) - assert.Equal(t, tc.expectedAzServiceBusMessage.Data, msg.Data) - assert.Equal(t, tc.expectedAzServiceBusMessage.ID, msg.ID) + assert.Equal(t, tc.expectedAzServiceBusMessage.Body, msg.Body) + assert.Equal(t, tc.expectedAzServiceBusMessage.MessageID, msg.MessageID) assert.Equal(t, tc.expectedAzServiceBusMessage.CorrelationID, msg.CorrelationID) assert.Equal(t, tc.expectedAzServiceBusMessage.SessionID, msg.SessionID) assert.Equal(t, tc.expectedAzServiceBusMessage.ContentType, msg.ContentType) assert.Equal(t, tc.expectedAzServiceBusMessage.ReplyTo, msg.ReplyTo) - assert.Equal(t, tc.expectedAzServiceBusMessage.TTL, msg.TTL) + assert.Equal(t, tc.expectedAzServiceBusMessage.TimeToLive, msg.TimeToLive) assert.Equal(t, tc.expectedAzServiceBusMessage.To, msg.To) - assert.Equal(t, tc.expectedAzServiceBusMessage.Label, msg.Label) - assert.NotNil(t, msg.SystemProperties) - assert.Equal(t, tc.expectedAzServiceBusMessage.SystemProperties.PartitionKey, msg.SystemProperties.PartitionKey) - assert.Equal(t, tc.expectedAzServiceBusMessage.SystemProperties.ScheduledEnqueueTime.Unix(), msg.SystemProperties.ScheduledEnqueueTime.Unix()) + assert.Equal(t, tc.expectedAzServiceBusMessage.Subject, msg.Subject) + assert.Equal(t, tc.expectedAzServiceBusMessage.PartitionKey, msg.PartitionKey) + assert.Equal(t, tc.expectedAzServiceBusMessage.ScheduledEnqueueTime.Unix(), msg.ScheduledEnqueueTime.Unix()) } }) } @@ -157,31 +152,28 @@ func TestNewPubsubMessageFromAzServiceBusMessage(t *testing.T) { testCases := []struct { name string - azServiceBusMessage azservicebus.Message + azServiceBusMessage azservicebus.ReceivedMessage topic string expectedPubsubMessage pubsub.NewMessage expectError bool }{ { name: "Maps azure service bus message to pubsub message", - azServiceBusMessage: azservicebus.Message{ - Data: testMessageData, - ContentType: testContentType, - ID: testMessageID, - CorrelationID: testCorrelationID, - SessionID: &testSessionID, - ReplyTo: testReplyTo, - DeliveryCount: testDeliveryCount, - To: testTo, - LockToken: &testLockToken, - Label: testLabel, - SystemProperties: &azservicebus.SystemProperties{ - LockedUntil: &nowUtc, - SequenceNumber: &testSequenceNumber, - ScheduledEnqueueTime: &nowUtc, - PartitionKey: &testPartitionKey, - EnqueuedTime: &nowUtc, - }, + azServiceBusMessage: azservicebus.ReceivedMessage{ + ContentType: &testContentType, + MessageID: testMessageID, + CorrelationID: &testCorrelationID, + SessionID: &testSessionID, + ReplyTo: &testReplyTo, + DeliveryCount: testDeliveryCount, + To: &testTo, + LockToken: testLockToken, + Subject: &testLabel, + LockedUntil: &nowUtc, + SequenceNumber: &testSequenceNumber, + ScheduledEnqueueTime: &nowUtc, + PartitionKey: &testPartitionKey, + EnqueuedTime: &nowUtc, }, topic: "testTopic", expectedPubsubMessage: pubsub.NewMessage{ diff --git a/pubsub/azure/servicebus/servicebus.go b/pubsub/azure/servicebus/servicebus.go index c05b9bd59a..2a9b407fb9 100644 --- a/pubsub/azure/servicebus/servicebus.go +++ b/pubsub/azure/servicebus/servicebus.go @@ -24,9 +24,12 @@ import ( "github.com/Azure/go-amqp" "github.com/cenkalti/backoff/v4" - azservicebus "github.com/Azure/azure-service-bus-go" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + servicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" + sbadmin "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin" azauth "github.com/dapr/components-contrib/authentication/azure" + contrib_metadata "github.com/dapr/components-contrib/metadata" "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" "github.com/dapr/kit/retry" @@ -88,12 +91,12 @@ type handle = struct{} type azureServiceBus struct { metadata metadata - namespace *azservicebus.Namespace - topicManager *azservicebus.TopicManager + client *servicebus.Client + adminClient *sbadmin.Client logger logger.Logger subscriptions []*subscription features []pubsub.Feature - topics map[string]*azservicebus.Topic + topics map[string]*servicebus.Sender topicsLock *sync.RWMutex ctx context.Context @@ -106,7 +109,7 @@ func NewAzureServiceBus(logger logger.Logger) pubsub.PubSub { logger: logger, subscriptions: []*subscription{}, features: []pubsub.Feature{pubsub.FeatureMessageTTL}, - topics: map[string]*azservicebus.Topic{}, + topics: map[string]*servicebus.Sender{}, topicsLock: &sync.RWMutex{}, } } @@ -281,56 +284,56 @@ func parseAzureServiceBusMetadata(meta pubsub.Metadata) (metadata, error) { return m, nil } -func (a *azureServiceBus) Init(metadata pubsub.Metadata) error { - m, err := parseAzureServiceBusMetadata(metadata) +func (a *azureServiceBus) Init(metadata pubsub.Metadata) (err error) { + a.metadata, err = parseAzureServiceBusMetadata(metadata) if err != nil { return err } userAgent := "dapr-" + logger.DaprVersion - a.metadata = m if a.metadata.ConnectionString != "" { - a.namespace, err = azservicebus.NewNamespace( - azservicebus.NamespaceWithConnectionString(a.metadata.ConnectionString), - azservicebus.NamespaceWithUserAgent(userAgent)) - + a.client, err = servicebus.NewClientFromConnectionString(a.metadata.ConnectionString, &servicebus.ClientOptions{ + ApplicationID: userAgent, + }) if err != nil { return err } - } else { - // Initialization code - settings, err := azauth.NewEnvironmentSettings(azauth.AzureServiceBusResourceName, metadata.Properties) + + a.adminClient, err = sbadmin.NewClientFromConnectionString(a.metadata.ConnectionString, nil) if err != nil { return err } + } else { + settings, innerErr := azauth.NewEnvironmentSettings(azauth.AzureServiceBusResourceName, metadata.Properties) + if innerErr != nil { + return innerErr + } - tokenProvider, err := settings.GetAMQPTokenProvider() - if err != nil { - return err + token, innerErr := settings.GetTokenCredential() + if innerErr != nil { + return innerErr } - a.namespace, err = azservicebus.NewNamespace(azservicebus.NamespaceWithTokenProvider(tokenProvider), - azservicebus.NamespaceWithUserAgent(userAgent)) - if err != nil { - return err + a.client, innerErr = servicebus.NewClient(a.metadata.NamespaceName, token, &servicebus.ClientOptions{ + ApplicationID: userAgent, + }) + if innerErr != nil { + return innerErr } - // We set these separately as the ServiceBus SDK does not provide a way to pass the environment via the options - // pattern unless you allow it to recreate the entire environment which seems wasteful. - a.namespace.Name = a.metadata.NamespaceName - a.namespace.Environment = *settings.AzureEnvironment - a.namespace.Suffix = settings.AzureEnvironment.ServiceBusEndpointSuffix + a.adminClient, innerErr = sbadmin.NewClient(a.metadata.NamespaceName, token, nil) + if innerErr != nil { + return innerErr + } } - a.topicManager = a.namespace.NewTopicManager() - a.ctx, a.cancel = context.WithCancel(context.Background()) return nil } func (a *azureServiceBus) Publish(req *pubsub.PublishRequest) error { - var sender *azservicebus.Topic + var sender *servicebus.Sender var err error a.topicsLock.RLock() @@ -347,7 +350,7 @@ func (a *azureServiceBus) Publish(req *pubsub.PublishRequest) error { } } a.topicsLock.Lock() - sender, err = a.namespace.NewTopic(req.Topic) + sender, err = a.client.NewSender(req.Topic, &servicebus.NewSenderOptions{}) a.topics[req.Topic] = sender a.topicsLock.Unlock() @@ -356,46 +359,51 @@ func (a *azureServiceBus) Publish(req *pubsub.PublishRequest) error { } } + a.logger.Infof("Creating message with body: %s", string(req.Data)) msg, err := NewASBMessageFromPubsubRequest(req) if err != nil { return err } - return a.doPublish(sender, msg) -} - -func (a *azureServiceBus) doPublish(sender *azservicebus.Topic, msg *azservicebus.Message) error { ebo := backoff.NewExponentialBackOff() ebo.InitialInterval = time.Duration(a.metadata.PublishInitialRetryIntervalInMs) * time.Millisecond bo := backoff.WithMaxRetries(ebo, uint64(a.metadata.PublishMaxRetries)) bo = backoff.WithContext(bo, a.ctx) - return retry.NotifyRecover(func() error { - ctx, cancel := context.WithTimeout(a.ctx, time.Second*time.Duration(a.metadata.TimeoutInSec)) - defer cancel() + msgID := "nil" + if msg.MessageID != nil { + msgID = *msg.MessageID + } + return retry.NotifyRecover( + func() (err error) { + ctx, cancel := context.WithTimeout(a.ctx, time.Second*time.Duration(a.metadata.TimeoutInSec)) + defer cancel() - err := sender.Send(ctx, msg) - if err == nil { - return nil - } + err = sender.SendMessage(ctx, msg, nil) + if err != nil { + var amqpError *amqp.Error + if errors.As(err, &amqpError) { + if _, ok := retriableSendingErrors[amqpError.Condition]; ok { + return amqpError // Retries. + } + } - var amqpError *amqp.Error - if errors.As(err, &amqpError) { - if _, ok := retriableSendingErrors[amqpError.Condition]; ok { - return amqpError // Retries. - } - } - var connClosedError azservicebus.ErrConnectionClosed - if errors.As(err, &connClosedError) { - return connClosedError // Retries. - } + if errors.Is(err, amqp.ErrConnClosed) { + return err // Retries. + } - return backoff.Permanent(err) // Does not retry. - }, bo, func(err error, _ time.Duration) { - a.logger.Debugf("Could not publish service bus message. Retrying...: %v", err) - }, func() { - a.logger.Debug("Successfully published service bus message after it previously failed") - }) + return backoff.Permanent(err) // Does not retry. + } + return nil + }, + bo, + func(err error, _ time.Duration) { + a.logger.Debugf("Could not publish service bus message (%s). Retrying...: %v", msgID, err) + }, + func() { + a.logger.Debugf("Successfully published service bus message (%s) after it previously failed", msgID) + }, + ) } func (a *azureServiceBus) Subscribe(req pubsub.SubscribeRequest, handler pubsub.Handler) error { @@ -427,7 +435,6 @@ func (a *azureServiceBus) Subscribe(req pubsub.SubscribeRequest, handler pubsub. select { case <-reconnCtx.Done(): a.logger.Debugf("Reconnect context for topic %s is done", req.Topic) - return case <-time.After(2 * time.Minute): attempts := readAttemptsStale() @@ -441,25 +448,14 @@ func (a *azureServiceBus) Subscribe(req pubsub.SubscribeRequest, handler pubsub. // Reconnect loop. for { - topic, err := a.namespace.NewTopic(req.Topic) - if err != nil { - a.logger.Errorf("%s could not instantiate topic %s, %s", errorMessagePrefix, req.Topic, err) - - return - } - - var opts []azservicebus.SubscriptionOption - if a.metadata.PrefetchCount != nil { - opts = append(opts, azservicebus.SubscriptionWithPrefetchCount(uint32(*a.metadata.PrefetchCount))) - } - subEntity, err := topic.NewSubscription(subID, opts...) + subEntity, err := a.client.NewReceiverForSubscription(req.Topic, subID, &servicebus.ReceiverOptions{}) if err != nil { a.logger.Errorf("%s could not instantiate subscription %s for topic %s", errorMessagePrefix, subID, req.Topic) - return } - sub := newSubscription(req.Topic, subEntity, a.metadata.MaxConcurrentHandlers, a.logger) + sub := newSubscription(req.Topic, subEntity, a.metadata.MaxConcurrentHandlers, a.metadata.PrefetchCount, a.logger) a.subscriptions = append(a.subscriptions, sub) + // ReceiveAndBlock will only return with an error // that it cannot handle internally. The subscription // connection is closed when this method returns. @@ -467,13 +463,15 @@ func (a *azureServiceBus) Subscribe(req pubsub.SubscribeRequest, handler pubsub. // to re-establish the subscription connection until // we exhaust the number of reconnect attempts. ctx, cancel := context.WithCancel(context.Background()) - innerErr := sub.ReceiveAndBlock(ctx, + innerErr := sub.ReceiveAndBlock( + ctx, handler, a.metadata.LockRenewalInSec, a.metadata.HandlerTimeoutInSec, a.metadata.TimeoutInSec, a.metadata.MaxActiveMessages, - a.metadata.MaxActiveMessagesRecoveryInSec) + a.metadata.MaxActiveMessagesRecoveryInSec, + ) if innerErr != nil { var detachError *amqp.DetachError var amqpError *amqp.Error @@ -489,7 +487,6 @@ func (a *azureServiceBus) Subscribe(req pubsub.SubscribeRequest, handler pubsub. attempts := readAttemptsStale() if attempts == 0 { a.logger.Errorf("Subscription to topic %s lost connection, unable to recover after %d attempts", sub.topic, a.metadata.MaxReconnectionAttempts) - return } @@ -503,13 +500,13 @@ func (a *azureServiceBus) Subscribe(req pubsub.SubscribeRequest, handler pubsub. } func (a *azureServiceBus) ensureTopic(topic string) error { - entity, err := a.getTopicEntity(topic) + shouldCreate, err := a.shouldCreateTopic(topic) if err != nil { return err } - if entity == nil { - err = a.createTopicEntity(topic) + if shouldCreate { + err = a.createTopic(topic) if err != nil { return err } @@ -524,18 +521,13 @@ func (a *azureServiceBus) ensureSubscription(name string, topic string) error { return err } - subManager, err := a.namespace.NewSubscriptionManager(topic) + shouldCreate, err := a.shouldCreateSubscription(topic, name) if err != nil { return err } - entity, err := a.getSubscriptionEntity(subManager, topic, name) - if err != nil { - return err - } - - if entity == nil { - err = a.createSubscriptionEntity(subManager, topic, name) + if shouldCreate { + err = a.createSubscription(topic, name) if err != nil { return err } @@ -544,85 +536,120 @@ func (a *azureServiceBus) ensureSubscription(name string, topic string) error { return nil } -func (a *azureServiceBus) getTopicEntity(topic string) (*azservicebus.TopicEntity, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(a.metadata.TimeoutInSec)) +func (a *azureServiceBus) shouldCreateTopic(topic string) (bool, error) { + ctx, cancel := context.WithTimeout(a.ctx, time.Second*time.Duration(a.metadata.TimeoutInSec)) defer cancel() - if a.topicManager == nil { - return nil, fmt.Errorf("%s init() has not been called", errorMessagePrefix) + if a.adminClient == nil { + return false, fmt.Errorf("%s init() has not been called", errorMessagePrefix) + } + res, err := a.adminClient.GetTopic(ctx, topic, nil) + if err != nil { + return false, fmt.Errorf("%s could not get topic %s, %s", errorMessagePrefix, topic, err.Error()) } - topicEntity, err := a.topicManager.Get(ctx, topic) - if err != nil && !azservicebus.IsErrNotFound(err) { - return nil, fmt.Errorf("%s could not get topic %s, %s", errorMessagePrefix, topic, err) + if res == nil { + // If res is nil, the topic does not exist + return true, nil } - return topicEntity, nil + return false, nil } -func (a *azureServiceBus) createTopicEntity(topic string) error { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(a.metadata.TimeoutInSec)) +func (a *azureServiceBus) createTopic(topic string) error { + ctx, cancel := context.WithTimeout(a.ctx, time.Second*time.Duration(a.metadata.TimeoutInSec)) defer cancel() - _, err := a.topicManager.Put(ctx, topic) + _, err := a.adminClient.CreateTopic(ctx, topic, nil) if err != nil { - return fmt.Errorf("%s could not put topic %s, %s", errorMessagePrefix, topic, err) + return fmt.Errorf("%s could not create topic %s, %s", errorMessagePrefix, topic, err) } return nil } -func (a *azureServiceBus) getSubscriptionEntity(mgr *azservicebus.SubscriptionManager, topic, subscription string) (*azservicebus.SubscriptionEntity, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(a.metadata.TimeoutInSec)) +func (a *azureServiceBus) shouldCreateSubscription(topic, subscription string) (bool, error) { + ctx, cancel := context.WithTimeout(a.ctx, time.Second*time.Duration(a.metadata.TimeoutInSec)) defer cancel() - entity, err := mgr.Get(ctx, subscription) - if err != nil && !azservicebus.IsErrNotFound(err) { - return nil, fmt.Errorf("%s could not get subscription %s, %s", errorMessagePrefix, subscription, err) + res, err := a.adminClient.GetSubscription(ctx, topic, subscription, &sbadmin.GetSubscriptionOptions{}) + if err != nil { + return false, fmt.Errorf("%s could not get subscription %s, %s", errorMessagePrefix, subscription, err) + } + if res == nil { + // If res is subscription, the topic does not exist + return true, nil } - return entity, nil + return false, nil } -func (a *azureServiceBus) createSubscriptionEntity(mgr *azservicebus.SubscriptionManager, topic, subscription string) error { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(a.metadata.TimeoutInSec)) - defer cancel() - - opts, err := a.createSubscriptionManagementOptions() +func (a *azureServiceBus) createSubscription(topic, subscription string) error { + props, err := a.createSubscriptionProperties() if err != nil { return err } - _, err = mgr.Put(ctx, subscription, opts...) + ctx, cancel := context.WithTimeout(a.ctx, time.Second*time.Duration(a.metadata.TimeoutInSec)) + defer cancel() + _, err = a.adminClient.CreateSubscription(ctx, topic, subscription, &sbadmin.CreateSubscriptionOptions{ + Properties: props, + }) if err != nil { - return fmt.Errorf("%s could not put subscription %s, %s", errorMessagePrefix, subscription, err) + return fmt.Errorf("%s could not create subscription %s, %s", errorMessagePrefix, subscription, err) } return nil } -func (a *azureServiceBus) createSubscriptionManagementOptions() ([]azservicebus.SubscriptionManagementOption, error) { - var opts []azservicebus.SubscriptionManagementOption +func (a *azureServiceBus) createSubscriptionProperties() (*sbadmin.SubscriptionProperties, error) { + properties := &sbadmin.SubscriptionProperties{} + if a.metadata.MaxDeliveryCount != nil { - opts = append(opts, subscriptionManagementOptionsWithMaxDeliveryCount(a.metadata.MaxDeliveryCount)) + maxDeliveryCount := int32(*a.metadata.MaxDeliveryCount) + properties.MaxDeliveryCount = &maxDeliveryCount } + if a.metadata.LockDurationInSec != nil { - opts = append(opts, subscriptionManagementOptionsWithLockDuration(a.metadata.LockDurationInSec)) + lockDuration := contrib_metadata.Duration{ + Duration: time.Duration(*a.metadata.LockDurationInSec) * time.Second, + } + properties.LockDuration = to.Ptr(lockDuration.ToISOString()) } + if a.metadata.DefaultMessageTimeToLiveInSec != nil { - opts = append(opts, subscriptionManagementOptionsWithDefaultMessageTimeToLive(a.metadata.DefaultMessageTimeToLiveInSec)) + defaultMessageTimeToLive := contrib_metadata.Duration{ + Duration: time.Duration(*a.metadata.DefaultMessageTimeToLiveInSec) * time.Second, + } + properties.DefaultMessageTimeToLive = to.Ptr(defaultMessageTimeToLive.ToISOString()) } + if a.metadata.AutoDeleteOnIdleInSec != nil { - opts = append(opts, subscriptionManagementOptionsWithAutoDeleteOnIdle(a.metadata.AutoDeleteOnIdleInSec)) + autoDeleteOnIdle := contrib_metadata.Duration{ + Duration: time.Duration(*a.metadata.AutoDeleteOnIdleInSec) * time.Second, + } + properties.AutoDeleteOnIdle = to.Ptr(autoDeleteOnIdle.ToISOString()) } - return opts, nil + return properties, nil } func (a *azureServiceBus) Close() error { + var ctx context.Context + var cancel context.CancelFunc for _, s := range a.subscriptions { - s.close(a.ctx) + ctx, cancel = context.WithTimeout(a.ctx, 10*time.Second) + s.close(ctx) + cancel() } + var err error for _, t := range a.topics { - t.Close(a.ctx) + a.logger.Debugf("Closing topic %s", t) + ctx, cancel = context.WithTimeout(a.ctx, 10*time.Second) + err = t.Close(ctx) + if err != nil { + // Log only + a.logger.Warnf("%s closing topic %s: %+v", errorMessagePrefix, t, err) + } + cancel() } a.cancel() @@ -633,39 +660,3 @@ func (a *azureServiceBus) Close() error { func (a *azureServiceBus) Features() []pubsub.Feature { return a.features } - -func subscriptionManagementOptionsWithMaxDeliveryCount(maxDeliveryCount *int) azservicebus.SubscriptionManagementOption { - return func(d *azservicebus.SubscriptionDescription) error { - mdc := int32(*maxDeliveryCount) - d.MaxDeliveryCount = &mdc - - return nil - } -} - -func subscriptionManagementOptionsWithAutoDeleteOnIdle(durationInSec *int) azservicebus.SubscriptionManagementOption { - return func(d *azservicebus.SubscriptionDescription) error { - duration := fmt.Sprintf("PT%dS", *durationInSec) - d.AutoDeleteOnIdle = &duration - - return nil - } -} - -func subscriptionManagementOptionsWithDefaultMessageTimeToLive(durationInSec *int) azservicebus.SubscriptionManagementOption { - return func(d *azservicebus.SubscriptionDescription) error { - duration := fmt.Sprintf("PT%dS", *durationInSec) - d.DefaultMessageTimeToLive = &duration - - return nil - } -} - -func subscriptionManagementOptionsWithLockDuration(durationInSec *int) azservicebus.SubscriptionManagementOption { - return func(d *azservicebus.SubscriptionDescription) error { - duration := fmt.Sprintf("PT%dS", *durationInSec) - d.LockDuration = &duration - - return nil - } -} diff --git a/pubsub/azure/servicebus/subscription.go b/pubsub/azure/servicebus/subscription.go index dec50bf006..ae33efdc92 100644 --- a/pubsub/azure/servicebus/subscription.go +++ b/pubsub/azure/servicebus/subscription.go @@ -19,7 +19,7 @@ import ( "sync" "time" - azservicebus "github.com/Azure/azure-service-bus-go" + azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" @@ -28,18 +28,19 @@ import ( type subscription struct { topic string mu sync.RWMutex - activeMessages map[string]*azservicebus.Message - entity *azservicebus.Subscription + activeMessages map[string]*azservicebus.ReceivedMessage + receiver *azservicebus.Receiver limitConcurrentHandlers bool handleChan chan handle logger logger.Logger + prefetchChan chan *azservicebus.ReceivedMessage } -func newSubscription(topic string, sub *azservicebus.Subscription, maxConcurrentHandlers *int, logger logger.Logger) *subscription { +func newSubscription(topic string, receiver *azservicebus.Receiver, maxConcurrentHandlers *int, prefetchCount *int, logger logger.Logger) *subscription { s := &subscription{ topic: topic, - activeMessages: make(map[string]*azservicebus.Message), - entity: sub, + activeMessages: make(map[string]*azservicebus.ReceivedMessage), + receiver: receiver, logger: logger, } @@ -52,6 +53,13 @@ func newSubscription(topic string, sub *azservicebus.Subscription, maxConcurrent } } + if prefetchCount != nil { + s.logger.Debugf("Subscription to topic %s will prefetch %d message(s)", topic, *prefetchCount) + s.prefetchChan = make(chan *azservicebus.ReceivedMessage, *prefetchCount) + } else { + s.prefetchChan = make(chan *azservicebus.ReceivedMessage, 1) + } + return s } @@ -76,7 +84,6 @@ func (s *subscription) ReceiveAndBlock(ctx context.Context, handler pubsub.Handl select { case <-ctx.Done(): s.logger.Debugf("Lock renewal context for topic %s done", s.topic) - return case <-time.After(time.Second * time.Duration(lockRenewalInSec)): s.tryRenewLocks() @@ -84,6 +91,20 @@ func (s *subscription) ReceiveAndBlock(ctx context.Context, handler pubsub.Handl } }() + // Prefetch loop. + go func() { + for { + msgs, err := s.receiver.ReceiveMessages(ctx, 1, nil) + if err != nil { + s.logger.Errorf("Error reading from topic %s. %s", s.topic, err.Error()) + } + for _, msg := range msgs { + s.logger.Infof("Prefetch received msg: %s", msg.MessageID) + s.prefetchChan <- msg + } + } + }() + asyncHandler := s.asyncWrapper(s.getHandlerFunc(handler, handlerTimeoutInSec, timeoutInSec)) // Receiver loop. @@ -98,7 +119,6 @@ func (s *subscription) ReceiveAndBlock(ctx context.Context, handler pubsub.Handl select { case <-ctx.Done(): s.logger.Debugf("Receive context for topic %s done", s.topic) - return ctx.Err() case <-time.After(time.Second * time.Duration(maxActiveMessagesRecoveryInSec)): continue @@ -115,13 +135,13 @@ func (s *subscription) close(ctx context.Context) { s.logger.Debugf("Closing subscription to topic %s", s.topic) // Ensure subscription entity is closed. - if err := s.entity.Close(ctx); err != nil { + if err := s.receiver.Close(ctx); err != nil { s.logger.Warnf("%s closing subscription entity for topic %s: %+v", errorMessagePrefix, s.topic, err) } } -func (s *subscription) getHandlerFunc(handler pubsub.Handler, handlerTimeoutInSec int, timeoutInSec int) azservicebus.HandlerFunc { - return func(ctx context.Context, asbMsg *azservicebus.Message) error { +func (s *subscription) getHandlerFunc(handler pubsub.Handler, handlerTimeoutInSec int, timeoutInSec int) func(ctx context.Context, asbMsg *azservicebus.ReceivedMessage) error { + return func(ctx context.Context, asbMsg *azservicebus.ReceivedMessage) error { pubsubMsg, err := NewPubsubMessageFromASBMessage(asbMsg, s.topic) if err != nil { return fmt.Errorf("failed to get pubsub message from azure service bus message: %+v", err) @@ -129,7 +149,7 @@ func (s *subscription) getHandlerFunc(handler pubsub.Handler, handlerTimeoutInSe handleCtx, handleCancel := context.WithTimeout(ctx, time.Second*time.Duration(handlerTimeoutInSec)) defer handleCancel() - s.logger.Debugf("Calling app's handler for message %s on topic %s", asbMsg.ID, s.topic) + s.logger.Debugf("Calling app's handler for message %s on topic %s", asbMsg.MessageID, s.topic) appErr := handler(handleCtx, pubsubMsg) // This context is used for the calls to service bus to finalize (i.e. complete/abandon) the message. @@ -153,33 +173,33 @@ func (s *subscription) getHandlerFunc(handler pubsub.Handler, handlerTimeoutInSe } } -func (s *subscription) asyncWrapper(handlerFunc azservicebus.HandlerFunc) azservicebus.HandlerFunc { - return func(ctx context.Context, msg *azservicebus.Message) error { +func (s *subscription) asyncWrapper(handlerFunc func(ctx context.Context, asbMsg *azservicebus.ReceivedMessage) error) func(ctx context.Context, asbMsg *azservicebus.ReceivedMessage) error { + return func(ctx context.Context, msg *azservicebus.ReceivedMessage) error { go func() { s.addActiveMessage(msg) - defer s.removeActiveMessage(msg.ID) + defer s.removeActiveMessage(msg.MessageID) if s.limitConcurrentHandlers { - s.logger.Debugf("Taking message handle for %s on topic %s", msg.ID, s.topic) + s.logger.Debugf("Taking message handle for %s on topic %s", msg.MessageID, s.topic) select { case <-ctx.Done(): - s.logger.Debugf("Message context done for %s on topic %s", msg.ID, s.topic) + s.logger.Debugf("Message context done for %s on topic %s", msg.MessageID, s.topic) return case <-s.handleChan: // Take or wait on a free handle before getting a new message. - s.logger.Debugf("Taken message handle for %s on topic %s", msg.ID, s.topic) + s.logger.Debugf("Taken message handle for %s on topic %s", msg.MessageID, s.topic) } defer func() { - s.logger.Debugf("Releasing message handle for %s on topic %s", msg.ID, s.topic) + s.logger.Debugf("Releasing message handle for %s on topic %s", msg.MessageID, s.topic) s.handleChan <- handle{} // Release a handle when complete. - s.logger.Debugf("Released message handle for %s on topic %s", msg.ID, s.topic) + s.logger.Debugf("Released message handle for %s on topic %s", msg.MessageID, s.topic) }() } err := handlerFunc(ctx, msg) if err != nil { - s.logger.Errorf("%s error handling message %s on topic '%s', %s", errorMessagePrefix, msg.ID, s.topic, err) + s.logger.Errorf("%s error handling message %s on topic '%s', %s", errorMessagePrefix, msg.MessageID, s.topic, err) } }() @@ -198,7 +218,7 @@ func (s *subscription) tryRenewLocks() { } // Snapshot the messages to try to renew locks for. - msgs := make([]*azservicebus.Message, 0) + msgs := make([]*azservicebus.ReceivedMessage, 0) s.mu.RLock() for _, m := range s.activeMessages { msgs = append(msgs, m) @@ -207,37 +227,48 @@ func (s *subscription) tryRenewLocks() { // Lock renewal is best effort and not guaranteed to succeed, warnings are expected. s.logger.Debugf("Trying to renew %d active message lock(s) for topic %s", len(msgs), s.topic) - err := s.entity.RenewLocks(context.Background(), msgs...) - if err != nil { - s.logger.Debugf("Couldn't renew all active message lock(s) for topic %s, ", s.topic, err) + for _, msg := range msgs { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + err := s.receiver.RenewMessageLock(ctx, msg, nil) + if err != nil { + s.logger.Debugf("Couldn't renew all active message lock(s) for topic %s, ", s.topic, err) + } + cancel() } } -func (s *subscription) receiveMessage(ctx context.Context, handler azservicebus.HandlerFunc) error { +func (s *subscription) receiveMessage(ctx context.Context, handler func(ctx context.Context, asbMsg *azservicebus.ReceivedMessage) error) error { s.logger.Debugf("Waiting to receive message on topic %s", s.topic) - if err := s.entity.ReceiveOne(ctx, handler); err != nil { - return fmt.Errorf("%s error receiving message on topic %s, %w", errorMessagePrefix, s.topic, err) + msg := <-s.prefetchChan + + s.logger.Infof("Process received message: %s", msg.MessageID) + body, _ := msg.Body() + s.logger.Infof("Message body: %s", string(body)) + + err := handler(ctx, msg) + if err != nil { + return fmt.Errorf("%s error processing message on topic %s, %w", errorMessagePrefix, s.topic, err) } return nil } -func (s *subscription) abandonMessage(ctx context.Context, m *azservicebus.Message) error { - s.logger.Debugf("Abandoning message %s on topic %s", m.ID, s.topic) +func (s *subscription) abandonMessage(ctx context.Context, m *azservicebus.ReceivedMessage) error { + s.logger.Debugf("Abandoning message %s on topic %s", m.MessageID, s.topic) - return m.Abandon(ctx) + return s.receiver.AbandonMessage(ctx, m, nil) } -func (s *subscription) completeMessage(ctx context.Context, m *azservicebus.Message) error { - s.logger.Debugf("Completing message %s on topic %s", m.ID, s.topic) +func (s *subscription) completeMessage(ctx context.Context, m *azservicebus.ReceivedMessage) error { + s.logger.Debugf("Completing message %s on topic %s", m.MessageID, s.topic) - return m.Complete(ctx) + return s.receiver.CompleteMessage(ctx, m, nil) } -func (s *subscription) addActiveMessage(m *azservicebus.Message) { - s.logger.Debugf("Adding message %s to active messages on topic %s", m.ID, s.topic) +func (s *subscription) addActiveMessage(m *azservicebus.ReceivedMessage) { + s.logger.Debugf("Adding message %s to active messages on topic %s", m.MessageID, s.topic) s.mu.Lock() - s.activeMessages[m.ID] = m + s.activeMessages[m.MessageID] = m s.mu.Unlock() } diff --git a/tests/certification/bindings/azure/servicebusqueues/go.mod b/tests/certification/bindings/azure/servicebusqueues/go.mod index f7a17bec3f..7cba735994 100644 --- a/tests/certification/bindings/azure/servicebusqueues/go.mod +++ b/tests/certification/bindings/azure/servicebusqueues/go.mod @@ -21,7 +21,8 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/azcore v0.23.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.14.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v0.9.2 // indirect - github.com/Azure/azure-service-bus-go v0.11.5 // indirect + github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v0.4.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/messaging/internal v0.1.0 // indirect github.com/Azure/azure-storage-blob-go v0.15.0 // indirect github.com/Azure/go-amqp v0.17.4 // indirect github.com/Azure/go-autorest v14.2.0+incompatible // indirect @@ -30,7 +31,6 @@ require ( github.com/Azure/go-autorest/autorest/azure/auth v0.5.11 // indirect github.com/Azure/go-autorest/autorest/azure/cli v0.4.5 // indirect github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect - github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect github.com/Azure/go-autorest/logger v0.2.1 // indirect github.com/Azure/go-autorest/tracing v0.6.0 // indirect github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0 // indirect @@ -134,7 +134,6 @@ require ( k8s.io/klog/v2 v2.30.0 // indirect k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 // indirect k8s.io/utils v0.0.0-20210930125809-cb0fa318a74b // indirect - nhooyr.io/websocket v1.8.7 // indirect sigs.k8s.io/controller-runtime v0.11.0 // indirect sigs.k8s.io/json v0.0.0-20211020170558-c049b76a60c6 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.2.0 // indirect diff --git a/tests/certification/bindings/azure/servicebusqueues/go.sum b/tests/certification/bindings/azure/servicebusqueues/go.sum index 128a9583f7..506a503404 100644 --- a/tests/certification/bindings/azure/servicebusqueues/go.sum +++ b/tests/certification/bindings/azure/servicebusqueues/go.sum @@ -43,25 +43,24 @@ contrib.go.opencensus.io/exporter/zipkin v0.1.1/go.mod h1:GMvdSl3eJ2gapOaLKzTKE3 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/AdhityaRamadhanus/fasthttpcors v0.0.0-20170121111917-d4c07198763a h1:XVdatQFSP2YhJGjqLLIfW8QBk4loz/SCe/PxkXDiW+s= github.com/AdhityaRamadhanus/fasthttpcors v0.0.0-20170121111917-d4c07198763a/go.mod h1:C0A1KeiVHs+trY6gUTPhhGammbrZ30ZfXRW/nuT7HLw= -github.com/Azure/azure-amqp-common-go/v3 v3.2.1/go.mod h1:O6X1iYHP7s2x7NjUKsXVhkwWrQhxrd+d8/3rRadj4CI= github.com/Azure/azure-amqp-common-go/v3 v3.2.3 h1:uDF62mbd9bypXWi19V1bN5NZEO84JqgmI5G73ibAmrk= github.com/Azure/azure-amqp-common-go/v3 v3.2.3/go.mod h1:7rPmbSfszeovxGfc5fSAXE4ehlXQZHpMja2OtxC2Tas= github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U= github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k= -github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= -github.com/Azure/azure-sdk-for-go v59.3.0+incompatible h1:dPIm0BO4jsMXFcCI/sLTPkBtE7mk8WMuRHA0JeWhlcQ= github.com/Azure/azure-sdk-for-go/sdk/azcore v0.23.1 h1:3CVsSo4mp8NDWO11tHzN/mdo2zP0CtaSK5IcwBjfqRA= github.com/Azure/azure-sdk-for-go/sdk/azcore v0.23.1/go.mod h1:w5pDIZuawUmY3Bj4tVx3Xb8KS96ToB0j315w9rqpAg0= github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.14.0 h1:NVS/4LOQfkBpk+B1VopIzv1ptmYeEskA8w/3K/w7vjo= github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.14.0/go.mod h1:RG0cZndeZM17StwohYclmcXSr4oOJ8b1I5hB8llIc6Y= +github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.2/go.mod h1:KLF4gFr6DcKFZwSuH8w8yEK6DpFl3LP5rhdvAb7Yz5I= github.com/Azure/azure-sdk-for-go/sdk/internal v0.9.2 h1:Px2KVERcYEg2Lv25AqC2hVr0xUWaq94wuEObLIkYzmA= github.com/Azure/azure-sdk-for-go/sdk/internal v0.9.2/go.mod h1:CdSJQNNzZhCkwDaV27XV1w48ZBPtxe7mlrZAsPNxD5g= +github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v0.4.0 h1:ALpdyAqAOhrHiTMWOkIjB7hEiG7KTvyx7CZ9Qel+ZYQ= +github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v0.4.0/go.mod h1:E3RSrZI5ub9zlZeobR+UUuuuUv4rz6JcVjVpfAmZ65Y= +github.com/Azure/azure-sdk-for-go/sdk/messaging/internal v0.1.0 h1:mvQhIGmKI3vmdNDUMG0ZHaZ1p+JcHE3m0q1RMOyKxRo= +github.com/Azure/azure-sdk-for-go/sdk/messaging/internal v0.1.0/go.mod h1:7hMUlcqiMXDUJtU1EWQlhhkC4BfIr6pEsiyuRYq4xLQ= github.com/Azure/azure-service-bus-go v0.11.5 h1:EVMicXGNrSX+rHRCBgm/TRQ4VUZ1m3yAYM/AB2R/SOs= -github.com/Azure/azure-service-bus-go v0.11.5/go.mod h1:MI6ge2CuQWBVq+ly456MY7XqNLJip5LO1iSFodbNLbU= github.com/Azure/azure-storage-blob-go v0.15.0 h1:rXtgp8tN1p29GvpGgfJetavIG0V7OgcSXPpwp3tx6qk= github.com/Azure/azure-storage-blob-go v0.15.0/go.mod h1:vbjsVbX0dlxnRc4FFMPsS9BsJWPcne7GB7onqlPvz58= -github.com/Azure/go-amqp v0.16.0/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg= -github.com/Azure/go-amqp v0.16.4/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg= github.com/Azure/go-amqp v0.17.0/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg= github.com/Azure/go-amqp v0.17.4 h1:6t9wEiwA4uXMRoUj3Cd3K2gmH8cW8ylizmBnSeF0bzM= github.com/Azure/go-amqp v0.17.4/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg= @@ -70,10 +69,13 @@ github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg6 github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs= github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= github.com/Azure/go-autorest/autorest v0.11.18/go.mod h1:dSiJPy22c3u0OtOKDNttNgqpNFY/GeWa7GH/Pz56QRA= +github.com/Azure/go-autorest/autorest v0.11.22/go.mod h1:BAWYUWGPEtKPzjVkp0Q6an0MJcJDsoh5Z1BFAEFs4Xs= github.com/Azure/go-autorest/autorest v0.11.24/go.mod h1:G6kyRlFnTuSbEYkQGawPfsCswgme4iYf6rfSKUDzbCc= github.com/Azure/go-autorest/autorest v0.11.27 h1:F3R3q42aWytozkV8ihzcgMO4OA4cuqr3bNlsEuF6//A= github.com/Azure/go-autorest/autorest v0.11.27/go.mod h1:7l8ybrIdUmGqZMTD0sRtAr8NvbHjfofbf8RSP2q7w7U= github.com/Azure/go-autorest/autorest/adal v0.9.13/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M= +github.com/Azure/go-autorest/autorest/adal v0.9.14/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M= +github.com/Azure/go-autorest/autorest/adal v0.9.17/go.mod h1:XVVeme+LZwABT8K5Lc3hA4nAe8LDBVle26gTrguhhPQ= github.com/Azure/go-autorest/autorest/adal v0.9.18 h1:kLnPsRjzZZUF3K5REu/Kc+qMQrvuza2bwSnNdhmzLfQ= github.com/Azure/go-autorest/autorest/adal v0.9.18/go.mod h1:XVVeme+LZwABT8K5Lc3hA4nAe8LDBVle26gTrguhhPQ= github.com/Azure/go-autorest/autorest/azure/auth v0.5.11 h1:P6bYXFoao05z5uhOQzbC3Qd8JqF3jUoocoTeIxkp2cA= @@ -86,9 +88,6 @@ github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935 github.com/Azure/go-autorest/autorest/mocks v0.4.2 h1:PGN4EDXnuQbojHbU0UWoNvmu9AGVwYHG9/fkDYhtAfw= github.com/Azure/go-autorest/autorest/mocks v0.4.2/go.mod h1:Vy7OitM9Kei0i1Oj+LvyAWMXJHeKH1MVlzFugfVrmyU= github.com/Azure/go-autorest/autorest/to v0.4.0 h1:oXVqrxakqqV1UZdSazDOPOLvOIz+XA683u8EctwboHk= -github.com/Azure/go-autorest/autorest/to v0.4.0/go.mod h1:fE8iZBn7LQR7zH/9XU2NcPR4o9jEImooCeWJcYV/zLE= -github.com/Azure/go-autorest/autorest/validation v0.3.1 h1:AgyqjAd94fwNAoTjl/WQXg4VvFeRFpO+UhNyRXqF1ac= -github.com/Azure/go-autorest/autorest/validation v0.3.1/go.mod h1:yhLgjC0Wda5DYXl6JAsWyUe4KVNffhoDhG0zVzUMo3E= github.com/Azure/go-autorest/logger v0.2.1 h1:IG7i4p/mDa2Ce4TRyAO8IHnVhAVF3RFU+ZtXWSmf4Tg= github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8= github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo= @@ -228,11 +227,6 @@ github.com/getkin/kin-openapi v0.76.0/go.mod h1:660oXbgy5JFMKreazJaQTw7o+X00qeSy github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= -github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= -github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= -github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M= -github.com/gin-gonic/gin v1.7.3/go.mod h1:jD2toBW3GZUr5UMcdrwQA10I7RuaFOl/SGeDjXkfUtY= -github.com/gin-gonic/gin v1.7.7 h1:3DoBmSbJbZAWqXJC3SLjAPfutPJJRN1U5pALB7EeTTs= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= @@ -257,22 +251,8 @@ github.com/go-openapi/jsonreference v0.19.3/go.mod h1:rjx6GuL8TTa9VaixXglHmQmIL9 github.com/go-openapi/jsonreference v0.19.5/go.mod h1:RdybgQwPxbL4UEjuAruzK1x3nE69AqPYEJeo/TWfEeg= github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= github.com/go-openapi/swag v0.19.14/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ= -github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= -github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q= -github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= -github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no= -github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= -github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI= -github.com/go-playground/validator/v10 v10.4.1 h1:pH2c5ADXtd66mxoE0Zm9SUhxE20r7aM3F26W0hOn+GE= -github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn9GlaMV7XkbRSipzJ0Ii4= github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= -github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0= -github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= -github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8= -github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= -github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo= -github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= @@ -377,8 +357,6 @@ github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51 github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= -github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grandcat/zeroconf v0.0.0-20190424104450-85eadb44205c h1:svzQzfVE9t7Y1CGULS5PsMWs4/H4Au/ZTJzU/0CKgqc= github.com/grandcat/zeroconf v0.0.0-20190424104450-85eadb44205c/go.mod h1:YjKB0WsLXlMkO9p+wGTCoPIDGRJH0mz7E526PxkQVxI= @@ -445,13 +423,11 @@ github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc= -github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= -github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= @@ -465,7 +441,6 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.10.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/compress v1.14.4 h1:eijASRJcobkVtSt81Olfh7JX43osYLwy5krOJo6YEu4= @@ -484,8 +459,6 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= -github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y= -github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.5/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60= @@ -526,7 +499,6 @@ github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS4 github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY= github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= -github.com/mitchellh/mapstructure v1.3.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/mapstructure v1.4.1 h1:CpVNEelQCZBooIPDn+AR3NpivK/TIKU8bDxdASFVQag= github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/moby/spdystream v0.2.0/go.mod h1:f7i0iNDQJ059oMTcWxx8MA/zKFIuD/lY+0GqbN2Wy8c= @@ -680,10 +652,6 @@ github.com/trusch/grpc-proxy v0.0.0-20190529073533-02b64529f274 h1:ChAMVBRng5Dsv github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/tylertreat/comcast v1.0.1 h1:+B8add2s9PrhX4lx5gGqOKUTebGD7lzdfwKZHYoF98Y= github.com/tylertreat/comcast v1.0.1/go.mod h1:8mA9mMCnmAGjTnrWNKQ7PXsBy6FfguO+U9pSxifaka8= -github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= -github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= -github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= -github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasthttp v1.21.0/go.mod h1:jjraHZVbKOXftJfsOYoAjaeygpj5hr8ermTRJNroD7A= @@ -756,7 +724,6 @@ golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.0.0-20211115234514-b4de73f9ece8/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 h1:71vQrMauZZhcTVK6KdYM+rklehEEwb3E+ZhaE5jrPrE= golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= @@ -948,7 +915,6 @@ golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211116061358-0a5406a5449c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a h1:ppl5mZgokTT8uPkmYOyEUmPTr3ypaKkg5eFOGrAmxxE= golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -1220,7 +1186,6 @@ k8s.io/utils v0.0.0-20210802155522-efc7438f0176/go.mod h1:jPW/WVKK9YHAvNhRxK0md/ k8s.io/utils v0.0.0-20210930125809-cb0fa318a74b h1:wxEMGetGMur3J1xuGLQY7GEQYg9bZxKn3tKo5k/eYcs= k8s.io/utils v0.0.0-20210930125809-cb0fa318a74b/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= nhooyr.io/websocket v1.8.7 h1:usjR2uOr/zjjkVMy0lW+PPohFok7PCow5sDjLgX4P4g= -nhooyr.io/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=