Skip to content

Commit

Permalink
WIP: Add clustered db info endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
chewbranca committed Apr 11, 2018
1 parent 5b74e66 commit 29e5ea9
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 14 deletions.
10 changes: 9 additions & 1 deletion src/chttpd/src/chttpd_db.erl
Original file line number Diff line number Diff line change
Expand Up @@ -321,11 +321,19 @@ do_db_req(#httpd{path_parts=[DbName|_], user_ctx=Ctx}=Req, Fun) ->
db_req(#httpd{method='GET',path_parts=[DbName]}=Req, _Db) ->
% measure the time required to generate the etag, see if it's worth it
T0 = os:timestamp(),
{ok, DbInfo} = fabric:get_db_info(DbName),
{ok, DbInfo} = fabric:get_db_info(DbName, aggregate),
DeltaT = timer:now_diff(os:timestamp(), T0) / 1000,
couch_stats:update_histogram([couchdb, dbinfo], DeltaT),
send_json(Req, {DbInfo});

db_req(#httpd{method='GET',path_parts=[DbName, <<"_info">>]}=Req, _Db) ->
% measure the time required to generate the etag, see if it's worth it
%%T0 = os:timestamp(),
{ok, DbInfo} = fabric:get_db_info(DbName, set),
%%DeltaT = timer:now_diff(os:timestamp(), T0) / 1000,
%%couch_stats:update_histogram([couchdb, dbinfo], DeltaT),
send_json(Req, {DbInfo});

db_req(#httpd{method='POST', path_parts=[DbName], user_ctx=Ctx}=Req, Db) ->
chttpd:validate_ctype(Req, "application/json"),

Expand Down
24 changes: 21 additions & 3 deletions src/fabric/src/fabric.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

% DBs
-export([all_dbs/0, all_dbs/1, create_db/1, create_db/2, delete_db/1,
delete_db/2, get_db_info/1, get_doc_count/1, set_revs_limit/3,
set_security/2, set_security/3, get_revs_limit/1, get_security/1,
get_security/2, get_all_security/1, get_all_security/2,
delete_db/2, get_db_info/1, get_db_info/2, get_doc_count/1,
set_revs_limit/3, set_security/2, set_security/3, get_revs_limit/1,
get_security/1, get_security/2, get_all_security/1, get_all_security/2,
compact/1, compact/2]).

% Documents
Expand All @@ -44,6 +44,7 @@
-type callback() :: fun((any(), any()) -> {ok | stop, any()}).
-type json_obj() :: {[{binary() | atom(), any()}]}.
-type option() :: atom() | {atom(), any()}.
-type db_info_type() :: set | aggregate.

%% db operations
%% @equiv all_dbs(<<>>)
Expand Down Expand Up @@ -84,6 +85,23 @@ all_dbs(Prefix) when is_list(Prefix) ->
get_db_info(DbName) ->
fabric_db_info:go(dbname(DbName)).

%% @doc returns a property list of interesting properties
%% about the database such as `doc_count', `disk_size',
%% etc.
%% TODO: fix return type def
-spec get_db_info(dbname(), db_info_type()) ->
{ok, [
{instance_start_time, binary()} |
{doc_count, non_neg_integer()} |
{doc_del_count, non_neg_integer()} |
{purge_seq, non_neg_integer()} |
{compact_running, boolean()} |
{disk_size, non_neg_integer()} |
{disk_format_version, pos_integer()}
]}.
get_db_info(DbName, Type) ->
fabric_db_info:go(dbname(DbName), Type).

%% @doc the number of docs in a database
-spec get_doc_count(dbname()) ->
{ok, non_neg_integer()} |
Expand Down
57 changes: 47 additions & 10 deletions src/fabric/src/fabric_db_info.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,21 @@

-module(fabric_db_info).

-export([go/1]).
-export([go/1, go/2]).

-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").

go(DbName) ->
go(DbName, aggregate).

go(DbName, Type) ->
Shards = mem3:shards(DbName),
Workers = fabric_util:submit_jobs(Shards, get_db_info, []),
RexiMon = fabric_util:create_monitors(Shards),
Fun = fun handle_message/3,
{ok, ClusterInfo} = get_cluster_info(Shards),
Acc0 = {fabric_dict:init(Workers, nil), [{cluster, ClusterInfo}]},
Acc0 = {fabric_dict:init(Workers, nil), [{cluster, ClusterInfo}], Type},
try
case fabric_util:recv(Workers, #shard.ref, Fun, Acc0) of
{ok, Acc} -> {ok, Acc};
Expand All @@ -43,35 +46,35 @@ go(DbName) ->
rexi_monitor:stop(RexiMon)
end.

handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) ->
handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc, Type}) ->
case fabric_util:remove_down_workers(Counters, NodeRef) of
{ok, NewCounters} ->
{ok, {NewCounters, Acc}};
{ok, {NewCounters, Acc, Type}};
error ->
{error, {nodedown, <<"progress not possible">>}}
end;

handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) ->
handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc, Type}) ->
NewCounters = fabric_dict:erase(Shard, Counters),
case fabric_view:is_progress_possible(NewCounters) of
true ->
{ok, {NewCounters, Acc}};
{ok, {NewCounters, Acc, Type}};
false ->
{error, Reason}
end;

handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, Acc}) ->
handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, Acc, aggregate}) ->
case fabric_dict:lookup_element(Shard, Counters) of
undefined ->
% already heard from someone else in this range
{ok, {Counters, Acc}};
{ok, {Counters, Acc, aggregate}};
nil ->
Seq = couch_util:get_value(update_seq, Info),
C1 = fabric_dict:store(Shard, Seq, Counters),
C2 = fabric_view:remove_overlapping_shards(Shard, C1),
case fabric_dict:any(nil, C2) of
true ->
{ok, {C2, [Info|Acc]}};
{ok, {C2, [Info|Acc], aggregate}};
false ->
{stop, [
{db_name,Name},
Expand All @@ -80,7 +83,27 @@ handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, Acc}) ->
]}
end
end;
handle_message(_, _, Acc) ->
handle_message({ok, Info0}, Shard, {Counters, Acc, set}) ->
#shard{
range = [B, E],
node = Node
} = Shard,
HexBeg = couch_util:to_hex(<<B:32/integer>>),
HexEnd = couch_util:to_hex(<<E:32/integer>>),
Range = list_to_binary(HexBeg ++ "-" ++ HexEnd),
Seq = couch_util:get_value(update_seq, Info0),
Info = {Range, [{node, Node} | Info0]},
C1 = fabric_dict:store(Shard, Seq, Counters),
Acc1 = [Info|Acc],
case fabric_dict:any(nil, C1) of
true ->
{ok, {C1, Acc1, set}};
false ->
{stop, format_results(Acc1)}
end;
handle_message(_Msg, _Shard, {_, _, aggregate}=Acc) ->
{ok, Acc};
handle_message(_Msg, _Shard, {_, _, set}=Acc) ->
{ok, Acc}.

merge_results(Info) ->
Expand Down Expand Up @@ -111,6 +134,20 @@ merge_results(Info) ->
Acc
end, [{instance_start_time, <<"0">>}], Dict).

format_results(Infos) ->
dict:to_list(lists:foldl(
fun({R,I}, D) ->
case dict:find(R, D) of
{ok, L} ->
dict:store(R, [{I} | L], D);
error ->
dict:store(R, [{I}], D)
end
end,
dict:new(),
Infos
)).

merge_other_results(Results) ->
Dict = lists:foldl(fun({Props}, D) ->
lists:foldl(fun({K,V},D0) -> orddict:append(K,V,D0) end, D, Props)
Expand Down

0 comments on commit 29e5ea9

Please sign in to comment.