Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: erlang/otp
base: master
...
head fork: Etsukata/otp
Checking mergeability… Don't worry, you can still create the pull request.
  • 3 commits
  • 10 files changed
  • 0 commit comments
  • 1 contributor
Commits on Mar 28, 2014
@Etsukata Etsukata async: Add erlang:statistics(async_queue)
Add new statistics feature erlang:statistic(async_queue).

erlang:statistics(async_queue) returns the total length of the
async queues, that is, the number of asynchronous works that are
ready to run on all async queues like erlang:statistics(run_queue).
c8ea819
@Etsukata Etsukata test: add a test of erlang:statistics(async_queue)
Tests that statistics(async_queue) is callable.
fcb9bd0
@Etsukata Etsukata add documentation of erlang:statistics(async_queue)
Add documentation of erlang:statistics(async_queue) and
erlang:statistics({async_queue, 1..N). Also added spec of these to
erts/preloaded/src/erlang.erl.
96f6b01
View
23 erts/doc/src/erlang.xml
@@ -4926,6 +4926,29 @@ ok
</desc>
</func>
<func>
+ <name name="statistics" arity="1" clause_i="10"/>
+ <name name="statistics" arity="1" clause_i="11"/>
+ <fsummary>Information about async_queue</fsummary>
+ <desc>
+ <p>Returns the length of the async queues.</p>
+ <taglist>
+ <tag><c>async_queue</c></tag>
+ <item>
+ <p>Returns the total length of the async queues, that is, the number
+ of asynchronous works that are ready to run on all available async
+ queues.</p>
+ </item>
+ <tag><c>{async_queue, AsyncId}</c></tag>
+ <item>
+ <p>Returns the length of Nth async queue where N equals to
+ <c><anno>AsyncId</anno></c>. If <c><anno>AsyncId</anno></c>
+ is less than 1 or greater than the number of async threads, this
+ function returns 0.</p>
+ </item>
+ </taglist>
+ </desc>
+ </func>
+ <func>
<name name="suspend_process" arity="2"/>
<fsummary>Suspend a process</fsummary>
<desc>
View
1  erts/emulator/beam/atom.names
@@ -93,6 +93,7 @@ atom arg0
atom arity
atom asn1
atom asynchronous
+atom async_queue
atom atom
atom atom_used
atom attributes
View
13 erts/emulator/beam/erl_async.c
@@ -703,7 +703,12 @@ int driver_async_cancel(unsigned int id)
return 0;
}
-
-
-
-
+Uint erts_async_queue_len(Uint qix)
+{
+#ifndef USE_THREADS
+ return 0;
+#else
+ ErtsAsyncQ *aq = async_q(qix - 1);
+ return (Uint) erts_thr_q_queue_len(&aq->thr_q);
+#endif
+}
View
2  erts/emulator/beam/erl_async.h
@@ -61,6 +61,6 @@ void *erts_get_async_ready_queue(Uint sched_id);
void erts_init_async(void);
void erts_exit_flush_async(void);
-
+Uint erts_async_queue_len(Uint qix);
#endif /* ERL_ASYNC_H__ */
View
35 erts/emulator/beam/erl_bif_info.c
@@ -61,6 +61,7 @@ static Export* alloc_sizes_trap = NULL;
static Export *gather_sched_wall_time_res_trap;
static Export *gather_gc_info_res_trap;
+static Export *gather_async_queue_len_trap;
#define DECL_AM(S) Eterm AM_ ## S = am_atom_put(#S, sizeof(#S) - 1)
@@ -3067,6 +3068,30 @@ BIF_RETTYPE process_display_2(BIF_ALIST_2)
BIF_RET(am_true);
}
+/*
+ * This function takes care of calls to erlang:statistics/1 when the argument
+ * is a tuple.
+ */
+static BIF_RETTYPE
+statistics_1_tuple(Process* BIF_P, /* Pointer to current process. */
+ Eterm* tp, /* Pointer to first element in tuple */
+ int arity) /* Arity of tuple (untagged). */
+{
+ Eterm ret;
+ Eterm sel;
+
+ sel = *tp++;
+ if (sel == am_async_queue && arity == 2) {
+ Uint qix;
+ qix = unsigned_val(*tp);
+ ret = erts_async_queue_len(qix);
+ BUMP_REDS(BIF_P, ret);
+ BIF_RET(make_small(ret));
+ }
+
+ ERTS_BIF_PREP_ERROR(ret, BIF_P, BADARG);
+ return ret;
+}
/* this is a general call which return some possibly useful information */
@@ -3075,7 +3100,11 @@ BIF_RETTYPE statistics_1(BIF_ALIST_1)
Eterm res;
Eterm* hp;
- if (BIF_ARG_1 == am_scheduler_wall_time) {
+ if (is_tuple(BIF_ARG_1)) {
+ Eterm* tp = tuple_val(BIF_ARG_1);
+ Uint arity = *tp++;
+ return statistics_1_tuple(BIF_P, tp, arityval(arity));
+ } else if (BIF_ARG_1 == am_scheduler_wall_time) {
res = erts_sched_wall_time_request(BIF_P, 0, 0);
if (is_non_value(res))
BIF_RET(am_undefined);
@@ -3130,6 +3159,8 @@ BIF_RETTYPE statistics_1(BIF_ALIST_1)
} else if (BIF_ARG_1 == am_run_queue) {
res = erts_run_queues_len(NULL);
BIF_RET(make_small(res));
+ } else if (BIF_ARG_1 == am_async_queue) {
+ BIF_TRAP1(gather_async_queue_len_trap, BIF_P, am_async_queue);
} else if (BIF_ARG_1 == am_wall_clock) {
UWord w1, w2;
Eterm b1, b2;
@@ -4062,6 +4093,8 @@ erts_bif_info_init(void)
= erts_export_put(am_erlang, am_gather_sched_wall_time_result, 1);
gather_gc_info_res_trap
= erts_export_put(am_erlang, am_gather_gc_info_result, 1);
+ gather_async_queue_len_trap
+ = erts_export_put(am_erts_internal, am_statistics, 1);
process_info_init();
os_info_init();
}
View
32 erts/emulator/beam/erl_thr_queue.c
@@ -779,3 +779,35 @@ erts_thr_q_dequeue(ErtsThrQ_t *q)
return res;
#endif
}
+
+int
+erts_thr_q_queue_len(ErtsThrQ_t *q)
+{
+ int len = 0;
+#ifndef USE_THREADS
+ ErtsThrQElement_t *tmp;
+
+ tmp = q->first;
+ if (!tmp)
+ return 0;
+ while (tmp->next != NULL) {
+ len++;
+ tmp = tmp->next;
+ }
+
+ return len;
+#else
+ ErtsThrQElement_t *tmp;
+ erts_aint_t inext;
+
+ tmp = ErtsThrQDirtyReadEl(&q->head.head);
+ inext = erts_atomic_read_acqb(&tmp->next);
+ while (inext != ERTS_AINT_NULL) {
+ len++;
+ tmp = (ErtsThrQElement_t *) inext;
+ inext = erts_atomic_read_acqb(&tmp->next);
+ }
+
+ return len;
+#endif
+}
View
1  erts/emulator/beam/erl_thr_queue.h
@@ -188,6 +188,7 @@ void erts_thr_q_append_finalize_dequeue_data(ErtsThrQFinDeQ_t *,
ErtsThrQFinDeQ_t *);
int erts_thr_q_finalize_dequeue(ErtsThrQFinDeQ_t *);
void erts_thr_q_finalize_dequeue_state_init(ErtsThrQFinDeQ_t *);
+int erts_thr_q_queue_len(ErtsThrQ_t *);
#ifdef ERTS_SMP
ERTS_GLB_INLINE ErtsThrPrgrVal erts_thr_q_need_thr_progress(ErtsThrQ_t *q);
View
9 erts/emulator/test/statistics_SUITE.erl
@@ -31,7 +31,7 @@
run_queue_one/1,
scheduler_wall_time/1,
reductions/1, reductions_big/1, garbage_collection/1, io/1,
- badarg/1]).
+ async_queue/1, badarg/1]).
%% Internal exports.
@@ -53,7 +53,7 @@ suite() -> [{ct_hooks,[ts_install_cth]}].
all() ->
[{group, wall_clock}, {group, runtime}, reductions,
reductions_big, {group, run_queue}, scheduler_wall_time,
- garbage_collection, io, badarg].
+ garbage_collection, io, async_queue, badarg].
groups() ->
[{wall_clock, [],
@@ -399,6 +399,11 @@ io(Config) when is_list(Config) ->
{{input,In},{output,Out}} when is_integer(In), is_integer(Out) -> ok
end.
+async_queue(Config) when is_list(Config) ->
+ case statistics(async_queue) of
+ N when N >= 0 -> ok
+ end.
+
badarg(doc) ->
"Tests that some illegal arguments to statistics fails.";
badarg(Config) when is_list(Config) ->
View
6 erts/preloaded/src/erlang.erl
@@ -1942,7 +1942,11 @@ spawn_opt(_Tuple) ->
(wall_clock) -> {Total_Wallclock_Time,
Wallclock_Time_Since_Last_Call} when
Total_Wallclock_Time :: non_neg_integer(),
- Wallclock_Time_Since_Last_Call :: non_neg_integer().
+ Wallclock_Time_Since_Last_Call :: non_neg_integer();
+ (async_queue) -> non_neg_integer();
+ ({async_queue, AsyncId}) -> non_neg_integer() when
+ AsyncId :: non_neg_integer().
+
statistics(_Item) ->
erlang:nif_error(undefined).
View
15 erts/preloaded/src/erts_internal.erl
@@ -28,7 +28,7 @@
-module(erts_internal).
--export([await_port_send_result/3]).
+-export([await_port_send_result/3, statistics/1]).
-export([port_command/3, port_connect/2, port_close/1,
port_control/3, port_call/3, port_info/1, port_info/2]).
@@ -44,6 +44,18 @@ await_port_send_result(Ref, Busy, Ok) ->
end.
%%
+%% erlang:statistics(async_queue) traps to
+%% erts_internal:statistics(async_queue).
+%%
+
+statistics(async_queue) ->
+ total_async_queue_len(erlang:system_info(thread_pool_size), 0).
+
+total_async_queue_len(0, Acc) -> Acc;
+total_async_queue_len(N, Acc) ->
+ total_async_queue_len(N-1, Acc+erlang:statistics({async_queue, N})).
+
+%%
%% Statically linked port NIFs
%%
@@ -139,3 +151,4 @@ port_info(_Result) ->
port_info(_Result, _Item) ->
erlang:nif_error(undefined).
+

No commit comments for this range

Something went wrong with that request. Please try again.