Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

With a ServiceBusTrigger, how to go from string/byte[] to Message? #384

Closed
ModernRonin opened this issue Apr 8, 2021 · 21 comments
Closed

Comments

@ModernRonin
Copy link

Hi,

using Service Bus and the new functions model, I need to integrate with MassTransit. MT's integration library for Service Bus expects that I pass in a Microsoft.Azure.ServiceBus.Message.

The new functions model doesn't allow this type to directly be bound, the relevant samples all act as if the message was a regular string.
I checked out Message and noted it has a constructor taking a byte[]. So I changed the type of the incoming message to byte[] - which is one of the supported types, as far as I understand - and then just construct a Message from that.

However, if I pass that to MassTransit, I get an InvalidOperationException that actually comes from Microsoft.Azure.ServiceBus:

---> System.InvalidOperationException: Operation is not valid due to the current state of the object.
[2021-04-08T18:05:51.756Z]    at Microsoft.Azure.ServiceBus.Message.SystemPropertiesCollection.ThrowIfNotReceived()
[2021-04-08T18:05:51.756Z]    at Microsoft.Azure.ServiceBus.Message.SystemPropertiesCollection.get_DeliveryCount()
[2021-04-08T18:05:51.757Z]    at MassTransit.Azure.ServiceBus.Core.Contexts.ServiceBusReceiveContext..ctor(Message message, ReceiveEndpointContext receiveEndpointContext)
...

So it seems that my idea of constructing a Message from the byte[] is not the right way to go. When I break into the function right after the Message instance has been constructed, it looks kinda incomplete, too. It seems only the Body is filled. Further investigation shows that the c'tor of Message taking an array does exactly this - it just fills the Body property from the array and initializes SystemProperties and UserProperties as empty collections. Later in the process when MassTransit tries to read Message.SystemProperties.DeliveryCount, the exception is thrown because SystemProperties is empty.

The question is: what do I need to do to create a complete Message instance from within my trigger function?

This is really critical for me, unless I find a way to make this work, we will have to switch away from Azure Functions to dockerized services, which would be a shame because the autoscaling of Azure Functions is a very cool feature.

Any help would be greatly appreciated!

Many thanks,
MR

@SeanFeldman
Copy link
Contributor

The question is: what do I need to do to create a complete Message instance from within my trigger function?

At this point in time, it will be tricky. Functions runtime passes the context which contains serialized system and user properties (headers). Just constructing a message from the body byte[] is not taking those values into consideration. I suspect there will be some hackery and reflection involved to construct a Message MassTransit requires.

@ModernRonin
Copy link
Author

Well, if there was some documentation on how to fill these properties from the passed FunctionContext, I'd be fine.
Seeing as Message is actually a type created and managed by Microsoft, as is FunctionContext, the required knowledge must be available within Microsoft.

So, if there is some documentation for this, please provide the link. If there isn't, please create it :-)

