Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dapr pubsub service bus topics: Add support for including ApplicationProperties in subscriber #3428

Open
vbuonagura opened this issue May 22, 2024 · 7 comments
Assignees
Labels
kind/enhancement New feature or request

Comments

@vbuonagura
Copy link

Hello,
we are trying to integrate Dapr within our microservices architectures but we are experiencing some issues in accessing custom properties for messaged published through Service Bus Topics.

We are not using cloudEvents and to make events traceable, we will leverage ElasticAPM with the default Azure Service Bus capabilities based on Diagnostic-Id field. See more here.

We are publishing the message from a .NET 8 API in the following way:
`
public async Task CreateNotification([FromBody] Notification notification)
{
if (notification is not null)
{
ITransaction transaction = Agent.Tracer.CurrentTransaction;

    var asyncResult = await transaction.CaptureSpan("SendEvent", ApiConstants.TypeMessaging, async(s) =>
    {
        //metadata needed to remove the cloudevent envelop
        var metadata = new Dictionary<string, string>
        {
            { "rawPayload", "true" },
            { "Diagnostic-Id", "00-"+ Agent.Tracer.CurrentTransaction.TraceId + "-" + Agent.Tracer.CurrentSpan.Id + "-" +"01"}
        };

        await _daprClient.PublishEventAsync("pubsub", "notification-topic", notification, metadata);

        return true;
    });

    return Ok("Notification Created");
}

return BadRequest();

}`

The rawPayload and Diagnostic-Id metadata are correctly set into the message custom properties, but when we receive the data in the following way, we cannot see those metadata into the http request header
`Dapr.Topic("pubsub", "notification-topic")]
[HttpPost("notificationreceived")]
public async Task NotificationReceived([FromBody] Notification notification)
{
foreach (var header in Request.Headers)
{
_logger.LogInformation(header.Key + " = " + header.Value);
}

if (notification is not null)
{
    await _notificationsService.CreateAsync(notification);

    return Ok("Notification Received");
}

return BadRequest();

}`

the only metadata available are:

  • deliverycount
  • enqueuedtimeutc
  • lockeduntilutc
  • locktoken
  • messageid
  • sequencenumber

Is there a way to get also custom properties with metadata?

@vbuonagura vbuonagura added the kind/bug Something isn't working label May 22, 2024
@vbuonagura
Copy link
Author

vbuonagura commented May 24, 2024

Looking at the implementation of the component here:

func addMessageAttributesToMetadata(metadata map[string]string, asbMsg *azservicebus.ReceivedMessage) map[string]string {

it looks like the metadata is missing from the mapping when receiving a message from the bus

@olitomlinson olitomlinson transferred this issue from dapr/dapr May 27, 2024
@berndverst berndverst added kind/enhancement New feature or request and removed kind/bug Something isn't working labels Jun 4, 2024
@berndverst
Copy link
Member

berndverst commented Jun 4, 2024

Dapr supports pass through of non-cloud event properties if and only if are these sent as additional properties within a cloud event body. That is referred to as cloud event extension. It does not work with raw events, but instead a payload of type application/cloudevent+json must be sent, with a manual json dictionary body which then contains both legitimate cloud event properties and your custom properties. These properties are indeed retained and delivered to all components.

https://github.com/dapr/dapr/blob/199bcf4e198b247bf0f259562e02c97064cb378e/pkg/api/grpc/grpc.go#L182-L190

What you are requesting is not inherently Service Bus specific and this request occasionally comes up with other components.

The Dapr PubSub component interface does not currently specify that metadata properties are to be retained. These were only created with the intentions to control component specific behavior but not to be delivered to the consumer application.

Therefore, a larger proposal would be required to design an appropriate solution to store and deliver metadata in all Dapr pubsub components, even when cloud event envelopes are not used. I am not a fan of creating a component specific solution / approach here, especially given that we already support passing through custom data via cloud event extensions.

@berndverst berndverst changed the title Dapr pubsub service bus topics missing custom metadata Dapr pubsub service bus topics: Passthrough metadata when using raw payload Jun 4, 2024
@berndverst
Copy link
Member

Perhaps there is some confusion here based on wording of the request.

The code you pointed out will take any unknown metadata properties and add them to:

ApplicationProperties on the message. This is the place to store custom metadata in service bus (at least when using the Go SDK): https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus#Message

default:
asbMsg.ApplicationProperties[k] = v
}

