forked from hyperledger/fabric
-
Notifications
You must be signed in to change notification settings - Fork 0
/
eviction.go
142 lines (117 loc) · 4.09 KB
/
eviction.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package etcdraft
import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/orderer/common/cluster"
"github.com/hyperledger/fabric/protoutil"
"go.etcd.io/etcd/raft/raftpb"
)
// PeriodicCheck checks periodically a condition, and reports
// the cumulative consecutive period the condition was fulfilled.
type PeriodicCheck struct {
Logger *flogging.FabricLogger
CheckInterval time.Duration
Condition func() bool
Report func(cumulativePeriod time.Duration)
conditionHoldsSince time.Time
once sync.Once // Used to prevent double initialization
stopped uint32
}
// Run runs the PeriodicCheck
func (pc *PeriodicCheck) Run() {
pc.once.Do(pc.check)
}
// Stop stops the periodic checks
func (pc *PeriodicCheck) Stop() {
pc.Logger.Info("Periodic check is stopping.")
atomic.AddUint32(&pc.stopped, 1)
}
func (pc *PeriodicCheck) shouldRun() bool {
return atomic.LoadUint32(&pc.stopped) == 0
}
func (pc *PeriodicCheck) check() {
if pc.Condition() {
pc.conditionFulfilled()
} else {
pc.conditionNotFulfilled()
}
if !pc.shouldRun() {
return
}
time.AfterFunc(pc.CheckInterval, pc.check)
}
func (pc *PeriodicCheck) conditionNotFulfilled() {
pc.conditionHoldsSince = time.Time{}
}
func (pc *PeriodicCheck) conditionFulfilled() {
if pc.conditionHoldsSince.IsZero() {
pc.conditionHoldsSince = time.Now()
}
pc.Report(time.Since(pc.conditionHoldsSince))
}
type evictionSuspector struct {
evictionSuspicionThreshold time.Duration
logger *flogging.FabricLogger
createPuller CreateBlockPuller
height func() uint64
amIInChannel cluster.SelfMembershipPredicate
halt func()
writeBlock func(block *common.Block) error
triggerCatchUp func(sn *raftpb.Snapshot)
halted bool
}
func (es *evictionSuspector) confirmSuspicion(cumulativeSuspicion time.Duration) {
if es.evictionSuspicionThreshold > cumulativeSuspicion || es.halted {
return
}
es.logger.Infof("Suspecting our own eviction from the channel for %v", cumulativeSuspicion)
puller, err := es.createPuller()
if err != nil {
es.logger.Panicf("Failed creating a block puller: %v", err)
}
lastConfigBlock, err := cluster.PullLastConfigBlock(puller)
if err != nil {
es.logger.Errorf("Failed pulling the last config block: %v", err)
return
}
es.logger.Infof("Last config block was found to be block [%d]", lastConfigBlock.Header.Number)
height := es.height()
if lastConfigBlock.Header.Number+1 <= height {
es.logger.Infof("Our height is higher or equal than the height of the orderer we pulled the last block from, aborting.")
return
}
err = es.amIInChannel(lastConfigBlock)
if err != cluster.ErrNotInChannel && err != cluster.ErrForbidden {
details := fmt.Sprintf(", our certificate was found in config block with sequence %d", lastConfigBlock.Header.Number)
if err != nil {
details = fmt.Sprintf(": %s", err.Error())
}
es.logger.Infof("Cannot confirm our own eviction from the channel%s", details)
es.triggerCatchUp(&raftpb.Snapshot{Data: protoutil.MarshalOrPanic(lastConfigBlock)})
return
}
es.logger.Warningf("Detected our own eviction from the channel in block [%d]", lastConfigBlock.Header.Number)
es.logger.Infof("Waiting for chain to halt")
es.halt()
es.halted = true
es.logger.Infof("Chain has been halted, pulling remaining blocks up to (and including) eviction block.")
nextBlock := height
es.logger.Infof("Will now pull blocks %d to %d", nextBlock, lastConfigBlock.Header.Number)
for seq := nextBlock; seq <= lastConfigBlock.Header.Number; seq++ {
es.logger.Infof("Pulling block [%d]", seq)
block := puller.PullBlock(seq)
err := es.writeBlock(block)
if err != nil {
es.logger.Panicf("Failed writing block [%d] to the ledger: %v", block.Header.Number, err)
}
}
es.logger.Infof("Pulled all blocks up to eviction block.")
}