Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't stop XRPL txs scanning if one tx in the result contains unexpected json structure. #206

Merged
merged 2 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions infra/composer/prometheus/alert.rules
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,11 @@ groups:
severity: "critical"
annotations:
description: "The bridge is halted"

- alert: XRPL RPC decoding error
expr: xrpl_rpc_decoding_errors_total > 0
for: 1s
labels:
severity: major
annotations:
description: "Found XRPL RPC decoding error"
1 change: 1 addition & 0 deletions integration-tests/xrpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func NewXRPLChain(cfg XRPLChainConfig, log logger.Logger) (XRPLChain, error) {
xrpl.DefaultRPCClientConfig(cfg.RPCAddress),
log,
http.NewRetryableClient(http.DefaultClientConfig()),
nil,
)

signer := xrpl.NewKeyringTxSigner(kr)
Expand Down
2 changes: 2 additions & 0 deletions integration-tests/xrpl/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func TestFullHistoryScanAccountTx(t *testing.T) {
rpcClientConfig,
chains.Log,
http.NewRetryableClient(http.DefaultClientConfig()),
nil,
)

// enable just historical scan
Expand Down Expand Up @@ -97,6 +98,7 @@ func TestRecentHistoryScanAccountTx(t *testing.T) {
rpcClientConfig,
chains.Log,
http.NewRetryableClient(http.DefaultClientConfig()),
nil,
)

// update config to use recent scan only
Expand Down
12 changes: 12 additions & 0 deletions relayer/metrics/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
relayerActivityMetricName = "relayer_activity"
xrplTokensCoreumSupplyMetricName = "xrpl_tokens_coreum_supply"
xrplBridgeAccountReservesMetricName = "xrpl_bridge_account_reserves"
xrplRPCDecodingErrorCounterMetricName = "xrpl_rpc_decoding_errors_total"

// XRPLCurrencyIssuerLabel is XRPL currency issuer label.
XRPLCurrencyIssuerLabel = "xrpl_currency_issuer"
Expand Down Expand Up @@ -59,6 +60,7 @@ type Registry struct {
RelayerActivityGaugeVec *prometheus.GaugeVec
XRPLTokensCoreumSupplyGaugeVec *prometheus.GaugeVec
XRPLBridgeAccountReservesGauge prometheus.Gauge
XRPLRPCDecodingErrorCounter prometheus.Counter
}

// NewRegistry returns new metric registry.
Expand Down Expand Up @@ -167,6 +169,10 @@ func NewRegistry() *Registry {
Name: xrplBridgeAccountReservesMetricName,
Help: "XRPL bridge account reserves",
}),
XRPLRPCDecodingErrorCounter: prometheus.NewCounter(prometheus.CounterOpts{
Name: xrplRPCDecodingErrorCounterMetricName,
Help: "XRPL RPC decoding error counter",
}),
}
}

Expand All @@ -190,6 +196,7 @@ func (m *Registry) Register(registry prometheus.Registerer) error {
m.RelayerActivityGaugeVec,
m.XRPLTokensCoreumSupplyGaugeVec,
m.XRPLBridgeAccountReservesGauge,
m.XRPLRPCDecodingErrorCounter,
}

