Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

.NET Core v2 Azure Function EventHub PartitionKey doesn't seem to work #28245

Closed
josh-endries opened this issue Mar 26, 2018 · 32 comments
Closed
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. Event Hubs feature-request This issue requires a new behavior in the product in order be resolved. Functions issue-addressed The Azure SDK team member assisting with this issue believes it to be addressed and ready to close.

Comments

@josh-endries
Copy link

I'm unable to effectively set the PartitionKey property when using an Azure Function v2 (.NET Core).

Firstly, I am unsure how I am correctly supposed to set the PartitionKey property. I initially used the new Microsoft.Azure.WebJobs.Extensions.EventHubs package and its Microsoft.Azure.EventHubs.EventData class, however there is no PartitionKey property on that class. There is a PartitionKey property in the SystemProperties member on EventData, however SystemProperties and its PartitionKey property are both read-only, so I can't figure out how to set it from that package. Using the "old" Microsoft.ServiceBus.Messaging.EventData class, I can set PartitionKey on the object, however it doesn't seem to have any effect.

I'm assuming that I am doing something wrong but I can't figure out what. I am expecting that when I set the same PartitionKey value, it gets hashed and modded or something and should cause the message to be assigned to the same partition every time (as described in the documentation).

This is the Function code (using the "old" EventData class):

public static class Function1
{
    [FunctionName("Function1")]
    public static async Task Run(
        [TimerTrigger("0 */1 * * * *")] TimerInfo myTimer,
        [EventHub("events", Connection = "ConnectionString")] IAsyncCollector<EventData> results,
        CancellationToken token,
        TraceWriter log
    )
    {
        var ed1 = new EventData(Encoding.UTF8.GetBytes("test"));
        ed1.PartitionKey = "0";
        await results.AddAsync(ed1);
    }
}

I've tried various partition keys (non-numeric, long, short, ...) but that didn't make a difference. The function runs every minute, es expected, but this is what I see in my client, which just dumps out the metadata:

untitled

This shows two Function invocations a minute apart. Note that the first line of each list item includes the partition on which the message was received, and that it alternates from partition 1 to partition 0, even though the PartitionKey property is always "0". If I change PartitionKey to "1", it updates in the data and stays consistent, but the reader-partition reporting the event keeps alternating the same as it does in the image. I would expect these partition IDs to remain the same if the messages were received on the same partition.

This is the code that displays the messages:

public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages) {
    await processor.ProcessEventsAsync(context, messages);

    foreach (EventData eventData in messages) {
        string data = Encoding.UTF8.GetString(eventData.GetBytes());
        var obj = JsonConvert.DeserializeObject(data);
        var c = JsonConvert.SerializeObject(obj, Formatting.Indented);
        await window.Dispatcher.InvokeAsync(() => window.viewModel.log.Add($"Message received (partition {context.Lease.PartitionId}): {c}"));
    }
}

This is currently displaying the lease partition, but I later added the runtime partition and they were always the same (both partition IDs 1 or both 0) and kept alternating with subsequent Function invocations.

@pragnagopa
Copy link
Member

This is by design. This thread on SO has some details on partition key and how the data is distributed. Also, please take a look at the sample: in-order-event-processing-with-azure-functions. This will help understand how partition keys are used by azure functions.

@paulbatum
Copy link
Member

@pragnagopa I am not sure I agree. Unless I'm missing something, the example included in the bug report shows that messages with the same partition key are being written to different partitions. That is not expected.

@pragnagopa
Copy link
Member

ahh I was looking at SystemProperties and both messages have partitionkey set to "0" and misunderstood they have the expected value as set in the code.

@josh-endries
Copy link
Author

That's correct: the ID assigned to PartitionKey in code (e.g. 0) doesn't match the partition ID from which the message arrives (e.g. 1). I probably should have circled the numbers in red in my screenshot.

@paulbatum
Copy link
Member

Ahh I do want to clarify - the partition key is not expected to match the partition ID. But its expected that there is a consistent mapping from a given partition key to a given partition ID (and your example shows that the mapping does not seem consistent right now).

We'll need to investigate this - first thing we'll want to do on our side is try to repro and then see if we can root cause to a functions bug or an event hubs sdk bug.

@josh-endries
Copy link
Author

Right, sorry, bad examples using 1 and 0. I didn't expect to the Key and ID to match, rather that the ID be consistent when the key is consistent, as you said, and not alternate. Thanks for checking into it.

@j-alexander
Copy link

I ran into this, as well, migrating from the WindowsAzure.ServiceBus package to the Microsoft.Azure.EventHubs for dotnet standard. Although I wasn't using WebJobs, this post was the only one that discussed the issue. In my case SendAsync is overloaded to optionally accept a partition key.

So, before I would have had:

var ed1 = new EventData(Encoding.UTF8.GetBytes("test"));
ed1.PartitionKey = "0";
await hub.SendAsync(ed1);

And afterward it was a fairly straightforward change:

var ed1 = new EventData(Encoding.UTF8.GetBytes("test"));
var partitionKey = "0";
await hub.SendAsync(ed1, partitionKey);

Not sure how it maps into your results datastructure, but it definitely seems like a similar sort of change would be necessary in the way results is handled.

@mabzd
Copy link

mabzd commented Dec 15, 2018

Any progress on this?

@paulbatum
Copy link
Member

Thank you for the ping @mabzd and thank you @j-alexander for the helpful comments. I now have the full context and I understand why @josh-endries was getting the behavior he described above. I'll try to summarize.

The Event Hubs team changed the way their SDK works in regards to partition keys. EventData no longer has a settable partition key. Instead, you specify the partition key when you send the data. Now @josh-endries tried to work around this problem by referencing the older Event Hubs SDK, but that won't work because the Event Hubs SDK is the component that is doing the send, and its not going to check the send payload for a partition key (I suspect it also doesn't work because the older EventData is not recognized by the system and is just treated as a generic user type).

This change is somewhat difficult to resolve with the binding model of webjobs/functions, where the user never really calls "send".

Today, the most straightforward workaround is to avoid using the Event Hubs output binding in webjobs/functions when you need to specify a partition key. Use the Event Hubs SDK directly, by calling EventHubClient.CreateFromConnectionString(...) and then client.SendAsync(data, partitionKey).

Obviously this is not an ideal solution. To get this scenario working with output bindings again, I can think of a few possible designs:

  1. Introduce a PartitionedEventData type that looks something like this:
namespace Microsoft.Azure.WebJobs.Extensions.EventHubs
{
  public class PartitionedEventData
  {
    public string PartitionKey { get; set; }
    public EventData Data { get; set; }
  }
}

The output binding would know how to recognize this datatype and call the SendAsync(data, key) overload appropriately.

  1. Introduce IPartitionedAsyncCollector<T> (and its twin IPartitionedCollector<T>). Instead of having AddAsync<T>(T item) it would have AddAsync<T>(T item, string partitionKey). If we updated the code from @josh-endries to use this approach, it would look like this:
public static class Function1
{
    [FunctionName("Function1")]
    public static async Task Run(
        [TimerTrigger("0 */1 * * * *")] TimerInfo myTimer,
        [EventHub("events", Connection = "ConnectionString")] IPartitionedAsyncCollector<EventData> results,
        CancellationToken token,
        TraceWriter log
    )
    {
        var ed1 = new EventData(Encoding.UTF8.GetBytes("test"));
        await results.AddAsync(ed1, "0");
    }
}

I am uncomfortable with the first option because it is essentially an undo of the change made by the Event Hubs SDK team.

The second option seems more consistent with the design choices made by the Event Hubs SDK team. I'm not sure yet of how much work this would be to implement as I'm not very familiar with the code for managing collectors.

Thats all I have right now. Until we are able to priortize the work involved in evaluating and implementing one of these designs (or another if we think of one), please use the workaround I mentioned above.

@jeffhollan
Copy link

@paulbatum I'm wondering if there is a 3rd option where we all you to bind directly to the Event Hub client? That's how we enable more advanced Service Bus scenarios (MessageSender and MessageReceiver). Either way the gap here makes sense.

@Sudhanshu1987
Copy link

Thank you for the ping @mabzd and thank you @j-alexander for the helpful comments. I now have the full context and I understand why @josh-endries was getting the behavior he described above. I'll try to summarize.

The Event Hubs team changed the way their SDK works in regards to partition keys. EventData no longer has a settable partition key. Instead, you specify the partition key when you send the data. Now @josh-endries tried to work around this problem by referencing the older Event Hubs SDK, but that won't work because the Event Hubs SDK is the component that is doing the send, and its not going to check the send payload for a partition key (I suspect it also doesn't work because the older EventData is not recognized by the system and is just treated as a generic user type).

This change is somewhat difficult to resolve with the binding model of webjobs/functions, where the user never really calls "send".

Today, the most straightforward workaround is to avoid using the Event Hubs output binding in webjobs/functions when you need to specify a partition key. Use the Event Hubs SDK directly, by calling EventHubClient.CreateFromConnectionString(...) and then client.SendAsync(data, partitionKey).

Obviously this is not an ideal solution. To get this scenario working with output bindings again, I can think of a few possible designs:

  1. Introduce a PartitionedEventData type that looks something like this:
