/
cp2fp.go
268 lines (226 loc) · 6.89 KB
/
cp2fp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
// Copyright 2018 The go-fractal Authors
// This file is part of the go-fractal library.
// sync cp2fp contains the implementation of fractal sync checkpoint to fixpoint.
package sync
import (
"errors"
"github.com/fractal-platform/fractal/core/types"
"github.com/fractal-platform/fractal/ftl/downloader"
"github.com/fractal-platform/fractal/ftl/protocol"
"github.com/fractal-platform/fractal/utils/log"
"sync"
)
const (
taskBackTrackLength = 10
)
type CP2FPSync struct {
task *CP2FPTask
taskLock sync.RWMutex
taskCh chan *CP2FPTask
// for skeleton hash process
peerSkeletonCh chan PeerHashElemList
timeoutSkeleton int
sync *Synchronizer
removePeerFn removePeerCallback
logger log.Logger
}
func newCP2FPSync(peerSkeletonCh chan PeerHashElemList, timeoutSkeleton int, sync *Synchronizer) *CP2FPSync {
res := &CP2FPSync{
task: nil,
taskCh: make(chan *CP2FPTask),
peerSkeletonCh: peerSkeletonCh,
timeoutSkeleton: timeoutSkeleton,
sync: sync,
removePeerFn: sync.removePeerCallback,
logger: sync.log,
}
go res.loop()
return res
}
func (s *CP2FPSync) loop() {
for task := range s.taskCh {
s.taskLock.Lock()
s.task = task
s.taskLock.Unlock()
s.task.process()
s.taskLock.Lock()
s.task = nil
s.taskLock.Unlock()
}
}
func (s *CP2FPSync) startTask(blockFrom *types.Block, blockTo *types.Block, peers []peer) {
// stop first
s.stopAll()
task := &CP2FPTask{
quitCh: make(chan struct{}),
from: blockFrom,
to: blockTo,
peers: peers,
peerSkeletonCh: s.peerSkeletonCh,
timeoutSkeleton: s.timeoutSkeleton,
sync: s.sync,
removePeerFn: s.removePeerFn,
logger: s.logger,
}
s.taskCh <- task
}
func (s *CP2FPSync) isRunning() bool {
s.taskLock.RLock()
defer s.taskLock.RUnlock()
return s.task != nil
}
func (s *CP2FPSync) stopAll() {
s.taskLock.Lock()
if s.task != nil {
s.task.stop()
s.task = nil
}
s.taskLock.Unlock()
}
func (s *CP2FPSync) registerPeer(p peer) {
s.taskLock.RLock()
if s.task != nil {
if s.task.blockSync != nil {
s.task.blockSync.Register(p)
}
}
s.taskLock.RUnlock()
}
func (s *CP2FPSync) deliverData(id string, data interface{}, kind int) error {
if s.task != nil {
if s.task.blockSync != nil {
return s.task.blockSync.DeliverData(id, data, kind)
}
}
return nil
}
type CP2FPTask struct {
quitCh chan struct{}
quitOnce sync.Once
// task info
from *types.Block
to *types.Block
peers []peer
// for block sync
blockSync *downloader.BlockFetcher
// for skeleton hash process
peerSkeletonCh chan PeerHashElemList
timeoutSkeleton int
sync *Synchronizer
removePeerFn removePeerCallback
logger log.Logger
}
func (t *CP2FPTask) process() {
// find break point
var from protocol.HashElem
var to protocol.HashElem
fromBlock, toBlock, err := t.sync.chain.GetBreakPoint(t.from, t.to)
if err != nil {
t.logger.Error("can't find from or to break point", "err", err)
return
} else if fromBlock == nil || toBlock == nil {
t.logger.Info("break point is nil, no need to sync")
return
} else {
for i := 0; i < taskBackTrackLength; i++ {
oldFromBlock := fromBlock
fromBlock = t.sync.chain.GetBlock(fromBlock.Header.ParentFullHash)
if fromBlock == nil {
fromBlock = oldFromBlock
break
}
}
from = protocol.HashElem{Height: fromBlock.Header.Height, Hash: fromBlock.FullHash(), Round: fromBlock.Header.Round}
to = protocol.HashElem{Height: toBlock.Header.Height, Hash: toBlock.FullHash(), Round: toBlock.Header.Round}
}
peers := t.peers
t.logger.Info("start cp2fp task", "fromHashElem", from, "toHashElem", to, "honestPeers", peers)
select {
case <-t.quitCh:
t.logger.Info("cp2fp task quit")
return
default:
}
//if to is below from, no need to sync
if (from.Height == 0 && to.Height <= from.Height+2) || (from.Height != 0 && to.Height <= from.Height) {
t.logger.Info("no need to sync blocks", "from.Height", from.Height, "to.Height", to.Height)
return
}
t.logger.Info("start fulfill from point to point", "from", from, "to", to, "peers", len(peers))
for len(peers) > 0 {
var longHashList protocol.HashElems
bestPeer := getBestPeerByHead(peers)
// invoke hash fetcher
length := int(to.Height - from.Height + 1)
fetcher := newLongHashFetcher([]peer{bestPeer}, protocol.SyncStageCP2FP, length, from, to, t.peerSkeletonCh, t.timeoutSkeleton, t.removePeerFn, t.logger)
err := fetcher.fetch()
if err != nil {
//delete the peer and do it again
for i, peer := range peers {
if peer.GetID() == bestPeer.GetID() {
peers = append(peers[0:i], peers[i+1:]...)
break
}
}
t.logger.Error("sync long list for full fill failed", "err", err, "peer", bestPeer.GetID())
continue
}
// get long hash list
longListMap := fetcher.fetchResult.hashes
longHashList, ok := longListMap[bestPeer.GetID()]
if !ok {
t.logger.Error("sync long list for full fill failed", "err", errors.New("can't find long hash list"), "peer", bestPeer.GetID())
continue
}
t.logger.Info("fetch long hash list ok", "longHashListSize", len(longHashList))
//TODO: fork branch
// revert long hash list
var longHashListReverse protocol.HashElems
for i := len(longHashList) - 1; i >= 0; i-- {
longHashListReverse = append(longHashListReverse, longHashList[i])
}
var peerMap = make(map[string]downloader.FetcherPeer)
for _, peer := range t.sync.getPeers() {
peerMap[peer.GetID()] = peer
}
//remove genesis hash
if longHashListReverse[0].Hash == t.sync.chain.Genesis().FullHash() {
t.logger.Info("remove genesis from long list", "genesis", longHashListReverse[0])
longHashListReverse = longHashListReverse[1:]
from = *longHashListReverse[0]
}
t.logger.Info("getBlocksFromCheckpointToFixPoint", "len(longHashListReverse)", len(longHashListReverse), "hashFrom", longHashListReverse[0],
"hashTo", longHashListReverse[len(longHashListReverse)-1], "allPeersForDownloader", peerMap, "genesisRound", t.sync.chain.Genesis().Header.Round)
var blockCh = make(chan *types.Block)
t.blockSync = downloader.StartFetchBlocks(from.Round-400, to.Round, peerMap, func(id string, addBlack bool) {
t.removePeerFn(id, addBlack)
}, false, protocol.SyncStageCP2FP, t.sync.chain, blockCh)
cursor := NewCursor(longHashListReverse, t.sync.chain, t.sync.packer, false, t.sync.lengthForStatesSync())
ForLoop:
for {
select {
case block := <-blockCh:
err := cursor.ProcessBlock(block)
if err == ErrMainBlockCheckAndExecFailed {
log.Error("main hash list is wrong, it is impossible")
}
if cursor.IsFinished() {
break ForLoop
}
case <-t.quitCh:
t.logger.Info("cp2fp task stopCh force closed")
break ForLoop
}
}
t.blockSync.Finish()
t.blockSync = nil
t.logger.Info("check point to fix point blocks received and executed", "blockFrom", from, "blockTo", to)
return
}
}
func (t *CP2FPTask) stop() {
t.logger.Info("force stop cp2fp tasks")
t.quitOnce.Do(func() {
close(t.quitCh)
})
}