-
Notifications
You must be signed in to change notification settings - Fork 202
/
historyRepository.go
494 lines (415 loc) · 18.5 KB
/
historyRepository.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
//go:generate protoc -I=. -I=$GOPATH/src -I=$GOPATH/src/github.com/ElrondNetwork/protobuf/protobuf --gogoslick_out=. miniblockMetadata.proto
package dblookupext
import (
"fmt"
"sync"
"github.com/ElrondNetwork/elrond-go-core/core"
"github.com/ElrondNetwork/elrond-go-core/core/check"
"github.com/ElrondNetwork/elrond-go-core/core/container"
"github.com/ElrondNetwork/elrond-go-core/data"
"github.com/ElrondNetwork/elrond-go-core/data/block"
"github.com/ElrondNetwork/elrond-go-core/data/typeConverters"
"github.com/ElrondNetwork/elrond-go-core/hashing"
"github.com/ElrondNetwork/elrond-go-core/marshal"
logger "github.com/ElrondNetwork/elrond-go-logger"
"github.com/ElrondNetwork/elrond-go/common/logging"
"github.com/ElrondNetwork/elrond-go/dblookupext/esdtSupply"
"github.com/ElrondNetwork/elrond-go/process"
"github.com/ElrondNetwork/elrond-go/storage"
"github.com/ElrondNetwork/elrond-go/storage/lrucache"
)
var log = logger.GetOrCreate("dblookupext")
const sizeOfDeduplicationCache = 1000
// HistoryRepositoryArguments is a structure that stores all components that are needed to a history processor
type HistoryRepositoryArguments struct {
SelfShardID uint32
MiniblocksMetadataStorer storage.Storer
MiniblockHashByTxHashStorer storage.Storer
BlockHashByRound storage.Storer
Uint64ByteSliceConverter typeConverters.Uint64ByteSliceConverter
EpochByHashStorer storage.Storer
EventsHashesByTxHashStorer storage.Storer
Marshalizer marshal.Marshalizer
Hasher hashing.Hasher
ESDTSuppliesHandler SuppliesHandler
}
type historyRepository struct {
selfShardID uint32
miniblocksMetadataStorer storage.Storer
miniblockHashByTxHashIndex storage.Storer
blockHashByRound storage.Storer
uint64ByteSliceConverter typeConverters.Uint64ByteSliceConverter
epochByHashIndex *epochByHashIndex
eventsHashesByTxHashIndex *eventsHashesByTxHash
marshalizer marshal.Marshalizer
hasher hashing.Hasher
esdtSuppliesHandler SuppliesHandler
// These maps temporarily hold notifications of "notarized at source or destination", to deal with unwanted concurrency effects
// The unwanted concurrency effects could be accentuated by the fast db-replay-validate mechanism.
pendingNotarizedAtSourceNotifications *container.MutexMap
pendingNotarizedAtDestinationNotifications *container.MutexMap
pendingNotarizedAtBothNotifications *container.MutexMap
// This cache will hold hashes of already inserted miniblock metadata records, so that we avoid repeated "put()" operations,
// that could mistakenly override the "patch()" operations performed when consuming notarization notifications.
deduplicationCacheForInsertMiniblockMetadata storage.Cacher
recordBlockMutex sync.Mutex
consumePendingNotificationsMutex sync.Mutex
}
type notarizedNotification struct {
metaNonce uint64
metaHash []byte
}
// NewHistoryRepository will create a new instance of HistoryRepository
func NewHistoryRepository(arguments HistoryRepositoryArguments) (*historyRepository, error) {
if check.IfNil(arguments.MiniblocksMetadataStorer) {
return nil, core.ErrNilStore
}
if check.IfNil(arguments.MiniblockHashByTxHashStorer) {
return nil, core.ErrNilStore
}
if check.IfNil(arguments.EpochByHashStorer) {
return nil, core.ErrNilStore
}
if check.IfNil(arguments.Marshalizer) {
return nil, core.ErrNilMarshalizer
}
if check.IfNil(arguments.Hasher) {
return nil, core.ErrNilHasher
}
if check.IfNil(arguments.EventsHashesByTxHashStorer) {
return nil, core.ErrNilStore
}
if check.IfNil(arguments.ESDTSuppliesHandler) {
return nil, errNilESDTSuppliesHandler
}
if check.IfNil(arguments.Uint64ByteSliceConverter) {
return nil, process.ErrNilUint64Converter
}
hashToEpochIndex := newHashToEpochIndex(arguments.EpochByHashStorer, arguments.Marshalizer)
deduplicationCacheForInsertMiniblockMetadata, _ := lrucache.NewCache(sizeOfDeduplicationCache)
eventsHashesToTxHashIndex := newEventsHashesByTxHash(arguments.EventsHashesByTxHashStorer, arguments.Marshalizer)
return &historyRepository{
selfShardID: arguments.SelfShardID,
miniblocksMetadataStorer: arguments.MiniblocksMetadataStorer,
blockHashByRound: arguments.BlockHashByRound,
marshalizer: arguments.Marshalizer,
hasher: arguments.Hasher,
epochByHashIndex: hashToEpochIndex,
miniblockHashByTxHashIndex: arguments.MiniblockHashByTxHashStorer,
pendingNotarizedAtSourceNotifications: container.NewMutexMap(),
pendingNotarizedAtDestinationNotifications: container.NewMutexMap(),
pendingNotarizedAtBothNotifications: container.NewMutexMap(),
deduplicationCacheForInsertMiniblockMetadata: deduplicationCacheForInsertMiniblockMetadata,
eventsHashesByTxHashIndex: eventsHashesToTxHashIndex,
esdtSuppliesHandler: arguments.ESDTSuppliesHandler,
uint64ByteSliceConverter: arguments.Uint64ByteSliceConverter,
}, nil
}
// RecordBlock records a block
// This function is not called on a goroutine, but synchronously instead, right after committing a block
func (hr *historyRepository) RecordBlock(blockHeaderHash []byte,
blockHeader data.HeaderHandler,
blockBody data.BodyHandler,
scrResultsFromPool map[string]data.TransactionHandler,
receiptsFromPool map[string]data.TransactionHandler,
createdIntraShardMiniBlocks []*block.MiniBlock,
logs []*data.LogData) error {
hr.recordBlockMutex.Lock()
defer hr.recordBlockMutex.Unlock()
log.Debug("RecordBlock()", "nonce", blockHeader.GetNonce(), "blockHeaderHash", blockHeaderHash, "header type", fmt.Sprintf("%T", blockHeader))
body, ok := blockBody.(*block.Body)
if !ok {
return errCannotCastToBlockBody
}
epoch := blockHeader.GetEpoch()
err := hr.epochByHashIndex.saveEpochByHash(blockHeaderHash, epoch)
if err != nil {
return newErrCannotSaveEpochByHash("block header", blockHeaderHash, err)
}
for _, miniblock := range body.MiniBlocks {
if miniblock.Type == block.PeerBlock {
continue
}
err = hr.recordMiniblock(blockHeaderHash, blockHeader, miniblock, epoch)
if err != nil {
logging.LogErrAsErrorExceptAsDebugIfClosingError(log, err, "cannot record miniblock",
"type", miniblock.Type, "error", err)
continue
}
}
for _, miniBlock := range createdIntraShardMiniBlocks {
err = hr.recordMiniblock(blockHeaderHash, blockHeader, miniBlock, epoch)
if err != nil {
logging.LogErrAsErrorExceptAsDebugIfClosingError(log, err, "cannot record in shard miniblock",
"type", miniBlock.Type, "error", err)
}
}
err = hr.eventsHashesByTxHashIndex.saveResultsHashes(epoch, scrResultsFromPool, receiptsFromPool)
if err != nil {
return err
}
err = hr.esdtSuppliesHandler.ProcessLogs(blockHeader.GetNonce(), logs)
if err != nil {
return err
}
err = hr.putHashByRound(blockHeaderHash, blockHeader)
if err != nil {
return err
}
return nil
}
func (hr *historyRepository) putHashByRound(blockHeaderHash []byte, header data.HeaderHandler) error {
roundToByteSlice := hr.uint64ByteSliceConverter.ToByteSlice(header.GetRound())
return hr.blockHashByRound.Put(roundToByteSlice, blockHeaderHash)
}
func (hr *historyRepository) recordMiniblock(blockHeaderHash []byte, blockHeader data.HeaderHandler, miniblock *block.MiniBlock, epoch uint32) error {
miniblockHash, err := hr.computeMiniblockHash(miniblock)
if err != nil {
return err
}
if hr.hasRecentlyInsertedMiniblockMetadata(miniblockHash, epoch) {
return nil
}
err = hr.epochByHashIndex.saveEpochByHash(miniblockHash, epoch)
if err != nil {
return newErrCannotSaveEpochByHash("miniblock", miniblockHash, err)
}
miniblockMetadata := &MiniblockMetadata{
Type: int32(miniblock.Type),
Epoch: epoch,
HeaderHash: blockHeaderHash,
MiniblockHash: miniblockHash,
Round: blockHeader.GetRound(),
HeaderNonce: blockHeader.GetNonce(),
SourceShardID: miniblock.GetSenderShardID(),
DestinationShardID: miniblock.GetReceiverShardID(),
}
err = hr.putMiniblockMetadata(miniblockHash, miniblockMetadata)
if err != nil {
return err
}
hr.markMiniblockMetadataAsRecentlyInserted(miniblockHash, epoch)
for _, txHash := range miniblock.TxHashes {
errPut := hr.miniblockHashByTxHashIndex.Put(txHash, miniblockHash)
if errPut != nil {
logging.LogErrAsWarnExceptAsDebugIfClosingError(log, errPut, "miniblockHashByTxHashIndex.Put()",
"txHash", txHash, "err", errPut)
continue
}
}
return nil
}
func (hr *historyRepository) computeMiniblockHash(miniblock *block.MiniBlock) ([]byte, error) {
return core.CalculateHash(hr.marshalizer, hr.hasher, miniblock)
}
func (hr *historyRepository) hasRecentlyInsertedMiniblockMetadata(miniblockHash []byte, epoch uint32) bool {
key := hr.buildKeyOfDeduplicationCacheForInsertMiniblockMetadata(miniblockHash, epoch)
return hr.deduplicationCacheForInsertMiniblockMetadata.Has(key)
}
// When building the key for the deduplication cache, we must take into account the epoch as well, in order to handle this case:
// - miniblock M added in a fork at the end of epoch E,
// - miniblock M re-added, on the canonical chain this time, in the next epoch E + 1.
// This way we do not mistakenly ignore to update the "epochByHashIndex".
func (hr *historyRepository) buildKeyOfDeduplicationCacheForInsertMiniblockMetadata(miniblockHash []byte, epoch uint32) []byte {
return []byte(fmt.Sprintf("%d_%x", epoch, miniblockHash))
}
func (hr *historyRepository) markMiniblockMetadataAsRecentlyInserted(miniblockHash []byte, epoch uint32) {
key := hr.buildKeyOfDeduplicationCacheForInsertMiniblockMetadata(miniblockHash, epoch)
_ = hr.deduplicationCacheForInsertMiniblockMetadata.Put(key, nil, 0)
}
// GetMiniblockMetadataByTxHash will return a history transaction for the given hash from storage
func (hr *historyRepository) GetMiniblockMetadataByTxHash(hash []byte) (*MiniblockMetadata, error) {
miniblockHash, err := hr.miniblockHashByTxHashIndex.Get(hash)
if err != nil {
return nil, err
}
return hr.getMiniblockMetadataByMiniblockHash(miniblockHash)
}
func (hr *historyRepository) putMiniblockMetadata(hash []byte, metadata *MiniblockMetadata) error {
metadataBytes, err := hr.marshalizer.Marshal(metadata)
if err != nil {
return err
}
err = hr.miniblocksMetadataStorer.PutInEpoch(hash, metadataBytes, metadata.Epoch)
if err != nil {
return newErrCannotSaveMiniblockMetadata(hash, err)
}
return nil
}
func (hr *historyRepository) getMiniblockMetadataByMiniblockHash(hash []byte) (*MiniblockMetadata, error) {
epoch, err := hr.epochByHashIndex.getEpochByHash(hash)
if err != nil {
return nil, err
}
metadataBytes, err := hr.miniblocksMetadataStorer.GetFromEpoch(hash, epoch)
if err != nil {
return nil, err
}
metadata := &MiniblockMetadata{}
err = hr.marshalizer.Unmarshal(metadata, metadataBytes)
if err != nil {
return nil, err
}
return metadata, nil
}
// GetEpochByHash will return epoch for a given hash
// This works for Blocks, Miniblocks
// It doesn't work for transactions (not needed, there we have a static storer for "miniblockHashByTxHashIndex" as well)!
func (hr *historyRepository) GetEpochByHash(hash []byte) (uint32, error) {
return hr.epochByHashIndex.getEpochByHash(hash)
}
// OnNotarizedBlocks notifies the history repository about notarized blocks
func (hr *historyRepository) OnNotarizedBlocks(shardID uint32, headers []data.HeaderHandler, headersHashes [][]byte) {
for i, headerHandler := range headers {
headerHash := headersHashes[i]
log.Trace("onNotarizedBlocks():", "shardID", shardID, "nonce", headerHandler.GetNonce(), "headerHash", headerHash, "type", fmt.Sprintf("%T", headerHandler))
metaBlock, isMetaBlock := headerHandler.(*block.MetaBlock)
if isMetaBlock {
for _, miniBlock := range metaBlock.MiniBlockHeaders {
hr.onNotarizedMiniblock(headerHandler.GetNonce(), headerHash, headerHandler.GetShardID(), miniBlock)
}
for _, shardData := range metaBlock.ShardInfo {
shardDataCopy := shardData
hr.onNotarizedInMetaBlock(headerHandler.GetNonce(), headerHash, &shardDataCopy)
}
} else {
log.Error("onNotarizedBlocks(): unexpected type of header", "type", fmt.Sprintf("%T", headerHandler))
}
}
hr.consumePendingNotificationsWithLock()
}
func (hr *historyRepository) onNotarizedInMetaBlock(metaBlockNonce uint64, metaBlockHash []byte, shardData *block.ShardData) {
if metaBlockNonce < 1 {
return
}
for _, miniblockHeader := range shardData.GetShardMiniBlockHeaders() {
hr.onNotarizedMiniblock(metaBlockNonce, metaBlockHash, shardData.GetShardID(), miniblockHeader)
}
}
func (hr *historyRepository) onNotarizedMiniblock(metaBlockNonce uint64, metaBlockHash []byte, shardOfContainingBlock uint32, miniblockHeader block.MiniBlockHeader) {
miniblockHash := miniblockHeader.Hash
isIntra := miniblockHeader.SenderShardID == miniblockHeader.ReceiverShardID
isToMeta := miniblockHeader.ReceiverShardID == core.MetachainShardId
isNotarizedAtSource := miniblockHeader.SenderShardID == shardOfContainingBlock
isNotarizedAtDestination := miniblockHeader.ReceiverShardID == shardOfContainingBlock
isNotarizedAtBoth := isIntra || isToMeta
notFromMe := miniblockHeader.SenderShardID != hr.selfShardID
notToMe := miniblockHeader.ReceiverShardID != hr.selfShardID
isPeerMiniblock := miniblockHeader.Type == block.PeerBlock
iDontCare := (notFromMe && notToMe) || isPeerMiniblock
if iDontCare {
return
}
log.Trace("onNotarizedMiniblock()",
"metaBlockNonce", metaBlockNonce,
"metaBlockHash", metaBlockHash,
"shardOfContainingBlock", shardOfContainingBlock,
"miniblock", miniblockHash,
"direction", fmt.Sprintf("[%d -> %d]", miniblockHeader.SenderShardID, miniblockHeader.ReceiverShardID),
)
if isNotarizedAtBoth {
hr.pendingNotarizedAtBothNotifications.Set(string(miniblockHash), ¬arizedNotification{
metaNonce: metaBlockNonce,
metaHash: metaBlockHash,
})
} else if isNotarizedAtSource {
hr.pendingNotarizedAtSourceNotifications.Set(string(miniblockHash), ¬arizedNotification{
metaNonce: metaBlockNonce,
metaHash: metaBlockHash,
})
} else if isNotarizedAtDestination {
hr.pendingNotarizedAtDestinationNotifications.Set(string(miniblockHash), ¬arizedNotification{
metaNonce: metaBlockNonce,
metaHash: metaBlockHash,
})
} else {
log.Error("onNotarizedMiniblock(): unexpected condition, notification not understood")
}
}
// Notifications are consumed within a critical section so that we don't have competing put() operations for the same miniblock metadata,
// which could have resulted in mistakenly overriding the "notarization (hyperblock) coordinates".
func (hr *historyRepository) consumePendingNotificationsWithLock() {
hr.consumePendingNotificationsMutex.Lock()
defer hr.consumePendingNotificationsMutex.Unlock()
if hr.pendingNotarizedAtSourceNotifications.Len() == 0 &&
hr.pendingNotarizedAtDestinationNotifications.Len() == 0 &&
hr.pendingNotarizedAtBothNotifications.Len() == 0 {
return
}
log.Trace("consumePendingNotificationsWithLock() begin",
"len(source)", hr.pendingNotarizedAtSourceNotifications.Len(),
"len(destination)", hr.pendingNotarizedAtDestinationNotifications.Len(),
"len(both)", hr.pendingNotarizedAtBothNotifications.Len(),
)
hr.consumePendingNotificationsNoLock(hr.pendingNotarizedAtSourceNotifications, func(metadata *MiniblockMetadata, notification *notarizedNotification) {
metadata.NotarizedAtSourceInMetaNonce = notification.metaNonce
metadata.NotarizedAtSourceInMetaHash = notification.metaHash
})
hr.consumePendingNotificationsNoLock(hr.pendingNotarizedAtDestinationNotifications, func(metadata *MiniblockMetadata, notification *notarizedNotification) {
metadata.NotarizedAtDestinationInMetaNonce = notification.metaNonce
metadata.NotarizedAtDestinationInMetaHash = notification.metaHash
})
hr.consumePendingNotificationsNoLock(hr.pendingNotarizedAtBothNotifications, func(metadata *MiniblockMetadata, notification *notarizedNotification) {
metadata.NotarizedAtSourceInMetaNonce = notification.metaNonce
metadata.NotarizedAtSourceInMetaHash = notification.metaHash
metadata.NotarizedAtDestinationInMetaNonce = notification.metaNonce
metadata.NotarizedAtDestinationInMetaHash = notification.metaHash
})
log.Trace("consumePendingNotificationsWithLock() end",
"len(source)", hr.pendingNotarizedAtSourceNotifications.Len(),
"len(destination)", hr.pendingNotarizedAtDestinationNotifications.Len(),
"len(both)", hr.pendingNotarizedAtBothNotifications.Len(),
)
}
func (hr *historyRepository) consumePendingNotificationsNoLock(pendingMap *container.MutexMap, patchMetadataFunc func(*MiniblockMetadata, *notarizedNotification)) {
for _, key := range pendingMap.Keys() {
notification, ok := pendingMap.Get(key)
if !ok {
continue
}
keyTyped, ok := key.(string)
if !ok {
log.Error("consumePendingNotificationsNoLock(): bad key", "key", key)
continue
}
notificationTyped, ok := notification.(*notarizedNotification)
if !ok {
log.Error("consumePendingNotificationsNoLock(): bad value", "value", fmt.Sprintf("%T", notification))
continue
}
miniblockHash := []byte(keyTyped)
metadata, err := hr.getMiniblockMetadataByMiniblockHash(miniblockHash)
if err != nil {
// Maybe not yet committed / saved in storer
continue
}
patchMetadataFunc(metadata, notificationTyped)
err = hr.putMiniblockMetadata(miniblockHash, metadata)
if err != nil {
logging.LogErrAsErrorExceptAsDebugIfClosingError(log, err, "consumePendingNotificationsNoLock(): cannot put miniblock metadata",
"miniblockHash", miniblockHash, "err", err)
continue
}
pendingMap.Remove(key)
}
}
// GetResultsHashesByTxHash will return results hashes by transaction hash
func (hr *historyRepository) GetResultsHashesByTxHash(txHash []byte, epoch uint32) (*ResultsHashesByTxHash, error) {
return hr.eventsHashesByTxHashIndex.getEventsHashesByTxHash(txHash, epoch)
}
// IsEnabled will always return true
func (hr *historyRepository) IsEnabled() bool {
return true
}
// RevertBlock will return the modification for the current block header
func (hr *historyRepository) RevertBlock(blockHeader data.HeaderHandler, blockBody data.BodyHandler) error {
return hr.esdtSuppliesHandler.RevertChanges(blockHeader, blockBody)
}
// GetESDTSupply will return the supply from the storage for the given token
func (hr *historyRepository) GetESDTSupply(token string) (*esdtSupply.SupplyESDT, error) {
return hr.esdtSuppliesHandler.GetESDTSupply(token)
}
// IsInterfaceNil returns true if there is no value under the interface
func (hr *historyRepository) IsInterfaceNil() bool {
return hr == nil
}