Skip to content

Commit

Permalink
rafthttp: configurable stream reader retry timeout
Browse files Browse the repository at this point in the history
rafthttp.Transport.DialRetryTimeout field alters the frequency of dial attempts
+ minor changes after code review
  • Loading branch information
Vitaly Isaev authored and gyuho committed Jun 2, 2017
1 parent c578ac4 commit 4301f49
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 29 deletions.
4 changes: 4 additions & 0 deletions rafthttp/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"golang.org/x/net/context"
"golang.org/x/time/rate"
)

const (
Expand Down Expand Up @@ -188,6 +189,7 @@ func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats
status: status,
recvc: p.recvc,
propc: p.propc,
rl: rate.NewLimiter(transport.DialRetryFrequency, 1),
}
p.msgAppReader = &streamReader{
peerID: peerID,
Expand All @@ -197,7 +199,9 @@ func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats
status: status,
recvc: p.recvc,
propc: p.propc,
rl: rate.NewLimiter(transport.DialRetryFrequency, 1),
}

p.msgAppV2Reader.start()
p.msgAppReader.start()

Expand Down
59 changes: 32 additions & 27 deletions rafthttp/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"sync"
"time"

"golang.org/x/time/rate"

"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/httputil"
"github.com/coreos/etcd/pkg/transport"
Expand Down Expand Up @@ -243,7 +245,9 @@ func (cw *streamWriter) closeUnlocked() bool {
if !cw.working {
return false
}
cw.closer.Close()
if err := cw.closer.Close(); err != nil {
plog.Errorf("peer %s (writer) connection close error: %v", cw.peerID, err)
}
if len(cw.msgc) > 0 {
cw.r.ReportUnreachable(uint64(cw.peerID))
}
Expand Down Expand Up @@ -278,25 +282,28 @@ type streamReader struct {
recvc chan<- raftpb.Message
propc chan<- raftpb.Message

rl *rate.Limiter // alters the frequency of dial retrial attempts

errorc chan<- error

mu sync.Mutex
paused bool
cancel func()
closer io.Closer

stopc chan struct{}
done chan struct{}
ctx context.Context
cancel context.CancelFunc
done chan struct{}
}

func (r *streamReader) start() {
r.stopc = make(chan struct{})
r.done = make(chan struct{})
if r.errorc == nil {
r.errorc = r.tr.ErrorC
func (cr *streamReader) start() {
cr.done = make(chan struct{})
if cr.errorc == nil {
cr.errorc = cr.tr.ErrorC
}

go r.run()
if cr.ctx == nil {
cr.ctx, cr.cancel = context.WithCancel(context.Background())
}
go cr.run()
}

func (cr *streamReader) run() {
Expand All @@ -311,7 +318,7 @@ func (cr *streamReader) run() {
} else {
cr.status.activate()
plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
err := cr.decodeLoop(rc, t)
err = cr.decodeLoop(rc, t)
plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
switch {
// all data is read out
Expand All @@ -322,15 +329,16 @@ func (cr *streamReader) run() {
cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error())
}
}
select {
// Wait 100ms to create a new stream, so it doesn't bring too much
// overhead when retry.
case <-time.After(100 * time.Millisecond):
case <-cr.stopc:
// Wait for a while before new dial attempt
err = cr.rl.Wait(cr.ctx)
if cr.ctx.Err() != nil {
plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t)
close(cr.done)
return
}
if err != nil {
plog.Errorf("streaming with peer %s (%s reader) rate limiter error: %v", cr.peerID, t, err)
}
}
}

Expand All @@ -346,7 +354,7 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
plog.Panicf("unhandled stream type %s", t)
}
select {
case <-cr.stopc:
case <-cr.ctx.Done():
cr.mu.Unlock()
if err := rc.Close(); err != nil {
return err
Expand Down Expand Up @@ -401,11 +409,8 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
}

func (cr *streamReader) stop() {
close(cr.stopc)
cr.mu.Lock()
if cr.cancel != nil {
cr.cancel()
}
cr.cancel()
cr.close()
cr.mu.Unlock()
<-cr.done
Expand All @@ -429,13 +434,11 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {

setPeerURLsHeader(req, cr.tr.URLs)

ctx, cancel := context.WithCancel(context.Background())
req = req.WithContext(ctx)
req = req.WithContext(cr.ctx)

cr.mu.Lock()
cr.cancel = cancel
select {
case <-cr.stopc:
case <-cr.ctx.Done():
cr.mu.Unlock()
return nil, fmt.Errorf("stream reader is stopped")
default:
Expand Down Expand Up @@ -497,7 +500,9 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {

func (cr *streamReader) close() {
if cr.closer != nil {
cr.closer.Close()
if err := cr.closer.Close(); err != nil {
plog.Errorf("peer %s (reader) connection close error: %v", cr.peerID, err)
}
}
cr.closer = nil
}
Expand Down
8 changes: 8 additions & 0 deletions rafthttp/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package rafthttp

import (
"context"
"errors"
"fmt"
"io"
Expand All @@ -25,6 +26,8 @@ import (
"testing"
"time"

"golang.org/x/time/rate"

"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/etcd/pkg/types"
Expand Down Expand Up @@ -113,6 +116,7 @@ func TestStreamReaderDialRequest(t *testing.T) {
peerID: types.ID(2),
tr: &Transport{streamRt: tr, ClusterID: types.ID(1), ID: types.ID(1)},
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
ctx: context.Background(),
}
sr.dial(tt)

Expand Down Expand Up @@ -167,6 +171,7 @@ func TestStreamReaderDialResult(t *testing.T) {
tr: &Transport{streamRt: tr, ClusterID: types.ID(1)},
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
errorc: make(chan error, 1),
ctx: context.Background(),
}

_, err := sr.dial(streamTypeMessage)
Expand All @@ -192,6 +197,7 @@ func TestStreamReaderStopOnDial(t *testing.T) {
errorc: make(chan error, 1),
typ: streamTypeMessage,
status: newPeerStatus(types.ID(2)),
rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1),
}
tr.onResp = func() {
// stop() waits for the run() goroutine to exit, but that exit
Expand Down Expand Up @@ -246,6 +252,7 @@ func TestStreamReaderDialDetectUnsupport(t *testing.T) {
peerID: types.ID(2),
tr: &Transport{streamRt: tr, ClusterID: types.ID(1)},
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
ctx: context.Background(),
}

_, err := sr.dial(typ)
Expand Down Expand Up @@ -311,6 +318,7 @@ func TestStream(t *testing.T) {
status: newPeerStatus(types.ID(2)),
recvc: recvc,
propc: propc,
rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1),
}
sr.start()

Expand Down
16 changes: 14 additions & 2 deletions rafthttp/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/coreos/pkg/capnslog"
"github.com/xiang90/probing"
"golang.org/x/net/context"
"golang.org/x/time/rate"
)

var plog = logutil.NewMergeLogger(capnslog.NewPackageLogger("github.com/coreos/etcd", "rafthttp"))
Expand Down Expand Up @@ -94,8 +95,12 @@ type Transporter interface {
// User needs to call Start before calling other functions, and call
// Stop when the Transport is no longer used.
type Transport struct {
DialTimeout time.Duration // maximum duration before timing out dial of the request
TLSInfo transport.TLSInfo // TLS information used when creating connection
DialTimeout time.Duration // maximum duration before timing out dial of the request
// DialRetryFrequency defines the frequency of streamReader dial retrial attempts;
// a distinct rate limiter is created per every peer (default value: 10 events/sec)
DialRetryFrequency rate.Limit

TLSInfo transport.TLSInfo // TLS information used when creating connection

ID types.ID // local member ID
URLs types.URLs // local peer URLs
Expand Down Expand Up @@ -135,6 +140,13 @@ func (t *Transport) Start() error {
t.remotes = make(map[types.ID]*remote)
t.peers = make(map[types.ID]Peer)
t.prober = probing.NewProber(t.pipelineRt)

// If client didn't provide dial retry frequence, use the default
// (100ms backoff between attempts to create a new stream),
// so it doesn't bring too much overhead when retry.
if t.DialRetryFrequency == 0 {
t.DialRetryFrequency = rate.Every(100 * time.Millisecond)
}
return nil
}

Expand Down

0 comments on commit 4301f49

Please sign in to comment.