(Of course, it would be much better yet to add the conversion functionality to a nuget we can reference, but if that would take more than just a few days, I'd prefer the documentation because I really need to solve this like yesterday.)

@SeanFeldman
Copy link
Contributor

SeanFeldman commented Apr 9, 2021

I don't represent the Functions team but If I understand correctly the intention, the idea is that the Isolated Worker SDK will not take any dependency on the specific SDK packages to avoid the mess that was there with BrokeredMessage and Message in the past that would be repeated with with Message and ServiceBusMessage (the latest ASB SDK). For that, some sort of abstraction will be needed and it will likely be coming later. As to documenting a workaround in the official documentation - not sure that's a good idea. That kind of "documentation" would be more suitable for an issue such as this one.

I didn't need to fully construct a Message so didn't bother with the reflection. As to getting the custom properties, this is what I've done

if (functionContext.BindingContext.BindingData.TryGetValue("UserProperties", out var customProperties) && customProperties != null)
{
  var customHeaders = System.Text.Json.JsonSerializer.Deserialize<Dictionary<string, string>>(customProperties.ToString()
        ?? throw new InvalidOperationException());
  // ...
}

For system set properties such as Label or MessageId, accessing BindingData using the property name as a key will give your the serialized values.

@ModernRonin
Copy link
Author

thank you, I'll see if this is enough to get me running :-)

@ModernRonin
Copy link
Author

ModernRonin commented Apr 10, 2021

Hi,

so I'm now able to receive messages using the info @SeanFeldman provided.
While it works, it's horrible because I rely on internal implementation details of Microsoft.Azure.ServiceBus.Message.

I will have to discuss internally whether we as a company want to build the infrastructure of our product on this rather shaky foundation or whether it's maybe the better course of action to steer away from Azure Functions.

However, maybe it'll be helpful for someone to see the complete code for creating a Message object from a byte[] and a FunctionContext:

public static class MessageFactory
{
    public static Message CreateMessage(byte[] body, FunctionContext context)
    {
        var result = new Message(body);
        result.SetPrimitiveProperty(m => m.MessageId, context);
        result.SetPrimitiveProperty(m => m.ContentType, context);
        result.SetJsonProperty(m => m.UserProperties, context);

        var sysProperties = result.SystemProperties;
        sysProperties.SetPrimitiveProperty(s => s.DeliveryCount, context);
        sysProperties.SetPrimitiveProperty(s => s.SequenceNumber, context);
        sysProperties.SetPrimitiveProperty(s => s.EnqueuedTimeUtc, context);

        // this one we cannot directly set because it's computed from a field
        var lockToken =
            context.GetPrimitiveValue<Message.SystemPropertiesCollection, string>(s => s.LockToken);
        var lockTokenGuid = Guid.Parse(lockToken);
        sysProperties.SetField("lockTokenGuid", lockTokenGuid);

        // this one we need to do indirectly because  ExpiresAtUtc is computed
        // while TTL is settable
        var expiresAtUtc = context.GetPrimitiveValue<Message, DateTime>(m => m.ExpiresAtUtc);
        result.TimeToLive = expiresAtUtc.Subtract(sysProperties.EnqueuedTimeUtc);

        return result;
    }

    static TProperty GetPrimitiveValue<TOwner, TProperty>(this FunctionContext self,
        Expression<Func<TOwner, TProperty>> accessor)
    {
        var rawValue = self.GetRawValue(accessor);
        if (typeof(TProperty) == typeof(DateTime)) rawValue = rawValue?.ToString()?.Trim('"');
        return (TProperty) Convert.ChangeType(rawValue, typeof(TProperty));
    }

    static object GetRawValue<TOwner, TProperty>(this FunctionContext self,
        Expression<Func<TOwner, TProperty>> accessor)
    {
        var property = PropertyInfo(accessor);
        var name = property.Name;
        if (self.BindingContext.BindingData.TryGetValue(name, out var rawValue)) return rawValue;
        return default;
    }

    static PropertyInfo PropertyInfo<TOwner, TProperty>(Expression<Func<TOwner, TProperty>> accessor)
    {
        var result = accessor.GetMemberExpression().Member as PropertyInfo;
        if (result == default)
            throw new ArgumentException("The accessor doesn't access a property", nameof(accessor));
        return result;
    }

    static void SetField(this object self, string fieldName, object value)
    {
        var field = self.GetType()
            .GetField(fieldName,
                BindingFlags.NonPublic |
                BindingFlags.Instance);
        if (field == default) throw new ArgumentException("There is no such field", fieldName);
        field.SetValue(self, value);
    }

    static void SetJsonProperty<TOwner, TProperty>(this TOwner self,
        Expression<Func<TOwner, TProperty>> accessor,
        FunctionContext context)
    {
        var json = (string) context.GetRawValue(accessor);
        var value = JsonConvert.DeserializeObject(json, typeof(TProperty));
        self.SetProperty(accessor, value);
    }

    static void SetPrimitiveProperty<TOwner, TProperty>(this TOwner self,
        Expression<Func<TOwner, TProperty>> accessor,
        FunctionContext context)
    {
        var value = context.GetPrimitiveValue(accessor);
        self.SetProperty(accessor, value);
    }

    static void SetProperty<TOwner, TProperty>(this TOwner self,
        Expression<Func<TOwner, TProperty>> accessor,
        object value)
    {
        var property = PropertyInfo(accessor);
        var setter = property.SetMethod;
        if (setter == default)
            throw new ArgumentException("The property does not have any setter", nameof(accessor));

        setter.Invoke(self, new[] {value});
    }
}

Note that in my tests all the properties this passage sets were always present in FunctionContext. Other properties, like for example PartitionKey were never present. This might or might not have to do with my using MassTransit or just my usage in my tests, I cannot say - yet. If I find out anything more about this, I'll try to remember and update this thread, though :-)

@repo-owners: While technically the code passage above answers my question, I don't think this issue should be closed because this simply is not a good solution, it's a hack.

@ModernRonin ModernRonin changed the title Question: with a ServiceBusTrigger, how to go from string/byte[] to Message? With a ServiceBusTrigger, how to go from string/byte[] to Message? Apr 10, 2021
@fabiocav fabiocav added this to the Functions Sprint 100 milestone Apr 13, 2021
@fabiocav fabiocav removed this from the Functions Sprint 100 milestone Apr 13, 2021
@gertjvr
Copy link

gertjvr commented May 7, 2021

Been looking at this as well, hidden in the documentation .net 5 ServiceBusTrigger does support ServiceBusReceivedMessage from the new Azure.Messaging.ServiceBus library. https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-service-bus-trigger?tabs=csharp#usage

MassTransit is still using Microsoft.Azure.ServiceBus which only supports Message

If MassTransit upgrades to support the new Azure.Messaging.ServiceBus client library then upgrading the Azure Function integrations would be simple, but the transport upgrade is the tricky one.

@gertjvr
Copy link

gertjvr commented May 7, 2021

I have started a discussion under the MassTransit repo MassTransit/MassTransit#2501

@optiks
Copy link

optiks commented May 31, 2021

Been looking at this as well, hidden in the documentation .net 5 ServiceBusTrigger does support ServiceBusReceivedMessage from the new Azure.Messaging.ServiceBus library. https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-service-bus-trigger?tabs=csharp#usage

I believe this is only for the old (non-isolated) model. Have you tried it on .NET 5?

@optiks
Copy link

optiks commented May 31, 2021

As to getting the custom properties, this is what I've done ...

This shouldn't be necessary; you should be able to inject them directly, e.g.

public async Task<Message> Run( 
   [ServiceBusTrigger(queueName: "queueName", Connection = "AzureServiceBus")]
   string body,
   string correlationId,
   string messageId,
   string sessionId,
   IDictionary<string, object> userProperties,
   int deliveryCount,
   FunctionContext functionContext
)

@SeanFeldman
Copy link
Contributor

@optiks those are not custom properties 🙂

@optiks
Copy link

optiks commented May 31, 2021

@optiks those are not custom properties 🙂

D'oh! My bad, thanks.

@jeroenbeuz
Copy link

jeroenbeuz commented Aug 4, 2021

Is there any update on this? I want to upgrade my Azure Function with a ServiceBus trigger to .Net 5 but this is blocking for me. I need the Message, just like it was possible in .Net core 3.1 functions. I've also tried the ServiceBusReceivedMessage like
@gertjvr suggested but then the function throws an exception because it can't bind that argument.

I've now spend hours to try and get this to work. Hope someone can help me out here.

Oh and the reason I need the message is so I can re-send the message once it's been handled successfully. So if getting the Message in the function trigger is not possible, maybe there's another way to send the exact same message to a different topic when and if the message was completed.

@marcelgrilo
Copy link

Hello there!!!

In my case, our team is needing to use the autocomplete=false settings, because we need to control when to complete the message or not manually.
And without the MessageReceiver we dont know where to find or call the CompleteAsync(lockToken)...

Does anyone here needing this functionality?

We are currently using autocomplete=true, but our application will scale in the very near future and we can have critical problems without this extra controll...

@jeroenbeuz
Copy link

jeroenbeuz commented Sep 14, 2021

Still no update? Please, we really want to upgrade our function to .Net 5 but this is blocking.

@SeanFeldman
Copy link
Contributor

Still no update? Please, we really want to upgrade our function to .Net 5 but this is blocking.

You are better off upgrading to .NET 6 (October) and staying with in-proc SDK as you won't be able to use SDK types with the new Isolated Worker Functions SDK at this point in time.

@digitalkirkley
Copy link

When will it be possible to use 'ServiceBusReceivedMessage' on the ServiceBusTrigger binding with .NET 5 out-of-process (isolated)? Or is this .NET 6? Or 7? (Have various of existing 3.1 in-proc functions and need to make some decisions)

@thomhurst
Copy link

For getting MessageHeaders in isolated functions I've done this:

public static class FunctionContextExtensions
{
    public static IReadOnlyDictionary<string, object> GetServiceBusMessageHeaders(this FunctionContext functionContext)
    {
        if (!functionContext.BindingContext.BindingData.TryGetValue("ApplicationProperties", out var applicationPropertiesString))
        {
            return new Dictionary<string, object>();
        }

        return JsonSerializer.Deserialize<Dictionary<string, object>>(applicationPropertiesString.ToString(), new JsonSerializerOptions
        {
            Converters = { new ObjectToInferredTypesConverter() }
        });
    }
}

public class ObjectToInferredTypesConverter : JsonConverter<object>
{
    public override object Read(
        ref Utf8JsonReader reader,
        Type typeToConvert,
        JsonSerializerOptions options) => reader.TokenType switch
    {
        JsonTokenType.True => true,
        JsonTokenType.False => false,
        JsonTokenType.Number when reader.TryGetInt64(out long l) => l,
        JsonTokenType.Number => reader.GetDouble(),
        JsonTokenType.String when reader.TryGetDateTime(out DateTime datetime) => datetime,
        JsonTokenType.String => reader.GetString()!,
        _ => JsonDocument.ParseValue(ref reader).RootElement.Clone()
    };

    public override void Write(
        Utf8JsonWriter writer,
        object objectToWrite,
        JsonSerializerOptions options) =>
        JsonSerializer.Serialize(writer, objectToWrite, objectToWrite.GetType(), options);
}

Then in the function, inject in FunctionContext and use that extension:

    [Function("MyFunction")]
    public static void Run([ServiceBusTrigger("test", Connection = "ConnectionString")] string myQueueItem, FunctionContext context)
    {
        ...
        var messageHeaders = context.GetServiceBusMessageHeaders();
        ...
    }

@Banchio
Copy link

Banchio commented Feb 12, 2023

+1, I also need to access system/custom properties as well as perform actions on service bus messages like complete/abandon/scheduleAsync and so on, thanks

@JayAtSherweb
Copy link

Is there any kind of ETA at all on this? This has been on going for years now. I think this deserves some attention by now.

@SeanFeldman
Copy link
Contributor

The work is in progress: #1313

@liliankasem
Copy link
Member

Closing as ServiceBusReceivedMessage support has been released: https://github.com/Azure/azure-functions-dotnet-worker/releases/tag/servicebus-extension-5.12.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests