Skip to content

Commit

Permalink
Merge pull request #2408 from yichengq/335
Browse files Browse the repository at this point in the history
rafthttp: report MsgSnap status
  • Loading branch information
yichengq committed Mar 2, 2015
2 parents 9989bf1 + b4b9b91 commit 31666cd
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 5 deletions.
4 changes: 4 additions & 0 deletions etcdserver/server.go
Expand Up @@ -328,6 +328,10 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {

func (s *EtcdServer) ReportUnreachable(id uint64) { s.r.ReportUnreachable(id) }

func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
s.r.ReportSnapshot(id, status)
}

func (s *EtcdServer) run() {
var syncC <-chan time.Time
var shouldstop bool
Expand Down
2 changes: 1 addition & 1 deletion raft/raft.go
Expand Up @@ -582,7 +582,7 @@ func stepLeader(r *raft, m pb.Message) {
log.Printf("raft: %x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
} else {
pr.snapshotFinish()
log.Printf("raft: %x snapshot succeeded resumed sending replication messages to %x [%s]", r.id, m.From, pr)
log.Printf("raft: %x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
// wait for the msgAppResp from the remote node before sending
// out the next msgApp
pr.waitSet(r.electionTimeout)
Expand Down
2 changes: 2 additions & 0 deletions rafthttp/http.go
Expand Up @@ -101,6 +101,8 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
return
}
// Write StatusNoContet header after the message has been processed by
// raft, which faciliates the client to report MsgSnap status.
w.WriteHeader(http.StatusNoContent)
}

Expand Down
11 changes: 7 additions & 4 deletions rafthttp/http_test.go
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
)

Expand Down Expand Up @@ -161,15 +162,17 @@ func (er *errReader) Read(_ []byte) (int, error) { return 0, errors.New("some er

type nopProcessor struct{}

func (p *nopProcessor) Process(ctx context.Context, m raftpb.Message) error { return nil }
func (p *nopProcessor) ReportUnreachable(id uint64) {}
func (p *nopProcessor) Process(ctx context.Context, m raftpb.Message) error { return nil }
func (p *nopProcessor) ReportUnreachable(id uint64) {}
func (p *nopProcessor) ReportSnapshot(id uint64, status raft.SnapshotStatus) {}

type errProcessor struct {
err error
}

func (p *errProcessor) Process(ctx context.Context, m raftpb.Message) error { return p.err }
func (p *errProcessor) ReportUnreachable(id uint64) {}
func (p *errProcessor) Process(ctx context.Context, m raftpb.Message) error { return p.err }
func (p *errProcessor) ReportUnreachable(id uint64) {}
func (p *errProcessor) ReportSnapshot(id uint64, status raft.SnapshotStatus) {}

type resWriterToError struct {
code int
Expand Down
7 changes: 7 additions & 0 deletions rafthttp/peer.go
Expand Up @@ -180,6 +180,11 @@ func (p *peer) Stop() {

func (p *peer) pick(m raftpb.Message) (writec chan raftpb.Message, name string, size int) {
switch {
// Considering MsgSnap may have a big size, e.g., 1G, and will block
// stream for a long time, only use one of the N pipelines to send MsgSnap.
case isMsgSnap(m):
writec = p.pipeline.msgc
name, size = "pipeline", pipelineBufSize
case p.msgAppWriter.isWorking() && canUseMsgAppStream(m):
writec = p.msgAppWriter.msgc
name, size = "msgapp stream", streamBufSize
Expand All @@ -192,3 +197,5 @@ func (p *peer) pick(m raftpb.Message) (writec chan raftpb.Message, name string,
}
return
}

func isMsgSnap(m raftpb.Message) bool { return m.Type == raftpb.MsgSnap }
7 changes: 7 additions & 0 deletions rafthttp/pipeline.go
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
)

Expand Down Expand Up @@ -105,6 +106,9 @@ func (p *pipeline) handle() {
p.fs.Fail()
}
p.r.ReportUnreachable(m.To)
if isMsgSnap(m) {
p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
}
} else {
if !p.active {
log.Printf("pipeline: the connection with %s became active", p.id)
Expand All @@ -114,6 +118,9 @@ func (p *pipeline) handle() {
if m.Type == raftpb.MsgApp {
p.fs.Succ(end.Sub(start))
}
if isMsgSnap(m) {
p.r.ReportSnapshot(m.To, raft.SnapshotFinish)
}
}
p.Unlock()
}
Expand Down
2 changes: 2 additions & 0 deletions rafthttp/transport.go
Expand Up @@ -24,12 +24,14 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
)

type Raft interface {
Process(ctx context.Context, m raftpb.Message) error
ReportUnreachable(id uint64)
ReportSnapshot(id uint64, status raft.SnapshotStatus)
}

type Transporter interface {
Expand Down

0 comments on commit 31666cd

Please sign in to comment.