Skip to content

Commit

Permalink
use list pipe enqueue for mapred_2i_pipe=false inputs as well
Browse files Browse the repository at this point in the history
  • Loading branch information
beerriot committed May 16, 2012
1 parent 25c99ac commit fcfc9e7
Showing 1 changed file with 20 additions and 15 deletions.
35 changes: 20 additions & 15 deletions src/riak_kv_mrc_pipe.erl
Original file line number Diff line number Diff line change
Expand Up @@ -511,16 +511,10 @@ send_inputs(Pipe, Inputs) ->
%% a list of keys.
-spec send_inputs(riak_pipe:pipe(), input(), timeout()) ->
ok | term().
send_inputs(Pipe, BucketKeyList, Timeout) when is_list(BucketKeyList) ->
%% TODO: use core abilities to decide whether to use list inputs
case riak_pipe:queue_work_list(Pipe, BucketKeyList) of
[] ->
riak_pipe:eoi(Pipe),
ok;
Rest ->
%% TODO: timeout, sleep?
send_inputs(Pipe, Rest, Timeout)
end;
send_inputs(Pipe, BucketKeyList, _Timeout) when is_list(BucketKeyList) ->
queue_whole_list(Pipe, BucketKeyList),
riak_pipe:eoi(Pipe),
ok;
send_inputs(Pipe, Bucket, Timeout) when is_binary(Bucket) ->
riak_kv_pipe_listkeys:queue_existing_pipe(Pipe, Bucket, Timeout);
send_inputs(Pipe, {Bucket, FilterExprs}, Timeout) ->
Expand Down Expand Up @@ -581,20 +575,21 @@ send_key_list(Pipe, Bucket, ReqId) ->
receive
{ReqId, {keys, Keys}} ->
%% Get results from list keys operation.
[riak_pipe:queue_work(Pipe, {Bucket, Key})
|| Key <- Keys],
BKeys = [ {Bucket, Key} || Key <- Keys ],
queue_whole_list(Pipe, BKeys),
send_key_list(Pipe, Bucket, ReqId);

{ReqId, {results, Results}} ->
%% Get results from 2i operation. Handle both [Keys] and [{Key,
%% Props}] formats. If props exists, use it as keydata.
F = fun
({Key, Props}) ->
riak_pipe:queue_work(Pipe, {{Bucket, Key}, Props});
{{Bucket, Key}, Props};
(Key) ->
riak_pipe:queue_work(Pipe, {Bucket, Key})
{Bucket, Key}
end,
[F(X) || X <- Results],
BKeys = [F(X) || X <- Results],
queue_whole_list(Pipe, BKeys),
send_key_list(Pipe, Bucket, ReqId);

{ReqId, {error, Reason}} ->
Expand All @@ -606,6 +601,16 @@ send_key_list(Pipe, Bucket, ReqId) ->
ok
end.

queue_whole_list(Pipe, BKeys) ->
%% TODO: use core abilities to decide whether to use list inputs
case riak_pipe:queue_work_list(Pipe, BKeys) of
[] ->
ok;
Rest ->
%% TODO: timeout, sleep?
queue_whole_list(Pipe, Rest)
end.

%% @equiv collect_outputs(Pipe, NumKeeps, 60000)
collect_outputs(Pipe, NumKeeps) ->
collect_outputs(Pipe, NumKeeps, ?DEFAULT_TIMEOUT).
Expand Down

0 comments on commit fcfc9e7

Please sign in to comment.