-
Notifications
You must be signed in to change notification settings - Fork 2.2k
chore(plugin-server): Improve kafka producer wrapper #10968
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
We're failing to send batches of messages to kafka on a semi-regular basis due to message sizes. It's unclear why this is the case as we try to limit each message batch size. This PR adds information on these failed batches to sentry error messages. Example error: https://sentry.io/organizations/posthog2/issues/3291755686/?project=6423401&query=is%3Aunresolved+level%3Aerror
This allows us to be much more accurate estimating message sizes, hopefully eliminating a class of errors
This helps avoid 'message too large' type errors (see #10968) by compressing in-flight messages. I would have preferred to use zstd, but the libraries did not compile cleanly on my machine.
This helps avoid 'message too large' type errors (see #10968) by compressing in-flight messages. I would have preferred to use zstd, but the libraries did not compile cleanly on my machine.
|
|
||
| expect(producer.currentBatch.length).toEqual(1) | ||
| expect(producer.currentBatchSize).toEqual(40) | ||
| expect(producer.currentBatchSize).toBeGreaterThan(40) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not check for what it's now supposed to be exactly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the result is kind of random (73) since it now accounts for sizes of the keys and so on.
This gets the intent across better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup, but we might mess up that function in the future to be something way too big
| const timeSinceLastFlush = Date.now() - this.lastFlushTime | ||
| if (timeSinceLastFlush > this.flushFrequencyMs || this.currentBatch.length >= this.maxQueueSize) { | ||
| if ( | ||
| this.currentBatchSize > this.maxBatchSize || |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix: immediately flush if enqueueing a too-large message. This way unrelated messages don't end up in DLQ and sentry reports have proper context.
Does this help here because this.flush() threw before (line 55) making us miss the next message, if it happened the next time we called queueMessage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. Previously if the batch was empty and a single too-large message came in, we would call flush() before appending and then append. Then when the next message comes along, that flush fails since the message in the queue is too large.
Now we append and flush immediately. Check the new test added - it would have failed before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, so that's what I thought - clearly I didn't word the question well, thanks for confirming.
This helps avoid 'message too large' type errors (see #10968) by compressing in-flight messages. I would have preferred to use zstd, but the libraries did not compile cleanly on my machine.
Problem
Kafka.produce calls are failing
Changes
Buffer.fromusage from plugin-server produce usage - this allows for more accurate message size estimationHow did you test this code?
Tests + manual checking things work