diff --git a/Gopkg.lock b/Gopkg.lock index 3c6fad7dc1..105ac5ba58 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -925,6 +925,7 @@ "github.com/coreos/etcd/raft", "github.com/coreos/etcd/raft/raftpb", "github.com/coreos/etcd/wal", + "github.com/coreos/etcd/wal/walpb", "github.com/davecgh/go-spew/spew", "github.com/fsouza/go-dockerclient", "github.com/go-kit/kit/log", diff --git a/Gopkg.toml b/Gopkg.toml index 38ccc000e5..5788e4e21b 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -6,7 +6,6 @@ required = [ "golang.org/x/lint/golint", "golang.org/x/tools/cmd/goimports", "github.com/golang/protobuf/protoc-gen-go", - "github.com/coreos/etcd/wal" ] ignored = [ diff --git a/integration/nwo/orderer_template.go b/integration/nwo/orderer_template.go index d51233d602..dc0aa74fa4 100644 --- a/integration/nwo/orderer_template.go +++ b/integration/nwo/orderer_template.go @@ -89,5 +89,7 @@ Kafka: Debug: BroadcastTraceDir: DeliverTraceDir: +Consensus: + WALDir: {{ .OrdererDir Orderer }}/etcdraft/wal {{- end }} ` diff --git a/orderer/consensus/etcdraft/chain.go b/orderer/consensus/etcdraft/chain.go index 5f242cebc0..5640e7e83a 100644 --- a/orderer/consensus/etcdraft/chain.go +++ b/orderer/consensus/etcdraft/chain.go @@ -10,12 +10,16 @@ import ( "context" "encoding/pem" "fmt" + "os" + "reflect" "sync/atomic" "time" "code.cloudfoundry.org/clock" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/wal" + "github.com/coreos/etcd/wal/walpb" "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric/common/configtx" "github.com/hyperledger/fabric/common/flogging" @@ -35,6 +39,7 @@ import ( type Storage interface { raft.Storage Append(entries []raftpb.Entry) error + SetHardState(st raftpb.HardState) error } //go:generate mockery -dir . -name Configurator -case underscore -output ./mocks/ @@ -53,12 +58,22 @@ type RPC interface { SendSubmit(dest uint64, request *orderer.SubmitRequest) error } +type block struct { + b *common.Block + + // i is the etcd/raft entry Index associated with block. + // it is persisted as block metatdata so we know where + // to continue rafting upon reboot. + i uint64 +} + // Options contains all the configurations relevant to the chain. type Options struct { RaftID uint64 Clock clock.Clock + WALDir string Storage Storage Logger *flogging.FabricLogger @@ -67,7 +82,8 @@ type Options struct { HeartbeatTick int MaxSizePerMsg uint64 MaxInflightMsgs int - RaftMetadata *etcdraft.RaftMetadata + + RaftMetadata *etcdraft.RaftMetadata } // Chain implements consensus.Chain interface. @@ -79,7 +95,7 @@ type Chain struct { channelID string submitC chan *orderer.SubmitRequest - commitC chan *common.Block + commitC chan block observeC chan<- uint64 // Notifies external observer on leader change (passed in optionally as an argument for tests) haltC chan struct{} // Signals to goroutines that the chain is halting doneC chan struct{} // Closes when the chain halts @@ -93,8 +109,11 @@ type Chain struct { leader uint64 appliedIndex uint64 + hasWAL bool // indicate if this is a fresh raft node + node raft.Node storage Storage + wal *wal.WAL opts Options logger *flogging.FabricLogger @@ -107,22 +126,34 @@ func NewChain( conf Configurator, rpc RPC, observeC chan<- uint64) (*Chain, error) { + + lg := opts.Logger.With("channel", support.ChainID(), "node", opts.RaftID) + + applied := opts.RaftMetadata.RaftIndex + w, hasWAL, err := replayWAL(lg, applied, opts.WALDir, opts.Storage) + if err != nil { + return nil, errors.Errorf("failed to create chain: %s", err) + } + return &Chain{ configurator: conf, rpc: rpc, channelID: support.ChainID(), raftID: opts.RaftID, submitC: make(chan *orderer.SubmitRequest), - commitC: make(chan *common.Block), + commitC: make(chan block), haltC: make(chan struct{}), doneC: make(chan struct{}), resignC: make(chan struct{}), startC: make(chan struct{}), observeC: observeC, support: support, + hasWAL: hasWAL, + appliedIndex: applied, clock: opts.Clock, - logger: opts.Logger.With("channel", support.ChainID(), "node", opts.RaftID), + logger: lg, storage: opts.Storage, + wal: w, opts: opts, }, nil } @@ -130,6 +161,9 @@ func NewChain( // Start instructs the orderer to begin serving the chain and keep it current. func (c *Chain) Start() { c.logger.Infof("Starting Raft node") + + // DO NOT use Applied option in config, see https://github.com/etcd-io/etcd/issues/10217 + // We guard against replay of written blocks in `entriesToApply` instead. config := &raft.Config{ ID: c.raftID, ElectionTick: c.opts.ElectionTick, @@ -149,7 +183,13 @@ func (c *Chain) Start() { raftPeers := RaftPeers(c.opts.RaftMetadata.Consenters) - c.node = raft.StartNode(config, raftPeers) + if !c.hasWAL { + c.logger.Infof("starting new raft node %d", c.raftID) + c.node = raft.StartNode(config, raftPeers) + } else { + c.logger.Infof("restarting raft node %d", c.raftID) + c.node = raft.RestartNode(config) + } close(c.startC) @@ -372,14 +412,16 @@ func (c *Chain) serveRequest() { } } -func (c *Chain) writeBlock(b *common.Block) { - metadata := utils.MarshalOrPanic(c.opts.RaftMetadata) - if utils.IsConfigBlock(b) { - c.support.WriteConfigBlock(b, metadata) +func (c *Chain) writeBlock(b block) { + c.opts.RaftMetadata.RaftIndex = b.i + m := utils.MarshalOrPanic(c.opts.RaftMetadata) + + if utils.IsConfigBlock(b.b) { + c.support.WriteConfigBlock(b.b, m) return } - c.support.WriteBlock(b, metadata) + c.support.WriteBlock(b.b, m) } // Orders the envelope in the `msg` content. SubmitRequest. @@ -450,7 +492,10 @@ func (c *Chain) serveRaft() { case rd := <-c.node.Ready(): c.storage.Append(rd.Entries) - c.apply(c.entriesToApply(rd.CommittedEntries)) + if err := c.wal.Save(rd.HardState, rd.Entries); err != nil { + c.logger.Panicf("failed to persist hardstate and entries to wal: %s", err) + } + c.apply(rd.CommittedEntries) c.node.Advance() c.send(rd.Messages) @@ -474,24 +519,35 @@ func (c *Chain) serveRaft() { } case <-c.haltC: - close(c.doneC) ticker.Stop() c.node.Stop() + c.wal.Close() c.logger.Infof("Raft node %x stopped", c.raftID) + close(c.doneC) // close after all the artifacts are closed return } } } func (c *Chain) apply(ents []raftpb.Entry) { + if len(ents) == 0 { + return + } + + if ents[0].Index > c.appliedIndex+1 { + c.logger.Panicf("first index of committed entry[%d] should <= appliedIndex[%d]+1", ents[0].Index, c.appliedIndex) + } + for i := range ents { switch ents[i].Type { case raftpb.EntryNormal: - if len(ents[i].Data) == 0 { + // We need to strictly avoid re-applying normal entries, + // otherwise we are writing the same block twice. + if len(ents[i].Data) == 0 || ents[i].Index <= c.appliedIndex { break } - c.commitC <- utils.UnmarshalBlockOrPanic(ents[i].Data) + c.commitC <- block{utils.UnmarshalBlockOrPanic(ents[i].Data), ents[i].Index} case raftpb.EntryConfChange: var cc raftpb.ConfChange @@ -503,7 +559,9 @@ func (c *Chain) apply(ents []raftpb.Entry) { c.node.ApplyConfChange(cc) } - c.appliedIndex = ents[i].Index + if ents[i].Index > c.appliedIndex { + c.appliedIndex = ents[i].Index + } } } @@ -522,27 +580,6 @@ func (c *Chain) send(msgs []raftpb.Message) { } } -// this is taken from coreos/contrib/raftexample/raft.go -func (c *Chain) entriesToApply(ents []raftpb.Entry) (nents []raftpb.Entry) { - if len(ents) == 0 { - return - } - - firstIdx := ents[0].Index - if firstIdx > c.appliedIndex+1 { - c.logger.Panicf("first index of committed entry[%d] should <= progress.appliedIndex[%d]+1", firstIdx, c.appliedIndex) - } - - // If we do have unapplied entries in nents. - // | applied | unapplied | - // |----------------|----------------------| - // firstIdx appliedIndex last - if c.appliedIndex-firstIdx+1 < uint64(len(ents)) { - nents = ents[c.appliedIndex-firstIdx+1:] - } - return nents -} - func (c *Chain) isConfig(env *common.Envelope) bool { h, err := utils.ChannelHeader(env) if err != nil { @@ -613,3 +650,88 @@ func (c *Chain) checkConsentersSet(configValue *common.ConfigValue) error { return nil } + +func (c *Chain) consentersChanged(newConsenters []*etcdraft.Consenter) bool { + if len(c.opts.RaftMetadata.Consenters) != len(newConsenters) { + return false + } + + consentersSet1 := c.membershipByCert() + consentersSet2 := c.consentersToMap(newConsenters) + + return reflect.DeepEqual(consentersSet1, consentersSet2) +} + +func (c *Chain) membershipByCert() map[string]struct{} { + set := map[string]struct{}{} + for _, c := range c.opts.RaftMetadata.Consenters { + set[string(c.ClientTlsCert)] = struct{}{} + } + return set +} + +func (c *Chain) consentersToMap(consenters []*etcdraft.Consenter) map[string]struct{} { + set := map[string]struct{}{} + for _, c := range consenters { + set[string(c.ClientTlsCert)] = struct{}{} + } + return set +} + +func (c *Chain) membershipToRaftPeers() []raft.Peer { + var peers []raft.Peer + + for raftID := range c.opts.RaftMetadata.Consenters { + peers = append(peers, raft.Peer{ID: raftID}) + } + return peers +} + +func replayWAL(lg *flogging.FabricLogger, applied uint64, walDir string, storage Storage) (*wal.WAL, bool, error) { + hasWAL := wal.Exist(walDir) + if !hasWAL && applied != 0 { + return nil, hasWAL, errors.Errorf("applied index is not zero but no WAL data found") + } + + if !hasWAL { + // wal.Create takes care of following cases by creating temp dir and atomically rename it: + // - wal dir is a file + // - wal dir is not readable/writeable + // + // TODO(jay_guo) store channel-related information in metadata when needed. + // potential use case could be data dump and restore + lg.Infof("No WAL data found, creating new WAL at path '%s'", walDir) + w, err := wal.Create(walDir, nil) + if err == os.ErrExist { + lg.Fatalf("programming error, we've just checked that WAL does not exist") + } + + if err != nil { + return nil, hasWAL, errors.Errorf("failed to initialize WAL: %s", err) + } + + if err = w.Close(); err != nil { + return nil, hasWAL, errors.Errorf("failed to close the WAL just created: %s", err) + } + } else { + lg.Infof("Found WAL data at path '%s', replaying it", walDir) + } + + w, err := wal.Open(walDir, walpb.Snapshot{}) + if err != nil { + return nil, hasWAL, errors.Errorf("failed to open existing WAL: %s", err) + } + + _, st, ents, err := w.ReadAll() + if err != nil { + return nil, hasWAL, errors.Errorf("failed to read WAL: %s", err) + } + + lg.Debugf("Setting HardState to {Term: %d, Commit: %d}", st.Term, st.Commit) + storage.SetHardState(st) // MemoryStorage.SetHardState always returns nil + + lg.Debugf("Appending %d entries to memory storage", len(ents)) + storage.Append(ents) // MemoryStorage.Append always return nil + + return w, hasWAL, nil +} diff --git a/orderer/consensus/etcdraft/chain_test.go b/orderer/consensus/etcdraft/chain_test.go index 3cbc250f3e..de85189781 100644 --- a/orderer/consensus/etcdraft/chain_test.go +++ b/orderer/consensus/etcdraft/chain_test.go @@ -8,6 +8,11 @@ package etcdraft_test import ( "encoding/pem" + "fmt" + "io/ioutil" + "os" + "os/user" + "path" "sync" "time" @@ -38,6 +43,16 @@ const ( HEARTBEAT_TICK = 1 ) +// for some test cases we chmod file/dir to test failures caused by exotic permissions. +// however this does not work if tests are running as root, i.e. in a container. +func skipIfRoot() { + u, err := user.Current() + Expect(err).NotTo(HaveOccurred()) + if u.Uid == "0" { + Skip("you are running test as root, there's no way to make files unreadable") + } +} + var _ = Describe("Chain", func() { var ( env *common.Envelope @@ -71,13 +86,18 @@ var _ = Describe("Chain", func() { storage *raft.MemoryStorage observeC chan uint64 chain *etcdraft.Chain + walDir string ) BeforeEach(func() { + var err error + configurator = &mocks.Configurator{} configurator.On("Configure", mock.Anything, mock.Anything) clock = fakeclock.NewFakeClock(time.Now()) storage = raft.NewMemoryStorage() + walDir, err = ioutil.TempDir("", "wal-") + Expect(err).NotTo(HaveOccurred()) observeC = make(chan uint64, 1) support = &consensusmocks.FakeConsenterSupport{} @@ -111,13 +131,25 @@ var _ = Describe("Chain", func() { RaftMetadata: membership, Logger: logger, Storage: storage, + WALDir: walDir, } - var err error chain, err = etcdraft.NewChain(support, opts, configurator, nil, observeC) Expect(err).NotTo(HaveOccurred()) }) + campaign := func(clock *fakeclock.FakeClock, observeC <-chan uint64) { + Eventually(func() bool { + clock.Increment(interval) + select { + case <-observeC: + return true + default: + return false + } + }).Should(BeTrue()) + } + JustBeforeEach(func() { chain.Start() @@ -141,6 +173,7 @@ var _ = Describe("Chain", func() { AfterEach(func() { chain.Halt() + os.RemoveAll(walDir) }) Context("when a node starts up", func() { @@ -159,15 +192,7 @@ var _ = Describe("Chain", func() { Context("when Raft leader is elected", func() { JustBeforeEach(func() { - Eventually(func() bool { - clock.Increment(interval) - select { - case <-observeC: - return true - default: - return false - } - }).Should(BeTrue()) + campaign(clock, observeC) }) It("fails to order envelope if chain is halted", func() { @@ -546,7 +571,216 @@ var _ = Describe("Chain", func() { }) }) }) + + Describe("Crash Fault Tolerance", func() { + Describe("when a chain is started with existing WAL", func() { + var ( + m1 *raftprotos.RaftMetadata + m2 *raftprotos.RaftMetadata + newOpts etcdraft.Options + ) + + BeforeEach(func() { + newOpts = opts // make a copy of Options + newOpts.Storage = raft.NewMemoryStorage() // create a fresh MemoryStorage + }) + + JustBeforeEach(func() { + // to generate WAL data, we start a chain, + // order several envelopes and then halt the chain. + close(cutter.Block) + cutter.CutNext = true + support.CreateNextBlockReturns(normalBlock) + + // enque some data to be persisted on disk by raft + err := chain.Order(env, uint64(0)) + Expect(err).NotTo(HaveOccurred()) + Eventually(support.WriteBlockCallCount).Should(Equal(1)) + + _, metadata := support.WriteBlockArgsForCall(0) + m1 = &raftprotos.RaftMetadata{} + proto.Unmarshal(metadata, m1) + + err = chain.Order(env, uint64(0)) + Expect(err).NotTo(HaveOccurred()) + Eventually(support.WriteBlockCallCount).Should(Equal(2)) + + _, metadata = support.WriteBlockArgsForCall(1) + m2 = &raftprotos.RaftMetadata{} + proto.Unmarshal(metadata, m2) + + chain.Halt() + }) + + It("replays blocks from committed entries", func() { + c := newChain(10*time.Second, channelID, walDir, 0, 1, []uint64{1}) + c.Start() + defer c.Halt() + + Eventually(c.support.WriteBlockCallCount).Should(Equal(2)) + + _, metadata := c.support.WriteBlockArgsForCall(0) + m := &raftprotos.RaftMetadata{} + proto.Unmarshal(metadata, m) + Expect(m.RaftIndex).To(Equal(m1.RaftIndex)) + + _, metadata = c.support.WriteBlockArgsForCall(1) + m = &raftprotos.RaftMetadata{} + proto.Unmarshal(metadata, m) + Expect(m.RaftIndex).To(Equal(m2.RaftIndex)) + + // chain should keep functioning + campaign(c.clock, c.observe) + + c.cutter.CutNext = true + c.support.CreateNextBlockReturns(normalBlock) + + err := c.Order(env, uint64(0)) + Expect(err).NotTo(HaveOccurred()) + Eventually(c.support.WriteBlockCallCount).Should(Equal(3)) + }) + + It("only replays blocks after Applied index", func() { + c := newChain(10*time.Second, channelID, walDir, m1.RaftIndex, 1, []uint64{1}) + c.Start() + defer c.Halt() + + Eventually(c.support.WriteBlockCallCount).Should(Equal(1)) + + _, metadata := c.support.WriteBlockArgsForCall(0) + m := &raftprotos.RaftMetadata{} + proto.Unmarshal(metadata, m) + Expect(m.RaftIndex).To(Equal(m2.RaftIndex)) + + // chain should keep functioning + campaign(c.clock, c.observe) + + c.cutter.CutNext = true + c.support.CreateNextBlockReturns(normalBlock) + + err := c.Order(env, uint64(0)) + Expect(err).NotTo(HaveOccurred()) + Eventually(c.support.WriteBlockCallCount).Should(Equal(2)) + }) + + It("does not replay any block if already in sync", func() { + c := newChain(10*time.Second, channelID, walDir, m2.RaftIndex, 1, []uint64{1}) + c.Start() + defer c.Halt() + + Consistently(c.support.WriteBlockCallCount).Should(Equal(0)) + + // chain should keep functioning + campaign(c.clock, c.observe) + + c.cutter.CutNext = true + c.support.CreateNextBlockReturns(normalBlock) + + err := c.Order(env, uint64(0)) + Expect(err).NotTo(HaveOccurred()) + Eventually(c.support.WriteBlockCallCount).Should(Equal(1)) + }) + + Context("WAL file is not readable", func() { + It("fails to load wal", func() { + skipIfRoot() + + files, err := ioutil.ReadDir(walDir) + Expect(err).NotTo(HaveOccurred()) + for _, f := range files { + os.Chmod(path.Join(walDir, f.Name()), 0300) + } + + c, err := etcdraft.NewChain(support, opts, configurator, nil, observeC) + Expect(c).To(BeNil()) + Expect(err).To(MatchError(ContainSubstring("failed to open existing WAL"))) + }) + }) + }) + }) + + Context("Invalid WAL dir", func() { + var support = &consensusmocks.FakeConsenterSupport{} + + When("WAL dir is a file", func() { + It("replaces file with fresh WAL dir", func() { + f, err := ioutil.TempFile("", "wal-") + Expect(err).NotTo(HaveOccurred()) + defer os.RemoveAll(f.Name()) + + chain, err := etcdraft.NewChain( + support, + etcdraft.Options{ + WALDir: f.Name(), + Logger: logger, + Storage: storage, + RaftMetadata: &raftprotos.RaftMetadata{}, + }, + configurator, + nil, + observeC) + Expect(chain).NotTo(BeNil()) + Expect(err).NotTo(HaveOccurred()) + + info, err := os.Stat(f.Name()) + Expect(err).NotTo(HaveOccurred()) + Expect(info.IsDir()).To(BeTrue()) + }) + }) + + When("WAL dir is not writeable", func() { + It("replace it with fresh WAL dir", func() { + d, err := ioutil.TempDir("", "wal-") + Expect(err).NotTo(HaveOccurred()) + defer os.RemoveAll(d) + + err = os.Chmod(d, 0500) + Expect(err).NotTo(HaveOccurred()) + + chain, err := etcdraft.NewChain( + support, + etcdraft.Options{ + WALDir: d, + Logger: logger, + Storage: storage, + RaftMetadata: &raftprotos.RaftMetadata{}, + }, + nil, + nil, + nil) + Expect(chain).NotTo(BeNil()) + Expect(err).ToNot(HaveOccurred()) + }) + }) + + When("WAL parent dir is not writeable", func() { + It("fails to bootstrap fresh raft node", func() { + skipIfRoot() + + d, err := ioutil.TempDir("", "wal-") + Expect(err).NotTo(HaveOccurred()) + defer os.RemoveAll(d) + + err = os.Chmod(d, 0500) + Expect(err).NotTo(HaveOccurred()) + + chain, err := etcdraft.NewChain( + support, + etcdraft.Options{ + WALDir: path.Join(d, "wal-dir"), + Logger: logger, + RaftMetadata: &raftprotos.RaftMetadata{}, + }, + nil, + nil, + nil) + Expect(chain).To(BeNil()) + Expect(err).To(MatchError(ContainSubstring("failed to initialize WAL: mkdir"))) + }) + }) + }) }) + }) Describe("Multiple Raft nodes", func() { @@ -554,19 +788,29 @@ var _ = Describe("Chain", func() { network *network channelID string timeout time.Duration + dataDir string c1, c2, c3 *chain ) BeforeEach(func() { + var err error + channelID = "multi-node-channel" timeout = 10 * time.Second - network = createNetwork(timeout, channelID, []uint64{1, 2, 3}) + dataDir, err = ioutil.TempDir("", "raft-test-") + Expect(err).NotTo(HaveOccurred()) + + network = createNetwork(timeout, channelID, dataDir, []uint64{1, 2, 3}) c1 = network.chains[1] c2 = network.chains[2] c3 = network.chains[3] }) + AfterEach(func() { + os.RemoveAll(dataDir) + }) + When("2/3 nodes are running", func() { It("late node can catch up", func() { network.start(1, 2) @@ -772,7 +1016,7 @@ var _ = Describe("Chain", func() { Eventually(func() int { return c1.support.WriteBlockCallCount() }).Should(Equal(0)) Eventually(func() int { return c3.support.WriteBlockCallCount() }).Should(Equal(0)) - c1.clock.Increment(3 * interval) + c1.clock.Increment(time.Duration(n * int(interval/time.Millisecond))) Eventually(func() int { return c1.support.WriteBlockCallCount() }).Should(Equal(1)) Eventually(func() int { return c3.support.WriteBlockCallCount() }).Should(Equal(1)) }) @@ -884,6 +1128,7 @@ type chain struct { configurator *mocks.Configurator rpc *mocks.FakeRPC storage *raft.MemoryStorage + walDir string clock *fakeclock.FakeClock opts etcdraft.Options @@ -893,29 +1138,30 @@ type chain struct { *etcdraft.Chain } -func newChain(timeout time.Duration, channel string, id uint64, all []uint64) *chain { +func newChain(timeout time.Duration, channel string, walDir string, applied uint64, id uint64, all []uint64) *chain { rpc := &mocks.FakeRPC{} clock := fakeclock.NewFakeClock(time.Now()) storage := raft.NewMemoryStorage() tlsCA, _ := tlsgen.NewCA() - membership := &raftprotos.RaftMetadata{ + meta := &raftprotos.RaftMetadata{ Consenters: map[uint64]*raftprotos.Consenter{}, NextConsenterID: 1, + RaftIndex: applied, } for _, raftID := range all { - membership.Consenters[uint64(raftID)] = &raftprotos.Consenter{ + meta.Consenters[uint64(raftID)] = &raftprotos.Consenter{ Host: "localhost", Port: 7051, ClientTlsCert: clientTLSCert(tlsCA), ServerTlsCert: serverTLSCert(tlsCA), } - if uint64(raftID) > membership.NextConsenterID { - membership.NextConsenterID = uint64(raftID) + if uint64(raftID) > meta.NextConsenterID { + meta.NextConsenterID = uint64(raftID) } } - membership.NextConsenterID++ + meta.NextConsenterID++ opts := etcdraft.Options{ RaftID: uint64(id), @@ -925,9 +1171,10 @@ func newChain(timeout time.Duration, channel string, id uint64, all []uint64) *c HeartbeatTick: HEARTBEAT_TICK, MaxSizePerMsg: 1024 * 1024, MaxInflightMsgs: 256, - RaftMetadata: membership, + RaftMetadata: meta, Logger: flogging.NewFabricLogger(zap.NewNop()), Storage: storage, + WALDir: walDir, } support := &consensusmocks.FakeConsenterSupport{} @@ -979,7 +1226,7 @@ type network struct { connectivity map[uint64]chan struct{} } -func createNetwork(timeout time.Duration, channel string, ids []uint64) *network { +func createNetwork(timeout time.Duration, channel string, dataDir string, ids []uint64) *network { n := &network{ chains: make(map[uint64]*chain), connectivity: make(map[uint64]chan struct{}), @@ -988,7 +1235,10 @@ func createNetwork(timeout time.Duration, channel string, ids []uint64) *network for _, i := range ids { n.connectivity[i] = make(chan struct{}) - c := newChain(timeout, channel, i, ids) + dir, err := ioutil.TempDir(dataDir, fmt.Sprintf("node-%d-", i)) + Expect(err).NotTo(HaveOccurred()) + + c := newChain(timeout, channel, dir, 0, i, ids) c.rpc.StepStub = func(dest uint64, msg *orderer.StepRequest) (*orderer.StepResponse, error) { n.connLock.RLock() @@ -1118,7 +1368,7 @@ func (n *network) elect(id uint64) (tick int) { // results in undeterministic behavior. Therefore // we are going to wait for enough time after each // tick so it could take effect. - t := 100 * time.Millisecond + t := 1000 * time.Millisecond c := n.chains[id] diff --git a/orderer/consensus/etcdraft/consenter.go b/orderer/consensus/etcdraft/consenter.go index 44b2122da7..a29432ee0c 100644 --- a/orderer/consensus/etcdraft/consenter.go +++ b/orderer/consensus/etcdraft/consenter.go @@ -8,6 +8,7 @@ package etcdraft import ( "bytes" + "path" "reflect" "time" @@ -15,6 +16,7 @@ import ( "github.com/coreos/etcd/raft" "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric/common/flogging" + "github.com/hyperledger/fabric/common/viperutil" "github.com/hyperledger/fabric/core/comm" "github.com/hyperledger/fabric/orderer/common/cluster" "github.com/hyperledger/fabric/orderer/common/localconfig" @@ -36,12 +38,18 @@ type ChainGetter interface { GetChain(chainID string) *multichannel.ChainSupport } +// Config contains etcdraft configurations +type Config struct { + WALDir string // WAL data of is stored in WALDir/ +} + // Consenter implements etddraft consenter type Consenter struct { Communication cluster.Communicator *Dispatcher Chains ChainGetter Logger *flogging.FabricLogger + Config Config Cert []byte } @@ -125,6 +133,7 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *co MaxSizePerMsg: m.Options.MaxSizePerMsg, RaftMetadata: raftMetadata, + WALDir: path.Join(c.Config.WALDir, support.ChainID()), } rpc := &cluster.RPC{Channel: support.ChainID(), Comm: c.Communication} @@ -132,33 +141,41 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *co } func raftMetadata(blockMetadata *common.Metadata, configMetadata *etcdraft.Metadata) (*etcdraft.RaftMetadata, error) { - membership := &etcdraft.RaftMetadata{ + m := &etcdraft.RaftMetadata{ Consenters: map[uint64]*etcdraft.Consenter{}, NextConsenterID: 1, } if blockMetadata != nil && len(blockMetadata.Value) != 0 { // we have consenters mapping from block - if err := proto.Unmarshal(blockMetadata.Value, membership); err != nil { + if err := proto.Unmarshal(blockMetadata.Value, m); err != nil { return nil, errors.Wrap(err, "failed to unmarshal block's metadata") } - return membership, nil + return m, nil } // need to read consenters from the configuration for _, consenter := range configMetadata.Consenters { - membership.Consenters[membership.NextConsenterID] = consenter - membership.NextConsenterID++ + m.Consenters[m.NextConsenterID] = consenter + m.NextConsenterID++ } - return membership, nil + return m, nil } +// New creates a etcdraft Consenter func New(clusterDialer *cluster.PredicateDialer, conf *localconfig.TopLevel, srvConf comm.ServerConfig, srv *comm.GRPCServer, r *multichannel.Registrar) *Consenter { logger := flogging.MustGetLogger("orderer.consensus.etcdraft") + + var config Config + if err := viperutil.Decode(conf.Consensus, &config); err != nil { + logger.Panicf("Failed to decode etcdraft configuration: %s", err) + } + consenter := &Consenter{ Cert: srvConf.SecOpts.Certificate, Logger: logger, Chains: r, + Config: config, } consenter.Dispatcher = &Dispatcher{ Logger: logger, diff --git a/orderer/consensus/etcdraft/consenter_test.go b/orderer/consensus/etcdraft/consenter_test.go index 58953a1d58..23b719e73d 100644 --- a/orderer/consensus/etcdraft/consenter_test.go +++ b/orderer/consensus/etcdraft/consenter_test.go @@ -7,6 +7,9 @@ SPDX-License-Identifier: Apache-2.0 package etcdraft_test import ( + "io/ioutil" + "os" + "github.com/hyperledger/fabric/common/flogging" mockconfig "github.com/hyperledger/fabric/common/mocks/config" clustermocks "github.com/hyperledger/fabric/orderer/common/cluster/mocks" @@ -114,7 +117,12 @@ var _ = Describe("Consenter", func() { metadata := utils.MarshalOrPanic(m) support.SharedConfigReturns(&mockconfig.Orderer{ConsensusMetadataVal: metadata}) + dir, err := ioutil.TempDir("", "wal-") + Expect(err).NotTo(HaveOccurred()) + defer os.RemoveAll(dir) + consenter := newConsenter(chainGetter) + consenter.Config.WALDir = dir chain, err := consenter.HandleChain(support, nil) Expect(err).NotTo(HaveOccurred()) @@ -141,11 +149,43 @@ var _ = Describe("Consenter", func() { consenter := newConsenter(chainGetter) - chain, err := consenter.HandleChain(support, nil) + chain, err := consenter.HandleChain(support, &common.Metadata{}) Expect(chain).To(BeNil()) Expect(err).To(MatchError("failed to detect own Raft ID because no matching certificate found")) }) + It("fails to handle chain if WAL is expected but no data found", func() { + c := &etcdraftproto.Consenter{ServerTlsCert: []byte("cert.orderer0.org0")} + m := &etcdraftproto.Metadata{ + Consenters: []*etcdraftproto.Consenter{c}, + Options: &etcdraftproto.Options{ + TickInterval: 100, + ElectionTick: 10, + HeartbeatTick: 1, + MaxInflightMsgs: 256, + MaxSizePerMsg: 1048576, + }, + } + metadata := utils.MarshalOrPanic(m) + support.SharedConfigReturns(&mockconfig.Orderer{ConsensusMetadataVal: metadata}) + + dir, err := ioutil.TempDir("", "wal-") + Expect(err).NotTo(HaveOccurred()) + defer os.RemoveAll(dir) + + consenter := newConsenter(chainGetter) + consenter.Config.WALDir = dir + + d := &etcdraftproto.RaftMetadata{ + Consenters: map[uint64]*etcdraftproto.Consenter{1: c}, + RaftIndex: uint64(2), + } + chain, err := consenter.HandleChain(support, &common.Metadata{Value: utils.MarshalOrPanic(d)}) + + Expect(chain).To(BeNil()) + Expect(err).To(MatchError(ContainSubstring("no WAL data found"))) + }) + It("fails to handle chain if etcdraft options have not been provided", func() { m := &etcdraftproto.Metadata{ Consenters: []*etcdraftproto.Consenter{