-
Notifications
You must be signed in to change notification settings - Fork 9
fix: Respect flush_queue_size in Worker.flush() #64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
bugbot run |
|
Skipping Bugbot: Bugbot is disabled for this repository |
Mercy811
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @sojingle, LGTM! Let's wait for bugbot run. I'm requesting access at https://amplitude.slack.com/archives/C08PQE45N68/p1763513616629679
|
bugbot run |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment @cursor review or bugbot run to trigger another review on this PR
igor-amp
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
|
Hi @dsaxton-1password , are you able to test this branch in a real environment? If testing is difficult, I will just proceed with the release. |
I could try, but it would likely take a while to actually trigger the issue (it has been happening maybe once a week) and you may not want to wait that long to release. If you're confident this doesn't introduce breaking changes I would go ahead and release it and we can start running the new version in a staging environment (and then in production if everything runs smoothly). |
Summary
Fix: Respect flush_queue_size in Worker.flush() to prevent payload size errors
Problem
The current
flush()implementation usespull_all()and sends all events in a single HTTP request, completely ignoring theflush_queue_sizeconfiguration. This causes:flush_queue_sizewhen payloads are too large, butflush()bypassed this entirelySolution
Thanks to @dsaxton-1password for raising this issue and submitting the PR #63. Based on that, I made further optimizations and fixed some problems caused by the modification.
Worker.flush()to batch eventsUsing
pull_allto fetch all the data first and then splitting it can reduce the number and duration of locks._create_combined_future()helperChanging the return type of flush to a Feature list would be an API change, so this helper is added to merge them back into a single feature.
flush_queue_sizereductionWhen multiple batches from the same
flush()fail, the old code would reduce the divider for EVERY failure, causing exponential over-reduction.Checklist
Note
Batch flushes by
flush_queue_sizewith a combined future and guard flush-divider increases on payload-too-large; update tests accordingly.src/amplitude/worker.py):flush()byconfiguration.flush_queue_size, submitting each batch to the thread pool._create_combined_future()that waits for all batch futures, logs failures, and raises on error; early-return when no storage or no events.src/amplitude/processor.py):PAYLOAD_TOO_LARGE, only callconfiguration._increase_flush_divider()whenlen(events) <= configuration.flush_queue_sizeto avoid multiple reductions; re-queue events as before.src/test/test_worker.py):stopto verify storage is flushed and HTTP sends occur.flush()and guarded divider increase underPAYLOAD_TOO_LARGE.Written by Cursor Bugbot for commit 954e288. Configure here.