Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

riak_kv_pipe_get's failover has trouble with fast-input+slow-output rates #290

Closed
beerriot opened this issue Feb 23, 2012 · 11 comments

Comments

Projects
None yet
4 participants
@beerriot
Copy link

commented Feb 23, 2012

A problem was uncovered with riak_kv_pipe_get's failover strategy.

Description

When the KV vnode that the pipe worker talks to returns an error, "not
found" for example, the worker attempts to have another worker for
this fitting try on a different vnode by returning the
forward_preflist result. The last worker in the fallback list is
then supposed to forward the error to the next phase if it also fails.

This strategy does not work when the kvget workers are unable to keep
up with the rate of incoming inputs. The forward_preflist strategy
has to use a non-blocking enqueue to prevent workers in a fitting from
mutually blocking on one another. If the workers' queues are already
full of other inputs, forwarding retry inputs to them will never
succeed. When forwarding retry inputs fails, the worker does not get
a chance to send the error to the next phase; Pipe's internal code
sends an error trace message and drops the input instead.

This bug manifests itself as MapReduce queries failing with a
timeout-like error before their configured timeout, since both the
HTTP and PB mapreduce endpoints treat error trace messages as reasons
to terminate the query. The issue was raised on the mailing list:
http://lists.basho.com/pipermail/riak-users_lists.basho.com/2012-February/007590.html

Possible fixes

One quick and incomplete fix may be to simply set the q_limit much
higher on the riak_kv_pipe_get fittings in the riak_kv_mrc_pipe
module. This would raise the level/rate of inputs necessary to
trigger the bug. Raising the following riak_kv_mrc_map fitting's
q_limit may help as well.

One long-term fix is likely to change riak_kv_pipe_get such that it
does not use the forward_preflist return value. Either it will need
to talk directly to the fallback KV vnodes without forwarding the
input, or it will need to be able to call
riak_pipe_vnode_worker:recurse_input/5 directly (which will also
require passing the used-preflist to the worker) to be able to handle
with full-queue errors.

Another long-term fix might be to alter the behavior of
forward_preflist such that instead of producing a trace error, it
forwards some standard message to the downstream fitting.

Reproduction

Below is a basic method for reproducing the problem. The method is to
follow a riak_kv_pipe_get fitting with a fitting that takes forever
to process its first input. This in turn causes the
riak_kv_pipe_get fitting's queues to back up. The nval, q_limit
and chashfun parameters have been tuned to provide a minimal case.
The sleeps are not needed, but they make the trace easier to read.
Each call to riak_pipe:queue_work/3 is preceeded by a description of
how the worker (W), input queue (Q), and blocking queue (B) look
after each input settles in its final position.

KVGet = #fitting_spec{name = kvget,
                      module = riak_kv_pipe_get,
                      chashfun = chash:key_of(now()),
                      nval = 2,
                      q_limit = 1},
SlowMap = #fitting_spec{name = slowmap,
                        module = riak_pipe_w_xform,
                        chashfun = chash:key_of(now()),
                        arg = fun(I,P,F) ->
                                 receive never -> ok end
                              end,
                        q_limit = 1},
{ok, Pipe} = riak_pipe:exec([KVGet, SlowMap],
                            [{log, sink}, {trace, all}]),

%% 1: makes it all the way to the slowmap worker
%% K1: W=idle, Q=[], B=[]
%% K2: W=idle, Q=[], B=[]
%% M1: W={notfound,1}, Q=[], B=[]
riak_pipe:queue_work(Pipe, {<<"notthere">>, <<"1">>}, noblock),
timer:sleep(1000),

%% 2: makes it into the slowmap worker's queue
%% K1: W=idle, Q=[], B=[]
%% K2: W=idle, Q=[], B=[]
%% M1: W={notfound,1}, Q=[{notfound,2}], B=[]
riak_pipe:queue_work(Pipe, {<<"notthere">>, <<"2">>}, noblock),
timer:sleep(1000),