namespace Microsoft.Azure.WebJobs.Extensions.EventHubs
{
  public class PartitionedEventData
  {
    public string PartitionKey { get; set; }
    public EventData Data { get; set; }
  }
}

The output binding would know how to recognize this datatype and call the SendAsync(data, key) overload appropriately.

  1. Introduce IPartitionedAsyncCollector<T> (and its twin IPartitionedCollector<T>). Instead of having AddAsync<T>(T item) it would have AddAsync<T>(T item, string partitionKey). If we updated the code from @josh-endries to use this approach, it would look like this:
public static class Function1
{
    [FunctionName("Function1")]
    public static async Task Run(
        [TimerTrigger("0 */1 * * * *")] TimerInfo myTimer,
        [EventHub("events", Connection = "ConnectionString")] IPartitionedAsyncCollector<EventData> results,
        CancellationToken token,
        TraceWriter log
    )
    {
        var ed1 = new EventData(Encoding.UTF8.GetBytes("test"));
        await results.AddAsync(ed1, "0");
    }
}

I am uncomfortable with the first option because it is essentially an undo of the change made by the Event Hubs SDK team.

The second option seems more consistent with the design choices made by the Event Hubs SDK team. I'm not sure yet of how much work this would be to implement as I'm not very familiar with the code for managing collectors.

Thats all I have right now. Until we are able to priortize the work involved in evaluating and implementing one of these designs (or another if we think of one), please use the workaround I mentioned above.

For the solution where you suggest using the Use the Event Hubs SDK directly, by calling EventHubClient.CreateFromConnectionString(...) and then client.SendAsync(data, partitionKey), will it not involve creating so many client. Can I use this in my prod app without perfomance concerns until this is fixed in proper ways.

@jeffhollan
Copy link

Just be sure to create the EventHubClient as a singleton (private static of the class) so the connection will safely be reused across executions.

@paulbatum
Copy link
Member

Adding another option to consider, that is a scoped down version of IPartitionedAsyncCollector - we could make an EventHubAsyncCollector type that has: .AddAsync(EventData d, string partitionKey) and .AddAsync(IEnumerable<EventData> d, string partitionKey).

@fabiocav
Copy link
Member

Assigning this to sprint 50 so we can investigate the options and decide how we want to move forward. Actual fix would likely happen in a later sprint once we have a design/plan

@slechta
Copy link

slechta commented May 22, 2019

Can you get tell me please a date when this will be fixed?
We are dependent on this fix. It is currently a blocker for us and we cannot get to production.

@paulbatum
Copy link
Member

@slechta We have not landed on a design for this yet so I can't provide an ETA. We are currently working on a around of updates for event hubs so hopefully we can provide an ETA in a month or so.

My reply above provided a workaround - can you explain why this workaround does not work for you, preventing you from going into production?

@paulbatum
Copy link
Member

Reopening this issue as Azure/azure-webjobs-sdk#2225 helps make this scenario easier, but it does not unblock usage of the output binding specifically. Lets explore the idea of a PartitionedAsyncCollector or EventHubAsyncCollector a little more.

@alrod alrod transferred this issue from Azure/azure-webjobs-sdk Aug 5, 2019
@alrod alrod transferred this issue from Azure/azure-functions-eventhubs-extension Aug 5, 2019
@pragnagopa
Copy link
Member

cc @amamounelsayed / @jeffhollan fyi

@alrod alrod transferred this issue from Azure/azure-functions-eventgrid-extension Aug 28, 2019
@pragnagopa
Copy link
Member

cc @mathewc

@fabiocav
Copy link
Member

Moving this back to "Triaged" so we can discuss the options

@wolszakp
Copy link

Hi guys,
Solution with

IPartitionedAsyncCollector<EventData> results

sounds very good. After migration to azure v2 we hit the same issue.
Is there any update when we can use it?

@alrod
Copy link
Member

alrod commented Jan 4, 2020

You can use [EventHub(TestHubName)] EventHubClient client as workaround for now

@daviburg
Copy link
Member

daviburg commented Mar 2, 2020

IAsyncCollector is an abstraction for output of n elements to a service, with internal batching. The partition key as a method property may be specific to Event Hub. As something specific to one underlying service, maybe it should remain abstracted from the interface for multiple services.