for _, c := range collectors {
Expand Down Expand Up @@ -221,3 +228,8 @@ func (m *Registry) SetXRPLAccountFullHistoryScanLedgerIndex(index float64) {
func (m *Registry) SetMaliciousBehaviourKey(key string) {
m.MaliciousBehaviourGaugeVec.WithLabelValues(key).Set(1)
}

// IncrementXRPLRPCDecodingErrorCounter increments XRPLRPCDecodingErrorCounter.
func (m *Registry) IncrementXRPLRPCDecodingErrorCounter() {
m.XRPLRPCDecodingErrorCounter.Inc()
}
2 changes: 1 addition & 1 deletion relayer/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func NewComponents(
retryableXRPLRPCHTTPClient := toolshttp.NewRetryableClient(toolshttp.RetryableClientConfig(cfg.XRPL.HTTPClient))

xrplRPCClientCfg := xrpl.RPCClientConfig(cfg.XRPL.RPC)
xrplRPCClient := xrpl.NewRPCClient(xrplRPCClientCfg, log, retryableXRPLRPCHTTPClient)
xrplRPCClient := xrpl.NewRPCClient(xrplRPCClientCfg, log, retryableXRPLRPCHTTPClient, metricsRegistry)

coreumClientContextCfg := coreumchainclient.DefaultContextConfig()
coreumClientContextCfg.TimeoutConfig.RequestTimeout = cfg.Coreum.Contract.RequestTimeout
Expand Down
84 changes: 70 additions & 14 deletions relayer/xrpl/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ import (
"github.com/CoreumFoundation/coreumbridge-xrpl/relayer/logger"
)

//go:generate mockgen -destination=rpc_mocks_test.go -package=xrpl_test . HTTPClient,RPCMetricRegistry

// UnknownTransactionResultErrorText error text for the unexpected tx code.
const UnknownTransactionResultErrorText = "Unknown TransactionResult"

// ******************** RPC command request objects ********************

// RPCError is RPC error result.
Expand Down Expand Up @@ -137,6 +142,13 @@ type AccountTxResult struct {
Validated bool `json:"validated"`
}

// AccountTxWithRawTxsResult is `account_tx` method result with json.RawMessage transactions.
type AccountTxWithRawTxsResult struct {
Marker map[string]any `json:"marker,omitempty"`
Transactions []json.RawMessage `json:"transactions,omitempty"`
Validated bool `json:"validated"`
}

// ServerStateValidatedLedger is the latest validated ledger from the server state.
type ServerStateValidatedLedger struct {
BaseFee uint32 `json:"base_fee"`
Expand Down Expand Up @@ -194,12 +206,14 @@ type RipplePathFindResult struct {

// ******************** RPC transport objects ********************

type rpcRequest struct {
// RPCRequest is general RPC request.
type RPCRequest struct {
Method string `json:"method"`
Params []any `json:"params,omitempty"`
}

type rpcResponse struct {
// RPCResponse is general RPC response.
type RPCResponse struct {
Result any `json:"result"`
}

Expand All @@ -210,6 +224,11 @@ type HTTPClient interface {
DoJSON(ctx context.Context, method, url string, reqBody any, resDecoder func([]byte) error) error
}

// RPCMetricRegistry is rpc metric registry.
type RPCMetricRegistry interface {
IncrementXRPLRPCDecodingErrorCounter()
}

// RPCClientConfig defines the config for the RPCClient.
type RPCClientConfig struct {
URL string
Expand All @@ -226,17 +245,24 @@ func DefaultRPCClientConfig(url string) RPCClientConfig {

// RPCClient implement the XRPL RPC client.
type RPCClient struct {
cfg RPCClientConfig
log logger.Logger
httpClient HTTPClient
cfg RPCClientConfig
log logger.Logger
httpClient HTTPClient
metricRegistry RPCMetricRegistry
}

// NewRPCClient returns new instance of the RPCClient.
func NewRPCClient(cfg RPCClientConfig, log logger.Logger, httpClient HTTPClient) *RPCClient {
func NewRPCClient(
cfg RPCClientConfig,
log logger.Logger,
httpClient HTTPClient,
metricRegistry RPCMetricRegistry,
) *RPCClient {
return &RPCClient{
cfg: cfg,
log: log,
httpClient: httpClient,
cfg: cfg,
log: log,
httpClient: httpClient,
metricRegistry: metricRegistry,
}
}

Expand Down Expand Up @@ -353,6 +379,11 @@ func (c *RPCClient) Submit(ctx context.Context, tx rippledata.Transaction) (Subm
}
var result SubmitResult
if err := c.callRPC(ctx, "submit", params, &result); err != nil {
if strings.Contains(err.Error(), UnknownTransactionResultErrorText) {
c.log.Error(ctx, "Failed to decode XRPL transaction result", zap.Error(err))
c.metricRegistry.IncrementXRPLRPCDecodingErrorCounter()
}

return SubmitResult{}, err
}

Expand Down Expand Up @@ -400,12 +431,37 @@ func (c *RPCClient) AccountTx(
Limit: c.cfg.PageLimit,
Marker: marker,
}
var result AccountTxResult
var result AccountTxWithRawTxsResult
if err := c.callRPC(ctx, "account_tx", params, &result); err != nil {
return AccountTxResult{}, err
}

return result, nil
txs := make(rippledata.TransactionSlice, 0)
for i, rawTx := range result.Transactions {
var tx rippledata.TransactionWithMetaData
if err := json.Unmarshal(rawTx, &tx); err != nil {
c.log.Error(
ctx,
"Failed to decode json tx to rippledata.TransactionWithMetaData",
zap.Error(err),
zap.String("tx", string(rawTx)),
zap.Int("txIndex", i),
zap.String("account", account.String()),
zap.Int64("minLedger", minLedger),
zap.Int64("maxLedger", maxLedger),
zap.Any("marker", marker),
)
c.metricRegistry.IncrementXRPLRPCDecodingErrorCounter()
continue
}
txs = append(txs, &tx)
}

return AccountTxResult{
Marker: result.Marker,
Transactions: txs,
Validated: result.Validated,
}, nil
}

// ServerState returns the server state information.
Expand Down Expand Up @@ -455,7 +511,7 @@ func (c *RPCClient) RipplePathFind(
}

func (c *RPCClient) callRPC(ctx context.Context, method string, params, result any) error {
request := rpcRequest{
request := RPCRequest{
Method: method,
Params: []any{
params,
Expand All @@ -465,7 +521,7 @@ func (c *RPCClient) callRPC(ctx context.Context, method string, params, result a

err := c.httpClient.DoJSON(ctx, http.MethodPost, c.cfg.URL, request, func(resBytes []byte) error {
c.log.Debug(ctx, "Received XRPL RPC result", zap.String("result", string(resBytes)))
errResponse := rpcResponse{
errResponse := RPCResponse{
Result: &RPCError{},
}
if err := json.Unmarshal(resBytes, &errResponse); err != nil {
Expand All @@ -478,7 +534,7 @@ func (c *RPCClient) callRPC(ctx context.Context, method string, params, result a
if errResult.Code != 0 || strings.TrimSpace(errResult.Name) != "" {
return errResult
}
response := rpcResponse{
response := RPCResponse{
Result: result,
}
if err := json.Unmarshal(resBytes, &response); err != nil {
Expand Down
84 changes: 84 additions & 0 deletions relayer/xrpl/rpc_mocks_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading