Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to Zstandard compress and microbatch events with output binding? #207

Open
smitty-codes opened this issue Dec 17, 2020 · 7 comments
Open
Labels
enhancement New feature or request

Comments

@smitty-codes
Copy link

smitty-codes commented Dec 17, 2020

Setup

  • Using confluent cloud, their schema registry and Avro for the serialization
  • Confluent Cloud sets all topics to compression.type=producer (producers must compress prior to sending, kafka cluster does not compress)

Goal
After my AF code has processed events and created a new list of ISpecific AVRO POCOs, I want to forward those new events to another Kafka topic but I want to send these new events as Zstd compressed microbatch.

Problem
I didn't see an argument I could pass into the KafkaAttribute output binding that lets me define a compression (in Confluent .NET client the setting is ProducerConfig.CompressionType = CompressionType.Zstd [compression.codec]) nor could I find the setting to linger to "fill up" the internal librdkafka buffer (ProducerConfig.LingerMs [linger.ms]).

See #57 and in #11 it seems the idea of providing all config options was shot down because it was messy - maybe we could just get the CompressionType setting added into KafkaProducerFactory.cs > GetProducerConfig()?

Regarding the microbatching, is that kind've what you get with using an "out" param for the output binding?
[Kafka("LocalBroker", "stringTopic")] out KafkaEventData<string>[] kafkaEventData
as opposed to using the IAsyncCollector and calling .AddAsync()?

[Kafka("LocalBroker", "stringTopic")] IAsyncCollector<KafkaEventData<string>> events,
...
await events.AddAsync(forwardEvent);

In the meantime, I assume I need to manually use Confluent's AvroSerializer() and take that byte[] and run it thru a ZStandard library (like ZstdNet) to compress it?

I also attached a Confluent Best Practices PDF which on page 25 outlines which settings to tweak to optimize for different scenarios (optimize for latency, throughput, durability and availability). You may consider exposing these settings.
confluent cloud-Best_Practices_for_Developing_Apache_Kafka_Applications_on_Confluent_Cloud.pdf

Thanks

@smitty-codes smitty-codes changed the title How to Zstandard compress events with output binding How to Zstandard compress and microbatch events with output binding? Dec 17, 2020
@TsuyoshiUshio
Copy link
Contributor

Hi @smitty-codes
Thank you for bringing up! As for [Kafka("LocalBroker", "stringTopic")] out KafkaEventData<string>[] kafkaEventData It might be difficult, since it is the feature of Azure Functions Host and consistency with other bindings. However, for the compression, we can start discussing it. It is good for you to add configuration for CompressionType.Zstd and ProducerConfig.LingerMs? I need to learn more about the compression scenario.

@TsuyoshiUshio TsuyoshiUshio added enhancement New feature or request and removed Needs: triage (functions) labels Feb 10, 2021
@smitty-codes
Copy link
Author

smitty-codes commented Feb 11, 2021

@TsuyoshiUshio
Yes being able to use CompressionType.Zstd and set ProducerConfig.LingerMs in the Kafka attribute is what I'm looking for.

These are the 2 important settings to optimize for throughput (see the Confluent PDF document > page 27 I posted above). There are other settings you may consider to optimize for other scenarios like latency, durability and availability in that document.

@TsuyoshiUshio
Copy link
Contributor

Hi @smitty-codes
I create a draft pr. feel free to add comment.

@smitty-codes
Copy link
Author

@TsuyoshiUshio What is the status on this item? I read thru the PR and it looks like it was decided this would be an attribute we can set on the function which is fine. I upgraded to 3.3.2 but didn't see this is available yet and not seeing it listed in 3.3.3 pre-release.

This is a pretty important item for us - it will really cut down on Confluent Cloud costs based on the volume of data we're anticipating to push thru kafka/CC.

@TsuyoshiUshio
Copy link
Contributor

TsuyoshiUshio commented Mar 2, 2022

The ownership of this repo has been changed. @lpapudippu Could you have a look? Current status, I create a pr, however, before my pr, a contributor already sent a pull request and in the review status. #175

@lpapudippu
Copy link
Contributor

@shrohilla Can you please take a look at it ? cc: @raorugan

@smitty-codes
Copy link
Author

Any update on this issue?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants