/
fastsync.go
227 lines (198 loc) · 8.25 KB
/
fastsync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
package sync
import (
"errors"
"github.com/fractal-platform/fractal/core/config"
"github.com/fractal-platform/fractal/ftl/protocol"
)
func (s *Synchronizer) doFastSync() {
s.log.Info("start fast sync")
if s.miner != nil {
s.miner.Stop()
}
//stop cp2fp
s.cp2fp.stopAll()
peers := s.getPeers()
// close all fast sync goroutines
close(s.fastSyncQuitCh)
s.fastSyncQuitCh = make(chan struct{})
//sync short hashLists
honestPeers, peerShortHashListMap, err := s.syncShortHashListsForFastSync(peers, s.config.MinFastSyncPeerCount)
if err != nil {
s.fastSyncErrCh <- struct{}{}
return
}
checkPoint := s.getLatestCheckPoint()
s.log.Info("checkpoint", "checkPoint", checkPoint)
var complexSync = false
var hasSkeleton = false
var peerLongHashListMap = make(map[string]protocol.HashElems)
// if diff is not in the top Blocks , do the loop
for len(honestPeers) > 0 {
//do syncFixPoint
var fixPoint protocol.HashElem
honestPeers, fixPoint, err = s.syncFixPoint(honestPeers, peerShortHashListMap)
if err == nil {
s.log.Info("doFastSync syncFixPoint succeed")
s.fixPoint = fixPoint
//sync from break point or checkpoint
go s.cp2fp.startTask(s.chain.Genesis(), s.chain.GetBlock(fixPoint.Hash), honestPeers)
s.fastSyncFinishedCh <- struct{}{}
return
} else if err.Error() == errNoCommonPrefixInShortHashLists.Error() {
s.log.Info("fast sync get into complex sync")
complexSync = true
s.changeFastSyncMode(FastSyncModeComplex)
//find fix point using long hash list first ,then interval hash list
interHashesMap, peerIndexIntervalMap, err := s.findFixpoint(hasSkeleton, honestPeers, checkPoint, peerLongHashListMap)
if err != nil {
s.log.Error("find fix point failed", "err", err)
s.fastSyncErrCh <- struct{}{}
return
}
hasSkeleton = true
// get main chain peers
s.changeFastSyncStatus(FastSyncStatusCheckMainChain)
leftPeers, err := s.findAndCheckMainChain(interHashesMap, honestPeers, s.config.CommonPrefixCount, peerIndexIntervalMap)
if err != nil {
s.log.Error("find and check main chain failed", "err", err)
s.fastSyncErrCh <- struct{}{}
return
}
// refresh honestPeers and honestPeerMap
honestPeers = leftPeers
s.log.Info("finish complex round", "len(leftPeers)", len(leftPeers), "len(honestPeers)", len(honestPeers))
} else if err.Error() == errPeer.Error() {
if !complexSync {
s.fastSyncErrCh <- struct{}{}
s.log.Error("not enough peer,need to stop", "err", err)
return
} else {
s.log.Error("complexSync have identified who are honest,left honest and connect peer should go on", "err", err)
continue
}
} else {
//force quit
s.fastSyncErrCh <- struct{}{}
s.log.Error("need to force quit", "err", err)
return
}
}
}
//
func (s *Synchronizer) findFixpoint(hasSkeleton bool, peers []peer, checkpoint config.CheckPoint, peerSkeletonMap map[string]protocol.HashElems) (map[string]protocol.HashElems, map[string]int, error) {
s.log.Info("start to find fix point", "hasSkeleton", hasSkeleton, "peers", peers, "checkpoint", checkpoint, "len(peerSkeletonMap)", len(peerSkeletonMap))
var peerLongListIndexMap map[string]int
var honestPeerLongHashLists map[string]protocol.HashElems
var err error
peerLongListIndexMap, honestPeerLongHashLists, err = s.findCommonFromSkeletonLists(hasSkeleton, peers,
protocol.HashElem{Height: checkpoint.Height, Hash: checkpoint.Hash, Round: checkpoint.Round}, peerSkeletonMap)
if err != nil {
s.log.Error("get common skeleton lists failed")
return nil, nil, err
}
for peerId, index := range peerLongListIndexMap {
s.log.Info("find common prefix for long hash lists", "peerId", peerId, "index", index, "hashElem", honestPeerLongHashLists[peerId][index])
}
return honestPeerLongHashLists, peerLongListIndexMap, nil
}
//sync short hashLists, if there are not enough good peers, return false
func (s *Synchronizer) syncShortHashListsForFastSync(peers []peer, peerThreshold int) ([]peer, map[string]protocol.HashElems, error) {
s.log.Info("start to sync short hash list")
if len(peers) < peerThreshold {
s.log.Error("not enough peers for sync short hash list", "peerCount", len(peers), "MinFastSyncPeerCount", peerThreshold)
return nil, nil, errNotEnoughPeers
}
// set mode & status for fast sync
s.changeFastSyncMode(FastSyncModeEasy)
s.changeFastSyncStatus(FastSyncStatusShortHashList)
fetcher := newShortHashFetcher(peers, protocol.SyncStageFastSync, s.config.ShortHashListLength, true, peerThreshold, true,
s.syncHashListChForFastSync, s.config.ShortTimeOutOfShortLists, s.removePeerCallback, s.log)
res := fetcher.fetch()
return fetcher.fetchResult.honestPeers, fetcher.fetchResult.hashes, res
}
func (s *Synchronizer) findCommonFromSkeletonLists(hasSkeleton bool, peers []peer, from protocol.HashElem, peerLongHashListMap map[string]protocol.HashElems) (map[string]int, map[string]protocol.HashElems, error) {
//
if !hasSkeleton {
//sync long HashList with Interval
peerSkeletonMap, err := s.syncSkeletonLists(peers, from)
if err != nil {
s.log.Error("find fix point failed", "err", err)
return nil, nil, err
}
for _, peer := range peers {
peerLongHashListMap[peer.GetID()] = peerSkeletonMap[peer.GetID()]
}
}
//get common prefix of honest peers long hashList
var honestPeerLongHashLists = make(map[string]protocol.HashElems)
for _, peer := range peers {
honestPeerLongHashLists[peer.GetID()] = peerLongHashListMap[peer.GetID()]
}
//get common prefix of honest peers long hashList
peerLongListIndexMap, err := s.getCommPreFromLongList(honestPeerLongHashLists, uint64(s.config.Interval))
if err != nil {
s.log.Error("find fix point failed", "err", err)
return nil, nil, err
}
return peerLongListIndexMap, honestPeerLongHashLists, nil
}
//find fix point, and then sync fix point
//return []peer(left honest peers)
//return err: errNoCommonPrefixInShortHashLists , errPeer ,forceQuit
func (s *Synchronizer) syncFixPoint(honestPeers []peer, peerShortHashListMap map[string]protocol.HashElems) ([]peer, protocol.HashElem, error) {
s.log.Info("start to find and sync fix point", "honestPeerCount", len(honestPeers), "peerShortHashListMapSize", len(peerShortHashListMap))
//set honestPeers left
var honestShortHashLists []protocol.HashElems
var leftHonestPeers []peer
for _, peer := range honestPeers {
leftHonestPeers = append(leftHonestPeers, peer)
honestShortHashLists = append(honestShortHashLists, peerShortHashListMap[peer.GetID()])
}
//get common prefix
lowestCommonHash, highestCommonHash, err := getCommPreFromShortList(honestShortHashLists)
s.log.Info("get common hash from short hash list", "lowestCommonHash", lowestCommonHash, "highestCommonHash", highestCommonHash, "error", err)
if err != nil {
return leftHonestPeers, lowestCommonHash, err
}
//get positive seq of common hashList
blockSyncHashList := getHashListPositiveSeqByHashFromHashTo(honestShortHashLists[0], lowestCommonHash, highestCommonHash)
//sync fixPoint
bestPeer, bestHashElem := getBestPeerByHashes(leftHonestPeers, peerShortHashListMap)
check, badPeers, err := s.doSyncAndCheckFixPoint(honestPeers, bestPeer, blockSyncHashList, bestHashElem, true)
if err != nil || !check {
s.log.Error("sync fix point failed", "err", err, "check", check, "badPeers", badPeers)
if err.Error() == errPeer.Error() {
for _, peer := range badPeers {
s.removePeerCallback(peer.GetID(), false)
for i, hPeer := range leftHonestPeers {
if hPeer.GetID() == peer.GetID() {
leftHonestPeers = append(leftHonestPeers[:i], leftHonestPeers[i+1:]...)
break
}
}
}
return leftHonestPeers, lowestCommonHash, err
} else {
return leftHonestPeers, lowestCommonHash, errors.New("need to stop fast sync")
}
}
return leftHonestPeers, lowestCommonHash, nil
}
//return a hashList of if there is already common prefix of hashLists
func getHashListPositiveSeqByHashFromHashTo(hashList protocol.HashElems, hashEFrom protocol.HashElem, hashETo protocol.HashElem) protocol.HashElems {
var hashFromIndex int
var hashToIndex int
var result protocol.HashElems
for index, hashE := range hashList {
if hashE.String() == hashEFrom.String() {
hashFromIndex = index
}
if hashE.String() == hashETo.String() {
hashToIndex = index
}
}
for i := hashFromIndex; i >= hashToIndex; i-- {
result = append(result, hashList[i])
}
return result
}