Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NetworkTransport make pipelining configurable and default to max 2 in flight. #541

Merged
merged 6 commits into from
Apr 21, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
78 changes: 65 additions & 13 deletions net_transport.go
Expand Up @@ -28,10 +28,6 @@ const (
// DefaultTimeoutScale is the default TimeoutScale in a NetworkTransport.
DefaultTimeoutScale = 256 * 1024 // 256KB

// rpcMaxPipeline controls the maximum number of outstanding
// AppendEntries RPC calls.
rpcMaxPipeline = 128

// connReceiveBufferSize is the size of the buffer we will use for reading RPC requests into
// on followers
connReceiveBufferSize = 256 * 1024 // 256KB
Expand Down Expand Up @@ -76,7 +72,8 @@ type NetworkTransport struct {

logger hclog.Logger

maxPool int
maxPool int
maxInFlight int

serverAddressProvider ServerAddressProvider

Expand Down Expand Up @@ -108,6 +105,37 @@ type NetworkTransportConfig struct {
// MaxPool controls how many connections we will pool
MaxPool int

// MaxRPCsInFlight controls the pipelining "optimization" when replicating
// entries to followers (if it's is not disabled).
//
// Setting this to 1 explicitly disables pipelining since no overlapping of
// request processing is allowed. If set to 1 the pipelining code path is
// skipped entirely and every request is entirely synchronous.
//
// The default value (if 0 or lower are specified) is 2, which overlaps the
// preparation and sending of the next request while waiting for the previous
// response, but no additional queuing.
//
// Historically this was internally fixed at (effectively) 130 however
// performance testing has shown that in practice the pipelining optimization
// combines badly with batching and actually has a very large negative impact
// on commit latency when throughput is high, whilst having very little
// benefit on latency or throughput in any other case! See [#541](https://github.com/hashicorp/raft/pull/541) for
// more analysis of the performance impacts.
//
// Increasing this beyond 2 is likely to only be beneficial in very
// high-latency network conditions. HashiCorp doesn't recommend using our own
// products this way.
//
// To maintain the old behavior exactly, set this to 130. The old internal
// constant was 128 but was used directly as a channel buffer size. Since we
// send before blocking on the channel and unblock the channel as soon as the
// receiver is done with the earliest outstanding request, even an unbuffered
// channel (buffer=0) allows one request to be sent while waiting for the
// previous one (i.e. 2 inflight). so the old buffer actually allowed 130 RPCs
// to be inflight at once.
MaxRPCsInFlight int

// Timeout is used to apply I/O deadlines. For InstallSnapshot, we multiply
// the timeout by (SnapshotSize / TimeoutScale).
Timeout time.Duration
Expand Down Expand Up @@ -162,11 +190,18 @@ func NewNetworkTransportWithConfig(
Level: hclog.DefaultLevel,
})
}
maxInFlight := config.MaxRPCsInFlight
if maxInFlight < 1 {
// Default (zero or nonsense negative value given) to 2 (which translates to
// a zero length channel buffer)
maxInFlight = 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[NIT] For readability, can we use a constant for 2 that describes it as "default" and use it here and AppendEntriesPipeline() and newNetPipeline()

Copy link
Member Author

@banks banks Mar 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I was just doing this, and I think we should make this case a Constant like the other defaults.

But I realised that all the 2s in newNetPipeline and AppendEntriesPipeline probably should not be changed. The rationale is: they are are 2 because 2 is the minimum value that is possible in our current implementation - they are guarding against panics or using code that can't work with a value specifically less than 2.

For example if we ever decided that the best default was actually 1 (disabling pipelining) or maybe a bit more than two, every usage except this one should still be 2 since that's a property of the actual code implementation not just the default value.

Do you think it's worth making two separate constants something like DefaultMaxInFlight = 2 and minValidMaxInFlight = 2 so that we can show that we are using the 2 to protect the implementation for doing something invalid in those other cases not necessarily because it's the default?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just pushed a commit with the approach above - two separate constants to express the semantics of the subtly-different 2 values!

}
trans := &NetworkTransport{
connPool: make(map[ServerAddress][]*netConn),
consumeCh: make(chan RPC),
logger: config.Logger,
maxPool: config.MaxPool,
maxInFlight: maxInFlight,
shutdownCh: make(chan struct{}),
stream: config.Stream,
timeout: config.Timeout,
Expand Down Expand Up @@ -379,14 +414,20 @@ func (n *NetworkTransport) returnConn(conn *netConn) {
// AppendEntriesPipeline returns an interface that can be used to pipeline
// AppendEntries requests.
func (n *NetworkTransport) AppendEntriesPipeline(id ServerID, target ServerAddress) (AppendPipeline, error) {
if n.maxInFlight < 2 {
// Pipelining is disabled since no more than one request can be outstanding
// at once. Skip the whole code path and use synchronous requests.
return nil, ErrPipelineReplicationNotSupported
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the implication of making a pipeline of size 1 instead of retuning an error in that use case? With this, if we decide to thread this config up to Consul would we need to add extra logic to call the right API?

Copy link
Member Author

@banks banks Mar 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A pipeline of size one doesn't make any sense with our current implementation and will panic.

This is because we only block on the buffer after sending the next request, and because the first request sent is immediately consumed by the receiving go routine. That means even with a buffer size of zero you still allow 2 requests to be in flight: one because the receiving goroutine immediately consumes and unblocks the buffer while it waits, and the second because we send it before pushing to inflight where we block waiting for the first response.

We could change the behaviour of the pipeline to push to the channel first before sending so that a zero-buffered chan would correspond to MaxInFlight = 1... But I don't see much benefit to that - that would just make it ambigous whether MaxInFlight = 1 mean "no pipeline at all" or "use pipelining code path for parallel decoding/encoding, but don't allow more than one outstanding request". In theory that still has some potential performance benefit but I was not able to measure one so I don't think it's worth adding the extra complexity/risk by changing how pipelining works and making there be even more subtle config options.

Does that make sense?

The flip side to this is that it might in some ways be simpler to make the config hide the complexity less by exposing a separate DisablePipelining and PipelineBufferSize, but then operators have to work out which thing they want and we still have to document all the unintuitive stuff like "Buffer size of 0 actually allows 2 requests to be in flight". I don't have a strong opinion but choose to lean towards making the config map more directly to the behaviour it enabled rather than obscure implementation details.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point I was trying to make is mostly that from UX point of you I expect a pipeline size of 1 or 0 (depending on how we design it) to work as if I call AppendEntries() without any pipelining. I guess we already missed that opportunity when we made the choice to make separate APIs for the pipelining option way before this PR. The user, in that case raft, need to deal with making the right choice which is already in place.

I agree hard failing, by panic is a reasonable choice to avoid misconfiguration here.

// Get a connection
conn, err := n.getConnFromAddressProvider(id, target)
if err != nil {
return nil, err
}

// Create the pipeline
return newNetPipeline(n, conn), nil
return newNetPipeline(n, conn, n.maxInFlight), nil
}

// AppendEntries implements the Transport interface.
Expand Down Expand Up @@ -720,14 +761,25 @@ func sendRPC(conn *netConn, rpcType uint8, args interface{}) error {
return nil
}

// newNetPipeline is used to construct a netPipeline from a given
// transport and connection.
func newNetPipeline(trans *NetworkTransport, conn *netConn) *netPipeline {
// newNetPipeline is used to construct a netPipeline from a given transport and
// connection. It is a bug to ever call this with maxInFlight less than 2 and
// will cause a panic.
func newNetPipeline(trans *NetworkTransport, conn *netConn, maxInFlight int) *netPipeline {
if maxInFlight < 2 {
// Shouldn't happen (tm) since we validate this in the one call site and
// skip pipelining if it's lower.
panic("pipelining makes no sense if maxInFlight < 2")
loshz marked this conversation as resolved.
Show resolved Hide resolved
}
n := &netPipeline{
conn: conn,
trans: trans,
doneCh: make(chan AppendFuture, rpcMaxPipeline),
inprogressCh: make(chan *appendFuture, rpcMaxPipeline),
conn: conn,
trans: trans,
// The buffer size is 2 less than the configured max because we send before
// waiting on the channel and the decode routine unblocks the channel as
// soon as it's waiting on the first request. So a zero-buffered channel
// still allows 1 request to be sent even while decode is still waiting for
// a response from the previous one. i.e. two are inflight at the same time.
inprogressCh: make(chan *appendFuture, maxInFlight-2),
doneCh: make(chan AppendFuture, maxInFlight-2),
shutdownCh: make(chan struct{}),
}
go n.decodeResponses()
Expand Down
204 changes: 141 additions & 63 deletions net_transport_test.go
Expand Up @@ -5,6 +5,7 @@ package raft

import (
"bytes"
"context"
"fmt"
"net"
"reflect"
Expand Down Expand Up @@ -199,6 +200,31 @@ func TestNetworkTransport_Heartbeat_FastPath(t *testing.T) {
}
}

func makeAppendRPC() AppendEntriesRequest {
return AppendEntriesRequest{
Term: 10,
PrevLogEntry: 100,
PrevLogTerm: 4,
Entries: []*Log{
{
Index: 101,
Term: 4,
Type: LogNoop,
},
},
LeaderCommitIndex: 90,
RPCHeader: RPCHeader{Addr: []byte("cartman")},
}
}

func makeAppendRPCResponse() AppendEntriesResponse {
return AppendEntriesResponse{
Term: 4,
LastLog: 90,
Success: true,
}
}

func TestNetworkTransport_AppendEntries(t *testing.T) {

for _, useAddrProvider := range []bool{true, false} {
Expand All @@ -211,26 +237,8 @@ func TestNetworkTransport_AppendEntries(t *testing.T) {
rpcCh := trans1.Consumer()

// Make the RPC request
args := AppendEntriesRequest{
Term: 10,
PrevLogEntry: 100,
PrevLogTerm: 4,
Entries: []*Log{
{
Index: 101,
Term: 4,
Type: LogNoop,
},
},
LeaderCommitIndex: 90,
RPCHeader: RPCHeader{Addr: []byte("cartman")},
}

resp := AppendEntriesResponse{
Term: 4,
LastLog: 90,
Success: true,
}
args := makeAppendRPC()
resp := makeAppendRPCResponse()

// Listen for a request
go func() {
Expand Down Expand Up @@ -282,26 +290,8 @@ func TestNetworkTransport_AppendEntriesPipeline(t *testing.T) {
rpcCh := trans1.Consumer()

// Make the RPC request
args := AppendEntriesRequest{
Term: 10,
PrevLogEntry: 100,
PrevLogTerm: 4,
Entries: []*Log{
{
Index: 101,
Term: 4,
Type: LogNoop,
},
},
LeaderCommitIndex: 90,
RPCHeader: RPCHeader{Addr: []byte("cartman")},
}

resp := AppendEntriesResponse{
Term: 4,
LastLog: 90,
Success: true,
}
args := makeAppendRPC()
resp := makeAppendRPCResponse()

// Listen for a request
go func() {
Expand Down Expand Up @@ -368,26 +358,8 @@ func TestNetworkTransport_AppendEntriesPipeline_CloseStreams(t *testing.T) {
rpcCh := trans1.Consumer()

// Make the RPC request
args := AppendEntriesRequest{
Term: 10,
PrevLogEntry: 100,
PrevLogTerm: 4,
Entries: []*Log{
{
Index: 101,
Term: 4,
Type: LogNoop,
},
},
LeaderCommitIndex: 90,
RPCHeader: RPCHeader{Addr: []byte("cartman")},
}

resp := AppendEntriesResponse{
Term: 4,
LastLog: 90,
Success: true,
}
args := makeAppendRPC()
resp := makeAppendRPCResponse()

shutdownCh := make(chan struct{})
defer close(shutdownCh)
Expand Down Expand Up @@ -467,6 +439,105 @@ func TestNetworkTransport_AppendEntriesPipeline_CloseStreams(t *testing.T) {
}
}

func TestNetworkTransport_AppendEntriesPipeline_MaxRPCsInFlight(t *testing.T) {
// Test the important cases 0 (default to 2), 1 (disabled), 2 and "some"
for _, max := range []int{0, 1, 2, 10} {
t.Run(fmt.Sprintf("max=%d", max), func(t *testing.T) {
config := &NetworkTransportConfig{
MaxPool: 2,
MaxRPCsInFlight: max,
Timeout: time.Second,
// Don't use test logger as the transport has multiple goroutines and
// causes panics.
ServerAddressProvider: &testAddrProvider{"localhost:0"},
}

// Transport 1 is consumer
trans1, err := NewTCPTransportWithConfig("localhost:0", nil, config)
require.NoError(t, err)
defer trans1.Close()

// Make the RPC request
args := makeAppendRPC()
resp := makeAppendRPCResponse()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Transport 2 makes outbound request
config.ServerAddressProvider = &testAddrProvider{string(trans1.LocalAddr())}
trans2, err := NewTCPTransportWithConfig("localhost:0", nil, config)
require.NoError(t, err)
defer trans2.Close()

// Kill the transports on the timeout to unblock. That means things that
// shouldn't have blocked did block.
go func() {
<-ctx.Done()
trans2.Close()
trans1.Close()
}()

// Attempt to pipeline
pipeline, err := trans2.AppendEntriesPipeline("id1", trans1.LocalAddr())
if max == 1 {
// Max == 1 implies no pipelining
require.EqualError(t, err, ErrPipelineReplicationNotSupported.Error())
return
}
require.NoError(t, err)

expectedMax := max
if max == 0 {
// Should have defaulted to 2
expectedMax = 2
}

for i := 0; i < expectedMax-1; i++ {
// We should be able to send `max - 1` rpcs before `AppendEntries`
// blocks. It blocks on the `max` one because it it sends before pushing
// to the chan. It will block forever when it does because nothing is
// responding yet.
out := new(AppendEntriesResponse)
_, err := pipeline.AppendEntries(&args, out)
require.NoError(t, err)
}

// Verify the next send blocks without blocking test forever
errCh := make(chan error, 1)
go func() {
out := new(AppendEntriesResponse)
_, err := pipeline.AppendEntries(&args, out)
errCh <- err
}()

select {
case <-errCh:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a bug here := we aren't capturing the error from the channel to use in the assertion below...

require.NoError(t, err)
t.Fatalf("AppendEntries didn't block with %d in flight", max)
case <-time.After(50 * time.Millisecond):
// OK it's probably blocked or we got _really_ unlucky with scheduling!
}

// Verify that once we receive/respond another one can be sent.
rpc := <-trans1.Consumer()
rpc.Respond(resp, nil)

// We also need to consume the response from the pipeline in case chan is
// unbuffered (inflight is 2 or 1)
<-pipeline.Consumer()

// The last append should unblock once the response is received.
select {
case <-errCh:
// OK
case <-time.After(50 * time.Millisecond):
t.Fatalf("last append didn't unblock")
}
})
}
}

func TestNetworkTransport_RequestVote(t *testing.T) {

for _, useAddrProvider := range []bool{true, false} {
Expand Down Expand Up @@ -741,11 +812,18 @@ func TestNetworkTransport_PooledConn(t *testing.T) {
}

func makeTransport(t *testing.T, useAddrProvider bool, addressOverride string) (*NetworkTransport, error) {
config := &NetworkTransportConfig{
MaxPool: 2,
// Setting this because older tests for pipelining were written when this
// was a constant and block forever if it's not large enough.
MaxRPCsInFlight: 130,
Timeout: time.Second,
Logger: newTestLogger(t),
}
if useAddrProvider {
config := &NetworkTransportConfig{MaxPool: 2, Timeout: time.Second, Logger: newTestLogger(t), ServerAddressProvider: &testAddrProvider{addressOverride}}
return NewTCPTransportWithConfig("localhost:0", nil, config)
config.ServerAddressProvider = &testAddrProvider{addressOverride}
}
return NewTCPTransportWithLogger("localhost:0", nil, 2, time.Second, newTestLogger(t))
return NewTCPTransportWithConfig("localhost:0", nil, config)
}

type testCountingWriter struct {
Expand Down