/
sdksigner.go
461 lines (407 loc) · 14.5 KB
/
sdksigner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
package util
import (
"context"
"errors"
"fmt"
"time"
"github.com/incubus-network/fury/app/params"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
errorsmod "cosmossdk.io/errors"
sdkclient "github.com/cosmos/cosmos-sdk/client"
cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
txtypes "github.com/cosmos/cosmos-sdk/types/tx"
"github.com/cosmos/cosmos-sdk/types/tx/signing"
authsigning "github.com/cosmos/cosmos-sdk/x/auth/signing"
authtypes "github.com/cosmos/cosmos-sdk/x/auth/types"
tmmempool "github.com/tendermint/tendermint/mempool"
)
var (
ErrSdkBroadcastTimeout = errors.New("timed out waiting for tx to be committed to block")
)
type FuryMsgRequest struct {
Msgs []sdk.Msg
GasLimit uint64
FeeAmount sdk.Coins
Memo string
// Arbitrary data to be referenced in the corresponding FuryMsgResponse, unused
// in signing. This is mostly useful to match FuryMsgResponses with FuryMsgRequests.
Data interface{}
}
type FuryMsgResponse struct {
Request FuryMsgRequest
Tx authsigning.Tx
TxBytes []byte
Result sdk.TxResponse
Err error
}
// internal result for inner loop logic
type broadcastTxResult int
const (
txOK broadcastTxResult = iota
txFailed
txRetry
txResetSequence
)
// FurySigner broadcasts msgs to a single fury node
type FurySigner struct {
chainID string
encodingConfig params.EncodingConfig
authClient authtypes.QueryClient
txClient txtypes.ServiceClient
privKey cryptotypes.PrivKey
inflightTxLimit uint64
}
func NewFurySigner(
chainID string,
encodingConfig params.EncodingConfig,
authClient authtypes.QueryClient,
txClient txtypes.ServiceClient,
privKey cryptotypes.PrivKey,
inflightTxLimit uint64) *FurySigner {
return &FurySigner{
chainID: chainID,
encodingConfig: encodingConfig,
authClient: authClient,
txClient: txClient,
privKey: privKey,
inflightTxLimit: inflightTxLimit,
}
}
func (s *FurySigner) pollAccountState() <-chan authtypes.AccountI {
accountState := make(chan authtypes.AccountI)
go func() {
for {
request := authtypes.QueryAccountRequest{
Address: GetAccAddress(s.privKey).String(),
}
response, err := s.authClient.Account(context.Background(), &request)
if err == nil {
var account authtypes.AccountI
err = s.encodingConfig.InterfaceRegistry.UnpackAny(response.Account, &account)
if err == nil {
accountState <- account
}
}
time.Sleep(1 * time.Second)
}
}()
return accountState
}
func (s *FurySigner) Run(requests <-chan FuryMsgRequest) (<-chan FuryMsgResponse, error) {
// poll account state in it's own goroutine
// and send status updates to the signing goroutine
//
// TODO: instead of polling, we can wait for block
// websocket events with a fallback to polling
accountState := s.pollAccountState()
responses := make(chan FuryMsgResponse)
go func() {
// wait until account is loaded to start signing
account := <-accountState
// store current request waiting to be broadcasted
var currentRequest *FuryMsgRequest
// keep track of all successfully broadcasted txs
// index is sequence % inflightTxLimit
inflight := make([]*FuryMsgResponse, s.inflightTxLimit)
// used for confirming sent txs only
prevDeliverTxSeq := account.GetSequence()
// tx sequence of already signed messages
checkTxSeq := account.GetSequence()
// tx sequence of broadcast queue, is reset upon
// unauthorized errors to recheck/refill mempool
broadcastTxSeq := account.GetSequence()
for {
// the inflight limit includes the current request
//
// account.GetSequence() represents the first tx in the mempool (at the last known state)
// or the next sequence to sign with if checkTxSeq == account.GetSequence() (zero msgs in flight)
//
// checkTxSeq always represents the next available mempool sequence to sign with
//
// if currentRequest is nil, then it will be used for the next request received
// if currentRequest is not nil, then checkTxSeq will be used to sign that request
//
// therefore, assuming no errors, broadcastTxSeq will be checkTxSeq-1 or checkTxSeq, dependent on
// if the currentRequest has been been successfully broadcast
//
// if an unauthorized error occurs, a tx in the mempool was dropped (or mempool flushed, node restart, etc)
// and broadcastTxSeq is reset to account.GetSequence() in order to refil the mempool and ensure
// checkTxSeq is valid
//
// if an authorized error occurs due to another process signing messages on behalf of the same
// address, then broadcastTxSeq will continually be reset until that sequence is delivered to a block
//
// this results in the message we signed with the same sequence being skipped as well as
// draining our inflight messages to 0.
//
// On deployments, a similar event will occur. we will continually broadcast until
// all of the previous transactions are processed and out of the mempool.
//
// it's possible to increase the checkTx (up to the inflight limit) until met with a successful broadcast,
// to fill the mempool faster, but this feature is umimplemented and would be best enabled only once
// on startup. An authorized error during normal operation would be difficult or impossible to tell apart
// from a dropped mempool tx (without further improving mempool queries). Options such as persisting inflight
// state out of process may be better.
inflightLimitReached := checkTxSeq-account.GetSequence() >= s.inflightTxLimit
// if we are still processing a request or the inflight limit is reached
// then block until the next account update without accepting new requests
if currentRequest != nil || inflightLimitReached {
account = <-accountState
} else {
// block on state update or new requests
select {
case account = <-accountState:
case request := <-requests:
currentRequest = &request
}
}
// send delivered (included in block) responses to caller
if account.GetSequence() > prevDeliverTxSeq {
for i := prevDeliverTxSeq; i < account.GetSequence(); i++ {
response := inflight[i%s.inflightTxLimit]
// sequences may be skipped due to errors
if response != nil {
responses <- *response
}
// clear to prevent duplicate confirmations on errors
inflight[i%s.inflightTxLimit] = nil
}
prevDeliverTxSeq = account.GetSequence()
}
// recover from errors due to untracked messages in mempool
// this will happen on deploys, or if another process
// signs a tx using the same address
if checkTxSeq < account.GetSequence() {
checkTxSeq = account.GetSequence()
}
// if currentRequest then lastRequestTxSeq == checkTxSeq
// if not currentRequest then lastRequestTxSeq == checkTxSeq - 1
lastRequestTxSeq := checkTxSeq
if currentRequest == nil && lastRequestTxSeq > 0 {
lastRequestTxSeq--
}
// reset broadcast seq if iterated over last request seq
// we always want to broadcast the current or last request
// to heartbeat the mempool
if broadcastTxSeq > lastRequestTxSeq {
broadcastTxSeq = lastRequestTxSeq
}
// loop serves three purposes
// - recover from dropped txs (broadcastTxSeq < lastRequestTxSeq)
// - send new requests (currentRequest is set)
// - send mempool heartbeat (currentRequest is nil)
BROADCAST_LOOP:
for broadcastTxSeq <= lastRequestTxSeq {
// we have a new request that has not been successfully broadcasted
// and are at the last broadcastTxSeq (broadcastTxSeq == checkTxSeq in this case)
sendingCurrentRequest := broadcastTxSeq == lastRequestTxSeq && currentRequest != nil
// check if we have a previous response to check/retry/send for the broadcastTxSeq
response := inflight[broadcastTxSeq%s.inflightTxLimit]
// no response -- either checkTxSeq was skipped (untracked mempool tx), or
// we are signing a new transactions (currentRequest is not nil)
if response == nil {
// nothing to do if no response to retry and not sending a current request
if !sendingCurrentRequest {
// move onto next broadcastTxSeq or exit loop
broadcastTxSeq++
continue
}
txBuilder := s.encodingConfig.TxConfig.NewTxBuilder()
txBuilder.SetMsgs(currentRequest.Msgs...)
txBuilder.SetGasLimit(currentRequest.GasLimit)
txBuilder.SetFeeAmount(currentRequest.FeeAmount)
signerData := authsigning.SignerData{
ChainID: s.chainID,
AccountNumber: account.GetAccountNumber(),
Sequence: broadcastTxSeq,
}
tx, txBytes, err := Sign(s.encodingConfig.TxConfig, s.privKey, txBuilder, signerData)
response = &FuryMsgResponse{
Request: *currentRequest,
Tx: tx,
TxBytes: txBytes,
Err: err,
}
// could not sign and encode the currentRequest
if response.Err != nil {
// clear invalid request, since this is non-recoverable
currentRequest = nil
// response immediately with error
responses <- *response
// exit loop
broadcastTxSeq++
continue
}
}
// broadcast tx and get result
//
// there are four main types of results
//
// OK (tx in mempool, store response - add to inflight txs)
// Retry (tx not in mempool, but retry - do not change inflight status)
// Failed (tx not in mempool, not recoverable - clear inflight status, reply to channel)
// Unauthorized (tx not in mempool - sequence not valid)
broadcastRequest := txtypes.BroadcastTxRequest{
TxBytes: response.TxBytes,
Mode: txtypes.BroadcastMode_BROADCAST_MODE_SYNC,
}
broadcastResponse, err := s.txClient.BroadcastTx(context.Background(), &broadcastRequest)
// set to determine action at the end of loop
// default is OK
txResult := txOK
// determine action to take when err (and no response)
if err != nil {
if tmmempool.IsPreCheckError(err) {
// ErrPreCheck - not recoverable
response.Err = err
txResult = txFailed
} else {
// could not contact node (POST failed, dns errors, etc)
// exit loop, wait for another account state update
// TODO: are there cases here that we will never recover from?
// should we implement retry limit?
response.Err = err
txResult = txRetry
}
} else {
// store rpc result in response
response.Result = *broadcastResponse.TxResponse
// determine action to take based on rpc result
switch response.Result.Code {
// 0: success, in mempool
case errorsmod.SuccessABCICode:
txResult = txOK
// 4: unauthorized
case sdkerrors.ErrUnauthorized.ABCICode():
txResult = txResetSequence
// 19: success, tx already in mempool
case sdkerrors.ErrTxInMempoolCache.ABCICode():
txResult = txOK
// 20: mempool full
case sdkerrors.ErrMempoolIsFull.ABCICode():
txResult = txRetry
// 32: wrong sequence
case sdkerrors.ErrWrongSequence.ABCICode():
txResult = txResetSequence
default:
response.Err = fmt.Errorf("message failed to broadcast, unrecoverable error code %d", response.Result.Code)
txResult = txFailed
}
}
switch txResult {
case txOK:
// clear any errors from previous attempts
response.Err = nil
// store for delivery later
inflight[broadcastTxSeq%s.inflightTxLimit] = response
// if this is the current/last request, then clear
// the request and increment the checkTxSeq
if sendingCurrentRequest {
currentRequest = nil
checkTxSeq++
}
// go to next request
broadcastTxSeq++
case txFailed:
// do not store the request as inflight (it's not in the mempool)
inflight[broadcastTxSeq%s.inflightTxLimit] = nil
// clear current request if it failed
if sendingCurrentRequest {
currentRequest = nil
}
// immediatley response to channel
responses <- *response
// go to next request
broadcastTxSeq++
case txRetry:
break BROADCAST_LOOP
case txResetSequence:
broadcastTxSeq = account.GetSequence()
break BROADCAST_LOOP
}
}
}
}()
return responses, nil
}
// Address returns the address of the Signer
func (s *FurySigner) Address() sdk.AccAddress {
return GetAccAddress(s.privKey)
}
// Sign signs a populated TxBuilder and returns a signed Tx and raw transaction bytes
func Sign(
txConfig sdkclient.TxConfig,
privKey cryptotypes.PrivKey,
txBuilder sdkclient.TxBuilder,
signerData authsigning.SignerData,
) (authsigning.Tx, []byte, error) {
signatureData := signing.SingleSignatureData{
SignMode: signing.SignMode_SIGN_MODE_DIRECT,
Signature: nil,
}
sigV2 := signing.SignatureV2{
PubKey: privKey.PubKey(),
Data: &signatureData,
Sequence: signerData.Sequence,
}
if err := txBuilder.SetSignatures(sigV2); err != nil {
return txBuilder.GetTx(), nil, err
}
signBytes, err := txConfig.SignModeHandler().GetSignBytes(signing.SignMode_SIGN_MODE_DIRECT, signerData, txBuilder.GetTx())
if err != nil {
return txBuilder.GetTx(), nil, err
}
signature, err := privKey.Sign(signBytes)
if err != nil {
return txBuilder.GetTx(), nil, err
}
sigV2.Data = &signing.SingleSignatureData{
SignMode: signing.SignMode_SIGN_MODE_DIRECT,
Signature: signature,
}
if err := txBuilder.SetSignatures(sigV2); err != nil {
return txBuilder.GetTx(), nil, err
}
txBytes, err := txConfig.TxEncoder()(txBuilder.GetTx())
if err != nil {
return txBuilder.GetTx(), nil, err
}
return txBuilder.GetTx(), txBytes, nil
}
func GetAccAddress(privKey cryptotypes.PrivKey) sdk.AccAddress {
return privKey.PubKey().Address().Bytes()
}
// WaitForSdkTxCommit polls the chain until the tx hash is found or times out.
// Returns an error immediately if tx hash is empty
func WaitForSdkTxCommit(txClient txtypes.ServiceClient, txHash string, timeout time.Duration) (*sdk.TxResponse, error) {
if txHash == "" {
return nil, fmt.Errorf("tx hash is empty")
}
var err error
var txRes *sdk.TxResponse
var res *txtypes.GetTxResponse
outOfTime := time.After(timeout)
for {
select {
case <-outOfTime:
err = ErrSdkBroadcastTimeout
default:
res, err = txClient.GetTx(context.Background(), &txtypes.GetTxRequest{Hash: txHash})
if err != nil {
status, ok := grpcstatus.FromError(err)
if ok && status.Code() == codes.NotFound {
// tx still not committed to a block. retry!
time.Sleep(100 * time.Millisecond)
continue
}
break
}
txRes = res.TxResponse
}
break
}
return txRes, err
}