/
eviction.go
128 lines (100 loc) · 3.78 KB
/
eviction.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package txcache
import (
"github.com/ElrondNetwork/elrond-go-core/core"
)
// doEviction does cache eviction
// We do not allow more evictions to start concurrently
func (cache *TxCache) doEviction() {
if cache.isEvictionInProgress.IsSet() {
return
}
if !cache.isCapacityExceeded() {
return
}
cache.evictionMutex.Lock()
defer cache.evictionMutex.Unlock()
_ = cache.isEvictionInProgress.SetReturningPrevious()
defer cache.isEvictionInProgress.Reset()
if !cache.isCapacityExceeded() {
return
}
stopWatch := cache.monitorEvictionStart()
cache.makeSnapshotOfSenders()
journal := evictionJournal{}
journal.passOneNumSteps, journal.passOneNumTxs, journal.passOneNumSenders = cache.evictSendersInLoop()
journal.evictionPerformed = true
cache.evictionJournal = journal
cache.monitorEvictionEnd(stopWatch)
cache.destroySnapshotOfSenders()
}
func (cache *TxCache) makeSnapshotOfSenders() {
cache.evictionSnapshotOfSenders = cache.txListBySender.getSnapshotAscending()
}
func (cache *TxCache) destroySnapshotOfSenders() {
cache.evictionSnapshotOfSenders = nil
}
func (cache *TxCache) isCapacityExceeded() bool {
return cache.areThereTooManyBytes() || cache.areThereTooManySenders() || cache.areThereTooManyTxs()
}
func (cache *TxCache) areThereTooManyBytes() bool {
numBytes := cache.NumBytes()
tooManyBytes := numBytes > int(cache.config.NumBytesThreshold)
return tooManyBytes
}
func (cache *TxCache) areThereTooManySenders() bool {
numSenders := cache.CountSenders()
tooManySenders := numSenders > uint64(cache.config.CountThreshold)
return tooManySenders
}
func (cache *TxCache) areThereTooManyTxs() bool {
numTxs := cache.CountTx()
tooManyTxs := numTxs > uint64(cache.config.CountThreshold)
return tooManyTxs
}
// This is called concurrently by two goroutines: the eviction one and the sweeping one
func (cache *TxCache) doEvictItems(txsToEvict [][]byte, sendersToEvict []string) (countTxs uint32, countSenders uint32) {
countTxs = cache.txByHash.RemoveTxsBulk(txsToEvict)
countSenders = cache.txListBySender.RemoveSendersBulk(sendersToEvict)
return
}
func (cache *TxCache) evictSendersInLoop() (uint32, uint32, uint32) {
return cache.evictSendersWhile(cache.isCapacityExceeded)
}
// evictSendersWhileTooManyTxs removes transactions in a loop, as long as "shouldContinue" is true
// One batch of senders is removed in each step
func (cache *TxCache) evictSendersWhile(shouldContinue func() bool) (step uint32, numTxs uint32, numSenders uint32) {
if !shouldContinue() {
return
}
snapshot := cache.evictionSnapshotOfSenders
snapshotLength := uint32(len(snapshot))
batchSize := cache.config.NumSendersToPreemptivelyEvict
batchStart := uint32(0)
for step = 0; shouldContinue(); step++ {
batchEnd := batchStart + batchSize
batchEndBounded := core.MinUint32(batchEnd, snapshotLength)
batch := snapshot[batchStart:batchEndBounded]
numTxsEvictedInStep, numSendersEvictedInStep := cache.evictSendersAndTheirTxs(batch)
numTxs += numTxsEvictedInStep
numSenders += numSendersEvictedInStep
batchStart += batchSize
reachedEnd := batchStart >= snapshotLength
noTxsEvicted := numTxsEvictedInStep == 0
incompleteBatch := numSendersEvictedInStep < batchSize
shouldBreak := noTxsEvicted || incompleteBatch || reachedEnd
if shouldBreak {
break
}
}
return
}
// This is called concurrently by two goroutines: the eviction one and the sweeping one
func (cache *TxCache) evictSendersAndTheirTxs(listsToEvict []*txListForSender) (uint32, uint32) {
sendersToEvict := make([]string, 0, len(listsToEvict))
txsToEvict := make([][]byte, 0, approximatelyCountTxInLists(listsToEvict))
for _, txList := range listsToEvict {
sendersToEvict = append(sendersToEvict, txList.sender)
txsToEvict = append(txsToEvict, txList.getTxHashes()...)
}
return cache.doEvictItems(txsToEvict, sendersToEvict)
}