Skip to content

Commit

Permalink
* make sure to call hashtree:flush_buffer on all trees before build/t…
Browse files Browse the repository at this point in the history
…ime, meta for build_finished and persist flushed trees

* add open/close (check) meta calls from hashtree lib
* handle closing trees securely, and set next_rebuild to full if not a normal/shutdown reason of the vnode
* Added explicit riak_kv_index_hashtree close/sync_stop call to KV vnode.
  Without it, the node can exit before the hashtree has a chance
  to close because riak_kv_index_hashtree is not supervised inside
  riak_kv, only monitored.
* set_rebuild on close depending on vnode shutdown/normal reason
* mark empty {1,0} within clear_trees and true/false in init, dependent on the vnode being empty or not; have do_new_tree mark correctly based on empty|open
* determine update or flush by next_rebuild of tree on close
* cast to explicitly set incr rebuild after update_perform
  • Loading branch information
zeeshanlakhani committed Oct 30, 2015
1 parent bb9935f commit ff087b9
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 41 deletions.
143 changes: 103 additions & 40 deletions src/riak_kv_index_hashtree.erl
Expand Up @@ -54,6 +54,7 @@
insert/3,
async_insert/3,
stop/1,
sync_stop/1,
clear/1,
expire/1,
destroy/1,
Expand Down Expand Up @@ -207,6 +208,12 @@ poke(Tree) ->
stop(Tree) ->
gen_server:cast(Tree, stop).

%% @doc Terminate the specified index_hashtree and wait for exit
sync_stop(undefined) ->
ok;
sync_stop(Tree) ->
gen_server:call(Tree, stop).

%% @doc Destroy the specified index_hashtree, which will destroy all
%% associated hashtrees and terminate.
-spec destroy(pid()) -> ok.
Expand Down Expand Up @@ -258,7 +265,7 @@ init([Index, VNPid, Opts]) ->
use_2i=Use2i,
path=Path},
IndexNs = responsible_preflists(State),
State2 = init_trees(IndexNs, State),
State2 = init_trees(IndexNs, VNEmpty, State),
%% If vnode is empty, mark tree as built without performing fold
case VNEmpty of
true ->
Expand All @@ -271,7 +278,7 @@ init([Index, VNPid, Opts]) ->
end.

handle_call({new_tree, Id}, _From, State) ->
State2 = do_new_tree(Id, State),
State2 = do_new_tree(Id, State, mark_open),
{reply, ok, State2};

handle_call({get_lock, Type, Pid}, _From, State) ->
Expand All @@ -294,10 +301,13 @@ handle_call({update_tree, Id}, From, State) ->
apply_tree(Id,
fun(Tree) ->
{SnapTree, Tree2} = hashtree:update_snapshot(Tree),
spawn_link(fun() ->
_ = hashtree:update_perform(SnapTree),
gen_server:reply(From, ok)
end),
Self = self(),
spawn_link(
fun() ->
_ = hashtree:update_perform(SnapTree),
gen_server:cast(Self, {updated, Id}),
gen_server:reply(From, ok)
end),
{noreply, Tree2}
end,
State);
Expand Down Expand Up @@ -353,6 +363,10 @@ handle_call({estimate_keys, IndexN}, _From, State=#state{trees=Trees}) ->
{reply, not_responsible, State}
end;

handle_call(stop, _From, State0) ->
State1 = close_trees(State0),
{stop, normal, ok, State1};

handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
Expand All @@ -361,9 +375,9 @@ handle_cast(poke, State) ->
State2 = do_poke(State),
{noreply, State2};

handle_cast(stop, State) ->
close_trees(State),
{stop, normal, State};
handle_cast(stop, State0) ->
State1 = close_trees(State0),
{stop, normal, State1};

handle_cast({insert, Items, Options}, State) ->
State2 = do_insert(Items, Options, State),
Expand Down Expand Up @@ -392,13 +406,30 @@ handle_cast({start_exchange_remote, FsmPid, From, _IndexN}, State) ->
{noreply, State2}
end;

handle_cast({updated, Id}, State) ->
Fun = fun(Tree) ->
{noreply, hashtree:set_next_rebuild(Tree, incremental)}
end,
apply_tree(Id, Fun, State);

handle_cast(_Msg, State) ->
{noreply, State}.

handle_info({'DOWN', _, _, Pid, _}, State) when Pid == State#state.vnode_pid ->
handle_info({'DOWN', _, _, Pid, Reason}, State0=#state{trees=Trees0, vnode_pid=VnodePid})
when Pid == VnodePid ->
%% vnode has terminated, exit as well
close_trees(State),
{stop, normal, State};
%% exiting as quickly as possible to reduce the window on the vnode restart
%% and force full rebuild for next restart
Trees = case Reason of
normal ->
Trees0;
shutdown ->
Trees0;
_ ->
[hashtree:set_next_rebuild(Tree, full) || {_IdxN, Tree} <- Trees0]
end,
State1 = close_trees(State0#state{trees=Trees}),
{stop, normal, State1};
handle_info({'DOWN', Ref, _, _, _}, State) ->
State2 = maybe_release_lock(Ref, State),
{noreply, State2};
Expand Down Expand Up @@ -434,11 +465,19 @@ determine_data_root() ->
end
end.

-spec init_trees([index_n()], state()) -> state().
init_trees(IndexNs, State) ->
State2 = lists:foldl(fun(Id, StateAcc) ->
do_new_tree(Id, StateAcc)
end, State, IndexNs),
%% @doc Init the trees.
%%
%% MarkEmpty is a boolean dictating whether we're marking the tree empty for the
%% the first creation or just marking it open instead.
-spec init_trees([index_n()], boolean(), state()) -> state().
init_trees(IndexNs, MarkEmpty, State) ->
State2 = lists:foldl(
fun(Id, StateAcc) ->
case MarkEmpty of
true -> do_new_tree(Id, StateAcc, mark_empty);
false -> do_new_tree(Id, StateAcc, mark_open)
end
end, State, IndexNs),
State2#state{built=false, expired=false}.

-spec load_built(state()) -> boolean().
Expand Down Expand Up @@ -483,7 +522,7 @@ hash_index_data(IndexData) when is_list(IndexData) ->
fold_keys(Partition, Tree, HasIndexTree) ->
FoldFun = fold_fun(Tree, HasIndexTree),
Req = riak_core_util:make_fold_req(FoldFun,
0, false,
0, false,
[aae_reconstruction,
{iterator_refresh, true}]),
riak_core_vnode_master:sync_command({Partition, node()},
Expand Down Expand Up @@ -555,22 +594,28 @@ index_fold_fun(Tree) ->
index_2i_n() ->
?INDEX_2I_N.

%% Generate a new {@link hashtree} for the specified `index_n'. If this is
%% the first hashtree created by this index_hashtree, then open/create a new
%% on-disk store at `segment_path'. Otherwise, re-use the store from the first
%% tree. In other words, all hashtrees for a given index_hashtree are stored in
%% the same on-disk store.
-spec do_new_tree(index_n(), state()) -> state().
do_new_tree(Id, State=#state{trees=Trees, path=Path}) ->
%% @doc Generate a new {@link hashtree} for the specified `index_n'. If this is
%% the first hashtree created by this index_hashtree, then open/create a new
%% on-disk store at `segment_path'. Otherwise, re-use the store from the first
%% tree. In other words, all hashtrees for a given index_hashtree are stored in
%% the same on-disk store.
%%
%% Also, write to the tree-id's `opened' meta and check if `opened' =:= `closed'.
-spec do_new_tree(index_n(), state(), mark_open|mark_empty) -> state().
do_new_tree(Id, State=#state{trees=Trees, path=Path}, MarkType) ->
Index = State#state.index,
IdBin = tree_id(Id),
NewTree = case Trees of
NewTree0 = case Trees of
[] ->
hashtree:new({Index,IdBin}, [{segment_path, Path}]);
[{_,Other}|_] ->
hashtree:new({Index,IdBin}, Other)
end,
Trees2 = orddict:store(Id, NewTree, Trees),
end,
NewTree1 = case MarkType of
mark_empty -> hashtree:mark_open_empty(Id, NewTree0);
mark_open -> hashtree:mark_open_and_check(Id, NewTree0)
end,
Trees2 = orddict:store(Id, NewTree1, Trees),
State#state{trees=Trees2}.

-spec do_get_lock(any(), pid(), state()) -> {not_built | ok | already_locked, state()}.
Expand Down Expand Up @@ -619,14 +664,17 @@ apply_tree(Id, Fun, State=#state{trees=Trees}) ->
end.

-spec do_build_finished(state()) -> state().
do_build_finished(State=#state{index=Index, built=_Pid}) ->
do_build_finished(State=#state{index=Index, built=_Pid, trees=Trees0}) ->
lager:debug("Finished build: ~p", [Index]),
{_,Tree0} = hd(State#state.trees),
Trees = orddict:map(fun(_Id, Tree) ->
hashtree:flush_buffer(Tree)
end, Trees0),
{_, Tree0} = hd(Trees),
BuildTime = get_build_time(Tree0),
_ = hashtree:write_meta(<<"built">>, <<1>>, Tree0),
_ = hashtree:write_meta(<<"build_time">>, term_to_binary(BuildTime), Tree0),
riak_kv_entropy_info:tree_built(Index, BuildTime),
State#state{built=true, build_time=BuildTime, expired=false}.
State#state{built=true, build_time=BuildTime, expired=false, trees=Trees}.

%% Determine the build time for all trees associated with this
%% index. The build time is stored as metadata in the on-disk file. If
Expand Down Expand Up @@ -683,7 +731,7 @@ expand_item(_, Item, Others) ->
do_insert_expanded([], _Opts, State) ->
State;
do_insert_expanded([{Id, Key, Hash}|Rest], Opts, State=#state{trees=Trees}) ->
State2 =
State2 =
case orddict:find(Id, Trees) of
{ok, Tree} ->
Tree2 = hashtree:insert(Key, Hash, Tree, Opts),
Expand Down Expand Up @@ -782,7 +830,7 @@ handle_unexpected_key(Id, Key, State=#state{index=Partition}) ->
%% be resolved whenever trees are eventually rebuilt, either
%% after normal expiration or after a future unexpected value
%% triggers the alternate case clause above.
State2 = do_new_tree(Id, State),
State2 = do_new_tree(Id, State, mark_open),
State2
end
end.
Expand Down Expand Up @@ -842,12 +890,12 @@ clear_tree(State=#state{index=Index}) ->
lager:info("Clearing AAE tree: ~p", [Index]),
IndexNs = responsible_preflists(State),
State2 = destroy_trees(State),
State3 = init_trees(IndexNs, State2#state{trees=orddict:new()}),
State3 = init_trees(IndexNs, true, State2#state{trees=orddict:new()}),
State3#state{built=false, expired=false}.

destroy_trees(State) ->
State2 = close_trees(State),
{_,Tree0} = hd(State2#state.trees),
{_,Tree0} = hd(State#state.trees), % deliberately using state with live db ref
_ = hashtree:destroy(Tree0),
State2.

Expand Down Expand Up @@ -879,7 +927,7 @@ build_or_rehash(Self, Locked, Type, #state{index=Index, trees=Trees}) ->
{true, build} ->
lager:info("Starting AAE tree build: ~p", [Index]),
fold_keys(Index, Self, has_index_tree(Trees)),
lager:info("Finished AAE tree build: ~p", [Index]),
lager:info("Finished AAE tree build: ~p", [Index]),
gen_server:cast(Self, build_finished);
{true, rehash} ->
lager:debug("Starting AAE tree rehash: ~p", [Index]),
Expand Down Expand Up @@ -919,17 +967,32 @@ maybe_rebuild(State) ->
has_index_tree(Trees) ->
orddict:is_key(?INDEX_2I_N, Trees).

close_trees(State=#state{trees=undefined}) ->
State;
close_trees(State=#state{trees=Trees}) ->
Trees2 = [begin
NewTree = try
hashtree:flush_buffer(Tree)
catch _:_ ->
case hashtree:next_rebuild(Tree) of
%% Not marking close cleanly to avoid the
%% cost of a full rebuild on shutdown.
full ->
lager:info("Deliberately marking KV hashtree ~p"
++ " for full rebuild on next restart",
[IdxN]),
hashtree:flush_buffer(Tree);
incremental ->
HT = hashtree:update_tree(Tree),
hashtree:mark_clean_close(IdxN, HT)
end
catch _:Err ->
lager:warning("Failed to flush/update trees"
++ " during close | Error: ~p", [Err]),
Tree
end,
{IdxN, NewTree}
end || {IdxN, Tree} <- Trees],
Trees3 = [{IdxN, hashtree:close(Tree)} || {IdxN, Tree} <- Trees2],
State#state{trees=Trees3}.
_ = [hashtree:close(Tree) || {_IdxN, Tree} <- Trees2],
State#state{trees=undefined}.

get_all_locks(Type, Index, Pid) ->
case riak_kv_entropy_manager:get_lock(Type, Pid) of
Expand Down
9 changes: 8 additions & 1 deletion src/riak_kv_vnode.erl
Expand Up @@ -1075,8 +1075,15 @@ delete(State=#state{idx=Index,mod=Mod, modstate=ModState}) ->
end,
{ok, State#state{modstate=UpdModState,vnodeid=undefined,hashtrees=undefined}}.

terminate(_Reason, #state{mod=Mod, modstate=ModState}) ->
terminate(_Reason, #state{mod=Mod, modstate=ModState,hashtrees=Trees}) ->
Mod:stop(ModState),

%% Explicitly stop the hashtree rather than relying on the process monitor
%% to detect the vnode exit. As riak_kv_index_hashtree is not a supervised
%% process in the riak_kv application, on graceful shutdown riak_kv and
%% riak_core can complete their shutdown before the hashtree is written
%% to disk causing the hashtree to be closed dirty.
riak_kv_index_hashtree:sync_stop(Trees),
ok.

handle_info({set_concurrency_limit, Lock, Limit}, State) ->
Expand Down

5 comments on commit ff087b9

@borshop
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

saw approval from jonmeredith
at ff087b9

@borshop
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merging basho/riak_kv/bugfix/zl/hashtree-cleanup-for-meta-counter+flush = ff087b9 into borshop-integration-1242-bugfix/zl/hashtree-cleanup-for-meta-counter+flush

@borshop
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basho/riak_kv/bugfix/zl/hashtree-cleanup-for-meta-counter+flush = ff087b9 merged ok, testing candidate = 290b35a

@borshop
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@borshop
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fast-forwarding 2.0 to borshop-integration-1242-bugfix/zl/hashtree-cleanup-for-meta-counter+flush = 290b35a

Please sign in to comment.