Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Telemetry/timestamps and spans #162

Merged
merged 9 commits into from
Dec 14, 2023
46 changes: 25 additions & 21 deletions guides/telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,63 +2,67 @@ Amoc also exposes the following telemetry events:

## Scenario

A telemetry span of a full scenario execution
A telemetry span of a full scenario execution globally (i.e. the exported `init/0` function):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i.e. the exported init/0 function - this is incorrect, it's rather a span between init/1 and terminate/1 call

also I don't think it's a good idea to manually emulate a span here. because scenario doesn't end itself, it finishes per user request. so duration doesn't indicate real scenario execution here. I would rather rework it into 2 events [amoc, scenario, start|stop] with duration reported for stop event. also reporting error during Scenario:termination/1 call as scenario exeption is incorrect. I think it makes sense to report every scenario callback as telemetry span (and we should do it from amoc_scenario module), and amoc_contorller should only issue telemetry events.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree to everything except the idea of running Scenario:start/ as a telemetry span in the amoc_scenario module, as this will run in the controller process and measure the time it takes to spawn an erlang process only, instead of measuring the lifespan of the entire amoc user. The current span must stay where it is already and it is the most useful one.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean here, I propose to measure exactly scenario callback time:
https://github.com/esl/amoc/blob/master/src/amoc_scenario.erl#L64

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, all of the callbacks.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oooooh, I see, my bad! Ok, one moment.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and amoc_scenario module works as a synchronous wrapper for the callbacks. it doesn't spawn anything.

```erlang
event_name: [amoc, scenario, user]
measurements: #{}
metadata: #{}
event_name: [amoc, scenario, run, _]
measurements: #{} %% As described in `telemetry:span/3`
metadata: #{scenario := module()} %% Plus as described in `telemetry:span/3`
```

A telemetry span of a full scenario execution for a user (i.e. the exported `start/1,2` function):
```erlang
event_name: [amoc, scenario, user, _]
measurements: #{} %% As described in `telemetry:span/3`
metadata: #{scenario := module(),
user_id := non_neg_integer()} %% Plus as described in `telemetry:span/3`
```

## Controller

Indicates the number of users added or removed
Indicates the number of users manually added or removed
```erlang
event_name: [amoc, controller, users]
measurements: #{count => non_neg_integer()}
metadata: #{type => add | remove}
measurements: #{count := non_neg_integer()}
metadata: #{monotonic_time := integer(), scenario := module(), type := add | remove}
```

## Throttle

### Init

Raised when a throttle mechanism is initialised.

```erlang
event_name: [amoc, throttle, init]
measurements: #{count => 1}
metadata: #{name => atom()}
measurements: #{count := 1}
metadata: #{monotonic_time := integer(), name := atom()}
```

### Rate

Raised when a throttle mechanism is initialised or its configured rate is changed.
This event is raised only on the master node.

```erlang
event_name: [amoc, throttle, rate]
measurements: #{rate => non_neg_integer()}
metadata: #{name => atom()}
measurements: #{rate := non_neg_integer()}
metadata: #{monotonic_time := integer(), name := atom(), msg => binary()}
```

### Request

Raised when a process client requests to be allowed pass through a throttled mechanism.

```erlang
event_name: [amoc, throttle, request]
measurements: #{count => 1}
metadata: #{name => atom()}
measurements: #{count := 1}
metadata: #{monotonic_time := integer(), name := atom()}
```

### Execute

Raised when a process client is allowed to execute after a throttled mechanism.

```erlang
event_name: [amoc, throttle, execute]
measurements: #{count => 1}
metadata: #{name => atom()}
measurements: #{count := 1}
metadata: #{monotonic_time := integer(), name := atom()}
```

## Coordinator
Expand All @@ -68,6 +72,6 @@ Indicates when a coordinating event was raised, like a process being added for c
### Event
```erlang
event_name: [amoc, coordinator, start | stop | add | reset | timeout]
measurements: #{count => 1}
metadata: #{name => atom()}
measurements: #{count := 1}
metadata: #{monotonic_time := integer(), name := atom()}
```
53 changes: 44 additions & 9 deletions src/amoc_controller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
status = idle :: idle | running | terminating | finished |
{error, any()} | disabled,
scenario_state :: any(), %% state returned from Scenario:init/0
scenario_start :: undefined | integer(),
scenario_ref :: undefined | reference(),
create_users = [] :: [amoc_scenario:user_id()],
tref :: timer:tref() | undefined}).

Expand Down Expand Up @@ -99,13 +101,11 @@ update_settings(Settings) ->
-spec add_users(amoc_scenario:user_id(), amoc_scenario:user_id()) ->
ok | {error, term()}.
add_users(StartId, EndId) ->
telemetry:execute([amoc, controller, users], #{count => EndId - StartId + 1}, #{type => add}),
%% adding the exact range of the users
gen_server:call(?SERVER, {add, StartId, EndId}).

-spec remove_users(user_count(), boolean()) -> {ok, user_count()}.
remove_users(Count, ForceRemove) ->
telemetry:execute([amoc, controller, users], #{count => Count}, #{type => remove}),
%% trying to remove Count users, this action is async!!!
gen_server:call(?SERVER, {remove, Count, ForceRemove}).

Expand Down Expand Up @@ -192,12 +192,20 @@ handle_info(_Msg, State) ->
-spec handle_start_scenario(module(), amoc_config:settings(), state()) ->
{handle_call_res(), state()}.
handle_start_scenario(Scenario, Settings, #state{status = idle} = State) ->
StartTime = erlang:monotonic_time(),
Ref = erlang:make_ref(),
case init_scenario(Scenario, Settings) of
{ok, ScenarioState} ->
NewState = State#state{last_user_id = 0,
scenario = Scenario,
scenario_state = ScenarioState,
scenario_start = StartTime,
scenario_ref = Ref,
status = running},
%% This simulates a span
telemetry:execute([amoc, scenario, run, start],
#{monotonic_time => StartTime, system_time => erlang:system_time()},
#{telemetry_span_context => Ref, scenario => Scenario}),
{ok, NewState};
{error, _} = Error ->
NewState = State#state{scenario = Scenario, status = Error},
Expand All @@ -207,9 +215,8 @@ handle_start_scenario(_Scenario, _Settings, #state{status = Status} = State) ->
{{error, {invalid_status, Status}}, State}.

-spec handle_stop_scenario(state()) -> {handle_call_res(), state()}.
handle_stop_scenario(#state{scenario = Scenario, scenario_state = ScenarioState,
no_of_users = 0, status = running} = State) ->
amoc_scenario:terminate(Scenario, ScenarioState),
handle_stop_scenario(#state{no_of_users = 0, status = running} = State) ->
terminate_scenario(State),
{ok, State#state{status = finished}};
handle_stop_scenario(#state{status = running} = State) ->
terminate_all_users(),
Expand All @@ -231,8 +238,11 @@ handle_update_settings(_Settings, #state{status = Status}) ->
handle_add(StartId, EndId, #state{last_user_id = LastId,
create_users = ScheduledUsers,
status = running,
scenario = Scenario,
tref = TRef} = State) when StartId =< EndId,
LastId < StartId ->
amoc_telemetry:execute([controller, users], #{count => EndId - StartId + 1},
#{scenario => Scenario, type => add}),
NewUsers = lists:seq(StartId, EndId),
NewScheduledUsers = lists:append(ScheduledUsers, NewUsers),
NewTRef = maybe_start_timer(TRef),
Expand All @@ -244,7 +254,9 @@ handle_add(_StartId, _EndId, #state{status = Status} = State) ->
{{error, {invalid_status, Status}}, State}.

-spec handle_remove(user_count(), boolean(), state()) -> handle_call_res().
handle_remove(Count, ForceRemove, #state{status = running}) ->
handle_remove(Count, ForceRemove, #state{status = running, scenario = Scenario}) ->
amoc_telemetry:execute([controller, users], #{count => Count},
#{scenario => Scenario, type => remove}),
Pids = case ets:match_object(?USERS_TABLE, '$1', Count) of
{Objects, _} -> [Pid || {_Id, Pid} <- Objects];
'$end_of_table' -> []
Expand Down Expand Up @@ -311,6 +323,30 @@ init_scenario(Scenario, Settings) ->
{error, Type, Reason} -> {error, {Type, Reason}}
end.

-spec terminate_scenario(state()) ->
ok | {ok, any()} | {error, any()}.
terminate_scenario(#state{scenario = Scenario,
scenario_state = ScenarioState,
scenario_start = StartTime,
scenario_ref = Ref}) ->
case amoc_scenario:terminate(Scenario, ScenarioState) of
{error, {Class, Reason, Stacktrace}} = Ret ->
%% This simulates a span
StopTime = erlang:monotonic_time(),
telemetry:execute([amoc, scenario, run, exception],
#{duration => StopTime - StartTime, monotonic_time => StopTime},
#{telemetry_span_context => Ref, scenario => Scenario,
kind => Class, reason => Reason, stacktrace => Stacktrace}),
Ret;
Ret ->
%% This simulates a span
StopTime = erlang:monotonic_time(),
telemetry:execute([amoc, scenario, run, stop],
#{duration => StopTime - StartTime, monotonic_time => StopTime},
#{telemetry_span_context => Ref, scenario => Scenario}),
Ret
end.

-spec maybe_start_timer(timer:tref() | undefined) -> timer:tref().
maybe_start_timer(undefined) ->
{ok, TRef} = timer:send_interval(interarrival(), start_user),
Expand Down Expand Up @@ -347,9 +383,8 @@ terminate_all_users({Objects, Continuation}) ->
terminate_all_users('$end_of_table') -> ok.

-spec dec_no_of_users(state()) -> state().
dec_no_of_users(#state{scenario = Scenario, scenario_state = ScenarioState,
no_of_users = 1, status = terminating} = State) ->
amoc_scenario:terminate(Scenario, ScenarioState),
dec_no_of_users(#state{no_of_users = 1, status = terminating} = State) ->
terminate_scenario(State),
State#state{no_of_users = 0, status = finished};
dec_no_of_users(#state{no_of_users = N} = State) ->
State#state{no_of_users = N - 1}.
Expand Down
9 changes: 4 additions & 5 deletions src/amoc_coordinator/amoc_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ start(Name, CoordinationPlan, Timeout) when ?IS_TIMEOUT(Timeout) ->
Plan = normalize_coordination_plan(CoordinationPlan),
case gen_event:start({local, Name}) of
{ok, _} ->
telemetry:execute([amoc, coordinator, start], #{count => 1}, #{name => Name}),
amoc_telemetry:execute([coordinator, start], #{count => 1}, #{name => Name}),
%% according to gen_event documentation:
%%
%% When the event is received, the event manager calls
Expand All @@ -99,7 +99,7 @@ start(Name, CoordinationPlan, Timeout) when ?IS_TIMEOUT(Timeout) ->
-spec stop(name()) -> ok.
stop(Name) ->
gen_event:stop(Name),
telemetry:execute([amoc, coordinator, stop], #{count => 1}, #{name => Name}).
amoc_telemetry:execute([coordinator, stop], #{count => 1}, #{name => Name}).

%% @see add/3
-spec add(name(), any()) -> ok.
Expand Down Expand Up @@ -156,15 +156,14 @@ init(CoordinationItem) ->
-spec handle_event(Event :: term(), state()) -> {ok, state()}.
handle_event(Event, {timeout, Name, Pid}) ->
%% there's only one "timeout" event handler for coordinator,
%% so calling telemetry:execute/3 here to ensure that it's
%% so calling amoc_telemetry:execute/3 here to ensure that it's
%% triggered just once per event.
TelemetryEvent = case Event of
{coordinate, _} -> add;
reset_coordinator -> reset;
coordinator_timeout -> timeout
end,
telemetry:execute([amoc, coordinator, TelemetryEvent],
#{count => 1}, #{name => Name}),
amoc_telemetry:execute([coordinator, TelemetryEvent], #{count => 1}, #{name => Name}),
erlang:send(Pid, Event),
{ok, {timeout, Name, Pid}};
handle_event(Event, {worker, WorkerPid}) ->
Expand Down
15 changes: 15 additions & 0 deletions src/amoc_telemetry.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
%% @private
%% @copyright 2023 Erlang Solutions Ltd.
-module(amoc_telemetry).

-export([execute/3]).

-spec execute(EventName, Measurements, Metadata) -> ok when
EventName :: telemetry:event_name(),
Measurements :: telemetry:event_measurements(),
Metadata :: telemetry:event_metadata().
execute(Name, Measurements, Metadata) ->
TimeStamp = erlang:monotonic_time(),
NameWithAmocPrefix = [amoc | Name],
MetadataWithTS = Metadata#{monotonic_time => TimeStamp},
telemetry:execute(NameWithAmocPrefix, Measurements, MetadataWithTS).
8 changes: 4 additions & 4 deletions src/amoc_throttle/amoc_throttle_controller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,10 @@ maybe_raise_event(Name, Event) ->
end.

raise_event(Name, Event) when Event =:= request; Event =:= execute; Event =:= init ->
telemetry:execute([amoc, throttle, Event], #{count => 1}, #{name => Name}).
amoc_telemetry:execute([throttle, Event], #{count => 1}, #{name => Name}).

report_rate(Name, RatePerMinute) ->
amoc_telemetry:execute([throttle, rate], #{rate => RatePerMinute}, #{name => Name}).

-spec change_rate_and_stop_plan(name(), state()) -> state().
change_rate_and_stop_plan(Name, State) ->
Expand Down Expand Up @@ -329,6 +332,3 @@ run_cmd(Pid, pause) ->
amoc_throttle_process:pause(Pid);
run_cmd(Pid, resume) ->
amoc_throttle_process:resume(Pid).

report_rate(Name, RatePerMinute) ->
telemetry:execute([amoc, throttle, rate], #{rate => RatePerMinute}, #{name => Name}).
5 changes: 3 additions & 2 deletions src/amoc_user.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@ stop(Pid, Force) when is_pid(Pid) ->
init(Parent, Scenario, Id, State) ->
proc_lib:init_ack(Parent, {ok, self()}),
process_flag(trap_exit, true),
ScenarioFun = fun() -> {amoc_scenario:start(Scenario, Id, State), #{}} end,
telemetry:span([amoc, scenario, user], #{}, ScenarioFun).
Metadata = #{scenario => Scenario, user_id => Id},
ScenarioFun = fun() -> {amoc_scenario:start(Scenario, Id, State), Metadata} end,
telemetry:span([amoc, scenario, user], Metadata, ScenarioFun).
9 changes: 4 additions & 5 deletions test/amoc_coordinator_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -327,9 +327,8 @@ assert_telemetry_events(Name, [{_Pid, Call, _Ret} | History], [Event | EventList
assert_telemetry_handler_call(Name, Call, Event) ->
EventName = [amoc, coordinator, Event],
Measurements = #{count => 1},
EventMetadata = #{name => Name},
HandlerConfig = ?TELEMETRY_HANDLER_CONFIG,
ExpectedHandlerCall = {?TELEMETRY_HANDLER, handler,
[EventName, Measurements,
EventMetadata, HandlerConfig]},
?assertEqual(ExpectedHandlerCall, Call).
?assertMatch(
{?TELEMETRY_HANDLER, handler,
[EventName, Measurements,
#{name := Name, monotonic_time := _}, HandlerConfig]}, Call).