Skip to content

Commit 384996f

Browse files
jongkwon.leemastersingh24
authored andcommitted
Split evictionSuspector from util.go
This CR splits evictionSuspector and PeriodicCheck from util.go by creating eviction.go file. Also moves related tests to eviction_test.go. Signed-off-by: jongkwon.lee <jongkwon.lee@navercorp.com> Change-Id: Ib0770d63ae495a54987a3b8dfdbcd208248e5e42
1 parent b063326 commit 384996f

File tree

4 files changed

+392
-360
lines changed

4 files changed

+392
-360
lines changed
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package etcdraft
8+
9+
import (
10+
"fmt"
11+
"sync"
12+
"sync/atomic"
13+
"time"
14+
15+
"github.com/hyperledger/fabric-protos-go/common"
16+
"github.com/hyperledger/fabric/common/flogging"
17+
"github.com/hyperledger/fabric/orderer/common/cluster"
18+
"github.com/hyperledger/fabric/protoutil"
19+
"go.etcd.io/etcd/raft/raftpb"
20+
)
21+
22+
// PeriodicCheck checks periodically a condition, and reports
23+
// the cumulative consecutive period the condition was fulfilled.
24+
type PeriodicCheck struct {
25+
Logger *flogging.FabricLogger
26+
CheckInterval time.Duration
27+
Condition func() bool
28+
Report func(cumulativePeriod time.Duration)
29+
conditionHoldsSince time.Time
30+
once sync.Once // Used to prevent double initialization
31+
stopped uint32
32+
}
33+
34+
// Run runs the PeriodicCheck
35+
func (pc *PeriodicCheck) Run() {
36+
pc.once.Do(pc.check)
37+
}
38+
39+
// Stop stops the periodic checks
40+
func (pc *PeriodicCheck) Stop() {
41+
pc.Logger.Info("Periodic check is stopping.")
42+
atomic.AddUint32(&pc.stopped, 1)
43+
}
44+
45+
func (pc *PeriodicCheck) shouldRun() bool {
46+
return atomic.LoadUint32(&pc.stopped) == 0
47+
}
48+
49+
func (pc *PeriodicCheck) check() {
50+
if pc.Condition() {
51+
pc.conditionFulfilled()
52+
} else {
53+
pc.conditionNotFulfilled()
54+
}
55+
56+
if !pc.shouldRun() {
57+
return
58+
}
59+
time.AfterFunc(pc.CheckInterval, pc.check)
60+
}
61+
62+
func (pc *PeriodicCheck) conditionNotFulfilled() {
63+
pc.conditionHoldsSince = time.Time{}
64+
}
65+
66+
func (pc *PeriodicCheck) conditionFulfilled() {
67+
if pc.conditionHoldsSince.IsZero() {
68+
pc.conditionHoldsSince = time.Now()
69+
}
70+
71+
pc.Report(time.Since(pc.conditionHoldsSince))
72+
}
73+
74+
type evictionSuspector struct {
75+
evictionSuspicionThreshold time.Duration
76+
logger *flogging.FabricLogger
77+
createPuller CreateBlockPuller
78+
height func() uint64
79+
amIInChannel cluster.SelfMembershipPredicate
80+
halt func()
81+
writeBlock func(block *common.Block) error
82+
triggerCatchUp func(sn *raftpb.Snapshot)
83+
halted bool
84+
}
85+
86+
func (es *evictionSuspector) confirmSuspicion(cumulativeSuspicion time.Duration) {
87+
if es.evictionSuspicionThreshold > cumulativeSuspicion || es.halted {
88+
return
89+
}
90+
es.logger.Infof("Suspecting our own eviction from the channel for %v", cumulativeSuspicion)
91+
puller, err := es.createPuller()
92+
if err != nil {
93+
es.logger.Panicf("Failed creating a block puller: %v", err)
94+
}
95+
96+
lastConfigBlock, err := cluster.PullLastConfigBlock(puller)
97+
if err != nil {
98+
es.logger.Errorf("Failed pulling the last config block: %v", err)
99+
return
100+
}
101+
102+
es.logger.Infof("Last config block was found to be block [%d]", lastConfigBlock.Header.Number)
103+
104+
height := es.height()
105+
106+
if lastConfigBlock.Header.Number+1 <= height {
107+
es.logger.Infof("Our height is higher or equal than the height of the orderer we pulled the last block from, aborting.")
108+
return
109+
}
110+
111+
err = es.amIInChannel(lastConfigBlock)
112+
if err != cluster.ErrNotInChannel && err != cluster.ErrForbidden {
113+
details := fmt.Sprintf(", our certificate was found in config block with sequence %d", lastConfigBlock.Header.Number)
114+
if err != nil {
115+
details = fmt.Sprintf(": %s", err.Error())
116+
}
117+
es.logger.Infof("Cannot confirm our own eviction from the channel%s", details)
118+
119+
es.triggerCatchUp(&raftpb.Snapshot{Data: protoutil.MarshalOrPanic(lastConfigBlock)})
120+
return
121+
}
122+
123+
es.logger.Warningf("Detected our own eviction from the channel in block [%d]", lastConfigBlock.Header.Number)
124+
125+
es.logger.Infof("Waiting for chain to halt")
126+
es.halt()
127+
es.halted = true
128+
es.logger.Infof("Chain has been halted, pulling remaining blocks up to (and including) eviction block.")
129+
130+
nextBlock := height
131+
es.logger.Infof("Will now pull blocks %d to %d", nextBlock, lastConfigBlock.Header.Number)
132+
for seq := nextBlock; seq <= lastConfigBlock.Header.Number; seq++ {
133+
es.logger.Infof("Pulling block [%d]", seq)
134+
block := puller.PullBlock(seq)
135+
err := es.writeBlock(block)
136+
if err != nil {
137+
es.logger.Panicf("Failed writing block [%d] to the ledger: %v", block.Header.Number, err)
138+
}
139+
}
140+
141+
es.logger.Infof("Pulled all blocks up to eviction block.")
142+
}
Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package etcdraft
8+
9+
import (
10+
"strings"
11+
"sync/atomic"
12+
"testing"
13+
"time"
14+
15+
"github.com/hyperledger/fabric-protos-go/common"
16+
"github.com/hyperledger/fabric/common/flogging"
17+
"github.com/hyperledger/fabric/orderer/common/cluster"
18+
"github.com/hyperledger/fabric/orderer/common/cluster/mocks"
19+
"github.com/hyperledger/fabric/protoutil"
20+
"github.com/onsi/gomega"
21+
"github.com/pkg/errors"
22+
"github.com/stretchr/testify/assert"
23+
"go.etcd.io/etcd/raft/raftpb"
24+
"go.uber.org/zap"
25+
"go.uber.org/zap/zapcore"
26+
)
27+
28+
func TestPeriodicCheck(t *testing.T) {
29+
t.Parallel()
30+
31+
g := gomega.NewGomegaWithT(t)
32+
33+
var cond uint32
34+
var checkNum uint32
35+
36+
fiveChecks := func() bool {
37+
return atomic.LoadUint32(&checkNum) > uint32(5)
38+
}
39+
40+
condition := func() bool {
41+
atomic.AddUint32(&checkNum, 1)
42+
return atomic.LoadUint32(&cond) == uint32(1)
43+
}
44+
45+
reports := make(chan time.Duration, 1000)
46+
47+
report := func(duration time.Duration) {
48+
reports <- duration
49+
}
50+
51+
check := &PeriodicCheck{
52+
Logger: flogging.MustGetLogger("test"),
53+
Condition: condition,
54+
CheckInterval: time.Millisecond,
55+
Report: report,
56+
}
57+
58+
go check.Run()
59+
60+
g.Eventually(fiveChecks, time.Minute, time.Millisecond).Should(gomega.BeTrue())
61+
// trigger condition to be true
62+
atomic.StoreUint32(&cond, 1)
63+
g.Eventually(reports, time.Minute, time.Millisecond).Should(gomega.Not(gomega.BeEmpty()))
64+
// read first report
65+
firstReport := <-reports
66+
g.Eventually(reports, time.Minute, time.Millisecond).Should(gomega.Not(gomega.BeEmpty()))
67+
// read second report
68+
secondReport := <-reports
69+
// time increases between reports
70+
g.Expect(secondReport).To(gomega.BeNumerically(">", firstReport))
71+
// wait for the reports channel to be full
72+
g.Eventually(func() int { return len(reports) }, time.Minute, time.Millisecond).Should(gomega.BeNumerically("==", 1000))
73+
74+
// trigger condition to be false
75+
atomic.StoreUint32(&cond, 0)
76+
77+
var lastReport time.Duration
78+
// drain the reports channel
79+
for len(reports) > 0 {
80+
select {
81+
case report := <-reports:
82+
lastReport = report
83+
default:
84+
break
85+
}
86+
}
87+
88+
// ensure the checks have been made
89+
checksDoneSoFar := atomic.LoadUint32(&checkNum)
90+
g.Consistently(reports, time.Second*2, time.Millisecond).Should(gomega.BeEmpty())
91+
checksDoneAfter := atomic.LoadUint32(&checkNum)
92+
g.Expect(checksDoneAfter).To(gomega.BeNumerically(">", checksDoneSoFar))
93+
// but nothing has been reported
94+
g.Expect(reports).To(gomega.BeEmpty())
95+
96+
// trigger the condition again
97+
atomic.StoreUint32(&cond, 1)
98+
g.Eventually(reports, time.Minute, time.Millisecond).Should(gomega.Not(gomega.BeEmpty()))
99+
// The first report is smaller than the last report,
100+
// so the countdown has been reset when the condition was reset
101+
firstReport = <-reports
102+
g.Expect(lastReport).To(gomega.BeNumerically(">", firstReport))
103+
// Stop the periodic check.
104+
check.Stop()
105+
checkCountAfterStop := atomic.LoadUint32(&checkNum)
106+
// Wait 50 times the check interval.
107+
time.Sleep(check.CheckInterval * 50)
108+
// Ensure that we cease checking the condition, hence the PeriodicCheck is stopped.
109+
g.Expect(atomic.LoadUint32(&checkNum)).To(gomega.BeNumerically("<", checkCountAfterStop+2))
110+
}
111+
112+
func TestEvictionSuspector(t *testing.T) {
113+
configBlock := &common.Block{
114+
Header: &common.BlockHeader{Number: 9},
115+
Metadata: &common.BlockMetadata{
116+
Metadata: [][]byte{{}, {}, {}, {}},
117+
},
118+
}
119+
configBlock.Metadata.Metadata[common.BlockMetadataIndex_LAST_CONFIG] = protoutil.MarshalOrPanic(&common.Metadata{
120+
Value: protoutil.MarshalOrPanic(&common.LastConfig{Index: 9}),
121+
})
122+
123+
puller := &mocks.ChainPuller{}
124+
puller.On("Close")
125+
puller.On("HeightsByEndpoints").Return(map[string]uint64{"foo": 10}, nil)
126+
puller.On("PullBlock", uint64(9)).Return(configBlock)
127+
128+
for _, testCase := range []struct {
129+
description string
130+
expectedPanic string
131+
expectedLog string
132+
expectedCommittedBlockCount int
133+
amIInChannelReturns error
134+
evictionSuspicionThreshold time.Duration
135+
blockPuller BlockPuller
136+
blockPullerErr error
137+
height uint64
138+
halt func()
139+
}{
140+
{
141+
description: "suspected time is lower than threshold",
142+
evictionSuspicionThreshold: 11 * time.Minute,
143+
halt: t.Fail,
144+
},
145+
{
146+
description: "puller creation fails",
147+
evictionSuspicionThreshold: 10*time.Minute - time.Second,
148+
blockPullerErr: errors.New("oops"),
149+
expectedPanic: "Failed creating a block puller: oops",
150+
halt: t.Fail,
151+
},
152+
{
153+
description: "our height is the highest",
154+
expectedLog: "Our height is higher or equal than the height of the orderer we pulled the last block from, aborting",
155+
evictionSuspicionThreshold: 10*time.Minute - time.Second,
156+
blockPuller: puller,
157+
height: 10,
158+
halt: t.Fail,
159+
},
160+
{
161+
description: "failed pulling the block",
162+
expectedLog: "Cannot confirm our own eviction from the channel: bad block",
163+
evictionSuspicionThreshold: 10*time.Minute - time.Second,
164+
amIInChannelReturns: errors.New("bad block"),
165+
blockPuller: puller,
166+
height: 9,
167+
halt: t.Fail,
168+
},
169+
{
170+
description: "we are still in the channel",
171+
expectedLog: "Cannot confirm our own eviction from the channel, our certificate was found in config block with sequence 9",
172+
evictionSuspicionThreshold: 10*time.Minute - time.Second,
173+
amIInChannelReturns: nil,
174+
blockPuller: puller,
175+
height: 9,
176+
halt: t.Fail,
177+
},
178+
{
179+
description: "we are not in the channel",
180+
expectedLog: "Detected our own eviction from the channel in block [9]",
181+
evictionSuspicionThreshold: 10*time.Minute - time.Second,
182+
amIInChannelReturns: cluster.ErrNotInChannel,
183+
blockPuller: puller,
184+
height: 8,
185+
expectedCommittedBlockCount: 2,
186+
halt: func() {
187+
puller.On("PullBlock", uint64(8)).Return(&common.Block{
188+
Header: &common.BlockHeader{Number: 8},
189+
Metadata: &common.BlockMetadata{
190+
Metadata: [][]byte{{}, {}, {}, {}},
191+
},
192+
})
193+
},
194+
},
195+
} {
196+
testCase := testCase
197+
t.Run(testCase.description, func(t *testing.T) {
198+
committedBlocks := make(chan *common.Block, 2)
199+
200+
commitBlock := func(block *common.Block) error {
201+
committedBlocks <- block
202+
return nil
203+
}
204+
205+
es := &evictionSuspector{
206+
halt: testCase.halt,
207+
amIInChannel: func(_ *common.Block) error {
208+
return testCase.amIInChannelReturns
209+
},
210+
evictionSuspicionThreshold: testCase.evictionSuspicionThreshold,
211+
createPuller: func() (BlockPuller, error) {
212+
return testCase.blockPuller, testCase.blockPullerErr
213+
},
214+
writeBlock: commitBlock,
215+
height: func() uint64 {
216+
return testCase.height
217+
},
218+
logger: flogging.MustGetLogger("test"),
219+
triggerCatchUp: func(sn *raftpb.Snapshot) { return },
220+
}
221+
222+
foundExpectedLog := testCase.expectedLog == ""
223+
es.logger = es.logger.WithOptions(zap.Hooks(func(entry zapcore.Entry) error {
224+
if strings.Contains(entry.Message, testCase.expectedLog) {
225+
foundExpectedLog = true
226+
}
227+
return nil
228+
}))
229+
230+
runTestCase := func() {
231+
es.confirmSuspicion(time.Minute * 10)
232+
}
233+
234+
if testCase.expectedPanic != "" {
235+
assert.PanicsWithValue(t, testCase.expectedPanic, runTestCase)
236+
} else {
237+
runTestCase()
238+
// Run the test case again.
239+
// Conditions that do not lead to a conclusion of a chain eviction
240+
// should be idempotent.
241+
// Conditions that do lead to conclusion of a chain eviction
242+
// in the second time - should result in a no-op.
243+
runTestCase()
244+
}
245+
246+
assert.True(t, foundExpectedLog, "expected to find %s but didn't", testCase.expectedLog)
247+
assert.Equal(t, testCase.expectedCommittedBlockCount, len(committedBlocks))
248+
})
249+
}
250+
}

0 commit comments

Comments
 (0)