Skip to content

Commit

Permalink
Upgrade all throttle calls to use only explicit config maps
Browse files Browse the repository at this point in the history
  • Loading branch information
NelsonVides committed Jul 1, 2024
1 parent df4b9e4 commit a2f225d
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 121 deletions.
76 changes: 27 additions & 49 deletions src/throttle/amoc_throttle.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,44 @@
-module(amoc_throttle).

%% API
-export([start/2, start/3, start/4, stop/1,
send/2, send/3, send_and_wait/2, wait/1,
-export([start/2, stop/1,
send/2, send/3, wait/1,
run/2, pause/1, resume/1,
change_rate/2, change_rate/3,
change_rate_gradually/2, change_rate_gradually/6]).

-deprecated([
{start, 3, "use start/2 with a config"},
{start, 4, "use start/2 with a config"},
{send_and_wait, 2, "use wait/1 instead"}
]).

-type name() :: atom().
-type rate() :: pos_integer().
%% Atom representing the name of the throttle.
-type rate() :: infinity | non_neg_integer().
%% Number of events per given `t:interval/0', or infinity for effectively unlocking all throttling.
%% Note that a rate of zero means effectively pausing the throttle.
-type interarrival() :: infinity | non_neg_integer().
%% Time in milliseconds between two events, or infinity for effectively pausing the throttle. Note
%% that an interarrival of zero means effectively unlocking all throttling.
-type interval() :: non_neg_integer().
%% In milliseconds, defaults to 60000 (one minute) when not given.
%% An interval of 0 means no delay at all, only the number of simultaneous executions will be
%% controlled, which corresponds to the number of processes started
-type throttle() :: #{rate := rate(),
interval := interval()}.
-type interarrival() :: #{interarrival := non_neg_integer()}.
%% In milliseconds, defaults to 60000 (one minute).
-type throttle() :: #{rate := rate(), interval := interval()} |
#{interarrival := interarrival()}.
%% Throttle unit of measurement
-type config() :: #{rate := rate(),
interval => interval(),
parallelism => non_neg_integer()}
| #{interarrival := non_neg_integer(),
| #{interarrival := interarrival(),
parallelism => non_neg_integer()}.
%% Literal throttle configuration. It can state `interarrival', in milliseconds,
%% in which case the rate per interval is calculated to allow one event every given milliseconds,
%% or, literally give the rate per interval.
%% Literal throttle configuration.

-type gradual_rate_config() :: #{from_rate := rate(),
to_rate := rate(),
-type gradual_rate_config() :: #{from_rate := non_neg_integer(),
to_rate := non_neg_integer(),
interval => interval(),
step_interval => pos_integer(),
step_size => pos_integer(),
step_count => pos_integer(),
duration => pos_integer()} |
#{from_interarrival := interarrival(),
to_interarrival := interarrival(),
step_interval => pos_integer(),
step_size => pos_integer(),
step_count => pos_integer(),
duration => pos_integer()}.
%% Configuration for a gradual throttle rate change
%%
Expand All @@ -50,31 +51,17 @@

-export_type([name/0, rate/0, interval/0, throttle/0, config/0, gradual_rate_config/0]).

%% @see start/4
%% @doc Starts the throttle mechanism for a given `Name' with a given config.
%%
%% The optional arguments are an `Interval' (default is one minute) and a ` NoOfProcesses' (default is 10).
%% `Name' is needed to identify the rate as a single test can have different rates for different tasks.
%% `Interval' is given in milliseconds and can be changed to a different value for convenience or higher granularity.
-spec start(name(), config() | rate()) -> {ok, started | already_started} | {error, any()}.
start(Name, #{} = Config) ->
amoc_throttle_controller:ensure_throttle_processes_started(Name, Config);
start(Name, Rate) ->
amoc_throttle_controller:ensure_throttle_processes_started(Name, #{rate => Rate}).

%% @see start/4
-spec start(name(), rate(), non_neg_integer()) -> {ok, started | already_started} | {error, any()}.
start(Name, Rate, Interval) ->
Config = #{rate => Rate, interval => Interval},
amoc_throttle_controller:ensure_throttle_processes_started(Name, Config).

%% @doc Starts the throttle mechanism for a given `Name' with a given `Rate' per `Interval'.
%%
%% The optional arguments are an `Interval' (default is one minute) and a ` NoOfProcesses' (default is 10).
%% `Name' is needed to identify the rate as a single test can have different rates for different tasks.
%% `Interval' is given in milliseconds and can be changed to a different value for convenience or higher granularity.
%% It also accepts a special value of `0' which limits the number of parallel executions associated with `Name' to `Rate'.
-spec start(name(), rate(), interval(), pos_integer()) ->
{ok, started | already_started} | {error, any()}.
start(Name, Rate, Interval, NoOfProcesses) ->
Config = #{rate => Rate, interval => Interval, parallelism => NoOfProcesses},
amoc_throttle_controller:ensure_throttle_processes_started(Name, Config).

%% @doc Pauses executions for the given `Name' as if `Rate' was set to `0'.
%%
%% Does not stop the scheduled rate changes.
Expand Down Expand Up @@ -127,8 +114,6 @@ change_rate_gradually(Name, FromRate, ToRate, RateInterval, StepInterval, StepCo
%%
%% `Fn' is executed in the context of a new process spawned on the same node on which
%% the process executing `run/2' runs, so a call to `run/2' is non-blocking.
%% This function is used internally by both `send' and `send_and_wait/2' functions,
%% so all those actions will be limited to the same rate when called with the same `Name'.
%%
%% Diagram showing function execution flow in distributed environment,
%% generated using https://sequencediagram.org/:
Expand Down Expand Up @@ -178,13 +163,6 @@ send(Name, Msg) ->
send(Name, Pid, Msg) ->
amoc_throttle_runner:throttle(Name, {Pid, Msg}).

%% @doc Sends and receives the given message `Msg'.
%%
%% Deprecated in favour of `wait/1'
-spec send_and_wait(name(), any()) -> ok | {error, any()}.
send_and_wait(Name, _) ->
amoc_throttle_runner:throttle(Name, wait).

%% @doc Blocks the caller until the throttle mechanism allows.
-spec wait(name()) -> ok | {error, any()}.
wait(Name) ->
Expand Down
39 changes: 26 additions & 13 deletions src/throttle/amoc_throttle_controller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
-define(DEFAULT_STEP_SIZE, 1).
-define(DEFAULT_INTERVAL, 60000). %% one minute
-define(DEFAULT_NO_PROCESSES, 10).
-define(TIMEOUT(N), (infinity =:= N orelse is_integer(N) andalso N >= 0)).
-define(NONNEG_INT(N), (is_integer(N) andalso N >= 0)).
-define(POS_INT(N), (is_integer(N) andalso N > 0)).

Expand All @@ -51,8 +52,8 @@
-type config() :: #{rate := amoc_throttle:rate(),
interval := amoc_throttle:interval(),
parallelism := non_neg_integer()}.
-type gradual_rate_change() :: #{from_rate := amoc_throttle:rate(),
to_rate := amoc_throttle:rate(),
-type gradual_rate_change() :: #{from_rate := non_neg_integer(),
to_rate := non_neg_integer(),
interval := amoc_throttle:interval(),
step_interval := pos_integer(),
step_size := pos_integer(),
Expand All @@ -71,26 +72,32 @@ start_link() ->
{ok, started | already_started} |
{error, invalid_throttle | wrong_reconfiguration | wrong_no_of_procs}.
ensure_throttle_processes_started(
Name, #{interarrival := EveryMs} = Config)
when is_atom(Name), ?NONNEG_INT(EveryMs) ->
Name, #{interarrival := Interarrival} = Config)
when is_atom(Name), ?TIMEOUT(Interarrival) ->
raise_event_on_slave_node(Name, init),
Config1 = #{rate => ?DEFAULT_INTERVAL div EveryMs, interval => ?DEFAULT_INTERVAL},
Config1 = #{rate => ?DEFAULT_INTERVAL div Interarrival, interval => ?DEFAULT_INTERVAL},
Config2 = Config1#{parallelism => maps:get(parallelism, Config, ?DEFAULT_NO_PROCESSES)},
gen_server:call(?MASTER_SERVER, {start_processes, Name, Config2});
ensure_throttle_processes_started(
Name, #{rate := Rate, interval := Interval, parallelism := NoOfProcesses} = Config)
when is_atom(Name), ?POS_INT(Rate), ?NONNEG_INT(Interval), ?POS_INT(NoOfProcesses) ->
when is_atom(Name), ?TIMEOUT(Rate), ?NONNEG_INT(Interval), ?POS_INT(NoOfProcesses) ->
raise_event_on_slave_node(Name, init),
gen_server:call(?MASTER_SERVER, {start_processes, Name, Config});
ensure_throttle_processes_started(
Name, #{rate := Rate, interval := Interval} = Config)
when is_atom(Name), ?POS_INT(Rate), ?NONNEG_INT(Interval) ->
when is_atom(Name), ?TIMEOUT(Rate), ?NONNEG_INT(Interval) ->
raise_event_on_slave_node(Name, init),
Config1 = Config#{parallelism => ?DEFAULT_NO_PROCESSES},
gen_server:call(?MASTER_SERVER, {start_processes, Name, Config1});
ensure_throttle_processes_started(
Name, #{rate := Rate, parallelism := NoOfProcesses} = Config)
when is_atom(Name), ?TIMEOUT(Rate), ?POS_INT(NoOfProcesses) ->
raise_event_on_slave_node(Name, init),
Config1 = Config#{interval => ?DEFAULT_INTERVAL},
gen_server:call(?MASTER_SERVER, {start_processes, Name, Config1});
ensure_throttle_processes_started(
Name, #{rate := Rate} = Config)
when is_atom(Name), ?POS_INT(Rate) ->
when is_atom(Name), ?TIMEOUT(Rate) ->
raise_event_on_slave_node(Name, init),
Config1 = Config#{interval => ?DEFAULT_INTERVAL, parallelism => ?DEFAULT_NO_PROCESSES},
gen_server:call(?MASTER_SERVER, {start_processes, Name, Config1});
Expand Down Expand Up @@ -147,7 +154,7 @@ init([]) ->
From :: {pid(), Tag :: term()}, state()) ->
{reply, {ok, started | already_started}, state()} |
{reply, {error, wrong_reconfiguration | wrong_no_of_procs}, state()};
({pause | resume | stop}, From :: {pid(), Tag :: term()}, state()) ->
({pause | resume | unlock | stop}, From :: {pid(), Tag :: term()}, state()) ->
{reply, ok, state()} |
{reply, Error :: any(), state()};
({change_rate, name(), amoc_throttle:rate(), amoc_throttle:interval()},
Expand Down Expand Up @@ -175,8 +182,8 @@ handle_call({pause, Name}, _From, State) ->
Error ->
{reply, Error, State}
end;
handle_call({resume, Name}, _From, State) ->
case run_in_all_processes(Name, resume) of
handle_call({Op, Name}, _From, State) when unlock =:= Op; resume =:= Op ->
case run_in_all_processes(Name, Op) of
ok ->
Info = maps:get(Name, State),
{reply, ok, State#{Name => Info#throttle_info{active = true}}};
Expand Down Expand Up @@ -266,6 +273,7 @@ continue_plan(Name, State, Info, Plan) ->
State#{Name => Info#throttle_info{rate = NewRate, change_plan = NewPlan}}.

-spec rate_per_minute(amoc_throttle:rate(), amoc_throttle:interval()) -> amoc_throttle:rate().
rate_per_minute(infinity, _) -> infinity;
rate_per_minute(_, 0) -> 0;
rate_per_minute(Rate, Interval) ->
(Rate * 60000) div Interval.
Expand All @@ -275,7 +283,7 @@ start_processes(Name, #{rate := Rate, interval := Interval, parallelism := NoOfP
raise_event(Name, init),
RatePerMinute = rate_per_minute(Rate, Interval),
report_rate(Name, RatePerMinute),
RealNoOfProcs = min(Rate, NoOfProcesses),
RealNoOfProcs = expected_no_of_processes(Rate, NoOfProcesses),
start_throttle_processes(Name, Interval, Rate, RealNoOfProcs),
#throttle_info{rate = Rate, interval = Interval, active = true, no_of_procs = RealNoOfProcs}.

Expand Down Expand Up @@ -334,7 +342,7 @@ run_in_all_processes(Name, Cmd) ->

verify_new_start_matches_running(Name, Config, Group, State) ->
#{rate := Rate, interval := Interval, parallelism := NoOfProcesses} = Config,
ExpectedNoOfProcesses = min(Rate, NoOfProcesses),
ExpectedNoOfProcesses = expected_no_of_processes(Rate, NoOfProcesses),
case {length(Group), State} of
{ExpectedNoOfProcesses, #{Name := #throttle_info{rate = Rate, interval = Interval}}} ->
{reply, {ok, already_started}, State};
Expand All @@ -344,6 +352,11 @@ verify_new_start_matches_running(Name, Config, Group, State) ->
{reply, {error, wrong_no_of_procs}, State}
end.

expected_no_of_processes(0, NoOfProcesses) ->
min(1, NoOfProcesses);
expected_no_of_processes(Rate, NoOfProcesses) ->
min(Rate, NoOfProcesses).

run_cmd(Pid, stop) ->
amoc_throttle_process:stop(Pid);
run_cmd(Pid, pause) ->
Expand Down
54 changes: 32 additions & 22 deletions src/throttle/amoc_throttle_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@
-define(DEFAULT_MSG_TIMEOUT, 60000).%% one minute

-record(state, {can_run_fn = true :: boolean(),
pause = false :: boolean(),
max_n :: non_neg_integer(),
status = running :: running | paused,
max_n :: infinity | non_neg_integer(),
name :: atom(),
n :: integer(),
n :: infinity | non_neg_integer(),
interval = 0 :: amoc_throttle:interval(), %%ms
delay_between_executions = 0 :: non_neg_integer(), %%ms
delay_between_executions = 0 :: timeout(), %%ms
tref :: timer:tref() | undefined,
schedule = [] :: [AmocThrottleRunnerProcess :: pid()],
schedule_reversed = [] :: [AmocThrottleRunnerProcess :: pid()]}).
Expand Down Expand Up @@ -120,9 +120,9 @@ handle_info(timeout, State) ->
handle_cast(stop_process, State) ->
{stop, normal, State};
handle_cast(pause_process, State) ->
{noreply, State#state{pause = true}, {continue, maybe_run_fn}};
{noreply, State#state{status = paused}, {continue, maybe_run_fn}};
handle_cast(resume_process, State) ->
{noreply, State#state{pause = false}, {continue, maybe_run_fn}};
{noreply, State#state{status = running}, {continue, maybe_run_fn}};
handle_cast({schedule, RunnerPid}, #state{schedule_reversed = SchRev, name = Name} = State) ->
amoc_throttle_controller:telemetry_event(Name, request),
{noreply, State#state{schedule_reversed = [RunnerPid | SchRev]}, {continue, maybe_run_fn}};
Expand Down Expand Up @@ -155,25 +155,28 @@ format_status(#{state := #state{} = State} = FormatStatus) ->
%% internal functions
%%------------------------------------------------------------------------------

initial_state(_Name, Interval, infinity) ->
#state{interval = Interval, n = infinity, max_n = infinity, delay_between_executions = 0};
initial_state(_Name, Interval, 0) ->
#state{interval = Interval, n = 0, max_n = 0, delay_between_executions = infinity};
initial_state(Name, Interval, Rate) when Rate > 0 ->
NewRate = case Rate < 5 of
true ->
Msg = <<"too low rate, please reduce NoOfProcesses">>,
internal_error(Msg, Name, Rate, Interval),
Rate;
false ->
Rate
end,
Delay = case {Interval, Interval div NewRate, Interval rem NewRate} of
case Rate < 5 of
true ->
Msg = <<"too low rate, please reduce NoOfProcesses">>,
internal_warning(Msg, Name, Rate, Interval);
false ->
ok
end,
Delay = case {Interval, Interval div Rate, Interval rem Rate} of
{0, _, _} -> 0; %% limit only No of simultaneous executions
{_, I, _} when I < 10 ->
Message = <<"too high rate, please increase NoOfProcesses">>,
internal_error(Message, Name, Rate, Interval),
internal_warning(Message, Name, Rate, Interval),
10;
{_, DelayBetweenExecutions, 0} -> DelayBetweenExecutions;
{_, DelayBetweenExecutions, _} -> DelayBetweenExecutions + 1
end,
#state{interval = Interval, n = NewRate, max_n = NewRate, delay_between_executions = Delay}.
#state{interval = Interval, n = Rate, max_n = Rate, delay_between_executions = Delay}.

merge_state(#state{interval = I, delay_between_executions = D, n = N, max_n = MaxN},
#state{n = OldN, max_n = OldMaxN} = OldState) ->
Expand All @@ -183,6 +186,8 @@ merge_state(#state{interval = I, delay_between_executions = D, n = N, max_n = Ma
max_n = MaxN, tref = undefined},
maybe_start_timer(NewState).

maybe_start_timer(#state{delay_between_executions = infinity, tref = undefined} = State) ->
State#state{can_run_fn = false};
maybe_start_timer(#state{delay_between_executions = 0, tref = undefined} = State) ->
State#state{can_run_fn = true};
maybe_start_timer(#state{delay_between_executions = D, tref = undefined} = State) ->
Expand All @@ -207,15 +212,20 @@ maybe_run_fn(#state{schedule = [], schedule_reversed = SchRev} = State) ->
NewSchedule = lists:reverse(SchRev),
NewState = State#state{schedule = NewSchedule, schedule_reversed = []},
maybe_run_fn(NewState);
maybe_run_fn(#state{interval = 0, pause = false, n = N} = State) when N > 0 ->
maybe_run_fn(#state{interval = 0, status = running, n = N} = State) when N > 0 ->
NewState = run_fn(State),
maybe_run_fn(NewState);
maybe_run_fn(#state{can_run_fn = true, pause = false, n = N} = State) when N > 0 ->
maybe_run_fn(#state{can_run_fn = true, status = running, n = N} = State) when N > 0 ->
NewState = run_fn(State),
NewState#state{can_run_fn = false};
maybe_run_fn(State) ->
State.

run_fn(#state{schedule = [RunnerPid | T], name = Name, n = infinity} = State) ->
erlang:monitor(process, RunnerPid),
amoc_throttle_runner:run(RunnerPid),
amoc_throttle_controller:telemetry_event(Name, execute),
State#state{schedule = T};
run_fn(#state{schedule = [RunnerPid | T], name = Name, n = N} = State) ->
erlang:monitor(process, RunnerPid),
amoc_throttle_runner:run(RunnerPid),
Expand Down Expand Up @@ -244,10 +254,10 @@ internal_event(Msg, #state{name = Name} = State) ->
amoc_telemetry:execute_log(
debug, [throttle, process], #{self => self(), name => Name, state => PrintableState}, Msg).

-spec internal_error(binary(), atom(), amoc_throttle:rate(), amoc_throttle:interval()) -> any().
internal_error(Msg, Name, Rate, Interval) ->
-spec internal_warning(binary(), atom(), amoc_throttle:rate(), amoc_throttle:interval()) -> any().
internal_warning(Msg, Name, Rate, Interval) ->
amoc_telemetry:execute_log(
error, [throttle, process], #{name => Name, rate => Rate, interval => Interval}, Msg).
warning, [throttle, process], #{name => Name, rate => Rate, interval => Interval}, Msg).

printable_state(#state{} = State) ->
Fields = record_info(fields, state),
Expand Down
Loading

0 comments on commit a2f225d

Please sign in to comment.