Permalink
Browse files

Merge branch '1.1'

  • Loading branch information...
2 parents 7965c87 + 4d867bb commit f987b73a43113914f3bc2fbdff4cf0726fa22d61 Jared Morrow committed Mar 1, 2012
Showing with 84 additions and 40 deletions.
  1. +1 −1 src/riak_kv.app.src
  2. +11 −6 src/riak_kv_mrc_map.erl
  3. +18 −8 src/riak_kv_pipe_get.erl
  4. +23 −11 src/riak_kv_pipe_index.erl
  5. +31 −14 src/riak_kv_pipe_listkeys.erl
View
@@ -3,7 +3,7 @@
{application, riak_kv,
[
{description, "Riak Key/Value Store"},
- {vsn, "1.1.0"},
+ {vsn, "1.1.1"},
{applications, [
kernel,
stdlib,
View
@@ -140,8 +140,7 @@ process(Input, _Last,
case map(Phase, Arg, Input) of
{ok, Results} when is_list(Results) ->
?T(_FittingDetails, [map], {produced, Results}),
- send_results(Results, State),
- {ok, State};
+ send_results(Results, State);
{ok, _NonListResults} ->
?T(_FittingDetails, [map, error],
{error, {non_list_result, Input}}),
@@ -226,10 +225,16 @@ bucket_linkfun(Bucket) ->
erlang:make_fun(Module, Function, 3).
%% @doc Send results to the next fitting.
--spec send_results([term()], state()) -> ok.
-send_results(Results, #state{p=P, fd=FD}) ->
- [ riak_pipe_vnode_worker:send_output(R, P, FD) || R <- Results],
- ok.
+-spec send_results([term()], state()) -> {ok | {error, term()}, state()}.
+send_results([], State) ->
+ {ok, State};
+send_results([Result | Results], #state{p=P, fd=FD} = State) ->
+ case riak_pipe_vnode_worker:send_output(Result, P, FD) of
+ ok ->
+ send_results(Results, State);
+ ER ->
+ {ER, State}
+ end.
%% @doc Unused.
-spec done(state()) -> ok.
View
@@ -54,6 +54,8 @@
bkey_nval/1]).
-include("riak_kv_vnode.hrl").
+-include_lib("riak_pipe/include/riak_pipe.hrl").
+-include_lib("riak_pipe/include/riak_pipe_log.hrl").
-export_type([input/0]).
@@ -77,7 +79,7 @@ init(Partition, FittingDetails) ->
%% @doc Lookup the bucket/key pair on the Riak KV vnode, and send it
%% downstream.
-spec process(riak_kv_mrc_pipe:key_input(), boolean(), state())
- -> {ok | forward_preflist, state()}.
+ -> {ok | forward_preflist | {error, term()}, state()}.
process(Input, Last, #state{partition=Partition, fd=FittingDetails}=State) ->
ReqId = make_req_id(),
riak_core_vnode_master:command(
@@ -87,15 +89,23 @@ process(Input, Last, #state{partition=Partition, fd=FittingDetails}=State) ->
riak_kv_vnode_master),
receive
{ReqId, {r, {ok, Obj}, _, _}} ->
- riak_pipe_vnode_worker:send_output(
- {ok, Obj, keydata(Input)}, Partition, FittingDetails),
- {ok, State};
+ case riak_pipe_vnode_worker:send_output(
+ {ok, Obj, keydata(Input)}, Partition, FittingDetails) of
+ ok ->
+ {ok, State};
+ ER ->
+ {ER, State}
+ end;
{ReqId, {r, {error, _} = Error, _, _}} ->
if Last ->
- riak_pipe_vnode_worker:send_output(
- {Error, bkey(Input), keydata(Input)},
- Partition, FittingDetails),
- {ok, State};
+ case riak_pipe_vnode_worker:send_output(
+ {Error, bkey(Input), keydata(Input)},
+ Partition, FittingDetails) of
+ ok ->
+ {ok, State};
+ ER ->
+ {ER, State}
+ end;
true ->
{forward_preflist, State}
end
View
@@ -60,7 +60,7 @@ init(Partition, FittingDetails) ->
%% @doc Process queries indexes on the KV vnode, according to the
%% input bucket and query.
--spec process(term(), boolean(), state()) -> {ok, state()}.
+-spec process(term(), boolean(), state()) -> {ok | {error, term()}, state()}.
process(Input, _Last, #state{p=Partition, fd=FittingDetails}=State) ->
case Input of
{cover, FilterVNodes, {Bucket, Query}} ->
@@ -77,23 +77,33 @@ process(Input, _Last, #state{p=Partition, fd=FittingDetails}=State) ->
FilterVNodes,
{raw, ReqId, self()},
riak_kv_vnode_master),
- keysend_loop(ReqId, Partition, FittingDetails),
- {ok, State}.
+ {keysend_loop(ReqId, Partition, FittingDetails), State}.
keysend_loop(ReqId, Partition, FittingDetails) ->
receive
+ {ReqId, {error, _Reason} = ER} ->
+ ER;
{ReqId, {Bucket, Keys}} ->
- keysend(Bucket, Keys, Partition, FittingDetails),
- keysend_loop(ReqId, Partition, FittingDetails);
+ case keysend(Bucket, Keys, Partition, FittingDetails) of
+ ok ->
+ keysend_loop(ReqId, Partition, FittingDetails);
+ ER ->
+ ER
+ end;
{ReqId, done} ->
ok
end.
-keysend(Bucket, Keys, Partition, FittingDetails) ->
- [ riak_pipe_vnode_worker:send_output(
- {Bucket, Key}, Partition, FittingDetails)
- || Key <- Keys ],
- ok.
+keysend(_Bucket, [], _Partition, _FittingDetails) ->
+ ok;
+keysend(Bucket, [Key | Keys], Partition, FittingDetails) ->
+ case riak_pipe_vnode_worker:send_output(
+ {Bucket, Key}, Partition, FittingDetails) of
+ ok ->
+ keysend(Bucket, Keys, Partition, FittingDetails);
+ ER ->
+ ER
+ end.
%% @doc Unused.
-spec done(state()) -> ok.
@@ -124,7 +134,9 @@ queue_existing_pipe(Pipe, Bucket, Query, Timeout) ->
{ok, LKP} = riak_pipe:exec([#fitting_spec{name=index,
module=?MODULE,
nval=1}],
- [{sink, Head}]),
+ [{sink, Head},
+ {trace, [error]},
+ {log, {sink, Pipe#pipe.sink}}]),
%% setup the cover operation
ReqId = erlang:phash2(erlang:now()), %% stolen from riak_client
@@ -60,7 +60,7 @@ init(Partition, FittingDetails) ->
%% @doc Process lists keys from the KV vnode, according to the input
%% bucket +/- filter.
--spec process(term(), boolean(), state()) -> {ok, state()}.
+-spec process(term(), boolean(), state()) -> {ok | {error, term()}, state()}.
process(Input, _Last, #state{p=Partition, fd=FittingDetails}=State) ->
case Input of
{cover, FilterVNodes, {Bucket, Filters}} ->
@@ -80,27 +80,42 @@ process(Input, _Last, #state{p=Partition, fd=FittingDetails}=State) ->
FilterVNodes,
{raw, ReqId, self()},
riak_kv_vnode_master),
- keysend_loop(ReqId, Partition, FittingDetails),
- {ok, State}.
+ Result = keysend_loop(ReqId, Partition, FittingDetails),
+ {Result, State}.
keysend_loop(ReqId, Partition, FittingDetails) ->
receive
{ReqId, {From, Bucket, Keys}} ->
- keysend(Bucket, Keys, Partition, FittingDetails),
- riak_kv_vnode:ack_keys(From),
- keysend_loop(ReqId, Partition, FittingDetails);
+ Result = keysend(Bucket, Keys, Partition, FittingDetails),
+ case Result of
+ ok ->
+ riak_kv_vnode:ack_keys(From),
+ keysend_loop(ReqId, Partition, FittingDetails);
+ Error ->
+ Error
+ end;
{ReqId, {Bucket, Keys}} ->
- keysend(Bucket, Keys, Partition, FittingDetails),
- keysend_loop(ReqId, Partition, FittingDetails);
+ Result = keysend(Bucket, Keys, Partition, FittingDetails),
+ case Result of
+ ok ->
+ keysend_loop(ReqId, Partition, FittingDetails);
+ Error ->
+ Error
+ end;
{ReqId, done} ->
ok
end.
-keysend(Bucket, Keys, Partition, FittingDetails) ->
- [ riak_pipe_vnode_worker:send_output(
- {Bucket, Key}, Partition, FittingDetails)
- || Key <- Keys ],
- ok.
+keysend(_Bucket, [], _Partition, _FittingDetails) ->
+ ok;
+keysend(Bucket, [Key | Keys], Partition, FittingDetails) ->
+ case riak_pipe_vnode_worker:send_output(
+ {Bucket, Key}, Partition, FittingDetails) of
+ ok ->
+ keysend(Bucket, Keys, Partition, FittingDetails);
+ ER ->
+ ER
+ end.
%% @doc Unused.
-spec done(state()) -> ok.
@@ -128,7 +143,9 @@ queue_existing_pipe(Pipe, Bucket, Timeout) ->
{ok, LKP} = riak_pipe:exec([#fitting_spec{name=listkeys,
module=?MODULE,
nval=1}],
- [{sink, Head}]),
+ [{sink, Head},
+ {trace, [error]},
+ {log, {sink, Pipe#pipe.sink}}]),
%% setup the cover operation
ReqId = erlang:phash2(erlang:now()), %% stolen from riak_client

0 comments on commit f987b73

Please sign in to comment.