Skip to content

Commit

Permalink
Add repair ability
Browse files Browse the repository at this point in the history
Add the ability to repair a Search partition.  If a partition's data
is lost or corrupted this command may be used to rebuild it from
adjacent partitions.  This is preferable over a list-keys + re-index
since it is more targeted, should complete more quickly and should put
less strain on the cluster.
  • Loading branch information
rzezeski committed Jun 11, 2012
1 parent 9c23eb6 commit c0b43d3
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 37 deletions.
55 changes: 32 additions & 23 deletions src/merge_index_backend.erl
Expand Up @@ -91,10 +91,38 @@ is_empty(State) ->
Pid = State#state.pid, Pid = State#state.pid,
merge_index:is_empty(Pid). merge_index:is_empty(Pid).


%% Note: Always async folding
fold(FoldFun, Acc, State) -> fold(FoldFun, Acc, State) ->
%% Copied almost verbatim from riak_search_ets_backend. AsyncFold = async_fold_fun(FoldFun, Acc, State),
{ok, FoldBatchSize} = application:get_env(riak_search, fold_batch_size), {async, AsyncFold}.
Fun = fun
drop(State) ->
Pid = State#state.pid,
merge_index:drop(Pid).

%%%===================================================================
%%% Internal Functions
%%%===================================================================

async_fold_fun(FoldFun, Acc, State) ->
fun() ->
{ok, BatchSize} = application:get_env(riak_search, fold_batch_size),
Pid = State#state.pid,
Fun = batch_fold(FoldFun, BatchSize),
{ok, {OuterAcc0, Final, _Count}} =
merge_index:fold(Pid, Fun, {Acc, undefined, 0}),
case Final of
{FoldKey, VPKList} ->
%% one last IFT to send off
FoldFun(FoldKey, VPKList, OuterAcc0);
undefined ->
%% this partition was empty
OuterAcc0
end
end.

batch_fold(FoldFun, FoldBatchSize) ->
fun
(I,F,T,V,P,K, {OuterAcc, {FoldKey = {I,{F,T}}, VPKList}, Count}) -> (I,F,T,V,P,K, {OuterAcc, {FoldKey = {I,{F,T}}, VPKList}, Count}) ->
%% same IFT. If we have reached the fold_batch_size, then %% same IFT. If we have reached the fold_batch_size, then
%% call FoldFun/3 on the batch and start the next %% call FoldFun/3 on the batch and start the next
Expand All @@ -114,26 +142,7 @@ fold(FoldFun, Acc, State) ->
(I,F,T,V,P,K, {OuterAcc, undefined, _Count}) -> (I,F,T,V,P,K, {OuterAcc, undefined, _Count}) ->
%% first round through the fold - just start building %% first round through the fold - just start building
{OuterAcc, {{I,{F,T}},[{V,P,K}]}, 1} {OuterAcc, {{I,{F,T}},[{V,P,K}]}, 1}
end, end.
Pid = State#state.pid,
{ok, {OuterAcc0, Final, _Count}} = merge_index:fold(Pid, Fun, {Acc, undefined, 0}),
OuterAcc = case Final of
{FoldKey, VPKList} ->
%% one last IFT to send off
FoldFun(FoldKey, VPKList, OuterAcc0);
undefined ->
%% this partition was empty
OuterAcc0
end,
{reply, OuterAcc, State}.

drop(State) ->
Pid = State#state.pid,
merge_index:drop(Pid).

%%%===================================================================
%%% Internal Functions
%%%===================================================================


-spec stream_worker(pid(), index(), field(), s_term(), fun(), sender()) -> -spec stream_worker(pid(), index(), field(), s_term(), fun(), sender()) ->
any(). any().
Expand Down
20 changes: 14 additions & 6 deletions src/riak_search_config.erl
@@ -1,6 +1,6 @@
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% %%
%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved. %% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved.
%% %%
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------


Expand All @@ -22,11 +22,12 @@


%% API %% API
-export([ -export([
start_link/0, clear/0,
clear/0, get_all_schemas/0,
get_schema/1, get_raw_schema/1,
get_raw_schema/1, get_schema/1,
put_raw_schema/2 put_raw_schema/2,
start_link/0
]). ]).


%% gen_server callbacks %% gen_server callbacks
Expand All @@ -44,6 +45,13 @@ start_link() ->
clear() -> clear() ->
gen_server:call(?SERVER, clear, infinity). gen_server:call(?SERVER, clear, infinity).


%% @doc Return list of all `Schemas'.
-spec get_all_schemas() -> Schemas::[any()].
get_all_schemas() ->
{ok, C} = riak:local_client(),
{ok, Keys} = C:list_keys(?SCHEMA_BUCKET),
[S || {ok,S} <- [get_schema(K) || K <- Keys]].

%% Get schema information for the provided index name. %% Get schema information for the provided index name.
%% @param Schema - Either the name of an index, or a schema record. %% @param Schema - Either the name of an index, or a schema record.
%% %%
Expand Down
63 changes: 57 additions & 6 deletions src/riak_search_vnode.erl
Expand Up @@ -9,7 +9,10 @@
delete/2, delete/2,
info/5, info/5,
stream/6, stream/6,
range/8 range/8,
repair/1,
repair_status/1,
repair_filter/1
]). ]).
-export([start_vnode/1, init/1, handle_command/3, -export([start_vnode/1, init/1, handle_command/3,
handle_handoff_command/3, handle_handoff_data/2, handle_handoff_command/3, handle_handoff_data/2,
Expand Down Expand Up @@ -91,6 +94,21 @@ sync_command(IndexNode, Msg) ->
riak_core_vnode_master:sync_command(IndexNode, Msg, riak_core_vnode_master:sync_command(IndexNode, Msg,
riak_search_vnode_master, infinity). riak_search_vnode_master, infinity).


%% @doc Repair the index at the given `Partition'.
-spec repair(partition()) ->
{ok, Pairs::[{partition(), node()}]} |
{down, Down::[{partition(), node()}]}.
repair(Partition) ->
Service = riak_search,
MP = {?MODULE, Partition},
FilterModFun = {?MODULE, repair_filter},
riak_core_vnode_manager:repair(Service, MP, FilterModFun).

%% @doc Get the status of the repair process for the given `Partition'.
-spec repair_status(partition()) -> no_repair | repair_in_progress.
repair_status(Partition) ->
riak_core_vnode_manager:repair_status({riak_search_vnode, Partition}).

%% %%
%% Callbacks for riak_core_vnode %% Callbacks for riak_core_vnode
%% %%
Expand All @@ -103,10 +121,10 @@ init([VNodeIndex]) ->
BMod = app_helper:get_env(riak_search, search_backend), BMod = app_helper:get_env(riak_search, search_backend),
Configuration = app_helper:get_env(riak_search), Configuration = app_helper:get_env(riak_search),
{ok, BState} = BMod:start(VNodeIndex, Configuration), {ok, BState} = BMod:start(VNodeIndex, Configuration),
State = #vstate{idx=VNodeIndex, bmod=BMod, bstate=BState},
Pool = {pool, riak_search_worker, 2, []},


{ok, #vstate{idx=VNodeIndex, {ok, State, [Pool]}.
bmod=BMod,
bstate=BState}}.


handle_command(#index_v1{iftvp_list = IFTVPList}, handle_command(#index_v1{iftvp_list = IFTVPList},
_Sender, #vstate{bmod=BMod,bstate=BState}=VState) -> _Sender, #vstate{bmod=BMod,bstate=BState}=VState) ->
Expand Down Expand Up @@ -140,9 +158,15 @@ handle_command(#range_v1{index = Index,


%% Request from core_vnode_handoff_sender - fold function %% Request from core_vnode_handoff_sender - fold function
%% expects to be called with {{Bucket,Key},Value} %% expects to be called with {{Bucket,Key},Value}
handle_command(?FOLD_REQ{foldfun=Fun, acc0=Acc},_Sender, handle_command(?FOLD_REQ{foldfun=Fun, acc0=Acc},
Sender,
#vstate{bmod=BMod,bstate=BState}=VState) -> #vstate{bmod=BMod,bstate=BState}=VState) ->
bmod_response(BMod:fold(Fun, Acc, BState), VState). {async, AsyncFoldFun} = BMod:fold(Fun, Acc, BState),
FinishFun =
fun(FinalAcc) ->
riak_core_vnode:reply(Sender, FinalAcc)
end,
{async, {fold, AsyncFoldFun, FinishFun}, Sender, VState}.


%% Handle a command during handoff - if it's a fold then %% Handle a command during handoff - if it's a fold then
%% make sure it runs locally, otherwise forward it on to the %% make sure it runs locally, otherwise forward it on to the
Expand Down Expand Up @@ -194,3 +218,30 @@ bmod_response({noreply, NewBState}, VState) ->
{noreply, VState#vstate{bstate=NewBState}}; {noreply, VState#vstate{bstate=NewBState}};
bmod_response({reply, Reply, NewBState}, VState) -> bmod_response({reply, Reply, NewBState}, VState) ->
{reply, Reply, VState#vstate{bstate=NewBState}}. {reply, Reply, VState#vstate{bstate=NewBState}}.

%% @private
%%
%% @doc Given a `Target' partition, a `Ring' generate a `Filter' fun
%% to use during partition repair. The `NValMap' is a map from
%% index name to n_val and is needed to determine which hash
%% range a key must fall into to be included. Only non-default
%% schemas will be included in the map.
-spec repair_filter(partition()) -> Filter::function().
repair_filter(Target) ->
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
CH = element(4, Ring),
NValMap = [{S:name(), S:n_val()} ||
S <- riak_search_config:get_all_schemas()],
RangeMap = riak_core_repair:gen_range_map(Target, CH, NValMap),
DefaultN = riak_core_bucket:n_val(riak_core_config:default_bucket_props()),
Default = riak_core_repair:gen_range(Target, CH, DefaultN),
RangeFun = riak_core_repair:gen_range_fun(RangeMap, Default),
fun({I, {F, T}}) ->
Hash = riak_search_ring_utils:calc_partition(I, F, T),
case RangeFun(I) of
{nowrap, GTE, LTE} ->
Hash >= GTE andalso Hash =< LTE;
{wrap, GTE, LTE} ->
Hash >= GTE orelse Hash =< LTE
end
end.
46 changes: 46 additions & 0 deletions src/riak_search_worker.erl
@@ -0,0 +1,46 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------

-module(riak_search_worker).
-behaviour(riak_core_vnode_worker).

-export([init_worker/3,
handle_work/3]).

-include_lib("riak_core/include/riak_core_vnode.hrl").
-record(state, {index :: partition()}).

%% ===================================================================
%% Public API
%% ===================================================================

%% @doc Initialize the worker. Currently only the VNode index
%% parameter is used.
init_worker(VNodeIndex, _Args, _Props) ->
{ok, #state{index=VNodeIndex}}.

%% @doc Perform the asynchronous fold operation.
handle_work({fold, FoldFun, FinishFun}, _Sender, State) ->
try
FinishFun(FoldFun())
catch
receiver_down -> ok
end,
{noreply, State}.
3 changes: 1 addition & 2 deletions src/search.erl
Expand Up @@ -161,7 +161,7 @@ index_docs(Docs) ->
riak_indexed_doc:new(Index, ID, Fields, Props) riak_indexed_doc:new(Index, ID, Fields, Props)
end, end,
IdxDocs = [F(X) || X <- Docs], IdxDocs = [F(X) || X <- Docs],

%% Index the IdxDocs... %% Index the IdxDocs...
{ok, Client} = riak_search:local_client(), {ok, Client} = riak_search:local_client(),
Client:index_docs(IdxDocs), Client:index_docs(IdxDocs),
Expand Down Expand Up @@ -200,7 +200,6 @@ delete_dir(Index, Directory) ->
Client:delete_docs(IdxDocs) Client:delete_docs(IdxDocs)
end, end,
riak_search_dir_indexer:index(Index, Directory, Fun). riak_search_dir_indexer:index(Index, Directory, Fun).



delete_doc(DocIndex, DocID) -> delete_doc(DocIndex, DocID) ->
delete_docs([{DocIndex, DocID}]). delete_docs([{DocIndex, DocID}]).
Expand Down

0 comments on commit c0b43d3

Please sign in to comment.