Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
Support send and receiving custom annotations in System Properties (#169
Browse files Browse the repository at this point in the history
)

* Support send and receiving custom annotations in System Properties

* update changelog and version number

Co-authored-by: Joel Hendrix <jhendrix@microsoft.com>
  • Loading branch information
princjef and jhendrixMSFT committed Jun 3, 2020
1 parent a123c25 commit dfc6364
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 22 deletions.
8 changes: 4 additions & 4 deletions azuredeploy.tf
Expand Up @@ -63,9 +63,9 @@ resource "random_string" "secret" {
resource "azuread_application" "test" {
count = data.azurerm_client_config.current.service_principal_application_id == "" ? 1 : 0
name = "servicebustest"
homepage = "https://servicebustest"
identifier_uris = ["https://servicebustest"]
reply_urls = ["https://servicebustest"]
homepage = "https://servicebustest-${random_string.name.result}"
identifier_uris = ["https://servicebustest-${random_string.name.result}"]
reply_urls = ["https://servicebustest-${random_string.name.result}"]
available_to_other_tenants = false
oauth2_allow_implicit_flow = true
}
Expand Down Expand Up @@ -160,4 +160,4 @@ output "AZURE_CLIENT_ID" {
output "AZURE_CLIENT_SECRET" {
value = compact(concat(azuread_service_principal_password.test.*.value, list(var.azure_client_secret)))[0]
sensitive = true
}
}
8 changes: 7 additions & 1 deletion changelog.md
@@ -1,5 +1,11 @@
# Change Log

## `v0.10.2`
- add support for sending and receiving custom annotations
- added some missing AMQP span attributes
- fixed propagation of sender/receiver close context
- don't panic on empty AMQP payloads

## `v0.10.1`
- fix nil pointer dereference for concurrent uses of Send() [issue #149](https://github.com/Azure/azure-service-bus-go/issues/149)
- fix nil pointer dereference when there are no listeners [PR #151](https://github.com/Azure/azure-service-bus-go/pull/151)
Expand Down Expand Up @@ -59,4 +65,4 @@
- Ensure senders wait for message disposition before returning

## `v0.1.0`
- initial tag for Service Bus which includes Queues, Topics and Subscriptions using AMQP
- initial tag for Service Bus which includes Queues, Topics and Subscriptions using AMQP
59 changes: 47 additions & 12 deletions message.go
Expand Up @@ -68,15 +68,16 @@ type (

// SystemProperties are used to store properties that are set by the system.
SystemProperties struct {
LockedUntil *time.Time `mapstructure:"x-opt-locked-until"`
SequenceNumber *int64 `mapstructure:"x-opt-sequence-number"`
PartitionID *int16 `mapstructure:"x-opt-partition-id"`
PartitionKey *string `mapstructure:"x-opt-partition-key"`
EnqueuedTime *time.Time `mapstructure:"x-opt-enqueued-time"`
DeadLetterSource *string `mapstructure:"x-opt-deadletter-source"`
ScheduledEnqueueTime *time.Time `mapstructure:"x-opt-scheduled-enqueue-time"`
EnqueuedSequenceNumber *int64 `mapstructure:"x-opt-enqueue-sequence-number"`
ViaPartitionKey *string `mapstructure:"x-opt-via-partition-key"`
LockedUntil *time.Time `mapstructure:"x-opt-locked-until"`
SequenceNumber *int64 `mapstructure:"x-opt-sequence-number"`
PartitionID *int16 `mapstructure:"x-opt-partition-id"`
PartitionKey *string `mapstructure:"x-opt-partition-key"`
EnqueuedTime *time.Time `mapstructure:"x-opt-enqueued-time"`
DeadLetterSource *string `mapstructure:"x-opt-deadletter-source"`
ScheduledEnqueueTime *time.Time `mapstructure:"x-opt-scheduled-enqueue-time"`
EnqueuedSequenceNumber *int64 `mapstructure:"x-opt-enqueue-sequence-number"`
ViaPartitionKey *string `mapstructure:"x-opt-via-partition-key"`
Annotations map[string]interface{} `mapstructure:"-"`
}

mapStructureTag struct {
Expand Down Expand Up @@ -364,11 +365,15 @@ func (m *Message) toMsg() (*amqp.Message, error) {
}

if m.SystemProperties != nil {
// Set the raw annotations first (they may be nil) and add the explicit
// system properties second to ensure they're set properly.
amqpMsg.Annotations = addMapToAnnotations(amqpMsg.Annotations, m.SystemProperties.Annotations)

sysPropMap, err := encodeStructureToMap(m.SystemProperties)
if err != nil {
return nil, err
}
amqpMsg.Annotations = annotationsFromMap(sysPropMap)
amqpMsg.Annotations = addMapToAnnotations(amqpMsg.Annotations, sysPropMap)
}

if m.LockToken != nil {
Expand All @@ -381,8 +386,11 @@ func (m *Message) toMsg() (*amqp.Message, error) {
return amqpMsg, nil
}

func annotationsFromMap(m map[string]interface{}) amqp.Annotations {
a := make(amqp.Annotations)
func addMapToAnnotations(a amqp.Annotations, m map[string]interface{}) amqp.Annotations {
if a == nil && len(m) > 0 {
a = make(amqp.Annotations)
}

for key, val := range m {
a[key] = val
}
Expand Down Expand Up @@ -434,6 +442,28 @@ func newMessage(data []byte, amqpMsg *amqp.Message) (*Message, error) {
if err := mapstructure.Decode(amqpMsg.Annotations, &msg.SystemProperties); err != nil {
return msg, err
}

// If we didn't populate any system properties, set up the struct so we
// can put the annotations in it
if msg.SystemProperties == nil {
msg.SystemProperties = new(SystemProperties)
}

// Take all string-keyed annotations because the protocol reserves all
// numeric keys for itself and there are no numeric keys defined in the
// protocol today:
//
// http://www.amqp.org/sites/amqp.org/files/amqp.pdf (section 3.2.10)
//
// This approach is also consistent with the behavior of .NET:
//
// https://docs.microsoft.com/en-us/dotnet/api/azure.messaging.eventhubs.eventdata.systemproperties?view=azure-dotnet#Azure_Messaging_EventHubs_EventData_SystemProperties
msg.SystemProperties.Annotations = make(map[string]interface{})
for key, val := range amqpMsg.Annotations {
if s, ok := key.(string); ok {
msg.SystemProperties.Annotations[s] = val
}
}
}

if amqpMsg.DeliveryTag != nil && len(amqpMsg.DeliveryTag) > 0 {
Expand Down Expand Up @@ -500,6 +530,11 @@ func encodeStructureToMap(structPointer interface{}) (map[string]interface{}, er
return nil, err
}

// Skip any entries with an exclude tag
if tag.Name == "-" {
continue
}

if tag != nil {
switch f.Kind() {
case reflect.Ptr:
Expand Down
2 changes: 1 addition & 1 deletion message_session.go
Expand Up @@ -8,8 +8,8 @@ import (
"time"

"github.com/Azure/azure-amqp-common-go/v3/rpc"
"github.com/devigned/tab"
"github.com/Azure/go-amqp"
"github.com/devigned/tab"
)

// MessageSession represents and allows for interaction with a Service Bus Session.
Expand Down
22 changes: 21 additions & 1 deletion message_test.go
Expand Up @@ -4,9 +4,9 @@ import (
"time"

"github.com/Azure/azure-amqp-common-go/v3/uuid"
"github.com/Azure/go-amqp"
"github.com/Azure/go-autorest/autorest/to"
"github.com/mitchellh/mapstructure"
"github.com/Azure/go-amqp"
)

func (suite *serviceBusSuite) TestMapStructureEncode() {
Expand Down Expand Up @@ -74,6 +74,10 @@ func (suite *serviceBusSuite) TestMessageToAMQPMessage() {
ScheduledEnqueueTime: &until,
EnqueuedSequenceNumber: to.Int64Ptr(1),
ViaPartitionKey: to.StringPtr("via"),
Annotations: map[string]interface{}{
"custom": "annotation",
"x-opt-partition-key": "other value",
},
},
UserProperties: map[string]interface{}{
"test": "foo",
Expand All @@ -100,6 +104,17 @@ func (suite *serviceBusSuite) TestMessageToAMQPMessage() {
for key, val := range sysPropMap {
suite.Equal(val, aMsg.Annotations[key], key)
}

for key, val := range msg.SystemProperties.Annotations {
// The partition key should be overridden by the value in the
// base system properties
if key == "x-opt-partition-key" {
suite.Equal(*msg.SystemProperties.PartitionKey, aMsg.Annotations[key], key)
continue
}

suite.Equal(val, aMsg.Annotations[key], key)
}
}

for key, val := range msg.UserProperties {
Expand Down Expand Up @@ -147,6 +162,7 @@ func (suite *serviceBusSuite) TestAMQPMessageToMessage() {
"x-opt-scheduled-enqueue-time": until,
"x-opt-enqueue-sequence-number": int64(1),
"x-opt-via-partition-key": "via",
"custom-annotation": "value",
},
ApplicationProperties: map[string]interface{}{
"test": "foo",
Expand Down Expand Up @@ -177,6 +193,10 @@ func (suite *serviceBusSuite) TestAMQPMessageToMessage() {
for key, val := range sysPropMap {
suite.Equal(val, aMsg.Annotations[key], key)
}

for key, val := range msg.SystemProperties.Annotations {
suite.Equal(val, aMsg.Annotations[key], key)
}
}

for key, val := range aMsg.ApplicationProperties {
Expand Down
2 changes: 1 addition & 1 deletion namespace.go
Expand Up @@ -49,7 +49,7 @@ const (
//`

// Version is the semantic version number
Version = "0.10.1"
Version = "0.10.2"

rootUserAgent = "/golang-service-bus"
)
Expand Down
2 changes: 1 addition & 1 deletion rpc.go
Expand Up @@ -32,8 +32,8 @@ import (

"github.com/Azure/azure-amqp-common-go/v3/rpc"
"github.com/Azure/azure-amqp-common-go/v3/uuid"
"github.com/devigned/tab"
"github.com/Azure/go-amqp"
"github.com/devigned/tab"
)

type (
Expand Down
2 changes: 1 addition & 1 deletion session.go
Expand Up @@ -28,8 +28,8 @@ import (
"sync/atomic"

"github.com/Azure/azure-amqp-common-go/v3/uuid"
"github.com/devigned/tab"
"github.com/Azure/go-amqp"
"github.com/devigned/tab"
)

type (
Expand Down

0 comments on commit dfc6364

Please sign in to comment.