Add erlang:statistic(async_queue) to get the queue length of async queue #327

Closed
wants to merge 3 commits into
from

Projects

None yet

4 participants

@Etsukata
Contributor

Add erlang:statistic(async_queue) and erlang:statistics(async_queues).

erlang:statistics(async_queue) returns the total length of the
async queues, that is, the number of asynchronous works that are ready to run on
all async queues like erlang:statistics(run_queue).

Previous pull request: #295

Updates:

  • rebased to the latest master.
  • delays deallocation of elements in lock-free queue by using reference counter while counting.
    • delays finalize_dequeue by setting max operation count to zero while the reference count is greater than 0.
Etsukata added some commits Mar 28, 2014
@Etsukata Etsukata async: Add erlang:statistics(async_queue)
Add new statistics feature erlang:statistic(async_queue).

erlang:statistics(async_queue) returns the total length of the
async queues, that is, the number of asynchronous works that are
ready to run on all async queues like erlang:statistics(run_queue).
3e7b844
@Etsukata Etsukata test: add a test of erlang:statistics(async_queue)
Tests that statistics(async_queue) is callable.
24f2541
@Etsukata Etsukata add documentation of erlang:statistics(async_queue)
Add documentation of erlang:statistics(async_queue) and
erlang:statistics({async_queue, 1..N}). Also added spec of these to
erts/preloaded/src/erlang.erl.
d932c44
@OTP-Maintainer

Patch has passed first testings and has been assigned to be reviewed

@garazdawi
Contributor

Hello,

Just letting you know that I've seen this. I haven't had the time yet to look at it yet. I hope to be able to next week.

@rickard-green
Contributor

The new reference counter only prevents deallocation, but it does not prevent the chain of elements between head.first and head.unref_end from being modified. The thread consuming from the queue may move head.head past head.unref_end and then remove all elements between head.first and head.unref_end while there still are other threads traversing those elements.

In the original implementation other threads than the consumer can only access the list via the tail.data.last field. That is, they can only have references to the elements referenced by tail.data.last or later at some point in time. The head.unref_end pointer will be moved forward to tail.data.last when thread progress has been made in order to track elements that other threads may refer to.

Since other threads than the consumer now can enter via head.head we don't want head.unref_end to move past head.head. Unfortunately head.head can pass tail.data.last which head.unref_end isn't allowed to pass either. That is, we now want to move head.unref_end the one of head.head and tail.data.last nearest head.first.

If the consumer thread clears head->data.ptr when dequeuing it can determine if head.head points before or after tail.data.last by inspecting the element referred to by tail.data.last (or the element after if it refers to the marker element). Doing it this way the overhead for the consumer thread will be small.

I'm not to fond of traversing a potentially huge queue in a scheduler thread either (Sorry; I know we've previously suggested a one queue at a time approach). When dirty schedulers are supported one would typically want to put this work on a dirty scheduler. But until then I think one wants this work to be done in a specialized count-async-q-lengths-thread created at emulator boot.

Using an ordinary double linked list together with a mutex-condvar pair would be good enough for sending requests to the count-async-q-lengths-thread (similar to how messages are sent to the sys_msg_dispatcher thread in erl_trace.c). Sending the result back to the requesting process can be done by scheduling a call to a function on scheduler 1 using erts_schedule_misc_aux_work(1, void (*func)(void *), void *arg). The function scheduled on scheduler 1 implements sending of the result message to the requesting process. This function needs to be called in a scheduler thread since in the non-smp case it is not possible to send a message from a thread (it is also preferred in the smp case, but not strictly needed).

The requesting process would queue the request to the count-async-q-lengths-thread in the system_info() BIF, then trap out to a function in the erts_internal module waiting for the reply in an ordinary receive statement. If you want to have an example of how to trap to erlang code, have a look at how the distribution code in dist.c traps to erlang code in erlang.erl (for example dsend()) in order to set up a connection.

Regards,
Rickard Green

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment