forked from ava-labs/coreth
-
Notifications
You must be signed in to change notification settings - Fork 0
/
block_builder.go
318 lines (277 loc) · 9.91 KB
/
block_builder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
// (c) 2019-2021, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package evm
import (
"math/big"
"sync"
"time"
coreth "github.com/haowang0402/coreth/chain"
"github.com/haowang0402/coreth/params"
"github.com/haowang0402/avalanchego/snow"
commonEng "github.com/haowang0402/avalanchego/snow/engine/common"
"github.com/haowang0402/avalanchego/utils/timer"
"github.com/ethereum/go-ethereum/log"
)
// buildingBlkStatus denotes the current status of the VM in block production.
type buildingBlkStatus uint8
var (
// AP3 Params
minBlockTime = 2 * time.Second
maxBlockTime = 3 * time.Second
// AP4 Params
minBlockTimeAP4 = 500 * time.Millisecond
)
const (
batchSize = 250
// waitBlockTime is the amount of time to wait for BuildBlock to be
// called by the engine before deciding whether or not to gossip the
// transaction that triggered the PendingTxs message to the engine.
//
// This is done to reduce contention in the network when there is no
// preferred producer. If we did not wait here, we may gossip a new
// transaction to a peer while building a block that will conflict with
// whatever the peer makes.
waitBlockTime = 100 * time.Millisecond
dontBuild buildingBlkStatus = iota
conditionalBuild // Only used prior to AP4
mayBuild
building
)
type blockBuilder struct {
ctx *snow.Context
chainConfig *params.ChainConfig
chain *coreth.ETHChain
mempool *Mempool
gossiper Gossiper
shutdownChan <-chan struct{}
shutdownWg *sync.WaitGroup
// A message is sent on this channel when a new block
// is ready to be build. This notifies the consensus engine.
notifyBuildBlockChan chan<- commonEng.Message
// [buildBlockLock] must be held when accessing [buildStatus]
buildBlockLock sync.Mutex
// [buildBlockTimer] is a two stage timer handling block production.
// Stage1 build a block if the batch size has been reached.
// Stage2 build a block regardless of the size.
buildBlockTimer *timer.Timer
// buildStatus signals the phase of block building the VM is currently in.
// [dontBuild] indicates there's no need to build a block.
// [conditionalBuild] indicates build a block if the batch size has been reached.
// [mayBuild] indicates the VM should proceed to build a block.
// [building] indicates the VM has sent a request to the engine to build a block.
buildStatus buildingBlkStatus
// isAP4 is a boolean indicating if AP4 is activated. This prevents us from
// getting the current time and comparing it to the *params.chainConfig more
// than once.
isAP4 bool
}
func (vm *VM) NewBlockBuilder(notifyBuildBlockChan chan<- commonEng.Message) *blockBuilder {
b := &blockBuilder{
ctx: vm.ctx,
chainConfig: vm.chainConfig,
chain: vm.chain,
mempool: vm.mempool,
gossiper: vm.gossiper,
shutdownChan: vm.shutdownChan,
shutdownWg: &vm.shutdownWg,
notifyBuildBlockChan: notifyBuildBlockChan,
buildStatus: dontBuild,
}
b.handleBlockBuilding()
return b
}
func (b *blockBuilder) handleBlockBuilding() {
b.buildBlockTimer = timer.NewStagedTimer(b.buildBlockTwoStageTimer)
go b.ctx.Log.RecoverAndPanic(b.buildBlockTimer.Dispatch)
if !b.chainConfig.IsApricotPhase4(big.NewInt(time.Now().Unix())) {
b.shutdownWg.Add(1)
go b.ctx.Log.RecoverAndPanic(b.migrateAP4)
} else {
b.isAP4 = true
}
}
func (b *blockBuilder) migrateAP4() {
defer b.shutdownWg.Done()
// In some tests, the AP4 timestamp is not populated. If this is the case, we
// should only stop [buildBlockTwoStageTimer] on shutdown.
if b.chainConfig.ApricotPhase4BlockTimestamp == nil {
<-b.shutdownChan
b.buildBlockTimer.Stop()
return
}
timestamp := time.Unix(b.chainConfig.ApricotPhase4BlockTimestamp.Int64(), 0)
duration := time.Until(timestamp)
select {
case <-time.After(duration):
b.isAP4 = true
b.buildBlockLock.Lock()
// Flush any invalid statuses leftover from legacy block timer builder
if b.buildStatus == conditionalBuild {
b.buildStatus = mayBuild
}
b.buildBlockLock.Unlock()
case <-b.shutdownChan:
// buildBlockTimer will never be nil because we exit as soon as it is ever
// set to nil.
b.buildBlockTimer.Stop()
}
}
// handleGenerateBlock should be called immediately after [BuildBlock].
// [handleGenerateBlock] invocation could lead to quiesence, building a block with
// some delay, or attempting to build another block immediately.
func (b *blockBuilder) handleGenerateBlock() {
b.buildBlockLock.Lock()
defer b.buildBlockLock.Unlock()
if !b.isAP4 {
// Set the buildStatus before calling Cancel or Issue on
// the mempool and after generating the block.
// This prevents [needToBuild] from returning true when the
// produced block will change whether or not we need to produce
// another block and also ensures that when the mempool adds a
// new item to Pending it will be handled appropriately by [signalTxsReady]
if b.needToBuild() {
b.buildStatus = conditionalBuild
b.buildBlockTimer.SetTimeoutIn(minBlockTime)
} else {
b.buildStatus = dontBuild
}
} else {
// If we still need to build a block immediately after building, we let the
// engine know it [mayBuild] in [minBlockTimeAP4].
//
// It is often the case in AP4 that a block (with the same txs) could be built
// after a few seconds of delay as the [baseFee] and/or [blockGasCost] decrease.
if b.needToBuild() {
b.buildStatus = mayBuild
b.buildBlockTimer.SetTimeoutIn(minBlockTimeAP4)
} else {
b.buildStatus = dontBuild
}
}
}
// needToBuild returns true if there are outstanding transactions to be issued
// into a block.
func (b *blockBuilder) needToBuild() bool {
size := b.chain.PendingSize()
return size > 0 || b.mempool.Len() > 0
}
// buildEarly returns true if there are sufficient outstanding transactions to
// be issued into a block to build a block early.
//
// NOTE: Only used prior to AP4.
func (b *blockBuilder) buildEarly() bool {
size := b.chain.PendingSize()
return size > batchSize || b.mempool.Len() > 1
}
// buildBlockTwoStageTimer is a two stage timer that sends a notification
// to the engine when the VM is ready to build a block.
// If it should be called back again, it returns the timeout duration at
// which it should be called again.
func (b *blockBuilder) buildBlockTwoStageTimer() (time.Duration, bool) {
b.buildBlockLock.Lock()
defer b.buildBlockLock.Unlock()
switch b.buildStatus {
case dontBuild:
case conditionalBuild:
if !b.buildEarly() {
b.buildStatus = mayBuild
return (maxBlockTime - minBlockTime), true
}
b.markBuilding()
case mayBuild:
b.markBuilding()
case building:
// If the status has already been set to building, there is no need
// to send an additional request to the consensus engine until the call
// to BuildBlock resets the block status.
default:
// Log an error if an invalid status is found.
log.Error("Found invalid build status in build block timer", "buildStatus", b.buildStatus)
}
// No need for the timeout to fire again until BuildBlock is called.
return 0, false
}
// markBuilding assumes the [buildBlockLock] is held.
func (b *blockBuilder) markBuilding() {
select {
case b.notifyBuildBlockChan <- commonEng.PendingTxs:
b.buildStatus = building
default:
log.Error("Failed to push PendingTxs notification to the consensus engine.")
}
}
// signalTxsReady sets the initial timeout on the two stage timer if the process
// has not already begun from an earlier notification. If [buildStatus] is anything
// other than [dontBuild], then the attempt has already begun and this notification
// can be safely skipped.
func (b *blockBuilder) signalTxsReady() {
b.buildBlockLock.Lock()
defer b.buildBlockLock.Unlock()
if b.buildStatus != dontBuild {
return
}
if !b.isAP4 {
b.buildStatus = conditionalBuild
b.buildBlockTimer.SetTimeoutIn(minBlockTime)
return
}
// We take a naive approach here and signal the engine that we should build
// a block as soon as we receive at least one transaction.
//
// In the future, we may wish to add optimization here to only signal the
// engine if the sum of the projected tips in the mempool satisfies the
// required block fee.
b.markBuilding()
}
// awaitSubmittedTxs waits for new transactions to be submitted
// and notifies the VM when the tx pool has transactions to be
// put into a new block.
func (b *blockBuilder) awaitSubmittedTxs() {
b.shutdownWg.Add(1)
go b.ctx.Log.RecoverAndPanic(func() {
defer b.shutdownWg.Done()
// txSubmitChan is invoked when new transactions are issued as well as on re-orgs which
// may orphan transactions that were previously in a preferred block.
txSubmitChan := b.chain.GetTxSubmitCh()
for {
select {
case ethTxsEvent := <-txSubmitChan:
log.Trace("New tx detected, trying to generate a block")
b.signalTxsReady()
// We only attempt to invoke [GossipEthTxs] once AP4 is activated
if b.isAP4 && b.gossiper != nil && len(ethTxsEvent.Txs) > 0 {
// Give time for this node to build a block before attempting to
// gossip
time.Sleep(waitBlockTime)
// [GossipEthTxs] will block unless [gossiper.ethTxsToGossipChan] (an
// unbuffered channel) is listened on
if err := b.gossiper.GossipEthTxs(ethTxsEvent.Txs); err != nil {
log.Warn(
"failed to gossip new eth transactions",
"err", err,
)
}
}
case <-b.mempool.Pending:
log.Trace("New atomic Tx detected, trying to generate a block")
b.signalTxsReady()
// We only attempt to invoke [GossipAtomicTxs] once AP4 is activated
newTxs := b.mempool.GetNewTxs()
if b.isAP4 && b.gossiper != nil && len(newTxs) > 0 {
// Give time for this node to build a block before attempting to
// gossip
time.Sleep(waitBlockTime)
if err := b.gossiper.GossipAtomicTxs(newTxs); err != nil {
log.Warn(
"failed to gossip new atomic transactions",
"err", err,
)
}
}
case <-b.shutdownChan:
b.buildBlockTimer.Stop()
return
}
}
})
}