Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Controller/improvements and tests #166

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions integration_test/extra_code_paths/path1/dummy_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ test_amoc_dist() ->
Slaves = amoc_cluster:slave_nodes(),
%% check the status of the nodes
disabled = rpc:call(Master, amoc_controller, get_status, []),
[{running, dummy_scenario, _, _} = rpc:call(Node, amoc_controller, get_status, [])
[{running, #{scenario := dummy_scenario}} = rpc:call(Node, amoc_controller, get_status, [])
|| Node <- Slaves],
%% check user ids
{N1, Nodes1, Ids1, Max1} = get_users_info(Slaves),
Expand Down Expand Up @@ -66,7 +66,7 @@ test_amoc_dist() ->
get_users_info(SlaveNodes) ->
Users = [{Node, Id} ||
Node <- SlaveNodes,
{Id, _Pid} <- rpc:call(Node, ets, tab2list, [amoc_users])],
{_Pid, Id} <- rpc:call(Node, ets, tab2list, [amoc_users])],
Ids = lists:usort([Id || {_, Id} <- Users]),
Nodes = lists:usort([Node || {Node, _} <- Users]),
N = length(Ids),
Expand Down
2 changes: 1 addition & 1 deletion src/amoc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ do(Scenario, Count, Settings) ->
add(Count) when is_integer(Count), Count > 0 ->
case is_running_locally() of
ok ->
{running, _, _, LastUserId} = amoc_controller:get_status(),
{running, #{highest_user_id := LastUserId}} = amoc_controller:get_status(),
amoc_controller:add_users(LastUserId + 1, LastUserId + Count);
Error -> Error
end.
Expand Down
105 changes: 42 additions & 63 deletions src/amoc_controller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
-behaviour(gen_server).

-define(SERVER, ?MODULE).
-define(USERS_TABLE, amoc_users).

-required_variable(#{name => interarrival, default_value => 50,
verification => {?MODULE, positive_integer, 1},
Expand All @@ -17,7 +16,6 @@
update => {?MODULE, maybe_update_interarrival_timer, 2}}).

-record(state, {scenario :: amoc:scenario() | undefined,
no_of_users = 0 :: user_count(),
last_user_id = 0 :: last_user_id(),
status = idle :: idle | running | terminating | finished |
{error, any()} | disabled,
Expand All @@ -28,8 +26,12 @@
-type state() :: #state{}.
%% Internal state of the node's controller
-type handle_call_res() :: ok | {ok, term()} | {error, term()}.
-type running_status() :: #{scenario := amoc:scenario(),
currently_running_users := user_count(),
highest_user_id := last_user_id()}.
%% Details about the scenario currently running
-type amoc_status() :: idle |
{running, amoc:scenario(), user_count(), last_user_id()} |
{running, running_status()} |
{terminating, amoc:scenario()} |
{finished, amoc:scenario()} |
{error, any()} |
Expand Down Expand Up @@ -65,6 +67,8 @@
%% ------------------------------------------------------------------
-export([maybe_update_interarrival_timer/2, positive_integer/1]).

-export([zero_users_running/0]).

%% ------------------------------------------------------------------
%% gen_server Function Exports
%% ------------------------------------------------------------------
Expand All @@ -73,6 +77,7 @@
%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------

%% @private
-spec start_link() -> {ok, pid()}.
start_link() ->
Expand Down Expand Up @@ -125,9 +130,16 @@ positive_integer(Interarrival) ->
-spec maybe_update_interarrival_timer(interarrival, term()) -> ok.
maybe_update_interarrival_timer(interarrival, _) ->
gen_server:cast(?SERVER, maybe_update_interarrival_timer).

%% @private
-spec zero_users_running() -> ok.
zero_users_running() ->
gen_server:cast(?SERVER, zero_users_running).

%% ------------------------------------------------------------------
%% gen_server Function Definitions
%% ------------------------------------------------------------------

%% @private
-spec init([]) -> {ok, state()}.
init([]) ->
Expand Down Expand Up @@ -170,6 +182,9 @@ handle_call(_Request, _From, State) ->
-spec handle_cast(any(), state()) -> {noreply, state()}.
handle_cast(maybe_update_interarrival_timer, State) ->
{noreply, maybe_update_interarrival_timer(State)};
handle_cast(zero_users_running, State) ->
NewSate = handle_zero_users_running(State),
{noreply, NewSate};
handle_cast(_Msg, State) ->
{noreply, State}.

Expand All @@ -178,9 +193,6 @@ handle_cast(_Msg, State) ->
handle_info(start_user, State) ->
NewSate = handle_start_user(State),
{noreply, NewSate};
handle_info({'DOWN', _, process, Pid, _}, State) ->
NewSate = handle_stop_user(Pid, State),
{noreply, NewSate};
handle_info(_Msg, State) ->
{noreply, State}.

Expand All @@ -205,12 +217,15 @@ handle_start_scenario(_Scenario, _Settings, #state{status = Status} = State) ->
{{error, {invalid_status, Status}}, State}.

-spec handle_stop_scenario(state()) -> {handle_call_res(), state()}.
handle_stop_scenario(#state{no_of_users = 0, status = running} = State) ->
terminate_scenario(State),
{ok, State#state{status = finished}};
handle_stop_scenario(#state{status = running} = State) ->
terminate_all_users(),
{ok, State#state{status = terminating}};
case amoc_users_sup:count_children() of
0 ->
terminate_scenario(State),
{ok, State#state{status = finished}};
_ ->
amoc_users_sup:terminate_all_children(),
{ok, State#state{status = terminating}}
end;
handle_stop_scenario(#state{status = Status} = State) ->
{{error, {invalid_status, Status}}, State}.

Expand Down Expand Up @@ -245,21 +260,18 @@ handle_add(_StartId, _EndId, #state{status = Status} = State) ->

-spec handle_remove(user_count(), boolean(), state()) -> handle_call_res().
handle_remove(Count, ForceRemove, #state{status = running, scenario = Scenario}) ->
amoc_telemetry:execute([controller, users], #{count => Count},
CountRemove = amoc_users_sup:stop_children(Count, ForceRemove),
amoc_telemetry:execute([controller, users], #{count => CountRemove},
#{scenario => Scenario, type => remove}),
Pids = case ets:match_object(?USERS_TABLE, '$1', Count) of
{Objects, _} -> [Pid || {_Id, Pid} <- Objects];
'$end_of_table' -> []
end,
amoc_users_sup:stop_children(Pids, ForceRemove),
{ok, length(Pids)};
{ok, CountRemove};
handle_remove(_Count, _ForceRemove, #state{status = Status}) ->
{error, {invalid_status, Status}}.

-spec handle_status(state()) -> amoc_status().
handle_status(#state{status = running, scenario = Scenario,
no_of_users = N, last_user_id = LastId}) ->
{running, Scenario, N, LastId};
last_user_id = LastId}) ->
N = amoc_users_sup:count_children(),
{running, #{scenario => Scenario, currently_running_users => N, highest_user_id => LastId}};
handle_status(#state{status = terminating, scenario = Scenario}) ->
{terminating, Scenario};
handle_status(#state{status = finished, scenario = Scenario}) ->
Expand All @@ -275,33 +287,18 @@ handle_disable(#state{status = Status} = State) ->

-spec handle_start_user(state()) -> state().
handle_start_user(#state{create_users = [UserId | T],
no_of_users = N,
scenario = Scenario,
scenario_state = ScenarioState} = State) ->
start_user(Scenario, UserId, ScenarioState),
State#state{create_users = T, no_of_users = N + 1};
State#state{create_users = T};
handle_start_user(#state{create_users = [], tref = TRef} = State) ->
State#state{tref = maybe_stop_timer(TRef)}.

-spec handle_stop_user(pid(), state()) -> state().
handle_stop_user(Pid, State) ->
case ets:match(?USERS_TABLE, {'$1', Pid}, 1) of
{[[UserId]], _} ->
ets:delete(?USERS_TABLE, UserId),
dec_no_of_users(State);
_ ->
State
end.

%% ------------------------------------------------------------------
%% helpers
%% ------------------------------------------------------------------
-spec start_tables() -> ok.
start_tables() -> %% ETS creation
?USERS_TABLE = ets:new(?USERS_TABLE, [named_table,
ordered_set,
protected,
{read_concurrency, true}]),
amoc_config_utils:create_amoc_config_ets(),
ok.

Expand All @@ -317,6 +314,13 @@ init_scenario(Scenario, Settings) ->
terminate_scenario(#state{scenario = Scenario, scenario_state = ScenarioState}) ->
amoc_scenario:terminate(Scenario, ScenarioState).

-spec handle_zero_users_running(state()) -> state().
handle_zero_users_running(#state{status = terminating} = State) ->
terminate_scenario(State),
State#state{status = finished};
handle_zero_users_running(State) ->
State.

-spec maybe_start_timer(timer:tref() | undefined) -> timer:tref().
maybe_start_timer(undefined) ->
{ok, TRef} = timer:send_interval(interarrival(), start_user),
Expand All @@ -330,34 +334,9 @@ maybe_stop_timer(TRef) ->
{ok, cancel} = timer:cancel(TRef),
undefined.

-spec start_user(amoc:scenario(), amoc_scenario:user_id(), any()) -> ok.
-spec start_user(amoc:scenario(), amoc_scenario:user_id(), any()) -> term().
start_user(Scenario, Id, ScenarioState) ->
{ok, Pid} = supervisor:start_child(amoc_users_sup, [Scenario, Id, ScenarioState]),
ets:insert(?USERS_TABLE, {Id, Pid}),
erlang:monitor(process, Pid),
ok.

-spec terminate_all_users() -> any().
terminate_all_users() ->
%stop all the users
Match = ets:match_object(?USERS_TABLE, '$1', 200),
terminate_all_users(Match).

%% ets:continuation/0 type is unfortunately not exported from the ets module.
-spec terminate_all_users({tuple(), term()} | '$end_of_table') -> ok.
terminate_all_users({Objects, Continuation}) ->
Pids = [Pid || {_Id, Pid} <- Objects],
amoc_users_sup:stop_children(Pids, true),
Match = ets:match_object(Continuation),
terminate_all_users(Match);
terminate_all_users('$end_of_table') -> ok.

-spec dec_no_of_users(state()) -> state().
dec_no_of_users(#state{no_of_users = 1, status = terminating} = State) ->
terminate_scenario(State),
State#state{no_of_users = 0, status = finished};
dec_no_of_users(#state{no_of_users = N} = State) ->
State#state{no_of_users = N - 1}.
amoc_users_sup:start_child(Scenario, Id, ScenarioState).

-spec interarrival() -> interarrival().
interarrival() ->
Expand Down
4 changes: 2 additions & 2 deletions src/amoc_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ start_link() ->
[ChildSpec :: supervisor:child_spec()]
}}.
init([]) ->
{ok, {{one_for_one, 5, 10},
{ok, {{rest_for_one, 5, 10},
[
?CHILD(amoc_users_sup, supervisor),
?CHILD(amoc_users_sup, worker),
?CHILD(amoc_controller, worker),
?CHILD(amoc_cluster, worker),
?CHILD(amoc_code_server, worker),
Expand Down
2 changes: 1 addition & 1 deletion src/amoc_user.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
-type state() :: term().

-spec start_link(amoc:scenario(), amoc_scenario:user_id(), state()) ->
{ok, pid()}.
{ok, pid()} | {error, term()}.
start_link(Scenario, Id, State) ->
proc_lib:start_link(?MODULE, init, [self(), Scenario, Id, State]).

Expand Down
Loading