"Sink type" that can provide backpressure #59

Closed
wants to merge 6 commits into
from

Conversation

Projects
None yet
2 participants
Contributor

beerriot commented Nov 3, 2012

At the head of a pipe, and between its fittings, passing inputs/outputs is a blocking operation, which allows it to provide backpressure to keep downstream processes from being overwhelmed. Before this commit, there was no such behavior between the tail of the pipe and its sink. For very large Riak KV MapReduce jobs, it was common for the sink to become overwhelmed either with results or with log messages during failures.

This PR introduces blocking sink-delivery. It does this by providing the ability to specify the "type" of the sink, which indicates the method by which messages are delivered.

At this time there are two types:

  • raw: The behavior that existed before this commit, and the default if none is specified. Messages are delivered by simply sending them to the sink process (Sink ! Message. in Erlang).
  • {fsm_sync, Timeout}: Messages are delivered as synchronous gen_fsm events (gen_fsm:sync_send_event(Sink, Message, Timeout)).

The type is set in the Options parameter of riak_pipe:exec/2 via the sink_type property: riak_pipe:exec(Spec, [{sink_type, {fsm_sync, infinity}}|OtherOptions]).

The spec of riak_sink:* has changed to facilitate error detection. Instead of returning the message it sent, it now returns the atom ok on success (always for raw sink types), or {error, Reason} for failure. Current Reasons are timeout if the request times out, and sink_died if the sink was gone before the call or vanished before replying.

New tests for this facility are in riak_pipe:sink_type_test_/0. A mock sink FSM for testing is included as test/riak_pipe_test_sink_fsm.erl.

Notes:

  • Yes, synchronous delivery of each output is likely to introduce additional latency. This latency is likely more desirable than the alternative of unbounded memory consumption. There is also ongoing work (in the bwf-pmi branch) to deliver multiple outputs in a single message.
  • Yes, synchronous log delivery is a hinderance to logging lots of messages. Again, it's likely more desirable than blowing up a sink with messages from many concurrent log producers. Additionally, the synchronicity is only encountered if the trace is actually delivered, and not if it is filtered out. Finally, the only current use of logging is in Riak KV's MapReduce, where it is used to deliver errors that cause the pipe to teardown immediately. Once one log is delivered there, the rate at which others can be delivered is not relevant.
  • Synchronous delivery of the single eoi message seems unnecessary, but it makes the API more predictable.

Bryan Fink added some commits Nov 2, 2012

sink backpressure via gen_fsm sync events
This patch allows a Pipe user to specify the "type" of the sink process.
This type is used to decide how to send it #pipe_result{} messages.

If the sink type is 'raw' or unspecified, result messages are delivered
as regular erlang messages, as they have been before this commit.

If the sink type is '{fsm_sync, Timeout}', result messages are delivered
as calls to gen_fsm:sync_send_message/3. The biggest benefit of this
sink type is that it will provide backpressure to the pipe, to prevent
the sink's mailbox from being overwhelmed.
deliver logs and eoi as gen_fsm sync events as well
Logs have overwhelmed our sinks too, and doing eoi the same way just
makes the interface more consistent.

@beerriot beerriot referenced this pull request in basho/riak_kv Nov 3, 2012

Closed

Apply backpressure from the MR sink #425

@ghost ghost assigned beerriot Nov 3, 2012

@beerriot beerriot referenced this pull request in basho/riak_kv Nov 6, 2012

Closed

Apply backpressure from the MR sink #429

test/riak_pipe_test_sink_fsm.erl
+%%
+%% The FSM starts out in `acc' simply accumulating results and logs.
+%%
+%% If a `get_results' arrives during live, the FSM stores the `From',
@joedevivo

joedevivo Nov 6, 2012

Contributor

not sure what you mean by "live"

@beerriot

beerriot Nov 6, 2012

Contributor

:P a missed doc update during a rewrite. should be acc

@ghost ghost assigned joedevivo Nov 6, 2012

Contributor

joedevivo commented Nov 6, 2012

I like it. Especially how it's all backwards compatible. eunits pass, as to the java client tests with a version of kv that doesn't yet support this feature.

Tentative +1, the +1 will be official when riak_kv#429 gets plus oned, as that will be the integration test of the {fsm_sync, Timeout} code path. Hang in there @beerriot, we're close.

Contributor

joedevivo commented Nov 7, 2012

+1 official now

Bryan Fink added some commits Nov 11, 2012

