-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(buffer_worker): avoid setting flush timer when inflight is full #10717
fix(buffer_worker): avoid setting flush timer when inflight is full #10717
Conversation
a77f6e5
to
617b032
Compare
Fixes https://emqx.atlassian.net/browse/EMQX-9902 When the buffer worker inflight window is full, we don’t need to set a timer to flush the messages again because there’s no more room, and one of the inflight windows will flush the buffer worker by calling `flush_worker`. Currently, we do set the timer on such situation, and this fact combined with the default batch time of 0 yields a busy loop situation where the CPU spins a lot while inflight messages do not return.
617b032
to
657df05
Compare
…table room is made The previous commit uncovered another bug that was hidden by it: `maybe_flush_after_async_reply` was sending a message to the wrong PID. It was sending a message to `self()` meaning to target a buffer worker, but `self()` in that context is never the buffer worker, it's the connector's worker. This change also revealed a race condition where the buffer workers could stop flushing messages. So we piggy-backed on the atomic update of the table size count to check if the buffer worker should be poked to continue flushing. This allows us to get rid of `maybe_flush_after_async_reply` altogether.
@@ -337,7 +325,8 @@ resume_from_blocked(Data) -> | |||
{next_state, running, Data} | |||
end; | |||
{expired, Ref, Batch} -> | |||
IsAcked = ack_inflight(InflightTID, Ref, Id, Index), | |||
WorkerPid = self(), | |||
IsAcked = ack_inflight(InflightTID, Ref, Id, Index, WorkerPid), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
m.b. an inline fun
ack_inflight_from_worker(InflightTID, Ref, Id, Index) ->
ack_inflight(InflightTID, Ref, Id, Index, self()).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have another PR underway that will also touch this function, I'll try to do it in that PR.
FlushCheck = dec_inflight_remove(InflightTID, Count, Removed), | ||
case FlushCheck of | ||
continue -> ok; | ||
flush -> ?MODULE:flush_worker(WorkerPid) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: not quite clear how continue
opposes to flush
😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would no_flush
or dont_flush
be better? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed here: #10740
true -> | ||
ok; | ||
false -> | ||
ack_inflight(InflightTID, Ref, Id, Index) | ||
ack_inflight(InflightTID, Ref, Id, Index, WorkerPid) | ||
end, | ||
{Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: ensure_async_worker_monitored
we also have WorkerPid
s, couldn't be this confusing?
true -> | ||
ok; | ||
false -> | ||
ack_inflight(InflightTID, Ref, Id, Index) | ||
ack_inflight(InflightTID, Ref, Id, Index, WorkerPid) | ||
end, | ||
{Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: In ensure_async_worker_monitored
we also have WorkerPid
s, couldn't be that confusing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, there are workers all around 😅
I think the differentiating point is that we have buffer workers (the current module) and async workers (from the connectors). I'll add the Async
modifier to the variable name to help with that differentiation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed here: #10740
Targeting
release-50
Fixes https://emqx.atlassian.net/browse/EMQX-9902
When the buffer worker inflight window is full, we don’t need to set a timer to flush the messages again because there’s no more room, and one of the inflight windows will flush the buffer worker by calling
flush_worker
.Currently, we do set the timer on such situation, and this fact combined with the default batch time of 0 yields a busy loop situation where the CPU spins a lot while inflight messages do not return.
This original change uncovered another bug that was hidden by it:
maybe_flush_after_async_reply
was sending a message to the wrongPID. It was sending a message to
self()
meaning to target a bufferworker, but
self()
in that context is never the buffer worker, it'sthe connector's worker.
This change also revealed a race condition where the buffer workers
could stop flushing messages. So we piggy-backed on the atomic update
of the table size count to check if the buffer worker should be poked
to continue flushing. This allows us to get rid of
maybe_flush_after_async_reply
altogether.Summary
🤖 Generated by Copilot at a77f6e5
Optimize resource buffer worker performance by removing redundant timer calls. This affects the file
emqx_resource_buffer_worker.erl
and the functionensure_flush_timer/1
.PR Checklist
Please convert it to a draft if any of the following conditions are not met. Reviewers may skip over until all the items are checked:
changes/{ce,ee}/(feat|perf|fix)-<PR-id>.en.md
filesChecklist for CI (.github/workflows) changes
changes/
dir for user-facing artifacts update