diff --git a/rafthttp/peer.go b/rafthttp/peer.go index a82d7beed74..b8de635aa83 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -24,6 +24,7 @@ import ( "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/snap" "golang.org/x/net/context" + "golang.org/x/time/rate" ) const ( @@ -188,6 +189,7 @@ func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats status: status, recvc: p.recvc, propc: p.propc, + rl: rate.NewLimiter(transport.DialRetryFrequency, 1), } p.msgAppReader = &streamReader{ peerID: peerID, @@ -197,7 +199,9 @@ func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats status: status, recvc: p.recvc, propc: p.propc, + rl: rate.NewLimiter(transport.DialRetryFrequency, 1), } + p.msgAppV2Reader.start() p.msgAppReader.start() diff --git a/rafthttp/stream.go b/rafthttp/stream.go index 2a6c620f56d..9dfe22148a3 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -25,6 +25,8 @@ import ( "sync" "time" + "golang.org/x/time/rate" + "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/httputil" "github.com/coreos/etcd/pkg/transport" @@ -243,7 +245,9 @@ func (cw *streamWriter) closeUnlocked() bool { if !cw.working { return false } - cw.closer.Close() + if err := cw.closer.Close(); err != nil { + plog.Errorf("peer %s (writer) connection close error: %v", cw.peerID, err) + } if len(cw.msgc) > 0 { cw.r.ReportUnreachable(uint64(cw.peerID)) } @@ -278,25 +282,28 @@ type streamReader struct { recvc chan<- raftpb.Message propc chan<- raftpb.Message + rl *rate.Limiter // alters the frequency of dial retrial attempts + errorc chan<- error mu sync.Mutex paused bool - cancel func() closer io.Closer - stopc chan struct{} - done chan struct{} + ctx context.Context + cancel context.CancelFunc + done chan struct{} } -func (r *streamReader) start() { - r.stopc = make(chan struct{}) - r.done = make(chan struct{}) - if r.errorc == nil { - r.errorc = r.tr.ErrorC +func (cr *streamReader) start() { + cr.done = make(chan struct{}) + if cr.errorc == nil { + cr.errorc = cr.tr.ErrorC } - - go r.run() + if cr.ctx == nil { + cr.ctx, cr.cancel = context.WithCancel(context.Background()) + } + go cr.run() } func (cr *streamReader) run() { @@ -311,7 +318,7 @@ func (cr *streamReader) run() { } else { cr.status.activate() plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ) - err := cr.decodeLoop(rc, t) + err = cr.decodeLoop(rc, t) plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ) switch { // all data is read out @@ -322,15 +329,16 @@ func (cr *streamReader) run() { cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error()) } } - select { - // Wait 100ms to create a new stream, so it doesn't bring too much - // overhead when retry. - case <-time.After(100 * time.Millisecond): - case <-cr.stopc: + // Wait for a while before new dial attempt + err = cr.rl.Wait(cr.ctx) + if cr.ctx.Err() != nil { plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t) close(cr.done) return } + if err != nil { + plog.Errorf("streaming with peer %s (%s reader) rate limiter error: %v", cr.peerID, t, err) + } } } @@ -346,7 +354,7 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error { plog.Panicf("unhandled stream type %s", t) } select { - case <-cr.stopc: + case <-cr.ctx.Done(): cr.mu.Unlock() if err := rc.Close(); err != nil { return err @@ -401,11 +409,8 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error { } func (cr *streamReader) stop() { - close(cr.stopc) cr.mu.Lock() - if cr.cancel != nil { - cr.cancel() - } + cr.cancel() cr.close() cr.mu.Unlock() <-cr.done @@ -429,13 +434,11 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { setPeerURLsHeader(req, cr.tr.URLs) - ctx, cancel := context.WithCancel(context.Background()) - req = req.WithContext(ctx) + req = req.WithContext(cr.ctx) cr.mu.Lock() - cr.cancel = cancel select { - case <-cr.stopc: + case <-cr.ctx.Done(): cr.mu.Unlock() return nil, fmt.Errorf("stream reader is stopped") default: @@ -497,7 +500,9 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { func (cr *streamReader) close() { if cr.closer != nil { - cr.closer.Close() + if err := cr.closer.Close(); err != nil { + plog.Errorf("peer %s (reader) connection close error: %v", cr.peerID, err) + } } cr.closer = nil } diff --git a/rafthttp/stream_test.go b/rafthttp/stream_test.go index f48714e7c57..c9cd2b3d69c 100644 --- a/rafthttp/stream_test.go +++ b/rafthttp/stream_test.go @@ -15,6 +15,7 @@ package rafthttp import ( + "context" "errors" "fmt" "io" @@ -25,6 +26,8 @@ import ( "testing" "time" + "golang.org/x/time/rate" + "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/pkg/types" @@ -113,6 +116,7 @@ func TestStreamReaderDialRequest(t *testing.T) { peerID: types.ID(2), tr: &Transport{streamRt: tr, ClusterID: types.ID(1), ID: types.ID(1)}, picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), + ctx: context.Background(), } sr.dial(tt) @@ -167,6 +171,7 @@ func TestStreamReaderDialResult(t *testing.T) { tr: &Transport{streamRt: tr, ClusterID: types.ID(1)}, picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), errorc: make(chan error, 1), + ctx: context.Background(), } _, err := sr.dial(streamTypeMessage) @@ -192,6 +197,7 @@ func TestStreamReaderStopOnDial(t *testing.T) { errorc: make(chan error, 1), typ: streamTypeMessage, status: newPeerStatus(types.ID(2)), + rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1), } tr.onResp = func() { // stop() waits for the run() goroutine to exit, but that exit @@ -246,6 +252,7 @@ func TestStreamReaderDialDetectUnsupport(t *testing.T) { peerID: types.ID(2), tr: &Transport{streamRt: tr, ClusterID: types.ID(1)}, picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), + ctx: context.Background(), } _, err := sr.dial(typ) @@ -311,6 +318,7 @@ func TestStream(t *testing.T) { status: newPeerStatus(types.ID(2)), recvc: recvc, propc: propc, + rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1), } sr.start() diff --git a/rafthttp/transport.go b/rafthttp/transport.go index 1f0b46836e6..50219db71b9 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -29,6 +29,7 @@ import ( "github.com/coreos/pkg/capnslog" "github.com/xiang90/probing" "golang.org/x/net/context" + "golang.org/x/time/rate" ) var plog = logutil.NewMergeLogger(capnslog.NewPackageLogger("github.com/coreos/etcd", "rafthttp")) @@ -94,8 +95,12 @@ type Transporter interface { // User needs to call Start before calling other functions, and call // Stop when the Transport is no longer used. type Transport struct { - DialTimeout time.Duration // maximum duration before timing out dial of the request - TLSInfo transport.TLSInfo // TLS information used when creating connection + DialTimeout time.Duration // maximum duration before timing out dial of the request + // DialRetryFrequency defines the frequency of streamReader dial retrial attempts; + // a distinct rate limiter is created per every peer (default value: 10 events/sec) + DialRetryFrequency rate.Limit + + TLSInfo transport.TLSInfo // TLS information used when creating connection ID types.ID // local member ID URLs types.URLs // local peer URLs @@ -135,6 +140,13 @@ func (t *Transport) Start() error { t.remotes = make(map[types.ID]*remote) t.peers = make(map[types.ID]Peer) t.prober = probing.NewProber(t.pipelineRt) + + // If client didn't provide dial retry frequence, use the default + // (100ms backoff between attempts to create a new stream), + // so it doesn't bring too much overhead when retry. + if t.DialRetryFrequency == 0 { + t.DialRetryFrequency = rate.Every(100 * time.Millisecond) + } return nil }