From 66b98b011ebfc032ec67136c7bf4f2f806df4fe5 Mon Sep 17 00:00:00 2001 From: ILYA Khlopotov Date: Mon, 22 Dec 2014 13:56:56 -0800 Subject: [PATCH 1/3] Fix incomplete spec of fabric:get_view_group_info COUCHDB-2526 --- src/fabric.erl | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/fabric.erl b/src/fabric.erl index ea07fbe..13cc910 100644 --- a/src/fabric.erl +++ b/src/fabric.erl @@ -321,7 +321,12 @@ query_view(DbName, DDoc, ViewName, Callback, Acc0, QueryArgs0) -> {waiting_commit, boolean()} | {waiting_clients, non_neg_integer()} | {update_seq, pos_integer()} | - {purge_seq, non_neg_integer()} + {purge_seq, non_neg_integer()} | + {sizes, [ + {active, non_neg_integer()} | + {external, non_neg_integer()} | + {file, non_neg_integer()} + ]} ]}. get_view_group_info(DbName, DesignId) -> fabric_group_info:go(dbname(DbName), design_doc(DesignId)). From 6351140d4f8d0621fc4d33da2767d4f09ecb2ad7 Mon Sep 17 00:00:00 2001 From: ILYA Khlopotov Date: Mon, 22 Dec 2014 13:57:38 -0800 Subject: [PATCH 2/3] Add updates_pending field to get_view_group_info We also change the beheviour of this function to wait response from all shards. COUCHDB-2526 --- src/fabric.erl | 5 +++ src/fabric_group_info.erl | 81 ++++++++++++++++++++++++++++----------- 2 files changed, 63 insertions(+), 23 deletions(-) diff --git a/src/fabric.erl b/src/fabric.erl index 13cc910..aff0485 100644 --- a/src/fabric.erl +++ b/src/fabric.erl @@ -326,6 +326,11 @@ query_view(DbName, DDoc, ViewName, Callback, Acc0, QueryArgs0) -> {active, non_neg_integer()} | {external, non_neg_integer()} | {file, non_neg_integer()} + ]} | + {updates_pending, [ + {minimum, non_neg_integer()} | + {preferred, non_neg_integer()} | + {total, non_neg_integer()} ]} ]}. get_view_group_info(DbName, DesignId) -> diff --git a/src/fabric_group_info.erl b/src/fabric_group_info.erl index 85719b6..4d81200 100644 --- a/src/fabric_group_info.erl +++ b/src/fabric_group_info.erl @@ -24,11 +24,12 @@ go(DbName, GroupId) when is_binary(GroupId) -> go(DbName, #doc{id=DDocId}) -> Shards = mem3:shards(DbName), + Ushards = mem3:ushards(DbName), Workers = fabric_util:submit_jobs(Shards, group_info, [DDocId]), RexiMon = fabric_util:create_monitors(Shards), - Acc0 = {fabric_dict:init(Workers, nil), []}, - try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of - {timeout, {WorkersDict, _}} -> + Acc = acc_init(Workers, Ushards), + try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc) of + {timeout, {WorkersDict, _, _}} -> DefunctWorkers = fabric_util:remove_done_workers(WorkersDict, nil), fabric_util:log_timeout(DefunctWorkers, "group_info"), {error, timeout}; @@ -38,48 +39,82 @@ go(DbName, #doc{id=DDocId}) -> rexi_monitor:stop(RexiMon) end. -handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) -> +handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc, U}) -> case fabric_util:remove_down_workers(Counters, NodeRef) of {ok, NewCounters} -> - {ok, {NewCounters, Acc}}; + {ok, {NewCounters, Acc, U}}; error -> {error, {nodedown, <<"progress not possible">>}} end; -handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) -> +handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc, U}) -> NewCounters = lists:keydelete(Shard, #shard.ref, Counters), case fabric_view:is_progress_possible(NewCounters) of true -> - {ok, {NewCounters, Acc}}; + {ok, {NewCounters, Acc, U}}; false -> {error, Reason} end; -handle_message({ok, Info}, Shard, {Counters, Acc}) -> - case fabric_dict:lookup_element(Shard, Counters) of - undefined -> - % already heard from someone else in this range - {ok, {Counters, Acc}}; - nil -> - C1 = fabric_dict:store(Shard, ok, Counters), - C2 = fabric_view:remove_overlapping_shards(Shard, C1), - case fabric_dict:any(nil, C2) of - true -> - {ok, {C2, [Info|Acc]}}; - false -> - {stop, merge_results(lists:flatten([Info|Acc]))} - end +handle_message({ok, Info}, Shard, {Counters0, Acc, U}) -> + NewAcc = append_result(Info, Shard, Acc, U), + Counters = fabric_dict:store(Shard, ok, Counters0), + case is_complete(Counters) of + false -> + {ok, {Counters, NewAcc, U}}; + true -> + Pending = aggregate_pending(NewAcc), + Infos = get_infos(NewAcc), + Results = [{updates_pending, {Pending}} | merge_results(Infos)], + {stop, Results} end; handle_message(_, _, Acc) -> {ok, Acc}. +acc_init(Workers, Ushards) -> + Set = sets:from_list([{Id, N} || #shard{name = Id, node = N} <- Ushards]), + {fabric_dict:init(Workers, nil), dict:new(), Set}. + +is_complete(Counters) -> + not fabric_dict:any(nil, Counters). + +append_result(Info, #shard{name = Name, node = Node}, Acc, Ushards) -> + IsPreferred = sets:is_element({Name, Node}, Ushards), + dict:append(Name, {Node, IsPreferred, Info}, Acc). + +get_infos(Acc) -> + Values = [V || {_, V} <- dict:to_list(Acc)], + lists:flatten([Info || {_Node, _Pref, Info} <- lists:flatten(Values)]). + +aggregate_pending(Dict) -> + {Preferred, Total, Minimum} = + dict:fold(fun(_Name, Results, {P, T, M}) -> + {Preferred, Total, Minimum} = calculate_pending(Results), + {P + Preferred, T + Total, M + Minimum} + end, {0, 0, 0}, Dict), + [ + {minimum, Minimum}, + {preferred, Preferred}, + {total, Total} + ]. + +calculate_pending(Results) -> + lists:foldl(fun + ({_Node, true, Info}, {P, T, V}) -> + Pending = couch_util:get_value(pending_updates, Info), + {P + Pending, T + Pending, min(Pending, V)}; + ({_Node, false, Info}, {P, T, V}) -> + Pending = couch_util:get_value(pending_updates, Info), + {P, T + Pending, min(Pending, V)} + end, {0, 0, infinity}, Results). + merge_results(Info) -> Dict = lists:foldl(fun({K,V},D0) -> orddict:append(K,V,D0) end, orddict:new(), Info), orddict:fold(fun - (signature, [X|_], Acc) -> + (signature, [X | _], Acc) -> [{signature, X} | Acc]; - (language, [X|_], Acc) -> + (language, [X | _], Acc) -> [{language, X} | Acc]; (disk_size, X, Acc) -> % legacy [{disk_size, lists:sum(X)} | Acc]; From 06d1efc0b0a083b75d691079f7da752c989d0c84 Mon Sep 17 00:00:00 2001 From: ILYA Khlopotov Date: Tue, 23 Dec 2014 07:54:02 -0800 Subject: [PATCH 3/3] Rename variables for clarification of intend COUCHDB-2526 --- src/fabric_group_info.erl | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/fabric_group_info.erl b/src/fabric_group_info.erl index 4d81200..20047c5 100644 --- a/src/fabric_group_info.erl +++ b/src/fabric_group_info.erl @@ -39,29 +39,30 @@ go(DbName, #doc{id=DDocId}) -> rexi_monitor:stop(RexiMon) end. -handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc, U}) -> +handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, + {Counters, Acc, Ushards}) -> case fabric_util:remove_down_workers(Counters, NodeRef) of {ok, NewCounters} -> - {ok, {NewCounters, Acc, U}}; + {ok, {NewCounters, Acc, Ushards}}; error -> {error, {nodedown, <<"progress not possible">>}} end; -handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc, U}) -> +handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc, Ushards}) -> NewCounters = lists:keydelete(Shard, #shard.ref, Counters), case fabric_view:is_progress_possible(NewCounters) of true -> - {ok, {NewCounters, Acc, U}}; + {ok, {NewCounters, Acc, Ushards}}; false -> {error, Reason} end; -handle_message({ok, Info}, Shard, {Counters0, Acc, U}) -> - NewAcc = append_result(Info, Shard, Acc, U), +handle_message({ok, Info}, Shard, {Counters0, Acc, Ushards}) -> + NewAcc = append_result(Info, Shard, Acc, Ushards), Counters = fabric_dict:store(Shard, ok, Counters0), case is_complete(Counters) of false -> - {ok, {Counters, NewAcc, U}}; + {ok, {Counters, NewAcc, Ushards}}; true -> Pending = aggregate_pending(NewAcc), Infos = get_infos(NewAcc),