From b40628f9573f8e1caec29cfb1287de31f692c145 Mon Sep 17 00:00:00 2001 From: Fred Dushin Date: Fri, 26 Feb 2016 18:07:41 -0500 Subject: [PATCH] Added a version of update that takes a Callback, which will get called after the hashtree has been snapshotted, but before the hash tree update is initiated. This callback is currently being used by the batching feature of Yokozuna, to minimize differences between the YZ and KV AAE hash trees. --- src/riak_kv_index_hashtree.erl | 54 +++++++++++++++++++++++++--------- 1 file changed, 40 insertions(+), 14 deletions(-) diff --git a/src/riak_kv_index_hashtree.erl b/src/riak_kv_index_hashtree.erl index 916fe57104..52de698f21 100644 --- a/src/riak_kv_index_hashtree.erl +++ b/src/riak_kv_index_hashtree.erl @@ -48,6 +48,7 @@ hash_index_data/1, hash_object/2, update/2, + update/3, start_exchange_remote/4, delete/2, async_delete/2, @@ -70,6 +71,7 @@ -type proplist() :: proplists:proplist(). -type riak_object_t2b() :: binary(). -type hashtree() :: hashtree:hashtree(). +-type update_callback() :: fun(() -> term()). -record(state, {index, vnode_pid, @@ -145,7 +147,14 @@ start_exchange_remote(FsmPid, From, IndexN, Tree) -> %% @doc Update all hashtrees managed by the provided index_hashtree pid. -spec update(index_n(), pid()) -> ok | not_responsible. update(Id, Tree) -> - gen_server:call(Tree, {update_tree, Id}, infinity). + update(Id, Tree, undefined). + +%% @doc Update all hashtrees managed by the provided index_hashtree pid. +-spec update(index_n(), pid(), undefined | update_callback()) -> ok | not_responsible. +update(Id, Tree, undefined) -> + gen_server:call(Tree, {update_tree, Id, undefined}, infinity); +update(Id, Tree, Callback) when is_function(Callback) -> + gen_server:call(Tree, {update_tree, Id, Callback}, infinity). %% @doc Return a hash bucket from the tree identified by the given tree id %% that is managed by the provided index_hashtree. @@ -296,21 +305,15 @@ handle_call({delete, Items}, _From, State) -> handle_call(get_trees, _From, #state{trees=Trees}=State) -> {reply, Trees, State}; -handle_call({update_tree, Id}, From, State) -> +handle_call({update_tree, Id, Callback}, From, State) -> lager:debug("Updating tree: (vnode)=~p (preflist)=~p", [State#state.index, Id]), apply_tree(Id, - fun(Tree) -> - {SnapTree, Tree2} = hashtree:update_snapshot(Tree), - Self = self(), - spawn_link( - fun() -> - _ = hashtree:update_perform(SnapTree), - gen_server:cast(Self, {updated, Id}), - gen_server:reply(From, ok) - end), - {noreply, Tree2} - end, - State); + fun(Tree) -> + NewTree = snapshot_and_async_update_tree(Tree, Id, From, Callback), + {noreply, NewTree} + end, + State + ); handle_call({exchange_bucket, Id, Level, Bucket}, _From, State) -> apply_tree(Id, @@ -1029,3 +1032,26 @@ maybe_get_vnode_lock(SrcPartition, Pid) -> false -> ok end. + +snapshot_and_async_update_tree(Tree, Id, From, Callback) -> + {SnapTree, Tree2} = hashtree:update_snapshot(Tree), + Self = self(), + spawn_link( + fun() -> + try maybe_callback(Callback) + catch + _:E -> + lager:error( + "An error occurred in update callback: ~p. " + "Ignoring error and proceeding with update.", [E]) + end, + _ = hashtree:update_perform(SnapTree), + gen_server:cast(Self, {updated, Id}), + gen_server:reply(From, ok) + end), + Tree2. + +maybe_callback(undefined) -> + ok; +maybe_callback(Callback) -> + Callback(). \ No newline at end of file