Skip to content

Commit

Permalink
Rework all names and optionally add a really delayed test
Browse files Browse the repository at this point in the history
  • Loading branch information
NelsonVides committed Jan 2, 2024
1 parent 02bef2e commit 554ff8b
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 43 deletions.
97 changes: 61 additions & 36 deletions src/opuntia.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
-ifdef(TEST).
-export([create/2, calculate/3, convert_time_unit/3]).
-else.
-compile({inline, [create/2, calculate/3, convert_time_unit/3]}).
-compile({inline, [create/2, calculate/3, convert_time_unit/3, timediff_in_units/3,
unbounded_bucket_diff/3, exactly_available_now/2, final_state/4]}).
-endif.

-include("opuntia.hrl").
Expand Down Expand Up @@ -125,19 +126,12 @@ calculate(#token_bucket_shaper{shape = {MaximumTokens, Rate, TimeUnit},
available_tokens = LastAvailableTokens,
last_update = NativeLastUpdate,
debt = OverPenalisedInUnitsLastTime} = Shaper, TokensNowUsed, NativeNow) ->
NativeTimeSinceLastUpdate = NativeNow - NativeLastUpdate,

%% This is now a float and so will all below be, to preserve best rounding errors possible
TimeSinceLastUpdate = convert_time_unit(NativeTimeSinceLastUpdate, native, TimeUnit),
TimeDiffInUnits = timediff_in_units(TimeUnit, NativeLastUpdate, NativeNow),

%% How much we might have recovered since last time
AvailableAtGrowthRate = Rate * TimeSinceLastUpdate,
UnboundedTokenGrowth = LastAvailableTokens + AvailableAtGrowthRate,
UnboundedTokenGrowth = unbounded_bucket_diff(Rate, LastAvailableTokens, TimeDiffInUnits),

%% Real recovery cannot grow higher than the actual rate in the window frame
ExactlyAvailableNow0 = min(MaximumTokens, UnboundedTokenGrowth),
%% But it can't be negative either (which can happen if we were already in debt)
ExactlyAvailableNow = max(0, ExactlyAvailableNow0),
ExactlyAvailableNow = exactly_available_now(MaximumTokens, UnboundedTokenGrowth),

%% How many are available after using TokensNowUsed can't be smaller than zero
TokensAvailable = max(0, ExactlyAvailableNow - TokensNowUsed),
Expand All @@ -148,31 +142,62 @@ calculate(#token_bucket_shaper{shape = {MaximumTokens, Rate, TimeUnit},
%% And then MaybeDelay will be zero if TokensOverused was zero
OverUsedRateNow = TokensOverused / Rate,

%% We penalise rounding up, the most important contract is that rate will never exceed that
%% requested, but the same way timeouts in Erlang promise not to arrive any time earlier but
%% don't promise at what time in the future they would arrive, nor we promise any upper bound
%% to the limits of the shaper delay.
case OverUsedRateNow - OverPenalisedInUnitsLastTime of
%% Even after paying the old debt you incurr a new debt again
Punish when Punish >= +0.0 ->
DelayMs = convert_time_unit(Punish, TimeUnit, millisecond),
RoundedDelayMs = ceil(DelayMs),
OverPenalisedNow = RoundedDelayMs - DelayMs,
OverUsedRateNowInUnits = convert_time_unit(OverPenalisedNow, millisecond, TimeUnit),
DelayNative = convert_time_unit(RoundedDelayMs, TimeUnit, native),
RoundedDelayNative = ceil(DelayNative),
NewShaper = Shaper#token_bucket_shaper{available_tokens = TokensAvailable,
last_update = NativeNow + RoundedDelayNative,
debt = OverUsedRateNowInUnits},
{NewShaper, RoundedDelayMs};
%% I penalised you too much last time, you get off now but with a future bill
Punish when Punish < +0.0 ->
DebtInUnits = convert_time_unit(-Punish, millisecond, TimeUnit),
NewShaper = Shaper#token_bucket_shaper{available_tokens = TokensAvailable,
last_update = NativeNow,
debt = DebtInUnits},
{NewShaper, 0}
end.
NewShaper = Shaper#token_bucket_shaper{available_tokens = TokensAvailable},
Punish = OverUsedRateNow - OverPenalisedInUnitsLastTime,
final_state(NewShaper, TimeUnit, Punish, NativeNow).

-spec timediff_in_units(time_unit(), integer(), integer()) -> float().
timediff_in_units(TimeUnit, NativeLastUpdate, NativeNow) ->
%% Time difference between now and the last update, in native
NativeTimeSinceLastUpdate = NativeNow - NativeLastUpdate,
%% This is now a float and so will all below be, to preserve best rounding errors possible
convert_time_unit(NativeTimeSinceLastUpdate, native, TimeUnit).

%% Unbounded growth is calculated, with float precision, in the configured time units
%%
%% Note that it can be negative, if earlier we have penalised giving a larger `last_update' and now we
%% update even before we have reach the point in time where the previous `last_update' was set
%%
%% If the growth was negative, that means that it has grown the debt instead
-spec unbounded_bucket_diff(rate(), tokens(), float()) -> float().
unbounded_bucket_diff(Rate, LastAvailableTokens, TimeDiffInUnits) ->
%% How much we might have recovered since last time
AvailableAtGrowthRate = Rate * TimeDiffInUnits,
%% Unbounded growth at rate since the last update
LastAvailableTokens + AvailableAtGrowthRate.

%% This is the real growth considering the maximum bucket size.
-spec exactly_available_now(bucket_size(), float()) -> float().
exactly_available_now(MaximumTokens, UnboundedTokenGrowth) ->
%% Real recovery cannot grow higher than the actual rate in the window frame
ExactlyAvailableNow0 = min(MaximumTokens, UnboundedTokenGrowth),
%% But it can't be negative either which can happen if we were already in debt,
%% but this is a debt we will pay when we calculate the final punishment in final_state
max(+0.0, ExactlyAvailableNow0).

%% We penalise rounding up, the most important contract is that rate will never exceed that
%% requested, but the same way timeouts in Erlang promise not to arrive any time earlier but
%% don't promise at what time in the future they would arrive, nor we promise any upper bound
%% to the limits of the shaper delay.
%%
%% Two cases, either:
%% Punish is positive: even after paying the old debt you incurr a new debt again
%% Punish is negative: I overpenalised you last time, you get off now but with a future bill
final_state(Shaper, TimeUnit, Punish, NativeNow) when Punish >= +0.0 ->
DelayMs = convert_time_unit(Punish, TimeUnit, millisecond),
RoundedDelayMs = ceil(DelayMs),
OverPenalisedNow = RoundedDelayMs - DelayMs,
OverUsedRateNowInUnits = convert_time_unit(OverPenalisedNow, millisecond, TimeUnit),
DelayNative = convert_time_unit(RoundedDelayMs, TimeUnit, native),
RoundedDelayNative = ceil(DelayNative),
NewShaper = Shaper#token_bucket_shaper{last_update = NativeNow + RoundedDelayNative,
debt = OverUsedRateNowInUnits},
{NewShaper, RoundedDelayMs};
final_state(Shaper, TimeUnit, Punish, NativeNow) when Punish < +0.0 ->
DebtInUnits = convert_time_unit(-Punish, millisecond, TimeUnit),
NewShaper = Shaper#token_bucket_shaper{last_update = NativeNow,
debt = DebtInUnits},
{NewShaper, 0}.

%% Avoid rounding errors by using floats and float division,
%% erlang:convert_time_unit works only with integers
Expand Down
56 changes: 49 additions & 7 deletions test/opuntia_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,14 @@ groups() ->
run_shaper_without_consuming_does_not_delay,
run_basic_shaper_property,
run_stateful_server
]}
]},
%% This group is purposefully left out because it is too slow to run on CI,
%% and uses a `timer:tc/4` only available since OTP26
%% Here for the record, can be enabled locally and checked
{delays, [sequence],
[
run_with_delays
]}
].

%%%===================================================================
Expand Down Expand Up @@ -98,6 +105,25 @@ run_shaper_without_consuming_does_not_delay(_) ->
end),
run_prop(?FUNCTION_NAME, Prop, 1000, 2).

run_with_delays(_) ->
S = "ToConsume ~p, Shape ~p, TimeItTook ~p, CalculatedDelay ~p ms, in range [~p, ~p]ms, ~nLastShaper ~p,~nHistory ~p",
Prop = ?FORALL(
{TokensToSpend, Shape},
{tokens(), shape()},
begin
Shaper = opuntia:new(Shape),
{TimeItTookUs, {LastShaper, History, CalculatedDelay}} =
timer:tc(fun run_with_sleeps/2, [Shaper, TokensToSpend], native),
TimeItTookMs = opuntia:convert_time_unit(TimeItTookUs, native, millisecond),
{CannotBeFasterThan, CannotBeSlowerThan} = should_take_in_range(Shape, TokensToSpend),
AdjustCannotBeSlowerThan = CannotBeSlowerThan + 10,
Val = value_in_range(TimeItTookMs, CannotBeFasterThan, AdjustCannotBeSlowerThan),
P = [TokensToSpend, Shape, TimeItTookMs, CalculatedDelay, CannotBeFasterThan,
AdjustCannotBeSlowerThan, LastShaper, History],
success_or_log_and_return(Val andalso is_integer(CalculatedDelay), S, P)
end),
run_prop(?FUNCTION_NAME, Prop, 100, 1).

run_basic_shaper_property(_) ->
S = "ToConsume ~p, Shape ~p, CalculatedDelay ~p ms, in range [~p, ~p]ms, ~nLastShaper ~p,~nHistory ~p",
Prop = ?FORALL(
Expand Down Expand Up @@ -207,19 +233,20 @@ key() ->
tokens() ->
integer(1, 99999).

time_unit() ->
oneof([second, millisecond, microsecond, nanosecond, native]).

config() ->
union([shape_for_server(), function(0, shape_for_server())]).

shape_for_server() ->
Unit = oneof([second, millisecond, microsecond, nanosecond, native]),
%% server is slower and proper struggles with bigger numbers, not critical
ShapeGen = {integer(1, 999), integer(1, 999), Unit, boolean()},
ShapeGen = {integer(1, 999), integer(1, 999), time_unit(), boolean()},
let_shape(ShapeGen).

shape() ->
Unit = oneof([second, millisecond, microsecond, nanosecond, native]),
Int = integer(1, 99999),
ShapeGen = {Int, Int, Unit, boolean()},
ShapeGen = {Int, Int, time_unit(), boolean()},
let_shape(ShapeGen).

let_shape(ShapeGen) ->
Expand Down Expand Up @@ -252,14 +279,28 @@ should_take_in_range(#{bucket_size := MaximumTokens,
start_full := true},
ToConsume) ->
case ToConsume < MaximumTokens of
true -> {0, 0};
true -> {0, 1};
false ->
ToThrottle = ToConsume - MaximumTokens,
Expected = ToThrottle / Rate,
ExpectedMs = opuntia:convert_time_unit(Expected, TimeUnit, millisecond),
{ExpectedMs, ceil(ExpectedMs + 1)}
end.

run_with_sleeps(Shaper, ToConsume) ->
run_with_sleeps(Shaper, [], 0, ToConsume).

run_with_sleeps(Shaper, History, AccumulatedDelay, TokensLeft) when TokensLeft =< 0 ->
{Shaper, lists:reverse(History), AccumulatedDelay};
run_with_sleeps(Shaper, History, AccumulatedDelay, TokensLeft) ->
ConsumeNow = rand:uniform(TokensLeft),
{NewShaper, DelayMs} = opuntia:update(Shaper, ConsumeNow),
timer:sleep(DelayMs),
NewEvent = #{consumed => ConsumeNow, proposed_delay => DelayMs, shaper => Shaper},
NewHistory = [NewEvent | History],
NewDelay = AccumulatedDelay + DelayMs,
run_with_sleeps(NewShaper, NewHistory, NewDelay, TokensLeft - ConsumeNow).

run_shaper(Shape, ToConsume) ->
Shaper = opuntia:create(Shape, 0),
run_shaper(Shaper, [], 0, ToConsume).
Expand All @@ -270,7 +311,8 @@ run_shaper(Shaper, History, AccumulatedDelay, TokensLeft) ->
%% Uniform distributes in [1, N], and we want [0, N], so we generate [1, N+1] and subtract 1
ConsumeNow = rand:uniform(TokensLeft + 1) - 1,
{NewShaper, DelayMs} = opuntia:calculate(Shaper, ConsumeNow, 0),
NewHistory = [{ConsumeNow, DelayMs, NewShaper} | History],
NewEvent = #{consumed => ConsumeNow, proposed_delay => DelayMs, final_shaper => NewShaper},
NewHistory = [NewEvent | History],
NewDelay = AccumulatedDelay + DelayMs,
NewToConsume = TokensLeft - ConsumeNow,
case is_integer(DelayMs) andalso DelayMs >= 0 of
Expand Down

0 comments on commit 554ff8b

Please sign in to comment.