only use gen_fsm:sync_send_event periodically
Waiting for an ack on *every* message to the sink adds a lot of
overhead. Change {fsm_sink, Timeout} to {fsm_sync, Period, Timeout},
where Period represents the number of times to use gen_fsm:send_event
before using gen_fsm:sync_send_event once.
send first fsm sink result syncronously, unless Period==infinity
Sending the first result synchronously prevents worker restart from
overwhelming the sink with "first" result messages (because the period
counter is kept in the process dictionary, which will clear during
worker restart).
rename sink type fsm_sync to fsm
Using Period=='infinity' is exactly what an 'fsm_async' sink type would
have been, so use the shorter name (and stop the confusing "sync sink"
typos).
Contributor

beerriot commented Nov 11, 2012

Sorry to negate your +1, but performance testing of basho/riak_kv#429 showed the naive approach here to be too expensive. I've just added three more commits to address this.

The first commit allows workers to use gen_fsm:send_event/2 for several sink messages, and only occasionally use gen_fsm:sync_send_event/3. The number of un-acked sends (send_event/2) that may be used before an acked send (sync_send_event/2) is described by a new element of the sink-type tuple, Period. It is specified as an integer number of un-acked messages to send, before sending one acked message. Alternatively, it can be set to the atom infinity which will cause all sends to be un-acked.

The second commit forces the first message to be sent acked, unless the Period is infinity. This is to prevent worker restart from overwhelming the sink with non-acked messages, because the ack-counter is stored in the worker's process dictionary.

The third commit renames the sink type from fsm_sync to fsm because using Period==infinity is the same as what fsm_async might some day have been. Furthermore, the original definition of fsm_sync (acking every message) is equivalent to Period==0.

So, the sink type tuple is now {fsm, Period::integer() | infinity, Timeout::timeout()}.

Contributor

joedevivo commented Nov 12, 2012

+1 and thanks for all the 🐟

Tested with riak+Test. looks good.

beerriot pushed a commit that referenced this pull request Nov 12, 2012

Merge pull request #59 from bwf-sink-backpressure
Allows the sink to receive result/log/eoi messages as gen_fsm events,
and to choose how often they should be synchronous events, allowing the
sink to provide backpressure to the pipe.

Squashed commit of the following:

commit 300088a
Author: Bryan Fink <bryan@basho.com>
Date:   Sun Nov 11 12:00:42 2012 -0500

    rename sink type fsm_sync to fsm

    Using Period=='infinity' is exactly what an 'fsm_async' sink type would
    have been, so use the shorter name (and stop the confusing "sync sink"
    typos).

commit ce90a17
Author: Bryan Fink <bryan@basho.com>
Date:   Sun Nov 11 11:50:44 2012 -0500

    send first fsm sink result syncronously, unless Period==infinity

    Sending the first result synchronously prevents worker restart from
    overwhelming the sink with "first" result messages (because the period
    counter is kept in the process dictionary, which will clear during
    worker restart).

commit e9819b8
Author: Bryan Fink <bryan@basho.com>
Date:   Sat Nov 10 22:36:59 2012 -0500

    only use gen_fsm:sync_send_event periodically

    Waiting for an ack on *every* message to the sink adds a lot of
    overhead. Change {fsm_sink, Timeout} to {fsm_sync, Period, Timeout},
    where Period represents the number of times to use gen_fsm:send_event
    before using gen_fsm:sync_send_event once.

commit eb80d30
Author: Bryan Fink <bryan@basho.com>
Date:   Tue Nov 6 13:56:28 2012 -0500

    fix state name typo in test sink doc

commit cbd15b5
Author: Bryan Fink <bryan@basho.com>
Date:   Fri Nov 2 23:39:12 2012 -0400

    deliver logs and eoi as gen_fsm sync events as well

    Logs have overwhelmed our sinks too, and doing eoi the same way just
    makes the interface more consistent.

commit ae0e1fe
Author: Bryan Fink <bryan@basho.com>
Date:   Fri Nov 2 10:43:59 2012 -0400

    sink backpressure via gen_fsm sync events

    This patch allows a Pipe user to specify the "type" of the sink process.
    This type is used to decide how to send it #pipe_result{} messages.

    If the sink type is 'raw' or unspecified, result messages are delivered
    as regular erlang messages, as they have been before this commit.

    If the sink type is '{fsm_sync, Timeout}', result messages are delivered
    as calls to gen_fsm:sync_send_message/3. The biggest benefit of this
    sink type is that it will provide backpressure to the pipe, to prevent
    the sink's mailbox from being overwhelmed.

@ghost ghost assigned beerriot Nov 12, 2012

Contributor

beerriot commented Nov 12, 2012

Squashed to master as 807c632.

@beerriot beerriot closed this Nov 12, 2012

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment