-
Notifications
You must be signed in to change notification settings - Fork 3
/
xrpl_tx_observer.go
350 lines (310 loc) · 11.7 KB
/
xrpl_tx_observer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
package processes
import (
"context"
"strings"
sdkmath "cosmossdk.io/math"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/pkg/errors"
rippledata "github.com/rubblelabs/ripple/data"
"github.com/samber/lo"
"github.com/CoreumFoundation/coreum-tools/pkg/parallel"
"github.com/CoreumFoundation/coreumbridge-xrpl/relayer/coreum"
"github.com/CoreumFoundation/coreumbridge-xrpl/relayer/logger"
"github.com/CoreumFoundation/coreumbridge-xrpl/relayer/tracing"
"github.com/CoreumFoundation/coreumbridge-xrpl/relayer/xrpl"
)
// XRPLTxObserverConfig is XRPLTxObserver config.
type XRPLTxObserverConfig struct {
BridgeXRPLAddress rippledata.Account
RelayerCoreumAddress sdk.AccAddress
}
// XRPLTxObserver is process which observes the XRPL txs and register the evidences in the contract.
type XRPLTxObserver struct {
cfg XRPLTxObserverConfig
log logger.Logger
txScanner XRPLAccountTxScanner
contractClient ContractClient
}
// NewXRPLTxObserver returns a new instance of the XRPLTxObserver.
func NewXRPLTxObserver(
cfg XRPLTxObserverConfig,
log logger.Logger,
txScanner XRPLAccountTxScanner,
contractClient ContractClient,
) *XRPLTxObserver {
return &XRPLTxObserver{
cfg: cfg,
log: log,
txScanner: txScanner,
contractClient: contractClient,
}
}
// Init validates the process state.
func (o *XRPLTxObserver) Init(ctx context.Context) error {
o.log.Debug(ctx, "Initializing process")
if o.cfg.RelayerCoreumAddress.Empty() {
return errors.Errorf("failed to init process, relayer address is nil or empty")
}
if !o.contractClient.IsInitialized() {
return errors.Errorf("failed to init process, contract client is not initialized")
}
return nil
}
// Start starts the process.
func (o *XRPLTxObserver) Start(ctx context.Context) error {
txCh := make(chan rippledata.TransactionWithMetaData)
return parallel.Run(ctx, func(ctx context.Context, spawn parallel.SpawnFn) error {
spawn("tx-scanner", parallel.Continue, func(ctx context.Context) error {
defer close(txCh)
return o.txScanner.ScanTxs(ctx, txCh)
})
spawn("tx-processor", parallel.Continue, func(ctx context.Context) error {
for tx := range txCh {
if err := o.processTx(ctx, tx); err != nil {
if errors.Is(err, context.Canceled) {
o.log.Warn(ctx, "Context canceled during the XRPL tx processing", logger.StringField("error", err.Error()))
} else {
return errors.Wrapf(err, "failed to process XRPL tx, txHash:%s", tx.GetHash().String())
}
}
}
return nil
})
return nil
}, parallel.WithGroupLogger(o.log.ParallelLogger(ctx)))
}
func (o *XRPLTxObserver) processTx(ctx context.Context, tx rippledata.TransactionWithMetaData) error {
ctx = tracing.WithTracingXRPLTxHash(tracing.WithTracingID(ctx), tx.GetHash().String())
if !txIsFinal(tx) {
o.log.Debug(ctx, "Transaction is not final", logger.StringField("txStatus", tx.MetaData.TransactionResult.String()))
return nil
}
if o.cfg.BridgeXRPLAddress == tx.GetBase().Account {
return o.processOutgoingTx(ctx, tx)
}
return o.processIncomingTx(ctx, tx)
}
func (o *XRPLTxObserver) processIncomingTx(ctx context.Context, tx rippledata.TransactionWithMetaData) error {
txType := tx.GetType()
if !tx.MetaData.TransactionResult.Success() {
o.log.Debug(
ctx,
"Skipping not successful transaction",
logger.StringField("type", txType),
logger.StringField("txResult", tx.MetaData.TransactionResult.String()),
)
return nil
}
o.log.Debug(ctx, "Start processing of XRPL incoming tx", logger.StringField("type", txType))
// we process only incoming payment transactions, other transactions are ignored
if txType != rippledata.PAYMENT.String() {
o.log.Debug(ctx, "Skipping not payment transaction", logger.StringField("type", txType))
return nil
}
paymentTx, ok := tx.Transaction.(*rippledata.Payment)
if !ok {
return errors.Errorf("failed to cast tx to Payment, data:%+v", tx)
}
coreumRecipient := xrpl.DecodeCoreumRecipientFromMemo(paymentTx.Memos)
if coreumRecipient == nil {
o.log.Info(ctx, "Bridge memo does not include expected structure", logger.AnyField("memos", paymentTx.Memos))
return nil
}
deliveredXRPLAmount := tx.MetaData.DeliveredAmount
stringCurrency := xrpl.ConvertCurrencyToString(deliveredXRPLAmount.Currency)
var coreumAmount sdkmath.Int
// for the coreum originated tokens we need to use toke decimals, but for the XRPL they are static
if o.cfg.BridgeXRPLAddress.String() == deliveredXRPLAmount.Issuer.String() {
coreumToken, err := o.contractClient.GetCoreumTokenByXRPLCurrency(ctx, stringCurrency)
if err != nil {
return errors.Wrapf(err, "faild to get XRPL token for the XRPL to coreum transfer")
}
coreumAmount, err = ConvertCoreumOriginatedTokenXRPLAmountToCoreumAmount(*deliveredXRPLAmount, coreumToken.Decimals)
if err != nil {
return err
}
} else {
var err error
coreumAmount, err = ConvertXRPLOriginatedTokenXRPLAmountToCoreumAmount(*deliveredXRPLAmount)
if err != nil {
return err
}
}
if coreumAmount.IsZero() {
o.log.Info(ctx, "Nothing to send, amount is zero")
return nil
}
evidence := coreum.XRPLToCoreumTransferEvidence{
TxHash: paymentTx.GetHash().String(),
Issuer: deliveredXRPLAmount.Issuer.String(),
Currency: stringCurrency,
Amount: coreumAmount,
Recipient: coreumRecipient,
}
_, err := o.contractClient.SendXRPLToCoreumTransferEvidence(ctx, o.cfg.RelayerCoreumAddress, evidence)
if err == nil {
return nil
}
if coreum.IsTokenNotRegisteredError(err) {
o.log.Info(ctx, "Token not registered")
return nil
}
if IsEvidenceErrorCausedByResubmission(err) {
o.log.Debug(ctx, "Received expected send evidence error caused by re-submission")
return nil
}
if coreum.IsAssetFTWhitelistedLimitExceededError(err) {
o.log.Info(ctx, "The evidence saving is failed because of the asset FT rules, the evidence is skipped", logger.AnyField("evidence", evidence))
return nil
}
return err
}
func (o *XRPLTxObserver) processOutgoingTx(ctx context.Context, tx rippledata.TransactionWithMetaData) error {
txType := tx.GetType()
o.log.Debug(ctx, "Start processing of XRPL outgoing tx",
logger.StringField("type", txType),
)
switch txType {
case rippledata.TICKET_CREATE.String():
return o.sendXRPLTicketsAllocationTransactionResultEvidence(ctx, tx)
case rippledata.TRUST_SET.String():
return o.sendXRPLTrustSetTransactionResultEvidence(ctx, tx)
case rippledata.PAYMENT.String():
return o.sendCoreumToXRPLTransferTransactionResultEvidence(ctx, tx)
default:
// TODO(dzmitryhil) replace with the error once we integrate all supported types
o.log.Warn(ctx, "Found unsupported transaction type", logger.AnyField("tx", tx))
return nil
}
}
func (o *XRPLTxObserver) sendXRPLTicketsAllocationTransactionResultEvidence(ctx context.Context, tx rippledata.TransactionWithMetaData) error {
tickets := extractTicketSequencesFromMetaData(tx.MetaData)
txResult := getTransactionResult(tx)
if txResult == coreum.TransactionResultRejected {
tickets = nil
}
evidence := coreum.XRPLTransactionResultTicketsAllocationEvidence{
XRPLTransactionResultEvidence: coreum.XRPLTransactionResultEvidence{
TxHash: tx.GetHash().String(),
TransactionResult: txResult,
},
Tickets: tickets,
}
ticketCreateTx, ok := tx.Transaction.(*rippledata.TicketCreate)
if !ok {
return errors.Errorf("failed to cast tx to TicketCreate, data:%+v", tx)
}
if ticketCreateTx.Sequence != 0 {
evidence.AccountSequence = lo.ToPtr(ticketCreateTx.Sequence)
}
if ticketCreateTx.TicketSequence != nil && *ticketCreateTx.TicketSequence != 0 {
evidence.TicketSequence = lo.ToPtr(*ticketCreateTx.TicketSequence)
}
_, err := o.contractClient.SendXRPLTicketsAllocationTransactionResultEvidence(
ctx,
o.cfg.RelayerCoreumAddress,
evidence,
)
return o.handleEvidenceSubmissionError(ctx, err, tx, evidence.XRPLTransactionResultEvidence)
}
func (o *XRPLTxObserver) sendXRPLTrustSetTransactionResultEvidence(ctx context.Context, tx rippledata.TransactionWithMetaData) error {
trustSetTx, ok := tx.Transaction.(*rippledata.TrustSet)
if !ok {
return errors.Errorf("failed to cast tx to TrustSet, data:%+v", tx)
}
evidence := coreum.XRPLTransactionResultTrustSetEvidence{
XRPLTransactionResultEvidence: coreum.XRPLTransactionResultEvidence{
TxHash: tx.GetHash().String(),
TransactionResult: getTransactionResult(tx),
TicketSequence: trustSetTx.TicketSequence,
},
Issuer: trustSetTx.LimitAmount.Issuer.String(),
Currency: xrpl.ConvertCurrencyToString(trustSetTx.LimitAmount.Currency),
}
_, err := o.contractClient.SendXRPLTrustSetTransactionResultEvidence(
ctx,
o.cfg.RelayerCoreumAddress,
evidence,
)
return o.handleEvidenceSubmissionError(ctx, err, tx, evidence.XRPLTransactionResultEvidence)
}
func (o *XRPLTxObserver) sendCoreumToXRPLTransferTransactionResultEvidence(ctx context.Context, tx rippledata.TransactionWithMetaData) error {
paymentTx, ok := tx.Transaction.(*rippledata.Payment)
if !ok {
return errors.Errorf("failed to cast tx to Payment, data:%+v", tx)
}
evidence := coreum.XRPLTransactionResultCoreumToXRPLTransferEvidence{
XRPLTransactionResultEvidence: coreum.XRPLTransactionResultEvidence{
TxHash: tx.GetHash().String(),
TransactionResult: getTransactionResult(tx),
TicketSequence: paymentTx.TicketSequence,
},
}
_, err := o.contractClient.SendCoreumToXRPLTransferTransactionResultEvidence(
ctx,
o.cfg.RelayerCoreumAddress,
evidence,
)
return o.handleEvidenceSubmissionError(ctx, err, tx, evidence.XRPLTransactionResultEvidence)
}
func (o *XRPLTxObserver) handleEvidenceSubmissionError(
ctx context.Context,
err error,
tx rippledata.TransactionWithMetaData,
evidence coreum.XRPLTransactionResultEvidence,
) error {
if err == nil {
if evidence.TransactionResult != coreum.TransactionResultAccepted {
o.log.Warn(ctx, "Transaction was rejected", logger.StringField("txResult", tx.MetaData.TransactionResult.String()))
}
return nil
}
if IsEvidenceErrorCausedByResubmission(err) {
o.log.Debug(ctx, "Received expected send evidence error")
return nil
}
return err
}
// txIsFinal returns value which indicates whether the transaction if final and can be used.
// Result Code Finality.
// tesSUCCESS Final when included in a validated ledger.
// Any tec code Final when included in a validated ledger.
// Any tem code Final unless the protocol changes to make the transaction valid.
// tefPAST_SEQ Final when another transaction with the same sequence number is included in a validated ledger.
// tefMAX_LEDGER Final when a validated ledger has a ledger index higher than the transaction's LastLedgerSequence field, and no validated ledger includes the transaction.
func txIsFinal(tx rippledata.TransactionWithMetaData) bool {
txResult := tx.MetaData.TransactionResult
return tx.MetaData.TransactionResult.Success() ||
strings.HasPrefix(txResult.String(), xrpl.TecTxResultPrefix) ||
strings.HasPrefix(txResult.String(), xrpl.TemTxResultPrefix) ||
txResult.String() == xrpl.TefPastSeqTxResult ||
txResult.String() == xrpl.TefMaxLedgerTxResult
}
func getTransactionResult(tx rippledata.TransactionWithMetaData) coreum.TransactionResult {
if tx.MetaData.TransactionResult.Success() {
return coreum.TransactionResultAccepted
}
return coreum.TransactionResultRejected
}
func extractTicketSequencesFromMetaData(metaData rippledata.MetaData) []uint32 {
ticketSequences := make([]uint32, 0)
for _, node := range metaData.AffectedNodes {
createdNode := node.CreatedNode
if createdNode == nil {
continue
}
newFields := createdNode.NewFields
if newFields == nil {
continue
}
if rippledata.TICKET.String() != newFields.GetType() {
continue
}
ticket, ok := newFields.(*rippledata.Ticket)
if !ok {
continue
}
ticketSequences = append(ticketSequences, *ticket.TicketSequence)
}
return ticketSequences
}