%% 3: makes it to the fallback kvget worker,
%%    blocking that worker on the slowmap worker's queue
%% K1: W=idle, Q=[], B=[]
%% K2: W={notthere 3}, Q=[], B=[]
%% M1: W={notfound,1}, Q=[{notfound,2}], B=[{notthere 3}]
riak_pipe:queue_work(Pipe, {<<"notthere">>, <<"3">>}, noblock),
timer:sleep(1000),

%% 4: makes it to the fallback worker's queue
%% K1: W=idle, Q=[], B=[]
%% K2: W={notthere 3}, Q=[{notthere, 4}], B=[]
%% M1: W={notfound,1}, Q=[{notfound,2}], B=[{notfound, 3}]
riak_pipe:queue_work(Pipe, {<<"notthere">>, <<"4">>}, noblock),
timer:sleep(1000),

%% 5: fails because preflist forwarding would block on the
%%    fallback worker's queue
%% K1: W={notthere 5}, Q=[], B=[]
%% K2: W={notthere 3}, Q=[{notthere, 4}], B=[]
%% M1: W={notfound,1}, Q=[{notfound,2}], B=[{notfound, 3}]
riak_pipe:queue_work(Pipe, {<<"notthere">>, <<"5">>}, noblock),
timer:sleep(1000),

{timeout, [], Trace} = riak_pipe:collect_results(Pipe),
rp(Trace).

Below is the value captured in the Trace variable, with additional
comments about what is happening at each step.

%% K1: W=idle, Q=[], B=[]
%% K2: W=idle, Q=[], B=[]
%% M1: W=idle, Q=[], B=[]

[{slowmap,{trace,all,{fitting,init_started}}},
 {slowmap,{trace,all,{fitting,init_finished}}},
 {kvget,{trace,all,{fitting,init_started}}},
 {kvget,{trace,all,{fitting,init_finished}}},

 %% 1: send first input to primary kvget vnode
 %% K1: W=idle, Q=[{notthere,1}], B=[]
 %% K2: W=idle, Q=[], B=[]
 %% M1: W=idle, Q=[], B=[]
 {kvget,
     {trace,all,
         {fitting,
             {get_details,#Ref<0.0.0.34313>,
                 1438665674247607560106752257205091097473808596992,
                 <11776.302.0>}}}},
 {kvget,
     {trace,all,
         {vnode,
             {start,1438665674247607560106752257205091097473808596992}}}},
 {kvget,
     {trace,all,
         {vnode,
             {queued,1438665674247607560106752257205091097473808596992,
                 {<<"notthere">>,<<"1">>}}}}},
 %% 1: primary kvget worker processes first input
 %% K1: W={notthere,1}, Q=[], B=[]
 %% K2: W=idle, Q=[], B=[]
 %% M1: W=idle, Q=[], B=[]
 {kvget,
     {trace,all,
         {vnode,
             {dequeue,
                 1438665674247607560106752257205091097473808596992}}}},
 %% 1: primary kvget worker forwards first input to fallback vnode
 %% K1: W=idle, Q=[], B=[]
 %% K2: W=idle, Q=[{notthere,1}], B=[]
 %% M1: W=idle, Q=[], B=[]
 {kvget,
     {trace,all,
         {fitting,{get_details,#Ref<0.0.0.34313>,0,<0.282.0>}}}},
 {kvget,{trace,all,{vnode,{start,0}}}},
 {kvget,
     {trace,all,{vnode,{queued,0,{<<"notthere">>,<<"1">>}}}}},
 %% 1: fallback kvget vnode processes first input
 %% K1: W=idle, Q=[], B=[]
 %% K2: W={notthere,1}, Q=[], B=[]
 %% M1: W=idle, Q=[], B=[]
 {kvget,{trace,all,{vnode,{dequeue,0}}}},
 {kvget,
     {trace,all,
         {vnode,
             {waiting,
                 1438665674247607560106752257205091097473808596992}}}},
 %% 1: fallback kvget worker sends notfound output to slowmap vnode
 %% K1: W=idle, Q=[], B=[]
 %% K2: W=idle, Q=[], B=[]
 %% M1: W=idle, Q=[{notfound,1}], B=[]
 {slowmap,
     {trace,all,
         {fitting,
             {get_details,#Ref<0.0.0.34313>,
                 593735040165679310520246963290989976735222595584,
                 <11775.290.0>}}}},
 {slowmap,
     {trace,all,
         {vnode,
             {start,593735040165679310520246963290989976735222595584}}}},
 {slowmap,
     {trace,all,
         {vnode,
             {queued,593735040165679310520246963290989976735222595584,
                 {{error,notfound},{<<"notthere">>,<<"1">>},undefined}}}}},
 {kvget,{trace,all,{vnode,{waiting,0}}}},
 %% 1: slowmap vnode picks up first input
 %% K1: W=idle, Q=[], B=[]
 %% K2: W=idle, Q=[], B=[]
 %% M1: W={notfound,1}, Q=[], B=[]
 {slowmap,
     {trace,all,
         {vnode,
             {dequeue,
                 593735040165679310520246963290989976735222595584}}}},
 %% 2: send second input to primary kvget vnode
 %% K1: W=idle, Q=[{notthere,2}], B=[]
 %% K2: W=idle, Q=[], B=[]
 %% M1: W={notfound,1}, Q=[], B=[]
 {kvget,
     {trace,all,
         {vnode,
             {queued,1438665674247607560106752257205091097473808596992,
                 {<<"notthere">>,<<"2">>}}}}},
 %% 2: primary kvget worker forwards second input to fallback vnode
 %% K1: W=idle, Q=[], B=[]
 %% K2: W=idle, Q=[{notthere,2}], B=[]
 %% M1: W={notfound,1}, Q=[], B=[]
 {kvget,
     {trace,all,{vnode,{queued,0,{<<"notthere">>,<<"2">>}}}}},
 {kvget,
     {trace,all,
         {vnode,
             {waiting,
                 1438665674247607560106752257205091097473808596992}}}},
 %% 2: fallback worker sends notfound output to slowmap vnode
 %% K1: W=idle, Q=[], B=[]
 %% K2: W=idle, Q=[], B=[]
 %% M1: W={notfound,1}, Q=[{notfound,2}], B=[]
 {slowmap,
     {trace,all,
         {vnode,
             {queued,593735040165679310520246963290989976735222595584,
                 {{error,notfound},{<<"notthere">>,<<"2">>},undefined}}}}},
 {kvget,{trace,all,{vnode,{waiting,0}}}},
 %% 3: send third input to primary kvget vnode
 %% K1: W=idle, Q=[{notthere 3}], B=[]
 %% K2: W=idle, Q=[], B=[]
 %% M1: W={notfound,1}, Q=[{notfound,2}], B=[]
 {kvget,
     {trace,all,
         {vnode,
             {queued,1438665674247607560106752257205091097473808596992,
                 {<<"notthere">>,<<"3">>}}}}},
 %% 3: primary kvget worker forwards third input to fallback vnode
 %% K1: W=idle, Q=[], B=[]
 %% K2: W=idle, Q=[{notthere 3}], B=[]
 %% M1: W={notfound,1}, Q=[{notfound,2}], B=[]
 {kvget,
     {trace,all,{vnode,{queued,0,{<<"notthere">>,<<"3">>}}}}},
 {kvget,
     {trace,all,
         {vnode,
             {waiting,
                 1438665674247607560106752257205091097473808596992}}}},
 %% 3: falback worker blocks on sending third notfound to slowmap vnode
 %% K1: W=idle, Q=[], B=[]
 %% K2: W={notthere 3}, Q=[], B=[]
 %% M1: W={notfound,1}, Q=[{notfound,2}], B=[{notthere 3}]
 {slowmap,
     {trace,all,
         {vnode,
             {queue_full,
                 593735040165679310520246963290989976735222595584,
                 {{error,notfound},{<<"notthere">>,<<"3">>},undefined}}}}},
 %% 4: send fourth input to primary kvget vnode
 %% K1: W=idle, Q=[{notthere, 4}], B=[]
 %% K2: W={notthere 3}, Q=[], B=[]
 %% M1: W={notfound,1}, Q=[{notfound,2}], B=[{notfound, 3}]
 {kvget,
     {trace,all,
         {vnode,
             {queued,1438665674247607560106752257205091097473808596992,
                 {<<"notthere">>,<<"4">>}}}}},
 %% 4: primary kvget worker forwards forth input to fallback vnode
 %% K1: W=idle, Q=[], B=[]
 %% K2: W={notthere 3}, Q=[{notthere, 4}], B=[]
 %% M1: W={notfound,1}, Q=[{notfound,2}], B=[{notfound, 3}]
 {kvget,
     {trace,all,{vnode,{queued,0,{<<"notthere">>,<<"4">>}}}}},
 {kvget,
     {trace,all,
         {vnode,
             {waiting,
                 1438665674247607560106752257205091097473808596992}}}},
 %% 5: send fifth input to primary kvget vnode
 %% K1: W=idle, Q=[{notthere 5}], B=[]
 %% K2: W={notthere 3}, Q=[{notthere, 4}], B=[]
 %% M1: W={notfound,1}, Q=[{notfound,2}], B=[{notfound, 3}]
 {kvget,
     {trace,all,
         {vnode,
             {queued,1438665674247607560106752257205091097473808596992,
                 {<<"notthere">>,<<"5">>}}}}},
 %% 5: primary kvget worker fails to forward input to fallback
 %%    because fallback's queue is full
 %% K1: W={notthere 5}, Q=[], B=[]
 %% K2: W={notthere 3}, Q=[{notthere, 4}], B=[]
 %% M1: W={notfound,1}, Q=[{notfound,2}], B=[{notfound, 3}]
 {kvget,
     {trace,all,
         {error,
             [{module,riak_kv_pipe_get},
              {partition,
                  1438665674247607560106752257205091097473808596992},
              {details,
                  [{fitting,
                       #fitting{
                           pid = <0.1434.0>,ref = #Ref<0.0.0.34313>,
                           chashfun = 
                               <<249,252,247,9,220,75,44,145,58,167,246,118,61,
                                 183,57,33,176,79,180,32>>,
                           nval = 2}},
                   {name,kvget},
                   {module,riak_kv_pipe_get},
                   {arg,undefined},
                   {output,
                       #fitting{
                           pid = <0.1433.0>,ref = #Ref<0.0.0.34313>,
                           chashfun = 
                               <<102,4,176,137,137,246,153,214,136,172,94,97,
                                 152,118,215,52,233,8,52,4>>,
                           nval = 1}},
                   {options,
                       [{sink,
                            #fitting{
                                pid = <0.1223.0>,ref = #Ref<0.0.0.34313>,
                                chashfun = sink,nval = undefined}},
                        {log,sink},
                        {trace,all}]},
                   {q_limit,1}]},
              {type,forward_preflist},
              {error,[timeout]},
              {input,{<<"notthere">>,<<"5">>}},
              {modstate,
                  {state,1438665674247607560106752257205091097473808596992,
                      #fitting_details{
                          fitting = 
                              #fitting{
                                  pid = <0.1434.0>,ref = #Ref<0.0.0.34313>,
                                  chashfun = 
                                      <<249,252,247,9,220,75,44,145,58,167,246,
                                        118,61,183,57,33,176,79,180,32>>,
                                  nval = 2},
                          name = kvget,module = riak_kv_pipe_get,
                          arg = undefined,
                          output = 
                              #fitting{
                                  pid = <0.1433.0>,ref = #Ref<0.0.0.34313>,
                                  chashfun = 
                                      <<102,4,176,137,137,246,153,214,136,172,
                                        94,97,152,118,215,52,233,8,52,4>>,
                                  nval = 1},
                          options = 
                              [{sink,
                                   #fitting{
                                       pid = <0.1223.0>,
                                       ref = #Ref<0.0.0.34313>,
                                       chashfun = sink,nval = undefined}},
                               {log,sink},
                               {trace,all}],
                          q_limit = 1}}},
              {stack,[]}]}}},
 {kvget,
     {trace,all,
         {vnode,
             {waiting,
                 1438665674247607560106752257205091097473808596992}}}}]
@outoftime

This comment has been minimized.

Copy link

commented Feb 23, 2012

Thanks for looking into this!

@tovbinm

This comment has been minimized.

Copy link

commented Aug 31, 2012

Recently I started to experience the same problem in Riak 1.1.2:
{"phase":0,"error":"[timeout,timeout]","input":"{<<"test">>,<<"123">>}","type":"forward_preflist","stack":"[]"}

What value should I assign to 'q_limit'?

Should I modify it in ALL of the following files?
riak/deps/riak_pipe/include/riak_pipe.hrl
riak/deps/riak_kv/src/riak_kv_w_reduce.erl

Is there any better solution available?

@beerriot

This comment has been minimized.

Copy link
Author

commented Aug 31, 2012

@tovbinm The default value for q_limit is 64. The default maximum is 4096. You should be able to set it anywhere between those two without too much danger.

Modifying the definition of #fitting_spec{} in riak_pipe.hrl will be the quickest way to change the default. If you want to set it to different things for different types of phases, the best place to modify is riak_kv_mrc_pipe.erl.

I do not know of a better solution at this time.

@tovbinm

This comment has been minimized.

Copy link

commented Aug 31, 2012

OK, I'll try that.

BTW, my riak/deps/riak_kv/src/riak_kv_mrc_pipe.erl does not contain q_limitsetting.
My riak_kv version is as follows: {riak_kv, "1.1.4", {git, "git://github.com/basho/riak_kv", {tag, "1.1.4"}}}