Event Hub SDK has separated the partition key and other send or batch options from the event data itself. One can send a single event to a specific partition and there the reason for change is not clear. The change makes most sense when you look at the Event Hub send async for batch of events (IEnumerable) where the partition key is a single argument. Meaning the whole batch is sent to one partition, such that the partition key is a parameter at the send level, not at the data event level. This doc explains why a client may need to do that https://docs.microsoft.com/en-us/dotnet/api/microsoft.azure.eventhubs.eventhubclient.sendasync?view=azure-dotnet-legacy#Microsoft_Azure_EventHubs_EventHubClient_SendAsync_System_Collections_Generic_IEnumerable_Microsoft_Azure_EventHubs_EventData__System_String_
The IAsyncCollector does not have a method to send or to add a batch. It design is around collecting event to form a batch (each implementation internally may decide the count of element at which point it sends the batch). So the method is add (not send), and the batch is send either as the count threshold is met, the client calls Flush method explicitly, or other logic (the collector goes out of scope so it knows it won't get more entities added).

So, as the IAsyncCollector is built around the concept of an underlying batch, the partition key should be a concept at the batch level. Either for building the instance implementing the collector - then the collector always send to the same partition -, or as a settable property -then the partition it sends a ready batch to match the current value of the partition key property-. To make this generic, rather than making a specific partition key property, it should be a batch option object which each binding gets to define the schema of.
Then for outputing a single event, the non-collector approach, the partition key need to be again a property set by the caller. To keep it abstracted from specific service, it means the specific input for Event Hub need to be wrapped in a container object having both the event data and the partition key. Together they form the input parameter for the Event Hub client SDK call to send single event data to specific partition. And yes, it would mean that the type for a single output entity and the type for a collection output of entities differs. Or, revise the single output contract to have both a type for the entity sent as well as a type for send options.

@azure-sdk azure-sdk added Event Hubs needs-team-triage This issue needs the team to triage. labels Apr 18, 2022
@ghost ghost removed the needs-triage This is a new issue that needs to be triaged to the appropriate team. label Apr 18, 2022
@jsquire jsquire added feature-request This issue requires a new behavior in the product in order be resolved. and removed Track2 needs-team-triage This issue needs the team to triage. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels Apr 22, 2022
@ghost ghost added the needs-team-attention This issue needs attention from Azure service team or SDK team label Apr 22, 2022
@jsquire jsquire added Functions and removed needs-team-attention This issue needs attention from Azure service team or SDK team labels Apr 22, 2022
@ghost ghost added the needs-team-attention This issue needs attention from Azure service team or SDK team label Apr 22, 2022
@jsquire
Copy link
Member

jsquire commented Apr 22, 2022

//fyi: @JoshLove-msft

@jsquire
Copy link
Member

jsquire commented Feb 6, 2023

This has been addressed via #33858. A new overload for IAsyncCollector<EventData> has been added to allow specifying a partition key to accompany the event.

[FunctionName("BindingToCollector")]
public static async Task Run(
    [TimerTrigger("0 */5 * * * *")] TimerInfo myTimer,
    [EventHub("<event_hub_name>", Connection = "<connection_name>")] IAsyncCollector<EventData> collector)
{
    // When no partition key is used, partitions will be assigned via round-robin.
    await collector.AddAsync(new EventData($"Event 1 added at: {DateTime.Now}"));
    await collector.AddAsync(new EventData($"Event 2 added at: {DateTime.Now}"));

    // Using a partition key will help group events together; events with the same key will always be
    // assigned to the same partition.
    await collector.AddAsync(new EventData($"Event 3 added at: {DateTime.Now}"), "sample-key");
    await collector.AddAsync(new EventData($"Event 4 added at: {DateTime.Now}"), "sample-key");
}

The release for this is tracked in #33919 and has been tentatively scheduled for our March window. Assuming that nothing requires the release to be rescheduled, the updated package should be available in the second week of March.

@jsquire jsquire modified the milestones: Backlog, 2023-03 Feb 6, 2023
@jsquire jsquire added the issue-addressed The Azure SDK team member assisting with this issue believes it to be addressed and ready to close. label Feb 6, 2023
@ghost
Copy link

ghost commented Feb 6, 2023

Hi @josh-endries. Thank you for opening this issue and giving us the opportunity to assist. We believe that this has been addressed. If you feel that further discussion is needed, please add a comment with the text “/unresolve” to remove the “issue-addressed” label and continue the conversation.

@ghost
Copy link

ghost commented Feb 13, 2023

Hi @josh-endries, since you haven’t asked that we “/unresolve” the issue, we’ll close this out. If you believe further discussion is needed, please add a comment “/unresolve” to reopen the issue.

@ghost ghost closed this as completed Feb 13, 2023
@github-actions github-actions bot locked and limited conversation to collaborators Jun 25, 2023
This issue was closed.
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. Event Hubs feature-request This issue requires a new behavior in the product in order be resolved. Functions issue-addressed The Azure SDK team member assisting with this issue believes it to be addressed and ready to close.
Projects
Development

No branches or pull requests