Skip to content

Commit

Permalink
Make multi backend work with batched 2i reformats
Browse files Browse the repository at this point in the history
This is WIP. Testing is not conclusive on whether it works, but it runs
to completion now.
  • Loading branch information
engelsanchez committed Mar 30, 2013
1 parent a03f2e9 commit 371f480
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 31 deletions.
53 changes: 39 additions & 14 deletions src/riak_kv_console.erl
Expand Up @@ -422,26 +422,51 @@ format_timestamp(_Now, undefined) ->
format_timestamp(Now, TS) ->
riak_core_format:human_time_fmt("~.1f", timer:now_diff(Now, TS)).

parse_int(IntStr) ->
try
list_to_integer(IntStr)
catch
error:badarg ->
undefined
end.

index_reformat_options([], Opts) ->
Opts;
index_reformat_options(["--downgrade" | Rest], Opts) ->
index_reformat_options(Rest, [{downgrade, true} | Opts]);
index_reformat_options(["--downgrade"], Opts) ->
[{downgrade, true} | Opts];
index_reformat_options(["--downgrade" | More], _Opts) ->
io:format("Invalid arguments after downgrade switch : ~p~n", [More]),
undefined;
index_reformat_options([IntStr | Rest], Opts) ->
%% Letting it crash here if not an integer
IntVal = list_to_integer(IntStr),
case lists:keymember(concurrency, 1, Opts) of
true ->
[{batch_size, IntVal} | Opts];
false ->
index_reformat_options(Rest, [{concurrency, IntVal} | Opts])
end.
HasConcurrency = lists:keymember(concurrency, 1, Opts),
HasBatchSize = lists:keymember(batch_size, 1, Opts),
case {parse_int(IntStr), HasConcurrency, HasBatchSize} of
{_, true, true} ->
io:format("Expected --downgrade instead of ~p~n", [IntStr]),
undefined;
{undefined, _, _ } ->
io:format("Expected integer parameter instead of ~p~n", [IntStr]),
undefined;
{IntVal, false, false} ->
index_reformat_options(Rest, [{concurrency, IntVal} | Opts]);
{IntVal, true, false} ->
index_reformat_options(Rest, [{batch_size, IntVal} | Opts])
end;
index_reformat_options(_, _) ->
undefined.

reformat_indexes(Args) ->
Opts = index_reformat_options(Args, []),
start_index_reformat(Opts),
io:format("index reformat started with options ~p ~n", [Opts]),
io:format("check console.log for status information~n"),
ok.
case Opts of
undefined ->
io:format("Expected options: <concurrency> <batch size> [--downgrade]~n"),
ok;
_ ->
start_index_reformat(Opts),
io:format("index reformat started with options ~p ~n", [Opts]),
io:format("check console.log for status information~n"),
ok
end.

start_index_reformat(Opts) ->
spawn(fun() -> run_index_reformat(Opts) end).
Expand Down
8 changes: 4 additions & 4 deletions src/riak_kv_eleveldb_backend.erl
Expand Up @@ -33,7 +33,7 @@
put/5,
delete/4,
drop/1,
fix_index/4,
fix_index/3,
mark_indexes_fixed/2,
set_legacy_indexes/2,
fold_buckets/4,
Expand Down Expand Up @@ -211,7 +211,7 @@ index_deletes(FixedIndexes, Bucket, PrimaryKey, Field, Value) ->
|| FixedIndexes =:= false andalso IndexKey =/= LegacyKey],
KeyDelete ++ LegacyDelete.

fix_index(_Bucket, IndexKeys, ForUpgrade, #state{ref=Ref,
fix_index(IndexKeys, ForUpgrade, #state{ref=Ref,
read_opts=ReadOpts,
write_opts=WriteOpts} = State)
when is_list(IndexKeys) ->
Expand All @@ -226,9 +226,9 @@ fix_index(_Bucket, IndexKeys, ForUpgrade, #state{ref=Ref,
Totals =
lists:foldl(FoldFun, {0,0,0},
[fix_index(IndexKey, ForUpgrade, Ref, ReadOpts, WriteOpts)
|| IndexKey <- IndexKeys]),
|| {_Bucket, IndexKey} <- IndexKeys]),
{reply, Totals, State};
fix_index(_Bucket, IndexKey, ForUpgrade, #state{ref=Ref,
fix_index(IndexKey, ForUpgrade, #state{ref=Ref,
read_opts=ReadOpts,
write_opts=WriteOpts} = State) ->
case fix_index(IndexKey, ForUpgrade, Ref, ReadOpts, WriteOpts) of
Expand Down
38 changes: 27 additions & 11 deletions src/riak_kv_multi_backend.erl
Expand Up @@ -39,7 +39,7 @@
is_empty/1,
status/1,
callback/3,
fix_index/4,
fix_index/3,
set_legacy_indexes/2,
mark_indexes_fixed/2]).

Expand Down Expand Up @@ -388,17 +388,33 @@ maybe_mark_indexes_fixed(Mod, ModState, ForUpgrade) ->
false -> {ok, ModState}
end.

fix_index(Bucket, StorageKey, ForUpgrade, State) ->
{_, Mod, ModState} = Backend = get_backend(Bucket, State),
case backend_can_index_reformat(Mod, ModState) of
true -> backend_fix_index(Backend, Bucket, StorageKey, ForUpgrade, State);
false -> {ok, State}
end.
fix_index(BKeys, ForUpgrade, State) ->
% Group keys per bucket
PerBucket = lists:foldl(fun({B,K},D) -> dict:append(B,K,D) end, dict:new(),
dict:new(), BKeys),
Result =
dict:fold(
fun(Bucket, StorageKey, Acc = {Success, Ignore, Errors}) ->
{_, Mod, ModState} = Backend = get_backend(Bucket, State),
case backend_can_index_reformat(Mod, ModState) of
true ->
{S, I, E} = backend_fix_index(Backend, Bucket,
StorageKey, ForUpgrade),
{Success + S, Ignore + I, Errors + E};
false ->
Acc
end
end, {0, 0, 0}, PerBucket),
{reply, Result, State}.

backend_fix_index({_, Mod, ModState}, Bucket, StorageKey, ForUpgrade, State) ->
backend_fix_index({_, Mod, ModState}, Bucket, StorageKey, ForUpgrade) ->
case Mod:fix_index(Bucket, StorageKey, ForUpgrade, ModState) of
{ok, _UpModState} -> {ok, State};
{error, Reason} -> {error, Reason}
{reply, Reply, _UpModState} ->
Reply;
{error, Reason} ->
lager:error("Failed to fix index for bucket ~p, key ~p, backend ~p: ~p",
[Bucket, StorageKey, Mod, Reason]),
{0, 0, length(StorageKey)}
end.

%% ===================================================================
Expand Down Expand Up @@ -479,7 +495,7 @@ backend_fold_fun(ModFun, FoldFun, Opts, AsyncFold) ->
%% if it supports asynchronous folding.
{ok, ModCaps} = Module:capabilities(SubState),
DoAsync = AsyncFold andalso lists:member(async_fold, ModCaps),
Indexes = proplists:get_value(indexes, Opts),
Indexes = lists:keyfind(index, 1, Opts),
case Indexes of
{index, incorrect_format, _ForUpgrade} ->
case lists:member(index_reformat, ModCaps) of
Expand Down
6 changes: 4 additions & 2 deletions src/riak_kv_vnode.erl
Expand Up @@ -529,7 +529,7 @@ handle_command({fix_incorrect_index_entry, Keys, ForUpgrade},
State=#state{mod=Mod,
modstate=ModState}) ->
Reply =
case Mod:fix_index(undefined, Keys, ForUpgrade, ModState) of
case Mod:fix_index(Keys, ForUpgrade, ModState) of
{ok, _UpModState} ->
ok;
{ignore, _UpModState} ->
Expand Down Expand Up @@ -557,6 +557,8 @@ handle_command({get_index_entries, Opts},
BufferMod = riak_kv_fold_buffer,
ResultFun =
fun(Results) ->
% Send result batch and wait for acknowledgement
% before moving on (backpressure to avoid flooding caller).
BatchRef = make_ref(),
riak_core_vnode:reply(Sender, {self(), BatchRef, Results}),
Monitor = riak_core_vnode:monitor(Sender),
Expand All @@ -568,7 +570,7 @@ handle_command({get_index_entries, Opts},
end
end,
Buffer = BufferMod:new(BufferSize, ResultFun),
FoldFun = fold_fun(keys, BufferMod, none),
FoldFun = fun(B, K, Buf) -> BufferMod:add({B, K}, Buf) end,
FinishFun = fun(_) -> riak_core_vnode:reply(Sender, done) end,
FoldOpts = [{index, incorrect_format, ForUpgrade}, async_fold],
case list(FoldFun, FinishFun, Mod, fold_keys, ModState, FoldOpts, Buffer) of
Expand Down

0 comments on commit 371f480

Please sign in to comment.