Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add erlang:statistic(async_queue) to get the queue length of async queue #327

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 23 additions & 0 deletions erts/doc/src/erlang.xml
Expand Up @@ -5050,6 +5050,29 @@ ok
opposed to runtime or CPU time.</p>
</desc>
</func>
<func>
<name name="statistics" arity="1" clause_i="10"/>
<name name="statistics" arity="1" clause_i="11"/>
<fsummary>Information about async_queue</fsummary>
<desc>
<p>Returns the length of the async queues.</p>
<taglist>
<tag><c>async_queue</c></tag>
<item>
<p>Returns the total length of the async queues, that is, the number
of asynchronous works that are ready to run on all available async
queues.</p>
</item>
<tag><c>{async_queue, AsyncId}</c></tag>
<item>
<p>Returns the length of Nth async queue where N equals to
<c><anno>AsyncId</anno></c>. If <c><anno>AsyncId</anno></c>
is less than 1 or greater than the number of async threads, this
function returns 0.</p>
</item>
</taglist>
</desc>
</func>
<func>
<name name="suspend_process" arity="2"/>
<fsummary>Suspend a process</fsummary>
Expand Down
1 change: 1 addition & 0 deletions erts/emulator/beam/atom.names
Expand Up @@ -95,6 +95,7 @@ atom arg0
atom arity
atom asn1
atom asynchronous
atom async_queue
atom atom
atom atom_used
atom attributes
Expand Down
10 changes: 10 additions & 0 deletions erts/emulator/beam/erl_async.c
Expand Up @@ -702,3 +702,13 @@ long driver_async(ErlDrvPort ix, unsigned int* key,

return id;
}

Uint erts_async_queue_len(Uint qix)
{
#ifndef USE_THREADS
return 0;
#else
ErtsAsyncQ *aq = async_q(qix - 1);
return (Uint) erts_thr_q_queue_len(&aq->thr_q);
#endif
}
2 changes: 1 addition & 1 deletion erts/emulator/beam/erl_async.h
Expand Up @@ -61,6 +61,6 @@ void *erts_get_async_ready_queue(Uint sched_id);

void erts_init_async(void);
void erts_exit_flush_async(void);

Uint erts_async_queue_len(Uint qix);

#endif /* ERL_ASYNC_H__ */
35 changes: 34 additions & 1 deletion erts/emulator/beam/erl_bif_info.c
Expand Up @@ -61,6 +61,7 @@ static Export* alloc_sizes_trap = NULL;

static Export *gather_sched_wall_time_res_trap;
static Export *gather_gc_info_res_trap;
static Export *gather_async_queue_len_trap;

#define DECL_AM(S) Eterm AM_ ## S = am_atom_put(#S, sizeof(#S) - 1)

Expand Down Expand Up @@ -3115,6 +3116,30 @@ BIF_RETTYPE process_display_2(BIF_ALIST_2)
BIF_RET(am_true);
}

/*
* This function takes care of calls to erlang:statistics/1 when the argument
* is a tuple.
*/
static BIF_RETTYPE
statistics_1_tuple(Process* BIF_P, /* Pointer to current process. */
Eterm* tp, /* Pointer to first element in tuple */
int arity) /* Arity of tuple (untagged). */
{
Eterm ret;
Eterm sel;

sel = *tp++;
if (sel == am_async_queue && arity == 2) {
Uint qix;
qix = unsigned_val(*tp);
ret = erts_async_queue_len(qix);
BUMP_REDS(BIF_P, ret);
BIF_RET(make_small(ret));
}

ERTS_BIF_PREP_ERROR(ret, BIF_P, BADARG);
return ret;
}

/* this is a general call which return some possibly useful information */

Expand All @@ -3123,7 +3148,11 @@ BIF_RETTYPE statistics_1(BIF_ALIST_1)
Eterm res;
Eterm* hp;

if (BIF_ARG_1 == am_scheduler_wall_time) {
if (is_tuple(BIF_ARG_1)) {
Eterm* tp = tuple_val(BIF_ARG_1);
Uint arity = *tp++;
return statistics_1_tuple(BIF_P, tp, arityval(arity));
} else if (BIF_ARG_1 == am_scheduler_wall_time) {
res = erts_sched_wall_time_request(BIF_P, 0, 0);
if (is_non_value(res))
BIF_RET(am_undefined);
Expand Down Expand Up @@ -3178,6 +3207,8 @@ BIF_RETTYPE statistics_1(BIF_ALIST_1)
} else if (BIF_ARG_1 == am_run_queue) {
res = erts_run_queues_len(NULL);
BIF_RET(make_small(res));
} else if (BIF_ARG_1 == am_async_queue) {
BIF_TRAP1(gather_async_queue_len_trap, BIF_P, am_async_queue);
} else if (BIF_ARG_1 == am_wall_clock) {
UWord w1, w2;
Eterm b1, b2;
Expand Down Expand Up @@ -4124,6 +4155,8 @@ erts_bif_info_init(void)
= erts_export_put(am_erlang, am_gather_sched_wall_time_result, 1);
gather_gc_info_res_trap
= erts_export_put(am_erlang, am_gather_gc_info_result, 1);
gather_async_queue_len_trap
= erts_export_put(am_erts_internal, am_statistics, 1);
process_info_init();
os_info_init();
}
45 changes: 44 additions & 1 deletion erts/emulator/beam/erl_thr_queue.c
Expand Up @@ -156,6 +156,7 @@ erts_thr_q_initialize(ErtsThrQ_t *q, ErtsThrQInit_t *qi)
q->tail.data.notify = noop_callback;

erts_atomic_init_nob(&q->head.head, (erts_aint_t) &q->tail.data.marker);
erts_atomic_init_nob(&q->head.refc, 0);
q->head.live = qi->live.objects;
q->head.first = &q->tail.data.marker;
q->head.unref_end = &q->tail.data.marker;
Expand Down Expand Up @@ -668,6 +669,11 @@ erts_thr_q_get_finalize_dequeue_data(ErtsThrQ_t *q, ErtsThrQFinDeQ_t *fdp)
ErtsThrQDirtySetEl(&fdp->end->next, NULL);
q->head.deq_fini.start = NULL;
q->head.deq_fini.end = NULL;
if (erts_atomic_read_nob(&q->head.refc) > 0) {
fdp->max_ops = 0;
} else {
fdp->max_ops = ERTS_THR_Q_MAX_FINI_DEQ_OPS;
}
return fdp->start != NULL;
#endif
}
Expand All @@ -683,6 +689,7 @@ erts_thr_q_append_finalize_dequeue_data(ErtsThrQFinDeQ_t *fdp0,
else
fdp0->start = fdp1->start;
fdp0->end = fdp1->end;
fdp0->max_ops = fdp1->max_ops;
}
#endif
}
Expand All @@ -692,10 +699,11 @@ int erts_thr_q_finalize_dequeue(ErtsThrQFinDeQ_t *state)
{
#ifdef USE_THREADS
ErtsThrQElement_t *start = state->start;
int max_ops = state->max_ops;
if (start) {
ErtsThrQLive_t live;
int i;
for (i = 0; i < ERTS_THR_Q_MAX_FINI_DEQ_OPS; i++) {
for (i = 0; i < max_ops; i++) {
ErtsThrQElement_t *tmp;
if (!start)
break;
Expand All @@ -719,6 +727,7 @@ erts_thr_q_finalize_dequeue_state_init(ErtsThrQFinDeQ_t *state)
#ifdef USE_THREADS
state->start = NULL;
state->end = NULL;
state->max_ops = ERTS_THR_Q_MAX_FINI_DEQ_OPS;
#endif
}

Expand Down Expand Up @@ -779,3 +788,37 @@ erts_thr_q_dequeue(ErtsThrQ_t *q)
return res;
#endif
}

int
erts_thr_q_queue_len(ErtsThrQ_t *q)
{
int len = 0;
#ifndef USE_THREADS
ErtsThrQElement_t *tmp;

tmp = q->first;
if (!tmp)
return 0;
while (tmp->next != NULL) {
len++;
tmp = tmp->next;
}

return len;
#else
ErtsThrQElement_t *tmp;
erts_aint_t inext;

erts_atomic_inc_acqb(&q->head.refc);
tmp = ErtsThrQDirtyReadEl(&q->head.head);
inext = erts_atomic_read_acqb(&tmp->next);
while (inext != ERTS_AINT_NULL) {
len++;
tmp = (ErtsThrQElement_t *) inext;
inext = erts_atomic_read_acqb(&tmp->next);
}
erts_atomic_dec_relb(&q->head.refc);

return len;
#endif
}
3 changes: 3 additions & 0 deletions erts/emulator/beam/erl_thr_queue.h
Expand Up @@ -91,6 +91,7 @@ struct ErtsThrQElement_t_ {
typedef struct {
ErtsThrQElement_t *start;
ErtsThrQElement_t *end;
int max_ops;
} ErtsThrQFinDeQ_t;

typedef enum {
Expand Down Expand Up @@ -150,6 +151,7 @@ struct ErtsThrQ_t_ {
int used_marker;
void *arg;
void (*notify)(void *);
erts_atomic_t refc; /* while refc > 0 delays dealloc */
} head;
struct {
int finalizing;
Expand Down Expand Up @@ -188,6 +190,7 @@ void erts_thr_q_append_finalize_dequeue_data(ErtsThrQFinDeQ_t *,
ErtsThrQFinDeQ_t *);
int erts_thr_q_finalize_dequeue(ErtsThrQFinDeQ_t *);
void erts_thr_q_finalize_dequeue_state_init(ErtsThrQFinDeQ_t *);
int erts_thr_q_queue_len(ErtsThrQ_t *);

#ifdef ERTS_SMP
ERTS_GLB_INLINE ErtsThrPrgrVal erts_thr_q_need_thr_progress(ErtsThrQ_t *q);
Expand Down
9 changes: 7 additions & 2 deletions erts/emulator/test/statistics_SUITE.erl
Expand Up @@ -31,7 +31,7 @@
run_queue_one/1,
scheduler_wall_time/1,
reductions/1, reductions_big/1, garbage_collection/1, io/1,
badarg/1]).
async_queue/1, badarg/1]).

%% Internal exports.

Expand All @@ -53,7 +53,7 @@ suite() -> [{ct_hooks,[ts_install_cth]}].
all() ->
[{group, wall_clock}, {group, runtime}, reductions,
reductions_big, {group, run_queue}, scheduler_wall_time,
garbage_collection, io, badarg].
garbage_collection, io, async_queue, badarg].

groups() ->
[{wall_clock, [],
Expand Down Expand Up @@ -399,6 +399,11 @@ io(Config) when is_list(Config) ->
{{input,In},{output,Out}} when is_integer(In), is_integer(Out) -> ok
end.

async_queue(Config) when is_list(Config) ->
case statistics(async_queue) of
N when N >= 0 -> ok
end.

badarg(doc) ->
"Tests that some illegal arguments to statistics fails.";
badarg(Config) when is_list(Config) ->
Expand Down
6 changes: 5 additions & 1 deletion erts/preloaded/src/erlang.erl
Expand Up @@ -2075,7 +2075,11 @@ spawn_opt(_Tuple) ->
(wall_clock) -> {Total_Wallclock_Time,
Wallclock_Time_Since_Last_Call} when
Total_Wallclock_Time :: non_neg_integer(),
Wallclock_Time_Since_Last_Call :: non_neg_integer().
Wallclock_Time_Since_Last_Call :: non_neg_integer();
(async_queue) -> non_neg_integer();
({async_queue, AsyncId}) -> non_neg_integer() when
AsyncId :: non_neg_integer().

statistics(_Item) ->
erlang:nif_error(undefined).

Expand Down
14 changes: 14 additions & 0 deletions erts/preloaded/src/erts_internal.erl
Expand Up @@ -31,6 +31,8 @@
-export([await_port_send_result/3]).
-export([binary_to_term/1, binary_to_term/2]).
-export([cmp_term/2]).
-export([statistics/1]).

-export([port_command/3, port_connect/2, port_close/1,
port_control/3, port_call/3, port_info/1, port_info/2]).

Expand All @@ -48,6 +50,18 @@ await_port_send_result(Ref, Busy, Ok) ->
{Ref, _} -> Ok
end.

%%
%% erlang:statistics(async_queue) traps to
%% erts_internal:statistics(async_queue).
%%

statistics(async_queue) ->
total_async_queue_len(erlang:system_info(thread_pool_size), 0).

total_async_queue_len(0, Acc) -> Acc;
total_async_queue_len(N, Acc) ->
total_async_queue_len(N-1, Acc+erlang:statistics({async_queue, N})).

%%
%% Statically linked port NIFs
%%
Expand Down