@beerriot

This comment has been minimized.

Copy link
Author

commented Aug 31, 2012

It's a field of #fitting_spec{}. I was trying to say that each place that record appears in riak_kv_mrc_pipe.erl is an opportunity to alter q_limit.

@Narayanan-Nachiappan

This comment has been minimized.

Copy link

commented Oct 2, 2012

By any chance is this fixed in Riak 1.2 ?

@beerriot

This comment has been minimized.

Copy link
Author

commented Oct 3, 2012

No, this issue still exists in Riak 1.2.

@Narayanan-Nachiappan

This comment has been minimized.

Copy link

commented Oct 3, 2012

Thanks. I am very new to Riak and curious to know if there is any chance to fix this without recompiling the source. Just to clarify, does this forward_preflist error has anything to do with the way my map reduce query is built and structured?

@beerriot

This comment has been minimized.

Copy link
Author

commented Oct 4, 2012

Sadly, no. The riak_kv_mrc_pipe module uses the queue limit that was set in riak_pipe.hrl, which is 64. The riak_pipe_vnode module chooses the lesser of this value and the riak_pipe environment variable queue_limit. So, it's not possible to raise the limit without recompiling.

Riak KV should expose setting the default queue limit per-phase-type basis in the app config, as well as setting it on a per-phase basis through the query definition in its MapReduce interface. Perhaps someone's eager to hack this for the next release?

@Narayanan-Nachiappan

This comment has been minimized.

Copy link

commented Oct 5, 2012

Thanks beerriot :)

@beerriot

This comment has been minimized.

Copy link
Author

commented Nov 1, 2012

This should now be fixed as of the merge of PR #413, and will be release in the next major version of Riak.

@beerriot beerriot closed this Nov 1, 2012

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.