Skip to content

Expose Backend Size to Handoff for Progress Tracking #526

Merged
merged 5 commits into from Apr 25, 2013
View
12 src/riak_kv_bitcask_backend.erl
@@ -42,6 +42,8 @@
status/1,
callback/3]).
+-export([data_size/1]).
+
%% Helper API
-export([key_counts/0,
key_counts/1]).
@@ -54,7 +56,7 @@
-define(MERGE_CHECK_INTERVAL, timer:minutes(3)).
-define(API_VERSION, 1).
--define(CAPABILITIES, [async_fold]).
+-define(CAPABILITIES, [async_fold,size]).
-record(state, {ref :: reference(),
data_dir :: string(),
@@ -353,6 +355,14 @@ status(#state{ref=Ref}) ->
{KeyCount, Status} = bitcask:status(Ref),
[{key_count, KeyCount}, {status, Status}].
+%% @doc Get the size of the bitcask backend (in number of keys)
+-spec data_size(state()) -> undefined | {non_neg_integer(), objects}.
+data_size(State) ->
+ Status = status(State),
+ case proplists:get_value(key_count, Status) of
+ undefined -> undefined;
+ KeyCount -> {KeyCount, objects}
+ end.
%% @doc Register an asynchronous callback
-spec callback(reference(), any(), state()) -> {ok, state()}.
View
134 src/riak_kv_console.erl
@@ -32,17 +32,12 @@
vnode_status/1,
reip/1,
ringready/1,
- transfers/1,
cluster_info/1,
down/1,
aae_status/1,
reformat_indexes/1,
reload_code/1]).
-%% Arrow is 24 chars wide
--define(ARROW, "=======================>").
-
-
join([NodeStr]) ->
join(NodeStr, fun riak_core:join/1,
"Sent join request to ~s~n", [NodeStr]).
@@ -249,77 +244,6 @@ ringready([]) ->
error
end.
-%% Provide a list of nodes with pending partition transfers (i.e. any secondary vnodes)
-%% and list any owned vnodes that are *not* running
--spec(transfers([]) -> ok).
-transfers([]) ->
- try
- {DownNodes, Pending} = riak_core_status:transfers(),
- case DownNodes of
- [] -> ok;
- _ -> io:format("Nodes ~p are currently down.\n", [DownNodes])
- end,
- F = fun({waiting_to_handoff, Node, Count}, Acc) ->
- io:format("~p waiting to handoff ~p partitions\n", [Node, Count]),
- Acc + 1;
- ({stopped, Node, Count}, Acc) ->
- io:format("~p does not have ~p primary partitions running\n", [Node, Count]),
- Acc + 1
- end,
- case lists:foldl(F, 0, Pending) of
- 0 ->
- io:format("No transfers active\n"),
- ok;
- _ ->
- error
- end
- catch
- Exception:Reason ->
- lager:error("Transfers failed ~p:~p", [Exception,
- Reason]),
- io:format("Transfers failed, see log for details~n"),
- error
- end,
-
- %% Now display active transfers
- {Xfers, Down} = riak_core_status:all_active_transfers(),
-
- DisplayXfer =
- fun({{Mod, Partition}, Node, outbound, active, _Status}) ->
- print_v1_status(Mod, Partition, Node);
-
- ({status_v2, Status}) ->
- %% Display base status
- Type = proplists:get_value(type, Status),
- Mod = proplists:get_value(mod, Status),
- SrcPartition = proplists:get_value(src_partition, Status),
- TargetPartition = proplists:get_value(target_partition, Status),
- StartTS = proplists:get_value(start_ts, Status),
- SrcNode = proplists:get_value(src_node, Status),
- TargetNode = proplists:get_value(target_node, Status),
-
- print_v2_status(Type, Mod, {SrcPartition, TargetPartition}, StartTS),
-
- %% Get info about stats if there is any yet
- Stats = proplists:get_value(stats, Status),
-
- print_stats(SrcNode, TargetNode, Stats),
- io:format("~n");
-
- (_) ->
- ignore
- end,
- DisplayDown =
- fun(Node) ->
- io:format("Node ~p could not be contacted~n", [Node])
- end,
-
- io:format("~nActive Transfers:~n~n", []),
- [DisplayXfer(Xfer) || Xfer <- lists:flatten(Xfers)],
-
- io:format("~n"),
- [DisplayDown(Node) || Node <- Down].
-
cluster_info([OutFile|Rest]) ->
try
case lists:reverse(atomify_nodestrs(Rest)) of
@@ -492,13 +416,6 @@ run_index_reformat(Opts) ->
%%%===================================================================
%%% Private
%%%===================================================================
-
-datetime_str({_Mega, _Secs, _Micro}=Now) ->
- datetime_str(calendar:now_to_datetime(Now));
-datetime_str({{Year, Month, Day}, {Hour, Min, Sec}}) ->
- riak_core_format:fmt("~4..0B-~2..0B-~2..0B ~2..0B:~2..0B:~2..0B",
- [Year,Month,Day,Hour,Min,Sec]).
-
format_stats([], Acc) ->
lists:reverse(Acc);
format_stats([{Stat, V}|T], Acc) ->
@@ -545,54 +462,3 @@ print_vnode_status([StatusItem | RestStatusItems]) ->
io:format("Status: ~n~p~n", [StatusItem])
end,
print_vnode_status(RestStatusItems).
-
-print_v2_status(Type, Mod, {SrcPartition, TargetPartition}, StartTS) ->
- StartTSStr = datetime_str(StartTS),
- Running = timer:now_diff(os:timestamp(), StartTS),
- RunningStr = riak_core_format:human_time_fmt("~.2f", Running),
-
- io:format("transfer type: ~s~n", [Type]),
- io:format("vnode type: ~p~n", [Mod]),
- case Type of
- repair ->
- io:format("source partition: ~p~n", [SrcPartition]),
- io:format("target partition: ~p~n", [TargetPartition]);
- _ ->
- io:format("partition: ~p~n", [TargetPartition])
- end,
- io:format("started: ~s [~s ago]~n", [StartTSStr, RunningStr]).
-
-print_v1_status(Mod, Partition, Node) ->
- io:format("vnode type: ~p~n", [Mod]),
- io:format("partition: ~p~n", [Partition]),
- io:format("target node: ~p~n~n", [Node]).
-
-print_stats(SrcNode, TargetNode, no_stats) ->
- ToFrom = riak_core_format:fmt("~16s ~s ~16s",
- [SrcNode, ?ARROW, TargetNode]),
- Width = length(ToFrom),
-
- io:format("last update: no updates seen~n"),
- io:format("objects transferred: unknown~n~n"),
- io:format("~s~n", [string:centre("unknown", Width)]),
- io:format("~s~n", [ToFrom]),
- io:format("~s~n", [string:centre("unknown", Width)]);
-print_stats(SrcNode, TargetNode, Stats) ->
- ObjsS = proplists:get_value(objs_per_s, Stats),
- BytesS = proplists:get_value(bytes_per_s, Stats),
- LastUpdate = proplists:get_value(last_update, Stats),
- Diff = timer:now_diff(os:timestamp(), LastUpdate),
- DiffStr = riak_core_format:human_time_fmt("~.2f", Diff),
- Objs = proplists:get_value(objs_total, Stats),
- ObjsSStr = riak_core_format:fmt("~p Objs/s", [ObjsS]),
- ByteStr = riak_core_format:human_size_fmt("~.2f", BytesS) ++ "/s",
- TS = datetime_str(LastUpdate),
- ToFrom = riak_core_format:fmt("~16s ~s ~16s",
- [SrcNode, ?ARROW, TargetNode]),
- Width = length(ToFrom),
-
- io:format("last update: ~s [~s ago]~n", [TS, DiffStr]),
- io:format("objects transferred: ~p~n~n", [Objs]),
- io:format("~s~n", [string:centre(ObjsSStr, Width)]),
- io:format("~s~n", [ToFrom]),
- io:format("~s~n", [string:centre(ByteStr, Width)]).
View
14 src/riak_kv_eleveldb_backend.erl
@@ -44,6 +44,8 @@
status/1,
callback/3]).
+-export([data_size/1]).
+
-compile({inline, [
to_object_key/2, from_object_key/1,
to_index_key/4, from_index_key/1
@@ -54,7 +56,7 @@
-endif.
-define(API_VERSION, 1).
--define(CAPABILITIES, [async_fold, indexes, index_reformat]).
+-define(CAPABILITIES, [async_fold, indexes, index_reformat, size]).
-define(FIXED_INDEXES_KEY, fixed_indexes).
-record(state, {ref :: reference(),
@@ -461,6 +463,16 @@ status(State=#state{fixed_indexes=FixedIndexes}) ->
callback(_Ref, _Msg, State) ->
{ok, State}.
+%% @doc Get the size of the eleveldb backend in bytes
+-spec data_size(state()) -> undefined | {non_neg_integer(), bytes}.
+data_size(State) ->
+ try {ok, <<SizeStr/binary>>} = eleveldb:status(State#state.ref, <<"leveldb.total-bytes">>),
+ list_to_integer(binary_to_list(SizeStr)) of
+ Size -> {Size, bytes}
+ catch
+ error:_ -> undefined
+ end.
+
%% ===================================================================
%% Internal functions
%% ===================================================================
View
17 src/riak_kv_memory_backend.erl
@@ -56,6 +56,8 @@
status/1,
callback/3]).
+-export([data_size/1]).
+
%% "Testing" backend API
-export([reset/0]).
@@ -65,7 +67,7 @@
-endif.
-define(API_VERSION, 1).
--define(CAPABILITIES, [async_fold, indexes]).
+-define(CAPABILITIES, [async_fold, indexes, size]).
%% Macros for working with indexes
-define(DELETE_PTN(B,K), {{B,'_','_',K},'_'}).
@@ -356,6 +358,19 @@ status(#state{data_ref=DataRef,
{time_table_status, TimeStatus}]
end.
+%% @doc Get the size of the memory backend. Returns a dynamic size
+%% since new writes may appear in an ets fold
+-spec data_size(state()) -> undefined | {function(), dynamic}.
+data_size(#state{data_ref=DataRef}) ->
+ F = fun() ->
+ DataStatus = ets:info(DataRef),
+ case proplists:get_value(size, DataStatus) of
+ undefined -> undefined;
+ ObjCount -> {ObjCount, objects}
+ end
+ end,
+ {F, dynamic}.
+
%% @doc Register an asynchronous callback
-spec callback(reference(), any(), state()) -> {ok, state()}.
callback(_Ref, _Msg, State) ->
View
16 src/riak_kv_vnode.erl
@@ -760,7 +760,21 @@ encode_handoff_item({B, K}, V) ->
#riakobject_pb{bucket=B, key=K, val=Val})).
is_empty(State=#state{mod=Mod, modstate=ModState}) ->
- {Mod:is_empty(ModState), State}.
+ IsEmpty = Mod:is_empty(ModState),
+ case IsEmpty of
+ true ->
+ {true, State};
+ false ->
+ Size = maybe_calc_handoff_size(State),
+ {false, Size, State}
+ end.
+
+maybe_calc_handoff_size(#state{mod=Mod,modstate=ModState}) ->
+ {ok, Capabilities} = Mod:capabilities(ModState),
+ case lists:member(size, Capabilities) of
+ true -> Mod:data_size(ModState);
+ false -> undefined
+ end.
delete(State=#state{idx=Index,mod=Mod, modstate=ModState}) ->
%% clear vnodeid first, if drop removes data but fails
Something went wrong with that request. Please try again.