Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

Already on GitHub? Sign in to your account

Apply backpressure from the MR sink #429

Closed
wants to merge 9 commits into
from

Conversation

Projects
None yet
2 participants
Contributor

beerriot commented Nov 6, 2012

This is a re-opening of #425. That PR closed when I deleted the branch for #416, which 425 targeted. This now targets master.

This PR uses the new ability of basho/riak_pipe#59 to provide backpressure to the MR pipe in the new MR sink introduced by #416. This PR is targeted at the bwf-sep-sink-2 branch to more clearly demonstrate the changes made here until #416 is merged to master.

In addition to using the fsm_sync sink type to synchronously receive pipe results/logs, the sink now has a capped-size buffer. Once that buffer is full, the sink delays acks to result/log senders until the sink's owner empties its buffer. This is to keep the problem of the expanding mailbox from becoming the problem of the expanding heap. The size of this buffer is configurable globally as the riak_kv app env mrc_sink_buffer or as a startup option for the FSM, buffer.

The changes to src/riak_kv_mrc_pipe.erl look more dramatic than they really are because enforcing the use of riak_kv_mrc_sink in all MR pipelines has cleaned up the logic of output collection and sorting.

This branch has not yet been tested for performance changes. It is likely that latency will take a hit, due to the extra synchronicity. The safety of not blowing up memory consumption in a sink process is likely desirable enough to counter worries. There is also work underway (in the bwf-pmi branch) to enable delivery of multiple outputs in a single message.

There are no "capabilities" or other "upgrade" facilities included here. Nodes running earlier versions of Riak will simply ignore the sink_type option, and send regular messages, as they do now, to the MR sink. This is fine - the sink will accept them, even though it cannot provide backpressure to those senders. Nodes running the new version will similarly send regular messages if they are working for a Pipe that was started by an older node (and therefore had not set sink_type).

Bryan Fink added some commits Nov 2, 2012

teach the mrc pipe sink to speak fsm_sync sink type
This will keep the sink process's mailbox from becoming overloaded from
a fast final fitting.
limit buffer size in riak_kv_mrc_sink
Since we have the ability to apply backpressure, we can also keep this
process's heap from growing unboundedly. This commit adds a configurable
buffer size cap that will be used to delay acks to result senders.
pipe log+eoi messages are also sync when sink type is fsm_sync now
This should keep error messges from overwhelming the sink's mailbox.

Sink startup has been moved inside riak_kv_mrc_pipe:mapred_stream, to
keep the whole mrc system more consistent. The riak_kv_pipe_index and
riak_kv_pipe_listkeys modules start up new pipes, but setup their
logging to message their target pipe's sink. So, they needed to be sure
that it was okay to set the sink type to fsm_sync.

This means that regular mapred/2 (non-streaming) also now uses
riak_kv_mrc_sink, which should help with its speed on large lists of
bkey input. The acks for those inputs will no longer compete with pipe
results for mailbox prioirty.

@ghost ghost assigned beerriot Nov 6, 2012

accumulate logs same way for sync and non-sync
Non-synchronous log messages were accumulated in a flat list. Last
commit added accumulation for sync logs, but grouped by phase id. The
grouping broke the error-message detection in the HTTP and PB endpoint
code. This commit switches synchronous logs to be accumulated in a flat
list like non-synchronous logs.

@ghost ghost assigned joedevivo Nov 6, 2012

Bryan Fink added some commits Nov 6, 2012