I think you are seeing the ApplicationProperties set correctly.

When this was designed, the intention there was never for those custom properties to be delivered back to the subscriber.

We can certainly add support for this however.

@berndverst berndverst changed the title Dapr pubsub service bus topics: Passthrough metadata when using raw payload Dapr pubsub service bus topics: Add support for including ApplicationProperties in subscriber Jun 4, 2024
@berndverst
Copy link
Member

What needs to be done is for this function be updated to also look at the ApplicationProperties property (which itself is a map) and for that map to be merged into the resulting metadata map.

func addMessageAttributesToMetadata(metadata map[string]string, asbMsg *azservicebus.ReceivedMessage) map[string]string {
if metadata == nil {
metadata = map[string]string{}
}
if asbMsg.MessageID != "" {
metadata["metadata."+MessageKeyMessageID] = asbMsg.MessageID
}
if asbMsg.SessionID != nil {
metadata["metadata."+MessageKeySessionID] = *asbMsg.SessionID
}
if asbMsg.CorrelationID != nil && *asbMsg.CorrelationID != "" {
metadata["metadata."+MessageKeyCorrelationID] = *asbMsg.CorrelationID
}
if asbMsg.Subject != nil && *asbMsg.Subject != "" {
metadata["metadata."+MessageKeyLabel] = *asbMsg.Subject
}
if asbMsg.ReplyTo != nil && *asbMsg.ReplyTo != "" {
metadata["metadata."+MessageKeyReplyTo] = *asbMsg.ReplyTo
}
if asbMsg.To != nil && *asbMsg.To != "" {
metadata["metadata."+MessageKeyTo] = *asbMsg.To
}
if asbMsg.ContentType != nil && *asbMsg.ContentType != "" {
metadata["metadata."+MessageKeyContentType] = *asbMsg.ContentType
}
if asbMsg.LockToken != [16]byte{} {
metadata["metadata."+MessageKeyLockToken] = base64.StdEncoding.EncodeToString(asbMsg.LockToken[:])
}
// Always set delivery count.
metadata["metadata."+MessageKeyDeliveryCount] = strconv.FormatInt(int64(asbMsg.DeliveryCount), 10)
if asbMsg.EnqueuedTime != nil {
// Preserve RFC2616 time format.
metadata["metadata."+MessageKeyEnqueuedTimeUtc] = asbMsg.EnqueuedTime.UTC().Format(http.TimeFormat)
}
if asbMsg.SequenceNumber != nil {
metadata["metadata."+MessageKeySequenceNumber] = strconv.FormatInt(*asbMsg.SequenceNumber, 10)
}
if asbMsg.ScheduledEnqueueTime != nil {
// Preserve RFC2616 time format.
metadata["metadata."+MessageKeyScheduledEnqueueTimeUtc] = asbMsg.ScheduledEnqueueTime.UTC().Format(http.TimeFormat)
}
if asbMsg.PartitionKey != nil {
metadata["metadata."+MessageKeyPartitionKey] = *asbMsg.PartitionKey
}
if asbMsg.LockedUntil != nil {
// Preserve RFC2616 time format.
metadata["metadata."+MessageKeyLockedUntilUtc] = asbMsg.LockedUntil.UTC().Format(http.TimeFormat)
}
return metadata
}

That should give you what you want @vbuonagura and this change would apply to all Service Bus components in Dapr.

@berndverst
Copy link
Member

I implemented this and you will see it in Dapr 1.14

@berndverst berndverst self-assigned this Jun 4, 2024
@vbuonagura
Copy link
Author

Thanks @berndverst for taking the point. I will check it with Dapr 1.14

@vbuonagura
Copy link
Author

@berndverst is there a way to test the code change in preview mode?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants