Permalink
Browse files

Change process_results functions in keys and buckets fsms to return a…

… completion tuple when last results from a VNode have been received.
  • Loading branch information...
kellymclaughlin committed Jul 18, 2011
1 parent 4f18dc9 commit 12dcd5a1792dc2ab5584eda047832c4fdaa57fda
Showing with 54 additions and 24 deletions.
  1. +10 −0 src/riak_kv_buckets_fsm.erl
  2. +23 −1 src/riak_kv_keys_fsm.erl
  3. +21 −23 src/riak_kv_vnode.erl
@@ -67,6 +67,16 @@ process_results({results, VNode, Buckets},
false -> % Ignore a response from a VNode that
% is not part of the coverage plan
StateData
+ end;
+process_results({final_results, VNode, Buckets},
+ CoverageVNodes,
+ StateData=#state{buckets=BucketAcc}) ->
+ case lists:member(VNode, CoverageVNodes) of
+ true -> % Received an expected response from a Vnode
+ {done, VNode, StateData#state{buckets=(Buckets ++ BucketAcc)}};
+ false -> % Ignore a response from a VNode that
+ % is not part of the coverage plan
+ StateData
end.
finish({error, Error},
View
@@ -91,7 +91,29 @@ process_results({results, VNode, {Bucket, Keys}},
% is not part of the coverage plan
ignore
end,
- StateData.
+ StateData;
+process_results({final_results, VNode, {Bucket, Keys}},
+ CoverageVNodes,
+ StateData=#state{client_type=ClientType,
+ from={raw, ReqId, ClientPid}}) ->
+ case lists:member(VNode, CoverageVNodes) of
+ true -> % Received an expected response from a Vnode
+ case ClientType of
+ mapred ->
+ try
+ luke_flow:add_inputs(ClientPid, [{Bucket, Key} || Key <- Keys])
+ catch _:_ ->
+ exit(self(), normal)
+ end;
+ plain -> ClientPid ! {ReqId, {keys, Keys}}
+ end,
+ %% Inform the coverage fsm that all results
+ %% are in for this vnode.
+ {done, VNode, StateData};
+ false -> % Ignore a response from a VNode that
+ % is not part of the coverage plan
+ StateData
+ end.
finish({error, Error},
StateData=#state{from={raw, ReqId, ClientPid},
View
@@ -196,7 +196,7 @@ handle_command(?FOLD_REQ{foldfun=Fun, acc0=Acc},_Sender,State) ->
{reply, Reply, State};
handle_command({?KV_LISTBUCKETS_REQ{item_filter=ItemFilter}, _FilterVNodes},
Sender,
- State=#state{mod=Mod,
+ State=#state{mod=Mod,
modstate=ModState,
idx=Index}) ->
%% Construct the filter function
@@ -207,15 +207,15 @@ handle_command({?KV_LISTKEYS_REQ{bucket=Bucket,
item_filter=ItemFilter},
FilterVNodes},
Sender,
- State=#state{mod=Mod,
+ State=#state{mod=Mod,
modstate=ModState,
idx=Index}) ->
%% Construct the filter function
FilterVNode = proplists:get_value(Index, FilterVNodes),
Filter = riak_kv_coverage_filter:build_filter(Bucket, ItemFilter, FilterVNode),
list_keys(Sender, Bucket, Filter, Index, Mod, ModState),
{noreply, State};
-
+
%% Commands originating from inside this vnode
handle_command({backend_callback, Ref, Msg}, _Sender,
State=#state{mod=Mod, modstate=ModState}) ->
@@ -286,12 +286,12 @@ terminate(_Reason, #state{mod=Mod, modstate=ModState}) ->
Mod:stop(ModState),
ok.
-handle_exit(_Pid, _Reason, State) ->
+handle_exit(_Pid, _Reason, State) ->
%% A linked processes has died so the vnode
%% process should take appropriate action here.
- %% The default behavior is to crash the vnode
+ %% The default behavior is to crash the vnode
%% process so that it can be respawned
- %% by riak_core_vnode_master to prevent
+ %% by riak_core_vnode_master to prevent
%% messages from stacking up on the process message
%% queue and never being processed.
{stop, linked_process_crash, State}.
@@ -409,7 +409,7 @@ syntactic_put_merge(Mod, ModState, BKey, Obj1, ReqId, StartTime) ->
{ok, Val0} ->
Obj0 = binary_to_term(Val0),
ResObj = riak_object:syntactic_merge(
- Obj0,Obj1,term_to_binary(ReqId), StartTime),
+ Obj0,Obj1,term_to_binary(ReqId), StartTime),
case riak_object:vclock(ResObj) =:= riak_object:vclock(Obj0) of
true -> {oldobj, ResObj};
false -> {newobj, ResObj}
@@ -453,18 +453,17 @@ do_get_binary(BKey, Mod, ModState) ->
list_buckets(Caller, Filter, Index, Mod, ModState) ->
%% TODO: Decide if we want to continue to allow key filters
%% to be used to filter the list of buckets. I think it is
- %% more useful to move all filtering out of the backend and
+ %% more useful to move all filtering out of the backend and
%% not have to force all backends to fold over all keys
%% to generate a list of buckets.
Buckets = Mod:list_bucket(ModState, '_'),
case Filter of
none ->
- riak_core_vnode:reply(Caller, {results, {Index, node()}, Buckets});
+ riak_core_vnode:reply(Caller, {final_results, {Index, node()}, Buckets});
_ ->
FilteredBuckets = lists:foldl(Filter, [], Buckets),
- riak_core_vnode:reply(Caller, {results, {Index, node()}, FilteredBuckets})
- end,
- riak_core_vnode:reply(Caller, {done, {Index, node()}}).
+ riak_core_vnode:reply(Caller, {final_results, {Index, node()}, FilteredBuckets})
+ end.
%% @private
list_keys(Caller, Bucket, Filter, Index, Mod, ModState) ->
@@ -498,12 +497,11 @@ list_keys(Caller, Bucket, Filter, Index, Mod, ModState) ->
end, try_next, TryFuns),
case Filter of
none ->
- riak_core_vnode:reply(Caller, {results, {Index, node()}, {Bucket, Keys}});
+ riak_core_vnode:reply(Caller, {final_results, {Index, node()}, {Bucket, Keys}});
_ ->
FilteredKeys = lists:foldl(Filter, [], Keys),
- riak_core_vnode:reply(Caller, {results, {Index, node()}, {Bucket, FilteredKeys}})
- end,
- riak_core_vnode:reply(Caller, {done, {Index, node()}}).
+ riak_core_vnode:reply(Caller, {final_results, {Index, node()}, {Bucket, FilteredKeys}})
+ end.
%% @private
do_delete(BKey, Mod, ModState) ->
@@ -521,14 +519,14 @@ process_keys(_Caller, _Index, _Bucket, _Filter, {_B, _K}, Acc) ->
Acc.
buffer_key_result(Caller, Bucket, Filter, Index, Acc) ->
- %% Use arbitrary fixed buffer size of 100. Not
+ %% Use arbitrary fixed buffer size of 100. Not
%% sure there is a good 'why' for that number.
case length(Acc) >= 100 of
true ->
%% Filter the buffer keys as needed
- case Filter of
+ case Filter of
none ->
- riak_core_vnode:reply(Caller, {results, {Index, node()}, {Bucket, Acc}});
+ riak_core_vnode:reply(Caller, {results, {Index, node()}, {Bucket, Acc}});
_ ->
FilteredKeys = lists:foldl(Filter, [], Acc),
case FilteredKeys of
@@ -744,8 +742,8 @@ result_listener_buckets(Acc) ->
receive
{'$gen_event', {_,{results,_,Results}}} ->
result_listener_keys(Results ++ Acc);
- {'$gen_event', {_,{done,_}}} ->
- result_listener_done(Acc)
+ {'$gen_event', {_,{final_results,_,Results}}} ->
+ result_listener_done(Results ++ Acc)
after 5000 ->
result_listener_done({timeout, Acc})
end.
@@ -754,8 +752,8 @@ result_listener_keys(Acc) ->
receive
{'$gen_event', {_,{results,_,{_Bucket, Results}}}} ->
result_listener_keys(Results ++ Acc);
- {'$gen_event', {_,{done,_}}} ->
- result_listener_done(Acc)
+ {'$gen_event', {_,{final_results,_,{_Bucket, Results}}}} ->
+ result_listener_done(Results ++ Acc)
after 5000 ->
result_listener_done({timeout, Acc})
end.

0 comments on commit 12dcd5a

Please sign in to comment.