Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve Kafka Output binding throughput as Async blocking on each output message. Move to a await whenAll() #300

Closed
ivanthelad opened this issue Mar 15, 2022 · 6 comments
Assignees

Comments

@ivanthelad
Copy link

ivanthelad commented Mar 15, 2022

When using outputing with a batch/list of events, The current throughput using the kafka output binding maxs out at around 500 events per minute. this because the implementation blocks on each message sent to the underlying c++ librkafka library using an await on each individual messages. As the librkafka is already by default batching messages this in my opinion is the incorrect usage of the library and effect the overall throughput of the the kafka output binding

Issue description

  • The kafka input trigger is based on a batch of messages coming in (libkafka).

  • When an output binding is defined the trigger will block for n seconds depending on the size of the trigger batch (around 8 seconds for 100 in a batch)

  • An output binding list/array of event are expected to produced in another kafka topic as individual events

  • Can achieve maximum of around 400-500 message per min,

  • The implementation is essential synchronous. It blocks and waits on each individual event in the output list.
    > https://github.com/Azure/azure-functions-kafka-extension/blob/dev/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerEntity.cs#L32
    > https://github.com/Azure/azure-functions-kafka-extension/blob/dev/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducer.cs#L89

  • Each of these calls will wait a preconfigured batch time “Linger,MS” (librdkafka ) before the message is sent to the kafka backend.

  • Until the output binding is completed the function is blocked and does not accepted additional events.

  • All of this means the output binding is always constrained and limited to a certain throughput.
    o As the trigger messages batches increase the function through put decreases(all events need to be sent before the next batch is triggered )
    o Modifying or tunning the librkafka library has no effect as we are always waiting on a response for a successful produce on each message (ignoring any librkafka batching mechanisms). I have run a modified version of the kafka extension with additional tunning attributes but the synchronous nature of the outputting binder means the tunning is redundant
    o Output binding is not batching these produced messages(librkafka batching)
    o This scenario applies all out of process function implementations such as Java, node, etc
    o Output binding implementation is not designed for high throughput. Our configuration seems to be geared toward low latency rather than high throughput. See librkafka docs
    Tunning underlying attributes

  • https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#high-throughput

Potential solution would be to modify the KafkaProducerEntity to perform a await.WhenAll() on the list of items sent

     ` `internal async Task SendAndCreateEntityIfNotExistsAsync<T>(T item, Guid functionInstanceId, CancellationToken cancellationToken)
    {
  
        List<Task> listOfTasks = new List<Task>();
        var kafkaProducer = this.KafkaProducerFactory.Create(this);

        // fff
        //fddf
        // This solves our original issue in the first code snippet. We are no longer in a situation where the loop is awaiting each task one by one, we are now we are now awaiting all tasks to finish before we return this methods Task to the caller as complet
        if (item is ICollection collection)
        {
            foreach (var collectionItem in collection)
            {
                 listOfTasks.Add(kafkaProducer.ProduceAsync(this.Topic, this.GetItemToProduce(collectionItem)));
               // await kafkaProducer.ProduceAsync(this.Topic, this.GetItemToProduce(collectionItem));
            }
        }
        else
        {
            listOfTasks.Add(kafkaProducer.ProduceAsync(this.Topic, this.GetItemToProduce(item)));
          //  await kafkaProducer.ProduceAsync(this.Topic, this.GetItemToProduce(item));
            // kafkaProducer.Produce(this.Topic, this.GetItemToProduce(item));
        }
        await Task.WhenAll(listOfTasks);
    }

@ivanthelad
Copy link
Author

any feedback on this issue

@raorugan
Copy link
Contributor

Want to provide an easy way to perform transformation and filtering without having to stand up a container and runtime constellation. As well as this issue they would be the requirement to all users to be able to more finely tune the librdkafka c++ library in the background. Essentially giving users the ability to perform such tuning, see [1]
[https://developers.redhat.com/articles/2022/05/03/fine-tune-kafka-performance-kafka-optimization-theorem#kafka_primitives])

consider below #11 while looking into this issue

@raorugan
Copy link
Contributor

raorugan commented Aug 11, 2022

8K in and 8K out throughput is ideal. C# in-proc throughput for output binding is pretty good. It is good to achieve similar for Java/Python as well

@ivanthelad
Copy link
Author

@raorugan any update of this item.

@shrohilla
Copy link
Contributor

@ivanthelad let me know if you are happy with this change -- #403

@shrohilla
Copy link
Contributor

@ivanthelad we had released this in 3.7.0 NuGet

@shrohilla shrohilla self-assigned this Dec 15, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants