diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index 973b1a1fed9..ce3eb4698fa 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -482,6 +482,33 @@ ssl_certificate_max_depth = 3 ; or 403 response this setting is not needed. ;session_refresh_interval_sec = 550 +; Usage coefficient decays historic fair share usage every scheduling +; cycle. The value must be between 0.0 and 1.0. Lower values will +; ensure historic usage decays quicker and higher values means it will +; be remembered longer. +;usage_coeff = 0.5 + +; Priority coefficient decays all the fair share job priorities such +; that they uniformly drift towards the front of the run queue. At the +; default value of 0.98 it will take about 430 scheduler cycle (7 +; hours) for a single job which ran for 1 minute to drift towards the +; front of the queue (get assigned priority 0). 7 hours was picked as +; it is close the maximum error backoff interval of about 8 hours. The +; value must be between 0.0 and 1.0. A too low of a value, coupled +; with a lower max jobs or churn parameter could end up making the +; majority of job priority 0 too quickly and canceling the effect of +; the fair share algorithm. +;priority_coeff = 0.98 + + +[replicator.shares] +; Fair share configuration section. More shares result in a higher +; chance that jobs from that db get to run. The default value is 100, +; minimum is 1 and maximum is 1000. The configuration may be set even +; if the database wasn't created yet. +;_replicator = 100 + + [log] ; Possible log levels: ; debug diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl index 641443a7c72..451ad4fc8be 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler.erl @@ -55,15 +55,10 @@ stats_updater_loop/1 ]). --include("couch_replicator_scheduler.hrl"). -include("couch_replicator.hrl"). -include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). -include_lib("couch/include/couch_db.hrl"). -%% types --type event_type() :: added | started | stopped | {crashed, any()}. --type event() :: {Type:: event_type(), When :: erlang:timestamp()}. --type history() :: nonempty_list(event()). %% definitions -define(MAX_BACKOFF_EXPONENT, 10). @@ -78,13 +73,13 @@ -define(DEFAULT_SCHEDULER_INTERVAL, 60000). --record(state, {interval, timer, max_jobs, max_churn, max_history, stats_pid}). --record(job, { - id :: job_id() | '$1' | '_', - rep :: #rep{} | '_', - pid :: undefined | pid() | '$1' | '_', - monitor :: undefined | reference() | '_', - history :: history() | '_' +-record(state, { + interval = ?DEFAULT_SCHEDULER_INTERVAL, + timer, + max_jobs, + max_churn, + max_history, + stats_pid }). -record(stats_acc, { @@ -229,6 +224,7 @@ init(_) -> EtsOpts = [named_table, {keypos, #job.id}, {read_concurrency, true}, {write_concurrency, true}], ?MODULE = ets:new(?MODULE, EtsOpts), + ok = couch_replicator_share:init(), ok = config:listen_for_changes(?MODULE, nil), Interval = config:get_integer("replicator", "interval", ?DEFAULT_SCHEDULER_INTERVAL), @@ -290,6 +286,17 @@ handle_cast({set_interval, Interval}, State) when is_integer(Interval), couch_log:notice("~p: interval set to ~B", [?MODULE, Interval]), {noreply, State#state{interval = Interval}}; +handle_cast({update_shares, Key, Shares}, State) when is_binary(Key), + is_integer(Shares), Shares >= 0 -> + couch_log:notice("~p: shares for ~s set to ~B", [?MODULE, Key, Shares]), + couch_replicator_share:update_shares(Key, Shares), + {noreply, State}; + +handle_cast({reset_shares, Key}, State) when is_binary(Key) -> + couch_log:notice("~p: shares for ~s reset to default", [?MODULE, Key]), + couch_replicator_share:reset_shares(Key), + {noreply, State}; + handle_cast({update_job_stats, JobId, Stats}, State) -> case rep_state(JobId) of nil -> @@ -314,6 +321,8 @@ handle_info(reschedule, State) -> handle_info({'DOWN', _Ref, process, Pid, normal}, State) -> {ok, Job} = job_by_pid(Pid), couch_log:notice("~p: Job ~p completed normally", [?MODULE, Job#job.id]), + Interval = State#state.interval, + couch_replicator_share:charge(Job, Interval, os:timestamp()), remove_job_int(Job), update_running_jobs_stats(State#state.stats_pid), {noreply, State}; @@ -324,6 +333,8 @@ handle_info({'DOWN', _Ref, process, Pid, Reason0}, State) -> {shutdown, ShutdownReason} -> ShutdownReason; Other -> Other end, + Interval = State#state.interval, + couch_replicator_share:charge(Job, Interval, os:timestamp()), ok = handle_crashed_job(Job, Reason, State), {noreply, State}; @@ -340,6 +351,7 @@ code_change(_OldVsn, State, _Extra) -> terminate(_Reason, _State) -> + couch_replicator_share:clear(), ok. @@ -369,6 +381,15 @@ handle_config_change("replicator", "max_history", V, _, S) -> ok = gen_server:cast(?MODULE, {set_max_history, list_to_integer(V)}), {ok, S}; +handle_config_change("replicator.shares", Key, deleted, _, S) -> + ok = gen_server:cast(?MODULE, {reset_shares, list_to_binary(Key)}), + {ok, S}; + +handle_config_change("replicator.shares", Key, V, _, S) -> + ok = gen_server:cast(?MODULE, {update_shares, list_to_binary(Key), + list_to_integer(V)}), + {ok, S}; + handle_config_change(_, _, _, _, S) -> {ok, S}. @@ -449,19 +470,19 @@ pending_jobs(0) -> []; pending_jobs(Count) when is_integer(Count), Count > 0 -> - Set0 = gb_sets:new(), % [{LastStart, Job},...] + Set0 = gb_sets:new(), % [{{Priority, LastStart}, Job},...] Now = os:timestamp(), Acc0 = {Set0, Now, Count, health_threshold()}, {Set1, _, _, _} = ets:foldl(fun pending_fold/2, Acc0, ?MODULE), - [Job || {_Started, Job} <- gb_sets:to_list(Set1)]. + [Job || {_PriorityKey, Job} <- gb_sets:to_list(Set1)]. pending_fold(#job{pid = Pid}, Acc) when is_pid(Pid) -> Acc; pending_fold(Job, {Set, Now, Count, HealthThreshold}) -> - Set1 = case {not_recently_crashed(Job, Now, HealthThreshold), - gb_sets:size(Set) >= Count} of + Healthy = not_recently_crashed(Job, Now, HealthThreshold), + Set1 = case {Healthy, gb_sets:size(Set) >= Count} of {true, true} -> % Job is healthy but already reached accumulated limit, so might % have to replace one of the accumulated jobs @@ -469,7 +490,7 @@ pending_fold(Job, {Set, Now, Count, HealthThreshold}) -> {true, false} -> % Job is healthy and we haven't reached the limit, so add job % to accumulator - gb_sets:add_element({last_started(Job), Job}, Set); + gb_sets:add_element({start_priority_key(Job), Job}, Set); {false, _} -> % This job is not healthy (has crashed too recently), so skip it. Set @@ -477,24 +498,34 @@ pending_fold(Job, {Set, Now, Count, HealthThreshold}) -> {Set1, Now, Count, HealthThreshold}. -% Replace Job in the accumulator if it is older than youngest job there. -% "oldest" here means one which has been waiting to run the longest. "youngest" -% means the one with most recent activity. The goal is to keep up to Count -% oldest jobs during iteration. For example if there are jobs with these times -% accumulated so far [5, 7, 11], and start time of current job is 6. Then -% 6 < 11 is true, so 11 (youngest) is dropped and 6 inserted resulting in -% [5, 6, 7]. In the end the result might look like [1, 2, 5], for example. +% Replace Job in the accumulator if it has a higher priority (lower priority +% value) than the lowest priority there. Job priority is indexed by +% {FairSharePiority, LastStarted} tuples. If the FairSharePriority is the same +% then last started timestamp is used to pick. The goal is to keep up to Count +% oldest jobs during the iteration. For example, if there are jobs with these +% priorities accumulated so far [5, 7, 11], and the priority of current job is +% 6. Then 6 < 11 is true, so 11 (lower priority) is dropped and 6 is inserted +% resulting in [5, 6, 7]. In the end the result might look like [1, 2, 5], for +% example. +% pending_maybe_replace(Job, Set) -> - Started = last_started(Job), - {Youngest, YoungestJob} = gb_sets:largest(Set), - case Started < Youngest of + Key = start_priority_key(Job), + {LowestPKey, LowestPJob} = gb_sets:largest(Set), + case Key < LowestPKey of true -> - Set1 = gb_sets:delete({Youngest, YoungestJob}, Set), - gb_sets:add_element({Started, Job}, Set1); + Set1 = gb_sets:delete({LowestPKey, LowestPJob}, Set), + gb_sets:add_element({Key, Job}, Set1); false -> Set end. +% Starting priority key is used to order pending jobs such that the ones with a +% lower priority value and start time would sort first, so they would be the +% first to run. +% +start_priority_key(#job{} = Job) -> + {couch_replicator_share:priority(Job#job.id), last_started(Job)}. + start_jobs(Count, State) -> [start_job_int(Job, State) || Job <- pending_jobs(Count)], @@ -509,13 +540,18 @@ stop_jobs(Count, IsContinuous, State) when is_integer(Count) -> Running0 = running_jobs(), ContinuousPred = fun(Job) -> is_continuous(Job) =:= IsContinuous end, Running1 = lists:filter(ContinuousPred, Running0), - Running2 = lists:sort(fun longest_running/2, Running1), - Running3 = lists:sublist(Running2, Count), - length([stop_job_int(Job, State) || Job <- Running3]). + Running2 = [{stop_priority_key(Job), Job} || Job <- Running1], + Running3 = lists:sublist(lists:sort(Running2), Count), + length([stop_job_int(Job, State) || {_SortKey, Job} <- Running3]). -longest_running(#job{} = A, #job{} = B) -> - last_started(A) =< last_started(B). +% Lower priority jobs have higher priority values, so we negate them, that way +% when sorted, they'll come up first. If priorities are equal, jobs are sorted +% by the lowest starting times as jobs with lowest start time have been running +% the longest. +% +stop_priority_key(#job{} = Job) -> + {-couch_replicator_share:priority(Job#job.id), last_started(Job)}. not_recently_crashed(#job{history = History}, Now, HealthThreshold) -> @@ -600,6 +636,9 @@ add_job_int(#job{} = Job) -> maybe_remove_job_int(JobId, State) -> case job_by_id(JobId) of {ok, Job} -> + Now = os:timestamp(), + Interval = State#state.interval, + couch_replicator_share:charge(Job, Interval, Now), ok = stop_job_int(Job, State), true = remove_job_int(Job), couch_stats:increment_counter([couch_replicator, jobs, removes]), @@ -734,6 +773,24 @@ reset_job_process(#job{} = Job) -> -spec reschedule(#state{}) -> ok. reschedule(State) -> + % Charge all running jobs for the current interval + RunningJobs = running_jobs(), + Now = os:timestamp(), + lists:foreach(fun(Job) -> + couch_replicator_share:charge(Job, State#state.interval, Now) + end, RunningJobs), + + % Update usage table + couch_replicator_share:update_usage(), + + % Decay all the process priorities + couch_replicator_share:decay_priorities(), + + % Adjust running process priorities + lists:foreach(fun(Job) -> + couch_replicator_share:update_priority(Job) + end, RunningJobs), + StopCount = stop_excess_jobs(State, running_job_count()), rotate_jobs(State, StopCount), update_running_jobs_stats(State#state.stats_pid). @@ -1035,7 +1092,8 @@ longest_running_test() -> J0 = testjob([crashed()]), J1 = testjob([started(1)]), J2 = testjob([started(2)]), - Sort = fun(Jobs) -> lists:sort(fun longest_running/2, Jobs) end, + SortFun = fun(A, B) -> last_started(A) =< last_started(B) end, + Sort = fun(Jobs) -> lists:sort(SortFun, Jobs) end, ?assertEqual([], Sort([])), ?assertEqual([J1], Sort([J1])), ?assertEqual([J1, J2], Sort([J2, J1])), @@ -1381,11 +1439,12 @@ t_if_excess_is_trimmed_rotation_still_happens() -> t_if_transient_job_crashes_it_gets_removed() -> ?_test(begin Pid = mock_pid(), + Rep = continuous_rep(), Job = #job{ id = job1, pid = Pid, history = [added()], - rep = #rep{db_name = null, options = [{continuous, true}]} + rep = Rep#rep{db_name = null} }, setup_jobs([Job]), ?assertEqual(1, ets:info(?MODULE, size)), @@ -1399,15 +1458,20 @@ t_if_transient_job_crashes_it_gets_removed() -> t_if_permanent_job_crashes_it_stays_in_ets() -> ?_test(begin Pid = mock_pid(), + Rep = continuous_rep(), Job = #job{ id = job1, pid = Pid, history = [added()], - rep = #rep{db_name = <<"db1">>, options = [{continuous, true}]} + rep = Rep#rep{db_name = <<"db1">>} }, setup_jobs([Job]), ?assertEqual(1, ets:info(?MODULE, size)), - State = #state{max_jobs =1, max_history = 3, stats_pid = self()}, + State = #state{ + max_jobs = 1, + max_history = 3, + stats_pid = self() + }, {noreply, State} = handle_info({'DOWN', r1, process, Pid, failed}, State), ?assertEqual(1, ets:info(?MODULE, size)), @@ -1419,21 +1483,11 @@ t_if_permanent_job_crashes_it_stays_in_ets() -> t_existing_jobs() -> ?_test(begin - Rep = #rep{ - id = job1, - db_name = <<"db">>, - source = <<"s">>, - target = <<"t">>, - options = [{continuous, true}] - }, + Rep0 = continuous_rep(<<"s">>, <<"t">>), + Rep = Rep0#rep{id = job1, db_name = <<"db">>}, setup_jobs([#job{id = Rep#rep.id, rep = Rep}]), - NewRep = #rep{ - id = Rep#rep.id, - db_name = <<"db">>, - source = <<"s">>, - target = <<"t">>, - options = [{continuous, true}] - }, + NewRep0 = continuous_rep(<<"s">>, <<"t">>), + NewRep = NewRep0#rep{id = Rep#rep.id, db_name = <<"db">>}, ?assert(existing_replication(NewRep)), ?assertNot(existing_replication(NewRep#rep{source = <<"s1">>})), ?assertNot(existing_replication(NewRep#rep{target = <<"t1">>})), @@ -1443,15 +1497,12 @@ t_existing_jobs() -> t_job_summary_running() -> ?_test(begin + Rep = rep(<<"s">>, <<"t">>), Job = #job{ id = job1, pid = mock_pid(), history = [added()], - rep = #rep{ - db_name = <<"db1">>, - source = <<"s">>, - target = <<"t">> - } + rep = Rep#rep{db_name = <<"db1">>} }, setup_jobs([Job]), Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), @@ -1472,7 +1523,7 @@ t_job_summary_pending() -> id = job1, pid = undefined, history = [stopped(20), started(10), added()], - rep = #rep{source = <<"s">>, target = <<"t">>} + rep = rep(<<"s">>, <<"t">>) }, setup_jobs([Job]), Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), @@ -1492,7 +1543,7 @@ t_job_summary_crashing_once() -> Job = #job{ id = job1, history = [crashed(?DEFAULT_HEALTH_THRESHOLD_SEC + 1), started(0)], - rep = #rep{source = <<"s">>, target = <<"t">>} + rep = rep(<<"s">>, <<"t">>) }, setup_jobs([Job]), Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), @@ -1508,7 +1559,7 @@ t_job_summary_crashing_many_times() -> Job = #job{ id = job1, history = [crashed(4), started(3), crashed(2), started(1)], - rep = #rep{source = <<"s">>, target = <<"t">>} + rep = rep(<<"s">>, <<"t">>) }, setup_jobs([Job]), Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), @@ -1521,19 +1572,18 @@ t_job_summary_crashing_many_times() -> t_job_summary_proxy_fields() -> ?_test(begin + Src = #httpdb{ + url = "https://s", + proxy_url = "http://u:p@sproxy:12" + }, + Tgt = #httpdb{ + url = "http://t", + proxy_url = "socks5://u:p@tproxy:34" + }, Job = #job{ id = job1, history = [started(10), added()], - rep = #rep{ - source = #httpdb{ - url = "https://s", - proxy_url = "http://u:p@sproxy:12" - }, - target = #httpdb{ - url = "http://t", - proxy_url = "socks5://u:p@tproxy:34" - } - } + rep = rep(Src, Tgt) }, setup_jobs([Job]), Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), @@ -1548,6 +1598,8 @@ t_job_summary_proxy_fields() -> setup_all() -> catch ets:delete(?MODULE), + meck:expect(config, get, 1, []), + meck:expect(config, get, 2, undefined), meck:expect(couch_log, notice, 2, ok), meck:expect(couch_log, warning, 2, ok), meck:expect(couch_log, error, 2, ok), @@ -1555,10 +1607,13 @@ setup_all() -> meck:expect(couch_stats, increment_counter, 1, ok), meck:expect(couch_stats, update_gauge, 2, ok), Pid = mock_pid(), - meck:expect(couch_replicator_scheduler_sup, start_child, 1, {ok, Pid}). + meck:expect(couch_replicator_scheduler_sup, start_child, 1, {ok, Pid}), + couch_replicator_share:init(). + teardown_all(_) -> + couch_replicator_share:clear(), catch ets:delete(?MODULE), meck:unload(). @@ -1567,7 +1622,8 @@ setup() -> meck:reset([ couch_log, couch_replicator_scheduler_sup, - couch_stats + couch_stats, + config ]). @@ -1621,13 +1677,32 @@ mock_state(MaxJobs, MaxChurn) -> }. +rep() -> + #rep{options = [], user_ctx = #user_ctx{}}. + + +rep(Src, Tgt) -> + Rep = rep(), + Rep#rep{source = Src, target = Tgt}. + + +continuous_rep() -> + #rep{options = [{continuous, true}], user_ctx = #user_ctx{}}. + + +continuous_rep(Src, Tgt) -> + Rep = continuous_rep(), + Rep#rep{source = Src, target = Tgt}. + + + continuous(Id) when is_integer(Id) -> Started = Id, Hist = [stopped(Started+1), started(Started), added()], #job{ id = Id, history = Hist, - rep = #rep{options = [{continuous, true}]} + rep = continuous_rep() }. @@ -1637,7 +1712,7 @@ continuous_running(Id) when is_integer(Id) -> #job{ id = Id, history = [started(Started), added()], - rep = #rep{options = [{continuous, true}]}, + rep = continuous_rep(), pid = Pid, monitor = monitor(process, Pid) }. @@ -1646,7 +1721,7 @@ continuous_running(Id) when is_integer(Id) -> oneshot(Id) when is_integer(Id) -> Started = Id, Hist = [stopped(Started + 1), started(Started), added()], - #job{id = Id, history = Hist, rep = #rep{options = []}}. + #job{id = Id, history = Hist, rep = rep()}. oneshot_running(Id) when is_integer(Id) -> @@ -1655,7 +1730,7 @@ oneshot_running(Id) when is_integer(Id) -> #job{ id = Id, history = [started(Started), added()], - rep = #rep{options = []}, + rep = rep(), pid = Pid, monitor = monitor(process, Pid) }. diff --git a/src/couch_replicator/src/couch_replicator_share.erl b/src/couch_replicator/src/couch_replicator_share.erl new file mode 100644 index 00000000000..1f28d514829 --- /dev/null +++ b/src/couch_replicator/src/couch_replicator_share.erl @@ -0,0 +1,271 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +% Algorithm implemented here is based on the "A Fair Share Scheduler" by Judy +% Kay and Piers Lauder [1]. +% +% [1] : https://proteusmaster.urcf.drexel.edu/urcfwiki/images/KayLauderFairShare.pdf +% + +-module(couch_replicator_share). + +-export([ + init/0, + clear/0, + + update_shares/2, + reset_shares/1, + + job_added/1, + job_removed/1, + + priority/1, + usage/1, + num_jobs/1, + shares/1, + + charge/3, + + decay_priorities/0, + update_priority/1, + update_usage/0 +]). + + +-include_lib("couch/include/couch_db.hrl"). +-include("couch_replicator.hrl"). + + +% Usage coefficient decays historic usage every scheduling cycle. For example, +% the usage value for a job running 1 minute is 60000000 (i.e microseconds / +% minute). If the job stops running it will take about 26 cycles (minutes) for +% it to decay to 0 and the system to "forget" about it completely: +% +% trunc(60000000 * math:pow(0.5, 26)) = 0 +% +-define(DEFAULT_USAGE_COEFF, 0.5). + +% Priority coefficient decays all the job priorities such that they slowly +% drift towards the front of the run queue. The priority value for a single job +% which ran one for 1 minute scheduler cycle and has the default number of 100 +% shares is 60000000 / (100 * 100) = 6000. If coefficient is 0.98 it wil take +% about 430 cycles i.e. about 7 hours for the job to drift towards the front of +% the queue: +% +% trunc(6000 * math:pow(0.98, 431)) = 0 +% 430 / 60 = 7.2 hrs +% +-define(DEFAULT_PRIORITY_COEFF, 0.98). + + +-define(MIN_SHARES, 1). +-define(MAX_SHARES, 1000). +-define(DEFAULT_SHARES, 100). + +-define(SHARES, couch_replicator_shares). +-define(PRIORITIES, couch_replicator_priorities). +-define(USAGE, couch_replicator_usage). +-define(CHARGES, couch_replicator_stopped_usage). +-define(NUM_JOBS, couch_replicator_num_jobs). + + +init() -> + EtsOpts = [named_table, public], + ?SHARES = ets:new(?SHARES, EtsOpts), % {Key, Shares} + ?PRIORITIES = ets:new(?PRIORITIES, EtsOpts), % {JobId, Priority} + ?USAGE = ets:new(?USAGE, EtsOpts), % {Key, Usage} + ?CHARGES = ets:new(?CHARGES, EtsOpts), % {Key, Charges} + ?NUM_JOBS = ets:new(?NUM_JOBS, EtsOpts), % {Key, NumJobs} + lists:foreach(fun({K, V}) -> + update_shares(list_to_binary(K), list_to_integer(V)) + end, config:get("replicator.shares")). + + +clear() -> + Tables = [?SHARES, ?PRIORITIES, ?USAGE, ?CHARGES, ?NUM_JOBS], + lists:foreach(fun(T) -> catch ets:delete(T) end, Tables). + + +% This should be called when user updates the replicator.shares config section +% +update_shares(Key, Shares) when is_integer(Shares) -> + ets:insert(?SHARES, {Key, min(?MAX_SHARES, max(?MIN_SHARES, Shares))}). + + +% Called when the config value is deleted and shares are reset to the default +% value. +reset_shares(Key) -> + ets:delete(?SHARES, Key). + + +job_added(#job{} = Job) -> + Key = key(Job), + ets:update_counter(?NUM_JOBS, Key, 1, {Key, 0}), + % Update job's priority as if it ran during one scheduler cycle. This is so + % new jobs don't get to be at priority 0 (highest). + update_priority(Job). + + +job_removed(#job{} = Job) -> + Key = key(Job), + ets:delete(?PRIORITIES, Job#job.id), + case ets:update_counter(?NUM_JOBS, Key, -1, {Key, 0}) of + N when is_integer(N), N =< 0 -> + ets:delete(?NUM_JOBS, Key); + N when is_integer(N), N > 0 -> + ok + end, + ok. + + +priority(JobId) -> + % Not found means it was removed because it's value was 0 + case ets:lookup(?PRIORITIES, JobId) of + [{_, Priority}] -> Priority; + [] -> 0 + end. + + +usage(Key) -> + case ets:lookup(?USAGE, Key) of + [{_, Usage}] -> Usage; + [] -> 0 + end. + + +num_jobs(Key) -> + case ets:lookup(?NUM_JOBS, Key) of + [{_, NumJobs}] -> NumJobs; + [] -> 0 + end. + + +shares(Key) -> + case ets:lookup(?SHARES, Key) of + [{_, Shares}] -> Shares; + [] -> ?DEFAULT_SHARES + end. + + +charge(#job{pid = undefined}, _, _) -> + 0; + +charge(#job{} = Job, Interval, {_, _, _} = Now) when is_integer(Interval) -> + Key = key(Job), + Charges = job_charges(Job, Interval, Now), + ets:update_counter(?CHARGES, Key, Charges, {Key, 0}). + + +% In [1] this described in the "Decay of Process Priorities" section +% +decay_priorities() -> + decay(?PRIORITIES, priority_coeff()), + % If priority becomes 0, it's removed. When looking it up, if it + % is missing we assume it is 0 + clear_zero(?PRIORITIES). + + +% This is the main part of the alrgorithm. In [1] it is described in the +% "Priority Adjustment" section. +% +update_priority(#job{} = Job) -> + Id = Job#job.id, + Key = key(Job), + Shares = shares(Key), + Priority = (usage(Key) * num_jobs(Key)) / (Shares * Shares), + ets:update_counter(?PRIORITIES, Id, trunc(Priority), {Id, 0}). + + +% This is the "User-Level Scheduling" part from [1] +% +update_usage() -> + decay(?USAGE, usage_coeff()), + clear_zero(?USAGE), + ets:foldl(fun({Key, Charges}, _) -> + ets:update_counter(?USAGE, Key, Charges, {Key, 0}) + end, 0, ?CHARGES), + % Start each interval with a fresh charges table + ets:delete_all_objects(?CHARGES). + + +% Private helper functions + +decay(Ets, Coeff) when is_atom(Ets) -> + Head = {'$1', '$2'}, + Result = {{'$1', {trunc, {'*', '$2', {const, Coeff}}}}}, + ets:select_replace(Ets, [{Head, [], [Result]}]). + + +clear_zero(Ets) when is_atom(Ets) -> + ets:select_delete(Ets, [{{'_', 0}, [], [true]}]). + + +key(#job{} = Job) -> + Rep = Job#job.rep, + case is_binary(Rep#rep.db_name) of + true -> Rep#rep.db_name; + false -> (Rep#rep.user_ctx)#user_ctx.name + end. + + +% Jobs are charged based on the amount of time the job was running during the +% last scheduling interval. The time units used are microseconds in order to +% have a large enough usage values so that when priority is calculated the +% rounded value won't be rounded off to 0 easily. The formula for the priority +% calculation is: +% +% Priority = (Usage * NumJobs) / Shares^2 +% +% Then in the worst case of a single job in the db, running only for one +% second,for one job, with 1000 (max) shares, the priority would be: +% +% 1000000 * 1 / (1000^2) = 1 +% +job_charges(#job{} = Job, IntervalMSec, {_, _, _} = Now) -> + TimeRunning = timer:now_diff(Now, last_started(Job)), + IntervalUSec = IntervalMSec * 1000, + min(IntervalUSec, max(0, TimeRunning)). + + +last_started(#job{} = Job) -> + case lists:keyfind(started, 1, Job#job.history) of + false -> {0, 0, 0}; % In case user set too low of a max history + {started, When} -> When + end. + + +% Config helper functions + +priority_coeff() -> + % This is the K2 coefficient from [1] + Default = ?DEFAULT_PRIORITY_COEFF, + Val = float_val(config:get("replicator", "priority_coeff"), Default), + max(0.0, min(1.0, Val)). + + +usage_coeff() -> + % This is the K1 coefficient from [1] + Default = ?DEFAULT_USAGE_COEFF, + Val = float_val(config:get("replicator", "usage_coeff"), Default), + max(0.0, min(1.0, Val)). + + +float_val(undefined, Default) -> + Default; + +float_val(Str, Default) when is_list(Str) -> + try list_to_float(Str) of + Val -> Val + catch + error:badarg -> + Default + end. diff --git a/src/couch_replicator/test/eunit/couch_replicator_share_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_share_tests.erl new file mode 100644 index 00000000000..fb752b6c260 --- /dev/null +++ b/src/couch_replicator/test/eunit/couch_replicator_share_tests.erl @@ -0,0 +1,492 @@ +-module(couch_replicator_share_tests). + + +-include_lib("couch/include/couch_db.hrl"). +-include_lib("couch_replicator/src/couch_replicator.hrl"). +-include_lib("couch/include/couch_eunit.hrl"). +-include("couch_replicator_test.hrl"). + + +-define(SHARES, couch_replicator_shares). +-define(PRIORITIES, couch_replicator_priorities). +-define(USAGE, couch_replicator_usage). +-define(CHARGES, couch_replicator_stopped_usage). +-define(NUM_JOBS, couch_replicator_num_jobs). + +-define(DB1, <<"db1">>). +-define(DB2, <<"db2">>). +-define(DB3, <<"db3">>). +-define(J1, <<"j1">>). +-define(J2, <<"j2">>). +-define(J3, <<"j3">>). + + +fair_share_test_() -> + { + setup, + fun setup_all/0, + fun teardown_all/1, + { + foreach, + fun setup/0, + fun teardown/1, + [ + ?TDEF_FE(init_works), + ?TDEF_FE(shares_are_updated_and_reset), + ?TDEF_FE(jobs_are_added_and_removed), + ?TDEF_FE(can_fetch_job_priority), + ?TDEF_FE(jobs_are_charged), + ?TDEF_FE(usage_is_updated), + ?TDEF_FE(priority_coefficient_works), + ?TDEF_FE(priority_decays_when_jobs_stop_running), + ?TDEF_FE(priority_increases_when_jobs_run), + ?TDEF_FE(two_dbs_equal_shares_equal_number_of_jobs), + ?TDEF_FE(two_dbs_unequal_shares_equal_number_of_jobs), + ?TDEF_FE(two_dbs_equal_shares_unequal_number_of_jobs), + ?TDEF_FE(two_dbs_unequal_shares_unequal_number_of_jobs), + ?TDEF_FE(three_dbs_equal_shares_equal_number_of_jobs), + ?TDEF_FE(three_dbs_unequal_shares_equal_number_of_jobs), + ?TDEF_FE(three_dbs_equal_shares_unequal_number_of_jobs), + ?TDEF_FE(three_dbs_unequal_shares_unequal_number_of_jobs) + ] + } + }. + + +setup_all() -> + test_util:start_applications([config]). + + +teardown_all(Ctx) -> + config_delete("priority_coeff"), + config_delete("usage_coeff"), + config_shares_delete(), + test_util:stop_applications(Ctx). + + +setup() -> + couch_replicator_share:init(), + ok. + + +teardown(_) -> + couch_replicator_share:clear(), + config_delete("priority_coeff"), + config_delete("usage_coeff"), + config_shares_delete(). + + +init_works(_)-> + Tables = [?SHARES, ?PRIORITIES, ?USAGE, ?CHARGES, ?NUM_JOBS], + [?assert(is_list(ets:info(T))) || T <- Tables], + ?assertEqual(#{}, tab2map(?SHARES)), + + couch_replicator_share:clear(), + [?assertEqual(undefined, ets:info(T)) || T <- Tables], + + config_share_set("db1", "200"), + couch_replicator_share:init(), + ?assertEqual(200, couch_replicator_share:shares(?DB1)), + ?assertEqual(#{?DB1 => 200}, tab2map(?SHARES)). + + +shares_are_updated_and_reset(_) -> + ?assertEqual(#{}, tab2map(?SHARES)), + + couch_replicator_share:update_shares(?DB1, 42), + ?assertEqual(42, couch_replicator_share:shares(?DB1)), + + couch_replicator_share:reset_shares(?DB1), + ?assertEqual(100, couch_replicator_share:shares(?DB1)), + ?assertEqual(#{}, tab2map(?SHARES)), + + % min shares + couch_replicator_share:update_shares(?DB1, 0), + ?assertEqual(1, couch_replicator_share:shares(?DB1)), + + % max shares + couch_replicator_share:update_shares(?DB1, 1001), + ?assertEqual(1000, couch_replicator_share:shares(?DB1)). + + +jobs_are_added_and_removed(_) -> + couch_replicator_share:job_added(job(?J1, ?DB1)), + ?assertEqual(1, couch_replicator_share:num_jobs(?DB1)), + ?assertEqual(#{?J1 => 0}, tab2map(?PRIORITIES)), + + couch_replicator_share:job_added(job(?J2, ?DB1)), + ?assertEqual(2, couch_replicator_share:num_jobs(?DB1)), + ?assertEqual(#{?J1 => 0, ?J2 => 0}, tab2map(?PRIORITIES)), + + couch_replicator_share:job_added(job(?J3, ?DB2)), + ?assertEqual(1, couch_replicator_share:num_jobs(?DB2)), + ?assertEqual(#{?J1 => 0, ?J2 => 0, ?J3 => 0}, tab2map(?PRIORITIES)), + + couch_replicator_share:job_removed(job(?J1, ?DB1)), + ?assertEqual(1, couch_replicator_share:num_jobs(?DB1)), + ?assertEqual(#{?J2 => 0, ?J3 => 0}, tab2map(?PRIORITIES)), + + couch_replicator_share:job_removed(job(?J3, ?DB2)), + ?assertEqual(0, couch_replicator_share:num_jobs(?DB2)), + ?assertEqual(0, couch_replicator_share:priority(?J3)), + + couch_replicator_share:job_removed(job(?J2, ?DB1)), + ?assertEqual(0, couch_replicator_share:num_jobs(?DB2)), + ?assertEqual(#{}, tab2map(?NUM_JOBS)), + ?assertEqual(0, couch_replicator_share:priority(?J2)), + ?assertEqual(#{}, tab2map(?PRIORITIES)). + + +can_fetch_job_priority(_) -> + couch_replicator_share:job_added(job(?J1, ?DB1)), + ?assertEqual(0, couch_replicator_share:priority(?J1)), + + ets:insert(?PRIORITIES, {?J1, 42}), + ?assertEqual(42, couch_replicator_share:priority(?J1)), + + ets:delete(?PRIORITIES, ?J1), + ?assertEqual(0, couch_replicator_share:priority(?J1)). + + +jobs_are_charged(_) -> + Job1 = running_job(?J1, ?DB1), + couch_replicator_share:job_added(Job1), + ?assertEqual(#{}, tab2map(?CHARGES)), + + couch_replicator_share:charge(Job1, 1000, {0, 1, 0}), + ?assertEqual(#{?DB1 => 1000000}, tab2map(?CHARGES)), + + % Stopped jobs are not charged + couch_replicator_share:charge(stop(Job1), 1000, {0, 1, 0}), + ?assertEqual(#{?DB1 => 1000000}, tab2map(?CHARGES)), + + % Only charge up to one interval's worth even if job ran longer + couch_replicator_share:charge(Job1, 1000, {0, 5, 0}), + ?assertEqual(#{?DB1 => 2000000}, tab2map(?CHARGES)), + + % Charges are accumulated from jobs in same db + Job2 = running_job(?J2, ?DB1), + couch_replicator_share:job_added(Job2), + couch_replicator_share:charge(Job2, 1000, {0, 0, 1}), + ?assertEqual(#{?DB1 => 2000001}, tab2map(?CHARGES)), + + % Charges are not cleared if jobs are removed + couch_replicator_share:job_removed(Job1), + couch_replicator_share:job_removed(Job2), + ?assertEqual(#{?DB1 => 2000001}, tab2map(?CHARGES)). + + +usage_is_updated(_) -> + Job = running_job(?J1, ?DB1), + couch_replicator_share:job_added(Job), + + couch_replicator_share:charge(Job, 60000, {0, 60, 0}), + couch_replicator_share:update_usage(), + ?assertEqual(60000000, couch_replicator_share:usage(?DB1)), + + % Charges table is cleared after usage is updated + ?assertEqual(#{}, tab2map(?CHARGES)), + + % Check that usage decay works + config_set("usage_coeff", "0.2"), + couch_replicator_share:update_usage(), + ?assertEqual(12000000, couch_replicator_share:usage(?DB1)), + + config_set("usage_coeff", "0.5"), + couch_replicator_share:update_usage(), + ?assertEqual(6000000, couch_replicator_share:usage(?DB1)), + + % Check that function both decays and updates from charges + couch_replicator_share:charge(Job, 60000, {0, 60, 0}), + couch_replicator_share:update_usage(), + ?assertEqual(63000000, couch_replicator_share:usage(?DB1)), + + % Usage eventually decays to 0 and is removed from the table + [couch_replicator_share:update_usage() || _ <- lists:seq(1, 100)], + ?assertEqual(0, couch_replicator_share:usage(?DB1)), + ?assertEqual(#{}, tab2map(?USAGE)). + + +priority_coefficient_works(_) -> + couch_replicator_share:job_added(job(?J1, ?DB1)), + ets:insert(?PRIORITIES, {?J1, 1000}), + + config_set("priority_coeff", "0.8"), + couch_replicator_share:decay_priorities(), + ?assertEqual(800, couch_replicator_share:priority(?J1)), + + config_set("priority_coeff", "0.5"), + couch_replicator_share:decay_priorities(), + ?assertEqual(400, couch_replicator_share:priority(?J1)), + + % If non-float junk value is set then the default is used + config_set("priority_coeff", "junk"), + couch_replicator_share:decay_priorities(), + ?assertEqual(392, couch_replicator_share:priority(?J1)), + + % Clipped to 1.0 max + config_set("priority_coeff", "1.1"), + couch_replicator_share:decay_priorities(), + ?assertEqual(392, couch_replicator_share:priority(?J1)), + + % Clipped to 0.0 min and removed when =< 0 + config_set("priority_coeff", "-0.1"), + couch_replicator_share:decay_priorities(), + ?assertEqual(0, couch_replicator_share:priority(?J1)), + ?assertEqual(#{}, tab2map(?PRIORITIES)). + + +priority_decays_when_jobs_stop_running(_) -> + Job = running_job(?J1, ?DB1), + couch_replicator_share:job_added(Job), + + % Ran for one cycle then stop + {[], Pending} = reschedule(1, {[Job], []}), + + % Priority is non-0 initially + ?assert(couch_replicator_share:priority(?J1) > 0), + + % Priority decays to 0 after some cycles + [reschedule(0, {[], Pending}) || _ <- lists:seq(1, 500)], + ?assertEqual(0, couch_replicator_share:priority(?J1)). + + +priority_increases_when_jobs_run(_) -> + Job = running_job(?J1, ?DB1), + couch_replicator_share:job_added(Job), + + Running = [Job], + reschedule(0, {Running, []}), + P1 = couch_replicator_share:priority(?J1), + ?assert(P1 > 0), + + % Priority increases + reschedule(0, {Running, []}), + P2 = couch_replicator_share:priority(?J1), + ?assert(P2 > P1), + + % Additive priority increase is balanced out by priority decay + [reschedule(0, {Running, []}) || _ <- lists:seq(1, 500)], + Pn = couch_replicator_share:priority(?J1), + ?assert(Pn > P2), + + reschedule(0, {Running, []}), + Pm = couch_replicator_share:priority(?J1), + ?assertEqual(Pn, Pm). + + +two_dbs_equal_shares_equal_number_of_jobs(_) -> + couch_replicator_share:update_shares(?DB1, 100), + couch_replicator_share:update_shares(?DB2, 100), + Sched = setup_jobs(#{?DB1 => {25, 75}, ?DB2 => {25, 75}}), + ?assertEqual(#{?DB1 => 50, ?DB2 => 50}, run_scheduler(1000, 10, Sched)). + + +two_dbs_unequal_shares_equal_number_of_jobs(_) -> + couch_replicator_share:update_shares(?DB1, 100), + couch_replicator_share:update_shares(?DB1, 900), + Sched = setup_jobs(#{?DB1 => {25, 75}, ?DB2 => {25, 75}}), + ?assertEqual(#{?DB1 => 90, ?DB2 => 10}, run_scheduler(1000, 10, Sched)). + + +two_dbs_equal_shares_unequal_number_of_jobs(_) -> + couch_replicator_share:update_shares(?DB1, 100), + couch_replicator_share:update_shares(?DB2, 100), + Sched = setup_jobs(#{?DB1 => {25, 25}, ?DB2 => {25, 125}}), + ?assertEqual(#{?DB1 => 50, ?DB2 => 50}, run_scheduler(1000, 10, Sched)). + + +two_dbs_unequal_shares_unequal_number_of_jobs(_) -> + couch_replicator_share:update_shares(?DB1, 1), + couch_replicator_share:update_shares(?DB2, 100), + Sched = setup_jobs(#{?DB1 => {25, 25}, ?DB2 => {25, 125}}), + ?assertEqual(#{?DB1 => 1, ?DB2 => 99}, run_scheduler(1000, 10, Sched)). + + +three_dbs_equal_shares_equal_number_of_jobs(_) -> + couch_replicator_share:update_shares(?DB1, 100), + couch_replicator_share:update_shares(?DB2, 100), + couch_replicator_share:update_shares(?DB3, 100), + Sched = setup_jobs(#{ + ?DB1 => {25, 75}, + ?DB2 => {25, 75}, + ?DB3 => {25, 75} + }), + #{ + ?DB1 := Db1, + ?DB2 := Db2, + ?DB3 := Db3 + } = run_scheduler(1000, 10, Sched), + ?assert(32 =< Db1 andalso Db1 =< 34), + ?assert(32 =< Db2 andalso Db2 =< 34), + ?assert(32 =< Db3 andalso Db3 =< 34). + + +three_dbs_unequal_shares_equal_number_of_jobs(_) -> + couch_replicator_share:update_shares(?DB1, 100), + couch_replicator_share:update_shares(?DB2, 700), + couch_replicator_share:update_shares(?DB3, 200), + Sched = setup_jobs(#{ + ?DB1 => {25, 75}, + ?DB2 => {25, 75}, + ?DB3 => {25, 75} + }), + #{ + ?DB1 := Db1, + ?DB2 := Db2, + ?DB3 := Db3 + } = run_scheduler(1000, 10, Sched), + ?assert(9 =< Db1 andalso Db1 =< 11), + ?assert(69 =< Db2 andalso Db2 =< 71), + ?assert(19 =< Db3 andalso Db3 =< 21). + + +three_dbs_equal_shares_unequal_number_of_jobs(_) -> + couch_replicator_share:update_shares(?DB1, 100), + couch_replicator_share:update_shares(?DB2, 100), + couch_replicator_share:update_shares(?DB3, 100), + Sched = setup_jobs(#{ + ?DB1 => {25, 25}, + ?DB2 => {25, 100}, + ?DB3 => {25, 75} + }), + #{ + ?DB1 := Db1, + ?DB2 := Db2, + ?DB3 := Db3 + } = run_scheduler(1000, 10, Sched), + ?assert(32 =< Db1 andalso Db1 =< 34), + ?assert(32 =< Db2 andalso Db2 =< 34), + ?assert(32 =< Db3 andalso Db3 =< 34). + + +three_dbs_unequal_shares_unequal_number_of_jobs(_) -> + couch_replicator_share:update_shares(?DB1, 1000), + couch_replicator_share:update_shares(?DB2, 100), + couch_replicator_share:update_shares(?DB3, 1), + Sched = setup_jobs(#{ + ?DB1 => {25, 100}, + ?DB2 => {25, 125}, + ?DB3 => {25, 875} + }), + #{ + ?DB1 := Db1, + ?DB2 := Db2, + ?DB3 := Db3 + } = run_scheduler(1000, 10, Sched), + ?assert(87 =< Db1 andalso Db1 =< 89), + ?assert(9 =< Db2 andalso Db2 =< 11), + ?assert(2 =< Db3 andalso Db3 =< 4). + + +config_set(K, V) -> + config:set("replicator", K, V, _Persist = false). + + +config_delete(K) -> + config:delete("replicator", K, _Persist = false). + + +config_share_set(K, V) -> + config:set("replicator.shares", K, V, _Persist = false). + + +config_shares_delete() -> + [config:delete("replicator.shares", K, _Persist = false) || + {K, _} <- config:get("replicator.shares")]. + + +tab2map(T) when is_atom(T) -> + maps:from_list(ets:tab2list(T)). + + +job(rand, Db) -> + job(rand:uniform(1 bsl 59), Db); + +job(Id, Db) -> + Job = #job{ + id = Id, + rep = #rep{ + db_name = Db, + user_ctx = #user_ctx{} + } + }, + stop(Job). + + +running_job(Id, Db) -> + run(job(Id, Db)). + + +run(#job{} = Job) -> + Job#job{ + pid = list_to_pid("<0.9999.999>"), + history = [{started, {0, 0, 0}}, {added, {0, 0, 0}}] + }. + + +stop(#job{} = Job) -> + Job#job{ + pid = undefined, + history = [{added, {0, 0, 0}}] + }. + + +% Simple scheduler simulator. Start and stop N jobs and do the +% accounting steps. Return a new list of running and pending jobs. If +% N is 0 then jobs which were running stay running and jobs were +% pending stay pending. +% +reschedule(N, {Running, Pending}) -> + [couch_replicator_share:charge(Job, 60000, {0, 60, 0}) || Job <- Running], + couch_replicator_share:update_usage(), + couch_replicator_share:decay_priorities(), + [couch_replicator_share:update_priority(Job) || Job <- Running], + + RunPr = [{couch_replicator_share:priority(Job#job.id), Job} || + Job <- Running], + StopPr = [{couch_replicator_share:priority(Job#job.id), Job} || + Job <- Pending], + + {_, Running1} = lists:unzip(lists:reverse(lists:sort(RunPr))), + {_, Pending1} = lists:unzip(lists:sort(StopPr)), + + ToStop = lists:sublist(Running1, N), + ToStart = lists:sublist(Pending1, N), + + Running2 = [run(Job) || Job <- ToStart] ++ Running1 -- ToStop, + Pending2 = [stop(Job) || Job <- ToStop] ++ Pending1 -- ToStart, + + {Running2, Pending2}. + + +% Run a few scheduling cycles and calculate usage percentage for each db +run_scheduler(Cycles, Churn, Sched0) -> + Acc0 = {#{}, Sched0}, + + {Sum, _} = lists:foldl(fun(_CycleCnt, {UsageAcc, Sched}) -> + %% UsageAcc1 = maps:fold(fun(Db, Usage, Acc) -> + %% maps:update_with(Db, fun(V) -> V + Usage end, 0, Acc) + %% end, UsageAcc, tab2map(?USAGE)), + {Running, _Pending} = Sched, + UsageAcc1 = lists:foldl(fun(#job{} = Job, Acc) -> + Db = Job#job.rep#rep.db_name, + maps:update_with(Db, fun(V) -> V + 1 end, 0, Acc) + end, UsageAcc, Running), + + {UsageAcc1, reschedule(Churn, Sched)} + end, Acc0, lists:seq(1, Cycles)), + + Total = maps:fold(fun(_, V, Acc) -> Acc + V end, 0, Sum), + maps:map(fun(Db, V) -> round(V / Total * 100) end, Sum). + + +% Dbs = #{Db => {RunningCount, PendingCount} +% +setup_jobs(#{} = Dbs) -> + maps:fold(fun(Db, {RCnt, PCnt}, {Running, Pending}) -> + RJobs = [running_job(rand, Db) || _ <- lists:seq(1, RCnt)], + PJobs = [job(rand, Db) || _ <- lists:seq(1, PCnt)], + [couch_replicator_share:job_added(Job) || Job <- RJobs ++ PJobs], + {Running ++ RJobs, Pending ++ PJobs} + end, {[], []}, Dbs). diff --git a/src/couch_replicator/test/eunit/couch_replicator_test.hrl b/src/couch_replicator/test/eunit/couch_replicator_test.hrl new file mode 100644 index 00000000000..6db97ec2b9a --- /dev/null +++ b/src/couch_replicator/test/eunit/couch_replicator_test.hrl @@ -0,0 +1,35 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + + +% Borrowed from fabric2_test.hrl + +% Some test modules do not use with, so squash the unused fun compiler warning +-compile([{nowarn_unused_function, [{with, 1}]}]). + + +-define(TDEF(Name), {atom_to_list(Name), fun Name/1}). +-define(TDEF(Name, Timeout), {atom_to_list(Name), Timeout, fun Name/1}). + +-define(TDEF_FE(Name), fun(Arg) -> {atom_to_list(Name), ?_test(Name(Arg))} end). +-define(TDEF_FE(Name, Timeout), fun(Arg) -> {atom_to_list(Name), {timeout, Timeout, ?_test(Name(Arg))}} end). + + +with(Tests) -> + fun(ArgsTuple) -> + lists:map(fun + ({Name, Fun}) -> + {Name, ?_test(Fun(ArgsTuple))}; + ({Name, Timeout, Fun}) -> + {Name, {timeout, Timeout, ?_test(Fun(ArgsTuple))}} + end, Tests) + end.