Skip to content

Commit

Permalink
Merge pull request #1361 from basho/feature/fd/hashtree-update-callback
Browse files Browse the repository at this point in the history
Added a version of update that takes a Callback

Reviewed-by: JeetKunDoug
  • Loading branch information
borshop committed Mar 9, 2016
2 parents 10dfd74 + b40628f commit 1ab5f5f
Showing 1 changed file with 40 additions and 14 deletions.
54 changes: 40 additions & 14 deletions src/riak_kv_index_hashtree.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
hash_index_data/1,
hash_object/2,
update/2,
update/3,
start_exchange_remote/4,
delete/2,
async_delete/2,
Expand All @@ -70,6 +71,7 @@
-type proplist() :: proplists:proplist().
-type riak_object_t2b() :: binary().
-type hashtree() :: hashtree:hashtree().
-type update_callback() :: fun(() -> term()).

-record(state, {index,
vnode_pid,
Expand Down Expand Up @@ -145,7 +147,14 @@ start_exchange_remote(FsmPid, From, IndexN, Tree) ->
%% @doc Update all hashtrees managed by the provided index_hashtree pid.
-spec update(index_n(), pid()) -> ok | not_responsible.
update(Id, Tree) ->
gen_server:call(Tree, {update_tree, Id}, infinity).
update(Id, Tree, undefined).

%% @doc Update all hashtrees managed by the provided index_hashtree pid.
-spec update(index_n(), pid(), undefined | update_callback()) -> ok | not_responsible.
update(Id, Tree, undefined) ->
gen_server:call(Tree, {update_tree, Id, undefined}, infinity);
update(Id, Tree, Callback) when is_function(Callback) ->
gen_server:call(Tree, {update_tree, Id, Callback}, infinity).

%% @doc Return a hash bucket from the tree identified by the given tree id
%% that is managed by the provided index_hashtree.
Expand Down Expand Up @@ -296,21 +305,15 @@ handle_call({delete, Items}, _From, State) ->
handle_call(get_trees, _From, #state{trees=Trees}=State) ->
{reply, Trees, State};

handle_call({update_tree, Id}, From, State) ->
handle_call({update_tree, Id, Callback}, From, State) ->
lager:debug("Updating tree: (vnode)=~p (preflist)=~p", [State#state.index, Id]),
apply_tree(Id,
fun(Tree) ->
{SnapTree, Tree2} = hashtree:update_snapshot(Tree),
Self = self(),
spawn_link(
fun() ->
_ = hashtree:update_perform(SnapTree),
gen_server:cast(Self, {updated, Id}),
gen_server:reply(From, ok)
end),
{noreply, Tree2}
end,
State);
fun(Tree) ->
NewTree = snapshot_and_async_update_tree(Tree, Id, From, Callback),
{noreply, NewTree}
end,
State
);

handle_call({exchange_bucket, Id, Level, Bucket}, _From, State) ->
apply_tree(Id,
Expand Down Expand Up @@ -1029,3 +1032,26 @@ maybe_get_vnode_lock(SrcPartition, Pid) ->
false ->
ok
end.

snapshot_and_async_update_tree(Tree, Id, From, Callback) ->
{SnapTree, Tree2} = hashtree:update_snapshot(Tree),
Self = self(),
spawn_link(
fun() ->
try maybe_callback(Callback)
catch
_:E ->
lager:error(
"An error occurred in update callback: ~p. "
"Ignoring error and proceeding with update.", [E])
end,
_ = hashtree:update_perform(SnapTree),
gen_server:cast(Self, {updated, Id}),
gen_server:reply(From, ok)
end),
Tree2.

maybe_callback(undefined) ->
ok;
maybe_callback(Callback) ->
Callback().

0 comments on commit 1ab5f5f

Please sign in to comment.