doc fixing: mapred_stream return val & --group_outputs/*
riak_kv_mrc_pipe:group_outputs no longer exists

riak_kv_mrc_pipe:collect_outputs is now arity-5

riak_kv_mrc_pipe:mapred_stream now returns a 4-tuple on succes, not a 2-tuple
remove duplicate cleanup code while reverting _mrc_pipe:mapred_stream/1
As Joe pointed out, the code for cleanup up monitors/messages from the
asyn sender, sync, and timeout timer was duplicated everywhere. In
addition, recent patches had altered the contract of
riak_kv_mrc_pipe:mapred_stream.

This patch solves both problems by moving the new sink stuff to
riak_kv_mrc_pipe:mapred_stream_sink. Added to it is the unified startup
of the async input sender. The duplicate cleanup code has been moved to
riak_kv_mrc_pipe:cleanup_sink. Shared receive/aggregate code for
riak_kv_mrc_pipe:mapred and riak_kv_wm_mapred has been unified in the
riak_kv_mrc_pipe:receive_sink and :collect_sink functions.

@joedevivo joedevivo commented on the diff Nov 7, 2012

src/riak_kv_pb_mapred.erl
@@ -52,9 +52,8 @@
req_ctx,
client}).
--record(pipe_ctx, {pipe, % pipe handling mapred request
- ref, % easier-access ref/reqid
- timer, % ref() for timeout send_after
+-record(pipe_ctx, {ref, % easier-access ref/reqid
@joedevivo

joedevivo Nov 7, 2012

Contributor

ref, sink, and sender are all included in mrc_ctx{}.

@beerriot

beerriot Nov 7, 2012

Contributor

Indeed. I duplicated them to make the matches in process_stream/3 less nesty. There's a comment in pipe_mapreduce/5 about it.

@joedevivo joedevivo commented on the diff Nov 7, 2012

src/riak_kv_wm_mapred.erl
{{halt, 400},
send_error({error, [{phase, Fitting},
{error, iolist_to_binary(Reason)}]}, RD),
State}
end.
-%% Destroying the pipe via riak_pipe_builder:destroy/1 does not kill
-%% the sender immediately, because it causes the builder to exit with
-%% reason `normal', so no exit signal is sent. The sender will
-%% eventually receive `worker_startup_error's from vnodes that can no
-%% longer find the fittings, but to help the process along, we kill
-%% them immediately here.
-cleanup_sender({SenderPid, SenderRef}) ->
- erlang:demonitor(SenderRef, [flush]),
@beerriot

beerriot Nov 7, 2012

Contributor

feelsgoodman

Contributor

joedevivo commented Nov 7, 2012

+1!

give jsonify_bkeys the whole list, not one element at a time
Thank you, ruby client tests, for finding the incorrect behavior when
given an empty query.
Contributor

beerriot commented Nov 9, 2012

Simple performance check 1: a single MR query, input is listkeys of 1-million item bucket (8-byte object values), query is [{map, {modfun, riak_kv_mapreduce, map_identity}, none, true}]. Looks like up to 10-20% overhead (i.e. this sink-backpressure branch takes 10-20% longer to finish than master does). Master prints lots of the following messages, while this sink-backpressure branch prints none:

2012-11-09 09:50:50.087 [info] <0.63.0>@riak_core_sysmon_handler:handle_event:95 monitor large_heap <0.5848.48> [{initial_call,{riak_api_pb_server,init,1}},{almost_current_function,{prim_inet,send,3}},{message_queue_len,2}] [{old_heap_block_size,0},{heap_block_size,47828850},{mbuf_size,0},{stack_size,37},{old_heap_size,0},{heap_size,23898616}]
2012-11-09 09:50:50.088 [info] <0.63.0>@riak_core_sysmon_handler:handle_event:95 monitor long_gc <0.5848.48> [{initial_call,{riak_api_pb_server,init,1}},{almost_current_function,{prim_inet,send,3}},{message_queue_len,2}] [{timeout,146},{old_heap_block_size,0},{heap_block_size,47828850},{mbuf_size,0},{stack_size,37},{old_heap_size,0},{heap_size,15618308}]

Next up is a concurrent basho_bench run to see what effect that improvement of memory-constraint has in that case (and to double-check the overhead estimate).

Bryan Fink added some commits Nov 11, 2012

require pipe-mr sink acks only every tenth message
This will reduce overhead and lower latency, while still providing the
possibility for backpressure. A sync-period of 10 means that worst-case
10*(number of active vnodes) messages beyond the sink's cap will be
allowed.

@beerriot beerriot referenced this pull request in basho/riak_pipe Nov 11, 2012

Closed

"Sink type" that can provide backpressure #59

Contributor

beerriot commented Nov 11, 2012

Further testing, in real distribution, with basho_bench and some concurrency showed that acking every message was, indeed, too much overhead. I made modifications to basho/riak_pipe#59 to allow specification of how often messages should be acked. Using that facility to send 10 un-acked messages before sending an acked message improved performance, such that the overhead is now only 1-2%.

Furthermore, the frequency of acked message is tweakable by setting the riak_kv application environment variable mrc_sink_sync_period. Its default is 10, for the ten un-acked messages described above. Setting it higher will cause more un-acked messages to be used between acked messages. Setting it to the atom infinity will cause all messages to be sent to the sink un-acked, which may be useful in the case that all MR queries produce very few results, and the overhead of even a few acks is too expensive.

Contributor

joedevivo commented Nov 12, 2012

+1

@ghost ghost assigned beerriot Nov 12, 2012

beerriot pushed a commit that referenced this pull request Nov 12, 2012

Merge pull request #429 from bwf-sink-backpressure
MapReduce now configures its sink to speak FSM and apply backpressure.

Squashed commit of the following:

commit ff52338
Author: Bryan Fink <bryan@basho.com>
Date:   Sun Nov 11 12:07:31 2012 -0500

    follow rename of fsm_sync to fsm in riak_pipe

commit fe07f46
Author: Bryan Fink <bryan@basho.com>
Date:   Sat Nov 10 23:00:45 2012 -0500

    require pipe-mr sink acks only every tenth message

    This will reduce overhead and lower latency, while still providing the
    possibility for backpressure. A sync-period of 10 means that worst-case
    10*(number of active vnodes) messages beyond the sink's cap will be
    allowed.

commit 686f223
Author: Bryan Fink <bryan@basho.com>
Date:   Thu Nov 8 14:13:20 2012 -0500

    give jsonify_bkeys the whole list, not one element at a time

    Thank you, ruby client tests, for finding the incorrect behavior when
    given an empty query.

commit 273f873
Author: Bryan Fink <bryan@basho.com>
Date:   Wed Nov 7 12:35:43 2012 -0500

    remove duplicate cleanup code while reverting _mrc_pipe:mapred_stream/1

    As Joe pointed out, the code for cleanup up monitors/messages from the
    asyn sender, sync, and timeout timer was duplicated everywhere. In
    addition, recent patches had altered the contract of
    riak_kv_mrc_pipe:mapred_stream.

    This patch solves both problems by moving the new sink stuff to
    riak_kv_mrc_pipe:mapred_stream_sink. Added to it is the unified startup
    of the async input sender. The duplicate cleanup code has been moved to
    riak_kv_mrc_pipe:cleanup_sink. Shared receive/aggregate code for
    riak_kv_mrc_pipe:mapred and riak_kv_wm_mapred has been unified in the
    riak_kv_mrc_pipe:receive_sink and :collect_sink functions.

commit 1d36aef
Author: Bryan Fink <bryan@basho.com>
Date:   Tue Nov 6 14:57:53 2012 -0500

    doc fixing: mapred_stream return val & --group_outputs/*

    riak_kv_mrc_pipe:group_outputs no longer exists

    riak_kv_mrc_pipe:collect_outputs is now arity-5

    riak_kv_mrc_pipe:mapred_stream now returns a 4-tuple on succes, not a 2-tuple

commit b567691
Author: Bryan Fink <bryan@basho.com>
Date:   Tue Nov 6 13:49:57 2012 -0500

    accumulate logs same way for sync and non-sync

    Non-synchronous log messages were accumulated in a flat list. Last
    commit added accumulation for sync logs, but grouped by phase id. The
    grouping broke the error-message detection in the HTTP and PB endpoint
    code. This commit switches synchronous logs to be accumulated in a flat
    list like non-synchronous logs.

commit 571a9f5
Author: Bryan Fink <bryan@basho.com>
Date:   Sat Nov 3 10:11:08 2012 -0400

    pipe log+eoi messages are also sync when sink type is fsm_sync now

    This should keep error messges from overwhelming the sink's mailbox.

    Sink startup has been moved inside riak_kv_mrc_pipe:mapred_stream, to
    keep the whole mrc system more consistent. The riak_kv_pipe_index and
    riak_kv_pipe_listkeys modules start up new pipes, but setup their
    logging to message their target pipe's sink. So, they needed to be sure
    that it was okay to set the sink type to fsm_sync.

    This means that regular mapred/2 (non-streaming) also now uses
    riak_kv_mrc_sink, which should help with its speed on large lists of
    bkey input. The acks for those inputs will no longer compete with pipe
    results for mailbox prioirty.

commit cf11baa
Author: Bryan Fink <bryan@basho.com>
Date:   Fri Nov 2 17:34:30 2012 -0400

    limit buffer size in riak_kv_mrc_sink

    Since we have the ability to apply backpressure, we can also keep this
    process's heap from growing unboundedly. This commit adds a configurable
    buffer size cap that will be used to delay acks to result senders.

commit fe23fbc
Author: Bryan Fink <bryan@basho.com>
Date:   Fri Nov 2 14:15:32 2012 -0400

    teach the mrc pipe sink to speak fsm_sync sink type

    This will keep the sink process's mailbox from becoming overloaded from
    a fast final fitting.
Contributor

beerriot commented Nov 12, 2012

Squashed to master as e639737.

@beerriot beerriot closed this Nov 12, 2012

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment