New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(buffer_worker): refactor buffer/resource workers to always use queue and use offload mode #9642
feat(buffer_worker): refactor buffer/resource workers to always use queue and use offload mode #9642
Conversation
Based/depends on #9619 |
9fb4fc6
to
2a4a502
Compare
4793230
to
0ed22de
Compare
346b530
to
7731d38
Compare
@@ -80,11 +98,13 @@ start_link(Id, Index, Opts) -> | |||
sync_query(Id, Request, Opts) -> | |||
PickKey = maps:get(pick_key, Opts, self()), | |||
Timeout = maps:get(timeout, Opts, infinity), | |||
emqx_resource_metrics:matched_inc(Id), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Matched moved here because otherwise retries would bump the matched
metric as if they were new requests.
get_first_n_from_queue(_Q, 0, Acc) -> | ||
lists:reverse(Acc); | ||
get_first_n_from_queue(Q, N, Acc) when N > 0 -> | ||
case replayq:peek(Q) of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to pop because, previously, the head of the queue would be duplicated N times.
9ff1329
to
b39adb4
Compare
ae23aaf
to
2cf4a99
Compare
fc459c9
to
1db6a80
Compare
|
@@ -110,7 +129,9 @@ simple_async_query(Id, Request, ReplyFun) -> | |||
%% would mess up the metrics anyway. `undefined' is ignored by | |||
%% `emqx_resource_metrics:*_shift/3'. | |||
Index = undefined, | |||
Result = call_query(async, Id, Index, ?QUERY(ReplyFun, Request, false), #{}), | |||
QueryOpts = #{}, | |||
emqx_resource_metrics:matched_inc(Id), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe the block/2
also needs to add this line, emqx_resource_metrics:matched_inc(Id, length(Query)),
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. But searching the code base, I could not find any usage of block/2
🤔
I think I'll just remove it. 😺
true -> | ||
{keep_state, St, {state_timeout, ResumeT, resume}}; | ||
{keep_state, Data0, {state_timeout, ResumeT, resume}}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There may be a bug if data is in the replayQ, its HasBeenSent
will always be false
even though it may have been sent N times
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before this refactoring, the only requests that did bump the retried.*
counters were the async requests, and they are the only ones that do get inserted into the inflight table and marked as sent.
I'm not entirely sure this was the intended behavior indeed. But I think that this behavior is kept because, after this refactoring, the path that an async request does is:
- The request gets enqueued into replayq.
- It's then popped (possible in a batch).
- It's appended to the inflight table (if there's room).
- In the first ("outer")
handle_query_result
, since it'll return{async_return, ok}
, it'll be ack'ed in replayq, removing it from there. - Eventually, when
{batch_,}reply_after_query
is called, then it'll bump eitherretried.*
counter.
For sync requests, you are right: they won't bump the retried.*
counters. Do you think that's the original intention? We would need to track what has been retried somehow.
We can't change the replayq items without re-appending them to the queue, and that would change the order of the requests, unfortunately. One way could be to keep a table of hashes of requests and then check if those have been sent or not. 🤔
I'm assuming that we want to keep the order in replayq to prevent things like re-ordering messages from the same client.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally, I think we can just ignore this here, then create a track ticket to the product owner, and left the discussion to them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me: we can revisit this point in a future PR, if required. 😺
43eb968
to
9e099be
Compare
To avoid confusion for the users as to what persistence guarantees we offer when buffering bridges/resources, we will always enable offload mode for `replayq`. With this, when the buffer size is above the max segment size, it'll flush the queue to disk, but on recovery after a restart it'll clean the existing segments rather than resuming from them.
…ueue This makes the buffer/resource workers always use `replayq` for queuing, along with collecting multiple requests in a single call. This is done to avoid long message queues for the buffer workers and rely on `replayq`'s capabilities of offloading to disk and detecting overflow. Also, this deprecates the `enable_batch` and `enable_queue` resource creation options, as: i) queuing is now always enables; ii) batch_size > 1 <=> batch_enabled. The corresponding metric `dropped.queue_not_enabled` is dropped, along with `batching`. The batching is too ephemeral, especially considering a default batch time of 20 ms, and is not shown in the dashboard, so it was removed.
Co-authored-by: Zaiming (Stone) Shi <zmstone@gmail.com>
Thanks to @qzhuyan for the corrections.
9e099be
to
70eb5ff
Compare
https://emqx.atlassian.net/browse/EMQX-8623
Currently, we face several issues trying to keep resource metrics
reasonable. For example, when a resource is re-created and has its
metrics reset, but then its durable queue resumes its previous work
and leads to strange (often negative) metrics.
Instead using
counters
that are shared by more than one worker tomanage gauges, we introduce an ETS table whose key is not only scoped
by the Resource ID as before, but also by the worker ID. This way,
when a worker starts/terminates, they should set their own gauges to
their values (often 0 or
replayq:count
when resuming off a queue).With this scoping and initialization procedure, we'll hopefully avoid
hitting those strange metrics scenarios and have better control over
the gauges.
This makes the buffer/resource workers always use
replayq
forqueuing, along with collecting multiple requests in a single call.
This is done to avoid long message queues for the buffer workers and
rely on
replayq
's capabilities of offloading to disk and detectingoverflow.
Also, this deprecates the
enable_batch
andenable_queue
resourcecreation options, as: i) queuing is now always enables; ii) batch_size > 1
<=> batch_enabled. The corresponding metric
dropped.queue_not_enabled
is dropped, along withbatching
. Thebatching is too ephemeral, especially considering a default batch time
of 20 ms, and is not shown in the dashboard, so it was removed.
Also, fixes a bug related to message loss in kafka producer when connection is down. Currently, the kafka producer bridge will test the connection to kafka itself to say if the resource is connected. However, if kafka or its connection is down, this’ll make messages to be lost, as there are no buffer workers for this bridge, and, being “down”, the resource won’t call the wolff producers, leading to both message loss and wrong metrics (as there won’t be failed counters bumped).