-
Notifications
You must be signed in to change notification settings - Fork 15
/
producer_plugin_impl.go
341 lines (280 loc) · 10.9 KB
/
producer_plugin_impl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
package producer_plugin
import (
"github.com/eosspark/eos-go/chain"
"github.com/eosspark/eos-go/chain/types"
. "github.com/eosspark/eos-go/chain/types/generated_containers"
"github.com/eosspark/eos-go/common"
"github.com/eosspark/eos-go/crypto"
"github.com/eosspark/eos-go/crypto/ecc"
. "github.com/eosspark/eos-go/exception"
. "github.com/eosspark/eos-go/exception/try"
"github.com/eosspark/eos-go/log"
"github.com/eosspark/eos-go/plugins/appbase/app"
"github.com/eosspark/eos-go/plugins/appbase/app/include"
"github.com/eosspark/eos-go/libraries/asio"
"github.com/eosspark/eos-go/plugins/chain_interface"
. "github.com/eosspark/eos-go/plugins/producer_plugin/multi_index"
)
type ProducerPluginImpl struct {
Chain *chain.Controller
ProductionEnabled bool
ProductionPaused bool
ProductionSkipFlags uint32
SignatureProviders map[ecc.PublicKey]signatureProviderType
Producers AccountNameSet
Timer *common.Timer
ProducerWatermarks map[common.AccountName]uint32
PendingBlockMode PendingBlockMode
PersistentTransactions *TransactionIdWithExpiryIndex
BlacklistedTransactions *TransactionIdWithExpiryIndex
MaxTransactionTimeMs int32
MaxIrreversibleBlockAgeUs common.Microseconds
ProduceTimeOffsetUs int32
LastBlockTimeOffsetUs int32
IrreversibleBlockTime common.TimePoint
KeosdProviderTimeoutUs common.Microseconds
LastSignedBlockTime common.TimePoint
StartTime common.TimePoint
LastSignedBlockNum uint32
Self *ProducerPlugin
PendingIncomingTransactions []pendingIncomingTransaction
/*
* HACK ALERT
* Boost timers can be in a state where a handler has not yet executed but is not abortable.
* As this method needs to mutate state handlers depend on for proper functioning to maintain
* invariants for other code (namely accepting incoming transactions in a nearly full block)
* the handlers capture a corelation ID at the time they are set. When they are executed
* they must check that correlation_id against the global ordinal. If it does not match that
* implies that this method has been called with the handler in the state where it should be
* cancelled but wasn't able to be.
*/
timerCorelationId uint32
// keep a expected ratio between defer txn and incoming txn
IncomingTrxWeight float64
IncomingDeferRadio float64
TransactionAckChannel *include.Channel
}
type StartBlockResult int
const (
succeeded = StartBlockResult(iota)
failed
waiting
exhausted
)
type PendingBlockMode int
const (
producing = PendingBlockMode(iota)
speculating
)
type EnumTxCategory int
const (
PERSISTED = EnumTxCategory(iota)
UNEXPIRED_UNPERSISTED
EXPIRED
)
type signatureProviderType = func(sha256 crypto.Sha256) *ecc.Signature
func NewProducerPluginImpl(io *asio.IoContext) *ProducerPluginImpl {
return &ProducerPluginImpl{
Timer: common.NewTimer(io),
SignatureProviders: make(map[ecc.PublicKey]signatureProviderType),
Producers: *NewAccountNameSet(),
ProducerWatermarks: make(map[common.AccountName]uint32),
PersistentTransactions: NewTransactionIdWithExpiryIndex(),
BlacklistedTransactions: NewTransactionIdWithExpiryIndex(),
IncomingTrxWeight: 0.0,
IncomingDeferRadio: 1.0, // 1:1
TransactionAckChannel: app.App().GetChannel(chain_interface.TransactionAck),
}
}
func (impl *ProducerPluginImpl) OnBlock(bsp *types.BlockState) {
if bsp.Header.Timestamp.ToTimePoint() <= impl.LastSignedBlockTime {
return
}
if bsp.Header.Timestamp.ToTimePoint() <= impl.StartTime {
return
}
if bsp.BlockNum <= impl.LastSignedBlockNum {
return
}
activeProducerToSigningKey := bsp.ActiveSchedule.Producers
activeProducers := NewAccountNameSet()
for _, p := range bsp.ActiveSchedule.Producers {
activeProducers.Add(p.ProducerName)
}
AccountNameSetIntersection(&impl.Producers, activeProducers, func(producer common.AccountName) {
if producer != bsp.Header.Producer {
var itr *types.ProducerKey
for _, k := range activeProducerToSigningKey {
if k.ProducerName == producer {
itr = &k
}
}
if itr != nil {
privateKeyItr := impl.SignatureProviders[itr.BlockSigningKey]
if privateKeyItr != nil {
//TODO signal ConfirmedBlock
//d := bsp.SigDigest()
//sig := privateKeyItr(d)
impl.LastSignedBlockTime = bsp.Header.Timestamp.ToTimePoint()
impl.LastSignedBlockNum = bsp.BlockNum
//impl.Self.ConfirmedBlock
}
}
}
})
// since the watermark has to be set before a block is created, we are looking into the future to
// determine the new schedule to identify producers that have become active
hbn := bsp.BlockNum
newBlockHeader := bsp.Header
newBlockHeader.Timestamp = newBlockHeader.Timestamp.Next()
newBlockHeader.Previous = bsp.BlockId
newBs := bsp.GenerateNext(newBlockHeader.Timestamp)
// for newly installed producers we can set their watermarks to the block they became active
if newBs.MaybePromotePending() && bsp.ActiveSchedule.Version != newBs.ActiveSchedule.Version {
newProducers := NewAccountNameSet()
for _, p := range newBs.ActiveSchedule.Producers {
if impl.Producers.Contains(p.ProducerName) {
newProducers.Add(p.ProducerName)
}
}
for _, p := range bsp.ActiveSchedule.Producers {
newProducers.Remove(p.ProducerName)
}
newProducers.Each(func(newProducer common.AccountName) {
impl.ProducerWatermarks[newProducer] = hbn
})
}
}
func (impl *ProducerPluginImpl) OnIrreversibleBlock(lib *types.SignedBlock) {
impl.IrreversibleBlockTime = lib.Timestamp.ToTimePoint()
}
func (impl *ProducerPluginImpl) OnIncomingBlock(block *types.SignedBlock) {
ppLog.Debug("received incoming block %s", block.BlockID())
EosAssert(block.Timestamp.ToTimePoint() < common.Now().AddUs(common.Seconds(7)), &BlockFromTheFuture{}, "received a block from the future, ignoring it")
chain := impl.Chain
/* de-dupe here... no point in aborting block if we already know the block */
id := block.BlockID()
existing := chain.FetchBlockById(id)
if existing != nil {
return
}
// abort the pending block
chain.AbortBlock()
// exceptions throw out, make sure we restart our loop
defer func() {
impl.ScheduleProductionLoop()
}()
// push the new block
except := false
returning := false
Try(func() {
chain.PushBlock(block, types.BlockStatus(types.Complete))
}).Catch(func(e GuardExceptions) {
//TODO: handle_guard_exception
returning = true
return
}).Catch(func(e Exception) {
log.Error(e.DetailMessage())
except = true
}).End()
if returning {
return
}
if except {
app.App().GetChannel(chain_interface.RejectedBlock).Publish(block)
return
}
if chain.HeadBlockState().Header.Timestamp.Next().ToTimePoint() >= common.Now() {
impl.ProductionEnabled = true
}
if common.Now().Sub(block.Timestamp.ToTimePoint()) < common.Minutes(5) || block.BlockNumber()%1000 == 0 {
log.Info("Received block %s... #%d @ %s signed by %s [trxs: %d, lib: %d, conf: %d, lantency: %d ms]\n",
block.BlockID().String()[8:16], block.BlockNumber(), block.Timestamp, block.Producer,
len(block.Transactions), chain.LastIrreversibleBlockNum(), block.Confirmed, (common.Now().Sub(block.Timestamp.ToTimePoint())).Count()/1000)
}
}
type pendingIncomingTransaction struct {
packedTransaction *types.PackedTransaction
persistUntilExpired bool
next func(interface{})
}
func (impl *ProducerPluginImpl) OnIncomingTransactionAsync(trx *types.PackedTransaction, persistUntilExpired bool, next func(interface{})) {
chain := impl.Chain
if chain.PendingBlockState() == nil {
impl.PendingIncomingTransactions = append(impl.PendingIncomingTransactions, pendingIncomingTransaction{trx, persistUntilExpired, next})
return
}
blockTime := chain.PendingBlockState().Header.Timestamp.ToTimePoint()
sendResponse := func(response interface{}) {
next(response)
if re, ok := response.(Exception); ok {
impl.TransactionAckChannel.Publish(common.Pair{re, trx})
if impl.PendingBlockMode == PendingBlockMode(producing) {
trxTraceLog.Debug("[TRX_TRACE] Block %d for producer %s is REJECTING tx: %s : %s ",
chain.HeadBlockNum()+1, chain.PendingBlockState().Header.Producer, trx.ID(), re.What())
} else {
trxTraceLog.Debug("[TRX_TRACE] Speculative execution is REJECTING tx: %s : %s ",
trx.ID(), re.What())
}
} else {
impl.TransactionAckChannel.Publish(common.Pair{nil, trx})
if impl.PendingBlockMode == PendingBlockMode(producing) {
trxTraceLog.Debug("[TRX_TRACE] Block %d for producer %s is ACCEPTING tx: %s",
chain.HeadBlockNum()+1, chain.PendingBlockState().Header.Producer, trx.ID())
} else {
trxTraceLog.Debug("[TRX_TRACE] Speculative execution is ACCEPTING tx: %s", trx.ID())
}
}
}
id := trx.ID()
if trx.Expiration().ToTimePoint() < blockTime {
sendResponse(&ExpiredTxException{Elog: log.Messages{log.FcLogMessage(log.LvlError, "expired transaction %s", id)}})
return
}
if chain.IsKnownUnexpiredTransaction(&id) {
sendResponse(&TxDuplicate{Elog: log.Messages{log.FcLogMessage(log.LvlError, "duplicate transaction %s", id)}})
return
}
deadline := common.Now().AddUs(common.Milliseconds(int64(impl.MaxTransactionTimeMs)))
deadlineIsSubjective := false
if impl.MaxTransactionTimeMs < 0 || impl.PendingBlockMode == PendingBlockMode(producing) && blockTime < deadline {
deadlineIsSubjective = true
deadline = blockTime
}
Try(func() {
trace := chain.PushTransaction(types.NewTransactionMetadata(trx), deadline, 0)
if trace.Except != nil {
if failureIsSubjective(trace.Except, deadlineIsSubjective) {
impl.PendingIncomingTransactions = append(impl.PendingIncomingTransactions, pendingIncomingTransaction{trx, persistUntilExpired, next})
if impl.PendingBlockMode == PendingBlockMode(producing) {
trxTraceLog.Debug("[TRX_TRACE] Block %d for producer %s COULD NOT FIT, tx: %s RETRYING ",
chain.HeadBlockNum()+1, chain.PendingBlockState().Header.Producer, trx.ID())
} else {
trxTraceLog.Debug("[TRX_TRACE] Speculative execution COULD NOT FIT tx: %s} RETRYING", trx.ID())
}
} else {
sendResponse(trace.Except)
}
} else {
if persistUntilExpired {
// if this trx didnt fail/soft-fail and the persist flag is set, store its ID so that we can
// ensure its applied to all future speculative blocks as well.
impl.PersistentTransactions.Insert(TransactionIdWithExpiry{TrxId: trx.ID(), Expiry: trx.Expiration().ToTimePoint()})
}
sendResponse(trace)
}
}).Catch(func(e GuardExceptions) {
//TODO: app().get_plugin<chain_plugin>().handle_guard_exception(e);
}).CatchAndCall(sendResponse).End()
}
func (impl *ProducerPluginImpl) GetIrreversibleBlockAge() common.Microseconds {
now := common.Now()
if now < impl.IrreversibleBlockTime {
return common.Microseconds(0)
} else {
return now.Sub(impl.IrreversibleBlockTime)
}
}
func (impl *ProducerPluginImpl) ProductionDisabledByPolicy() bool {
return !impl.ProductionEnabled || impl.ProductionPaused || (impl.MaxIrreversibleBlockAgeUs >= 0 && impl.GetIrreversibleBlockAge() >= impl.MaxIrreversibleBlockAgeUs)
}