Skip to content

Commit

Permalink
[FAB-11918] Add WAL to persist raft log
Browse files Browse the repository at this point in the history
This CR adds WAL to etcdraft to persist log entries, so
that an etcdraft based chain can continue from where it's
left. The index of log entry is persisted with block as
its metadata so written blocks are not replayed.

Change-Id: I7e4131541d90b256d767a9421ab6ae2a47a88bcd
Signed-off-by: Jay Guo <guojiannan1101@gmail.com>
  • Loading branch information
guoger authored and C0rWin committed Nov 8, 2018
1 parent cf49094 commit 858aaa9
Show file tree
Hide file tree
Showing 7 changed files with 497 additions and 66 deletions.
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Gopkg.toml
Expand Up @@ -6,7 +6,6 @@ required = [
"golang.org/x/lint/golint",
"golang.org/x/tools/cmd/goimports",
"github.com/golang/protobuf/protoc-gen-go",
"github.com/coreos/etcd/wal"
]

ignored = [
Expand Down
2 changes: 2 additions & 0 deletions integration/nwo/orderer_template.go
Expand Up @@ -89,5 +89,7 @@ Kafka:
Debug:
BroadcastTraceDir:
DeliverTraceDir:
Consensus:
WALDir: {{ .OrdererDir Orderer }}/etcdraft/wal
{{- end }}
`
194 changes: 158 additions & 36 deletions orderer/consensus/etcdraft/chain.go
Expand Up @@ -10,12 +10,16 @@ import (
"context"
"encoding/pem"
"fmt"
"os"
"reflect"
"sync/atomic"
"time"

"code.cloudfoundry.org/clock"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/configtx"
"github.com/hyperledger/fabric/common/flogging"
Expand All @@ -35,6 +39,7 @@ import (
type Storage interface {
raft.Storage
Append(entries []raftpb.Entry) error
SetHardState(st raftpb.HardState) error
}

//go:generate mockery -dir . -name Configurator -case underscore -output ./mocks/
Expand All @@ -53,12 +58,22 @@ type RPC interface {
SendSubmit(dest uint64, request *orderer.SubmitRequest) error
}

type block struct {
b *common.Block

// i is the etcd/raft entry Index associated with block.
// it is persisted as block metatdata so we know where
// to continue rafting upon reboot.
i uint64
}

// Options contains all the configurations relevant to the chain.
type Options struct {
RaftID uint64

Clock clock.Clock

WALDir string
Storage Storage
Logger *flogging.FabricLogger

Expand All @@ -67,7 +82,8 @@ type Options struct {
HeartbeatTick int
MaxSizePerMsg uint64
MaxInflightMsgs int
RaftMetadata *etcdraft.RaftMetadata

RaftMetadata *etcdraft.RaftMetadata
}

// Chain implements consensus.Chain interface.
Expand All @@ -79,7 +95,7 @@ type Chain struct {
channelID string

submitC chan *orderer.SubmitRequest
commitC chan *common.Block
commitC chan block
observeC chan<- uint64 // Notifies external observer on leader change (passed in optionally as an argument for tests)
haltC chan struct{} // Signals to goroutines that the chain is halting
doneC chan struct{} // Closes when the chain halts
Expand All @@ -93,8 +109,11 @@ type Chain struct {
leader uint64
appliedIndex uint64

hasWAL bool // indicate if this is a fresh raft node

node raft.Node
storage Storage
wal *wal.WAL
opts Options

logger *flogging.FabricLogger
Expand All @@ -107,29 +126,44 @@ func NewChain(
conf Configurator,
rpc RPC,
observeC chan<- uint64) (*Chain, error) {

lg := opts.Logger.With("channel", support.ChainID(), "node", opts.RaftID)

applied := opts.RaftMetadata.RaftIndex
w, hasWAL, err := replayWAL(lg, applied, opts.WALDir, opts.Storage)
if err != nil {
return nil, errors.Errorf("failed to create chain: %s", err)
}

return &Chain{
configurator: conf,
rpc: rpc,
channelID: support.ChainID(),
raftID: opts.RaftID,
submitC: make(chan *orderer.SubmitRequest),
commitC: make(chan *common.Block),
commitC: make(chan block),
haltC: make(chan struct{}),
doneC: make(chan struct{}),
resignC: make(chan struct{}),
startC: make(chan struct{}),
observeC: observeC,
support: support,
hasWAL: hasWAL,
appliedIndex: applied,
clock: opts.Clock,
logger: opts.Logger.With("channel", support.ChainID(), "node", opts.RaftID),
logger: lg,
storage: opts.Storage,
wal: w,
opts: opts,
}, nil
}

// Start instructs the orderer to begin serving the chain and keep it current.
func (c *Chain) Start() {
c.logger.Infof("Starting Raft node")

// DO NOT use Applied option in config, see https://github.com/etcd-io/etcd/issues/10217
// We guard against replay of written blocks in `entriesToApply` instead.
config := &raft.Config{
ID: c.raftID,
ElectionTick: c.opts.ElectionTick,
Expand All @@ -149,7 +183,13 @@ func (c *Chain) Start() {

raftPeers := RaftPeers(c.opts.RaftMetadata.Consenters)

c.node = raft.StartNode(config, raftPeers)
if !c.hasWAL {
c.logger.Infof("starting new raft node %d", c.raftID)
c.node = raft.StartNode(config, raftPeers)
} else {
c.logger.Infof("restarting raft node %d", c.raftID)
c.node = raft.RestartNode(config)
}

close(c.startC)

Expand Down Expand Up @@ -372,14 +412,16 @@ func (c *Chain) serveRequest() {
}
}

func (c *Chain) writeBlock(b *common.Block) {
metadata := utils.MarshalOrPanic(c.opts.RaftMetadata)
if utils.IsConfigBlock(b) {
c.support.WriteConfigBlock(b, metadata)
func (c *Chain) writeBlock(b block) {
c.opts.RaftMetadata.RaftIndex = b.i
m := utils.MarshalOrPanic(c.opts.RaftMetadata)

if utils.IsConfigBlock(b.b) {
c.support.WriteConfigBlock(b.b, m)
return
}

c.support.WriteBlock(b, metadata)
c.support.WriteBlock(b.b, m)
}

// Orders the envelope in the `msg` content. SubmitRequest.
Expand Down Expand Up @@ -450,7 +492,10 @@ func (c *Chain) serveRaft() {

case rd := <-c.node.Ready():
c.storage.Append(rd.Entries)
c.apply(c.entriesToApply(rd.CommittedEntries))
if err := c.wal.Save(rd.HardState, rd.Entries); err != nil {
c.logger.Panicf("failed to persist hardstate and entries to wal: %s", err)
}
c.apply(rd.CommittedEntries)
c.node.Advance()
c.send(rd.Messages)

Expand All @@ -474,24 +519,35 @@ func (c *Chain) serveRaft() {
}

case <-c.haltC:
close(c.doneC)
ticker.Stop()
c.node.Stop()
c.wal.Close()
c.logger.Infof("Raft node %x stopped", c.raftID)
close(c.doneC) // close after all the artifacts are closed
return
}
}
}

func (c *Chain) apply(ents []raftpb.Entry) {
if len(ents) == 0 {
return
}

if ents[0].Index > c.appliedIndex+1 {
c.logger.Panicf("first index of committed entry[%d] should <= appliedIndex[%d]+1", ents[0].Index, c.appliedIndex)
}

for i := range ents {
switch ents[i].Type {
case raftpb.EntryNormal:
if len(ents[i].Data) == 0 {
// We need to strictly avoid re-applying normal entries,
// otherwise we are writing the same block twice.
if len(ents[i].Data) == 0 || ents[i].Index <= c.appliedIndex {
break
}

c.commitC <- utils.UnmarshalBlockOrPanic(ents[i].Data)
c.commitC <- block{utils.UnmarshalBlockOrPanic(ents[i].Data), ents[i].Index}

case raftpb.EntryConfChange:
var cc raftpb.ConfChange
Expand All @@ -503,7 +559,9 @@ func (c *Chain) apply(ents []raftpb.Entry) {
c.node.ApplyConfChange(cc)
}

c.appliedIndex = ents[i].Index
if ents[i].Index > c.appliedIndex {
c.appliedIndex = ents[i].Index
}
}
}

Expand All @@ -522,27 +580,6 @@ func (c *Chain) send(msgs []raftpb.Message) {
}
}

// this is taken from coreos/contrib/raftexample/raft.go
func (c *Chain) entriesToApply(ents []raftpb.Entry) (nents []raftpb.Entry) {
if len(ents) == 0 {
return
}

firstIdx := ents[0].Index
if firstIdx > c.appliedIndex+1 {
c.logger.Panicf("first index of committed entry[%d] should <= progress.appliedIndex[%d]+1", firstIdx, c.appliedIndex)
}

// If we do have unapplied entries in nents.
// | applied | unapplied |
// |----------------|----------------------|
// firstIdx appliedIndex last
if c.appliedIndex-firstIdx+1 < uint64(len(ents)) {
nents = ents[c.appliedIndex-firstIdx+1:]
}
return nents
}

func (c *Chain) isConfig(env *common.Envelope) bool {
h, err := utils.ChannelHeader(env)
if err != nil {
Expand Down Expand Up @@ -613,3 +650,88 @@ func (c *Chain) checkConsentersSet(configValue *common.ConfigValue) error {

return nil
}

func (c *Chain) consentersChanged(newConsenters []*etcdraft.Consenter) bool {
if len(c.opts.RaftMetadata.Consenters) != len(newConsenters) {
return false
}

consentersSet1 := c.membershipByCert()
consentersSet2 := c.consentersToMap(newConsenters)

return reflect.DeepEqual(consentersSet1, consentersSet2)
}

func (c *Chain) membershipByCert() map[string]struct{} {
set := map[string]struct{}{}
for _, c := range c.opts.RaftMetadata.Consenters {
set[string(c.ClientTlsCert)] = struct{}{}
}
return set
}

func (c *Chain) consentersToMap(consenters []*etcdraft.Consenter) map[string]struct{} {
set := map[string]struct{}{}
for _, c := range consenters {
set[string(c.ClientTlsCert)] = struct{}{}
}
return set
}

func (c *Chain) membershipToRaftPeers() []raft.Peer {
var peers []raft.Peer

for raftID := range c.opts.RaftMetadata.Consenters {
peers = append(peers, raft.Peer{ID: raftID})
}
return peers
}

func replayWAL(lg *flogging.FabricLogger, applied uint64, walDir string, storage Storage) (*wal.WAL, bool, error) {
hasWAL := wal.Exist(walDir)
if !hasWAL && applied != 0 {
return nil, hasWAL, errors.Errorf("applied index is not zero but no WAL data found")
}

if !hasWAL {
// wal.Create takes care of following cases by creating temp dir and atomically rename it:
// - wal dir is a file
// - wal dir is not readable/writeable
//
// TODO(jay_guo) store channel-related information in metadata when needed.
// potential use case could be data dump and restore
lg.Infof("No WAL data found, creating new WAL at path '%s'", walDir)
w, err := wal.Create(walDir, nil)
if err == os.ErrExist {
lg.Fatalf("programming error, we've just checked that WAL does not exist")
}

if err != nil {
return nil, hasWAL, errors.Errorf("failed to initialize WAL: %s", err)
}

if err = w.Close(); err != nil {
return nil, hasWAL, errors.Errorf("failed to close the WAL just created: %s", err)
}
} else {
lg.Infof("Found WAL data at path '%s', replaying it", walDir)
}

w, err := wal.Open(walDir, walpb.Snapshot{})
if err != nil {
return nil, hasWAL, errors.Errorf("failed to open existing WAL: %s", err)
}

_, st, ents, err := w.ReadAll()
if err != nil {
return nil, hasWAL, errors.Errorf("failed to read WAL: %s", err)
}

lg.Debugf("Setting HardState to {Term: %d, Commit: %d}", st.Term, st.Commit)
storage.SetHardState(st) // MemoryStorage.SetHardState always returns nil

lg.Debugf("Appending %d entries to memory storage", len(ents))
storage.Append(ents) // MemoryStorage.Append always return nil

return w, hasWAL, nil
}

0 comments on commit 858aaa9

Please sign in to comment.