diff --git a/Makefile b/Makefile index a02abaa7..618bd3ba 100644 --- a/Makefile +++ b/Makefile @@ -73,12 +73,12 @@ lint-contract: test-integration: # test each directory separately to prevent faucet concurrent access for d in $(INTEGRATION_TESTS_DIR)/*/; \ - do cd $$d && go clean -testcache && go test -v --tags=integrationtests -mod=readonly -parallel=4 ./... || exit 1; \ + do cd $$d && go clean -testcache && go test -v --tags=integrationtests -mod=readonly -parallel=10 -timeout 5m ./... || exit 1; \ done .PHONY: test-relayer test-relayer: - cd $(RELAYER_DIR) && go clean -testcache && go test -v -mod=readonly -parallel=4 -timeout 500ms ./... + cd $(RELAYER_DIR) && go clean -testcache && go test -v -mod=readonly -parallel=10 -timeout 500ms ./... .PHONY: test-contract test-contract: diff --git a/contract/src/operation.rs b/contract/src/operation.rs index 3550ac25..798a3820 100644 --- a/contract/src/operation.rs +++ b/contract/src/operation.rs @@ -143,7 +143,7 @@ pub fn handle_coreum_to_xrpl_transfer_confirmation( match COREUM_TOKENS .idx .xrpl_currency - .item(storage, currency.to_owned())? + .item(storage, currency)? .map(|(_, ct)| ct) { Some(token) => { diff --git a/integration-tests/go.mod b/integration-tests/go.mod index a1248819..e4f6ad2e 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -20,7 +20,7 @@ replace ( ) require ( - github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b + github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872 github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd github.com/CoreumFoundation/coreumbridge-xrpl/relayer v1.0.0 github.com/rubblelabs/ripple v0.0.0-20230908201244-7f73b1fe5e22 diff --git a/integration-tests/go.sum b/integration-tests/go.sum index 9f1dc0a4..86b05c68 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -222,8 +222,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d h1:nalkkPQcITbvhmL4+C4cKA87NW0tfm3Kl9VXRoPywFg= github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d/go.mod h1:URdX5+vg25ts3aCh8H5IFZybJYKWhJHYMTnf+ULtoC4= -github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b h1:nSNvOe9oRVl0Ijph3u/e1nZi7j4vGhYyTI3NVbPLdXI= -github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b/go.mod h1:VD93vCHkxYaT/RhOesXTFgd/GQDW54tr0BqGi5JU1c0= +github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872 h1:cEkMNqZamgrLwUAqjhRcoF0h3Z/AZY6mp20H8N0P/98= +github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872/go.mod h1:VD93vCHkxYaT/RhOesXTFgd/GQDW54tr0BqGi5JU1c0= github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd h1:NEwCGG9i6yjPY/avFTcrCDF16zvzIcvldnUR8AZtA7U= github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd/go.mod h1:XTqILFqH1e0GF1bYEnu/I0mElsfwH5OWfu2F5DACIjY= github.com/CosmWasm/wasmd v0.41.0 h1:fmwxSbwb50zZDcBaayYFRLIaSFca+EFld1WOaQi49jg= diff --git a/integration-tests/processes/env_test.go b/integration-tests/processes/env_test.go index 26f92b11..37073b89 100644 --- a/integration-tests/processes/env_test.go +++ b/integration-tests/processes/env_test.go @@ -5,7 +5,8 @@ package processes_test import ( "context" - "sync" + "fmt" + "strings" "testing" "time" @@ -18,6 +19,7 @@ import ( "github.com/samber/lo" "github.com/stretchr/testify/require" + "github.com/CoreumFoundation/coreum-tools/pkg/parallel" "github.com/CoreumFoundation/coreum-tools/pkg/retry" coreumapp "github.com/CoreumFoundation/coreum/v3/app" coreumconfig "github.com/CoreumFoundation/coreum/v3/pkg/config" @@ -54,18 +56,18 @@ func DefaultRunnerEnvConfig() RunnerEnvConfig { // RunnerEnv is runner environment used for the integration tests. type RunnerEnv struct { - Cfg RunnerEnvConfig - bridgeXRPLAddress rippledata.Account - ContractClient *coreum.ContractClient - Chains integrationtests.Chains - ContractOwner sdk.AccAddress - Runners []*runner.Runner - ProcessErrorsMu sync.RWMutex - ProcessErrors []error + Cfg RunnerEnvConfig + bridgeXRPLAddress rippledata.Account + ContractClient *coreum.ContractClient + Chains integrationtests.Chains + ContractOwner sdk.AccAddress + RunnersParallelGroup *parallel.Group + Runners []*runner.Runner } // NewRunnerEnv returns new instance of the RunnerEnv. func NewRunnerEnv(ctx context.Context, t *testing.T, cfg RunnerEnvConfig, chains integrationtests.Chains) *RunnerEnv { + ctx, cancel := context.WithCancel(ctx) relayerCoreumAddresses := genCoreumRelayers( ctx, t, @@ -134,57 +136,45 @@ func NewRunnerEnv(ctx context.Context, t *testing.T, cfg RunnerEnvConfig, chains } runnerEnv := &RunnerEnv{ - Cfg: cfg, - bridgeXRPLAddress: bridgeXRPLAddress, - ContractClient: contractClient, - Chains: chains, - ContractOwner: contractOwner, - Runners: runners, - ProcessErrorsMu: sync.RWMutex{}, - ProcessErrors: make([]error, 0), + Cfg: cfg, + bridgeXRPLAddress: bridgeXRPLAddress, + ContractClient: contractClient, + Chains: chains, + ContractOwner: contractOwner, + RunnersParallelGroup: parallel.NewGroup(ctx), + Runners: runners, } t.Cleanup(func() { - runnerEnv.RequireNoErrors(t) + // we can cancel the context now and wait for the runner to stop gracefully + cancel() + err := runnerEnv.RunnersParallelGroup.Wait() + if err == nil || errors.Is(err, context.Canceled) { + return + } + // the client replies with that error in if the context is canceled at the time of the request, + // and the error is in the internal package, so we can't check the type + if strings.Contains(err.Error(), "context canceled") { + return + } + + require.NoError(t, err, "Found unexpected runner process errors after the execution") }) return runnerEnv } // StartAllRunnerProcesses starts all relayer processes. -func (r *RunnerEnv) StartAllRunnerProcesses(ctx context.Context, t *testing.T) { - errCh := make(chan error, len(r.Runners)) - go func() { - for { - select { - case <-ctx.Done(): - if !errors.Is(ctx.Err(), context.Canceled) { - r.ProcessErrorsMu.Lock() - r.ProcessErrors = append(r.ProcessErrors, ctx.Err()) - r.ProcessErrorsMu.Unlock() - } - return - case err := <-errCh: - r.ProcessErrorsMu.Lock() - r.ProcessErrors = append(r.ProcessErrors, err) - r.ProcessErrorsMu.Unlock() - } - } - }() - - for _, relayerRunner := range r.Runners { - go func(relayerRunner *runner.Runner) { +func (r *RunnerEnv) StartAllRunnerProcesses() { + for i := range r.Runners { + relayerRunner := r.Runners[i] + r.RunnersParallelGroup.Spawn(fmt.Sprintf("runner-%d", i), parallel.Exit, func(ctx context.Context) error { // disable restart on error to handler unexpected errors xrplTxObserverProcess := relayerRunner.Processes.XRPLTxObserver xrplTxObserverProcess.IsRestartableOnError = false xrplTxSubmitterProcess := relayerRunner.Processes.XRPLTxSubmitter xrplTxSubmitterProcess.IsRestartableOnError = false - - err := relayerRunner.Processor.StartProcesses(ctx, xrplTxObserverProcess, xrplTxSubmitterProcess) - if err != nil && !errors.Is(err, context.Canceled) { - t.Logf("Unexpected error on process start:%s", err) - errCh <- err - } - }(relayerRunner) + return relayerRunner.Processor.StartProcesses(ctx, xrplTxObserverProcess, xrplTxSubmitterProcess) + }) } } @@ -285,13 +275,6 @@ func (r *RunnerEnv) RegisterXRPLOriginatedToken( return registeredXRPLToken } -// RequireNoErrors check whether the runner err received runner errors. -func (r *RunnerEnv) RequireNoErrors(t *testing.T) { - r.ProcessErrorsMu.RLock() - defer r.ProcessErrorsMu.RUnlock() - require.Empty(t, r.ProcessErrors, "Found unexpected process errors after the execution") -} - // SendXRPLPaymentTx sends Payment transaction. func (r *RunnerEnv) SendXRPLPaymentTx( ctx context.Context, diff --git a/integration-tests/processes/send_test.go b/integration-tests/processes/send_test.go index a67cc366..860f575c 100644 --- a/integration-tests/processes/send_test.go +++ b/integration-tests/processes/send_test.go @@ -31,7 +31,7 @@ func TestSendXRPLOriginatedTokensFromXRPLToCoreumAndBack(t *testing.T) { envCfg := DefaultRunnerEnvConfig() runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains) - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() runnerEnv.AllocateTickets(ctx, t, uint32(200)) coreumSender := chains.Coreum.GenAccount() @@ -138,7 +138,7 @@ func TestSendXRPTokenFromXRPLToCoreumAndBack(t *testing.T) { envCfg := DefaultRunnerEnvConfig() runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains) - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() runnerEnv.AllocateTickets(ctx, t, uint32(200)) coreumSender := chains.Coreum.GenAccount() @@ -220,7 +220,7 @@ func TestSendXRPLOriginatedTokenFromXRPLToCoreumWithMaliciousRelayer(t *testing. envCfg := DefaultRunnerEnvConfig() envCfg.MaliciousRelayerNumber = 1 runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains) - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() runnerEnv.AllocateTickets(ctx, t, uint32(200)) coreumSender := chains.Coreum.GenAccount() @@ -302,7 +302,7 @@ func TestSendXRPLOriginatedTokenFromXRPLToCoreumWithTicketsReallocation(t *testi envCfg := DefaultRunnerEnvConfig() envCfg.UsedTicketSequenceThreshold = 3 runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains) - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() runnerEnv.AllocateTickets(ctx, t, uint32(5)) sendingCount := 10 @@ -430,7 +430,7 @@ func TestSendXRPLOriginatedTokensFromXRPLToCoreumWithDifferentAmountAndPartialAm }) // start relayers - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() // recover tickets so we can register tokens runnerEnv.AllocateTickets(ctx, t, 200) @@ -561,7 +561,7 @@ func TestRecoverXRPLOriginatedTokenRegistrationAndSendFromXRPLToCoreumAndBack(t envCfg := DefaultRunnerEnvConfig() runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains) - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() runnerEnv.AllocateTickets(ctx, t, uint32(200)) coreumSender := chains.Coreum.GenAccount() @@ -712,7 +712,7 @@ func TestSendCoreumOriginatedTokenFromCoreumToXRPLAndBackWithDifferentAmountsAnd runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains) // start relayers - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() // recover tickets so we can register tokens runnerEnv.AllocateTickets(ctx, t, 200) @@ -883,7 +883,7 @@ func TestSendCoreumOriginatedTokenFromCoreumToXRPLAndBackWithMaliciousRelayer(t runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains) // start relayers - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() // recover tickets so we can register tokens runnerEnv.AllocateTickets(ctx, t, 200) diff --git a/integration-tests/processes/ticket_allocation_test.go b/integration-tests/processes/ticket_allocation_test.go index 76eb6595..ac001391 100644 --- a/integration-tests/processes/ticket_allocation_test.go +++ b/integration-tests/processes/ticket_allocation_test.go @@ -25,7 +25,7 @@ func TestTicketsAllocationRecoveryWithAccountSequence(t *testing.T) { require.NoError(t, err) require.Empty(t, availableTickets) - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() chains.XRPL.FundAccountForTicketAllocation(ctx, t, runnerEnv.bridgeXRPLAddress, numberOfTicketsToAllocate) bridgeXRPLAccountInfo, err := chains.XRPL.RPCClient().AccountInfo(ctx, runnerEnv.bridgeXRPLAddress) @@ -57,7 +57,7 @@ func TestTicketsAllocationRecoveryWithRejection(t *testing.T) { require.NoError(t, err) require.Empty(t, availableTickets) - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() // we don't fund the contract for the tickets allocation to let the chain reject the allocation transaction bridgeXRPLAccountInfo, err := chains.XRPL.RPCClient().AccountInfo(ctx, runnerEnv.bridgeXRPLAddress) @@ -90,7 +90,7 @@ func TestTicketsAllocationRecoveryWithInvalidAccountSequence(t *testing.T) { require.NoError(t, err) require.Empty(t, availableTickets) - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() chains.XRPL.FundAccountForTicketAllocation(ctx, t, runnerEnv.bridgeXRPLAddress, numberOfTicketsToAllocate) bridgeXRPLAccountInfo, err := chains.XRPL.RPCClient().AccountInfo(ctx, runnerEnv.bridgeXRPLAddress) @@ -153,7 +153,7 @@ func TestTicketsAllocationRecoveryWithMaliciousRelayers(t *testing.T) { require.NoError(t, err) require.Empty(t, availableTickets) - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() chains.XRPL.FundAccountForTicketAllocation(ctx, t, runnerEnv.bridgeXRPLAddress, numberOfTicketsToAllocate) @@ -184,7 +184,7 @@ func TestTicketsReAllocationByTheXRPLTokenRegistration(t *testing.T) { envCfg.UsedTicketSequenceThreshold = 3 runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains) - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() // allocate first five tickets numberOfTicketsToAllocate := uint32(5) diff --git a/integration-tests/xrpl/scanner_test.go b/integration-tests/xrpl/scanner_test.go index 844e18d0..a6847789 100644 --- a/integration-tests/xrpl/scanner_test.go +++ b/integration-tests/xrpl/scanner_test.go @@ -8,10 +8,12 @@ import ( "testing" "time" + "github.com/pkg/errors" rippledata "github.com/rubblelabs/ripple/data" "github.com/stretchr/testify/require" "github.com/CoreumFoundation/coreum-tools/pkg/http" + "github.com/CoreumFoundation/coreum-tools/pkg/parallel" integrationtests "github.com/CoreumFoundation/coreumbridge-xrpl/integration-tests" "github.com/CoreumFoundation/coreumbridge-xrpl/relayer/xrpl" ) @@ -52,9 +54,18 @@ func TestFullHistoryScanAccountTx(t *testing.T) { // add timeout to finish the tests in case of error txsCh := make(chan rippledata.TransactionWithMetaData, txsCount) - require.NoError(t, scanner.ScanTxs(ctx, txsCh)) + t.Logf("Waiting for %d transactions to be scanned by the historycal scanner", len(writtenTxHashes)) - validateTxsHashesInChannel(ctx, t, writtenTxHashes, txsCh) + + require.NoError(t, parallel.Run(ctx, func(ctx context.Context, spawn parallel.SpawnFn) error { + spawn("scan", parallel.Continue, func(ctx context.Context) error { + return scanner.ScanTxs(ctx, txsCh) + }) + spawn("read", parallel.Exit, func(ctx context.Context) error { + return validateTxsHashesInChannel(ctx, writtenTxHashes, txsCh) + }) + return nil + })) } func TestRecentHistoryScanAccountTx(t *testing.T) { @@ -93,29 +104,34 @@ func TestRecentHistoryScanAccountTx(t *testing.T) { // await for the state when the current ledger is valid to run the scanner currentLedger, err := chains.XRPL.RPCClient().LedgerCurrent(ctx) require.NoError(t, err) - chains.XRPL.AwaitLedger(ctx, t, - currentLedger.LedgerCurrentIndex+ - scannerCfg.RecentScanWindow) + chains.XRPL.AwaitLedger(ctx, t, currentLedger.LedgerCurrentIndex+scannerCfg.RecentScanWindow) var writtenTxHashes map[string]struct{} - writeDone := make(chan struct{}) - go func() { - defer close(writeDone) - writtenTxHashes = sendMultipleTxs(ctx, t, chains.XRPL, 20, senderAcc, recipientAcc) - }() + receivedTxHashes := make(map[string]struct{}) txsCh := make(chan rippledata.TransactionWithMetaData, txsCount) - require.NoError(t, scanner.ScanTxs(ctx, txsCh)) - - t.Logf("Waiting for %d transactions to be scanned by the recent scanner", len(writtenTxHashes)) - receivedTxHashes := getTxHashesFromChannel(ctx, t, txsCh, txsCount) + require.NoError(t, parallel.Run(ctx, func(ctx context.Context, spawn parallel.SpawnFn) error { + spawn("scan", parallel.Continue, func(ctx context.Context) error { + return scanner.ScanTxs(ctx, txsCh) + }) + // write and exist + spawn("write", parallel.Exit, func(ctx context.Context) error { + writtenTxHashes = sendMultipleTxs(ctx, t, chains.XRPL, txsCount, senderAcc, recipientAcc) + return nil + }) + spawn("wait", parallel.Continue, func(ctx context.Context) error { + t.Logf("Waiting for %d transactions to be scanned by the scanner", txsCount) + for tx := range txsCh { + receivedTxHashes[tx.GetHash().String()] = struct{}{} + if len(receivedTxHashes) == txsCount { + return nil + } + } + return nil + }) + return nil + })) - // wait for the writing to be done - select { - case <-ctx.Done(): - t.FailNow() - case <-writeDone: - } require.Equal(t, writtenTxHashes, receivedTxHashes) } @@ -145,8 +161,8 @@ func sendMultipleTxs( } func validateTxsHashesInChannel( - ctx context.Context, t *testing.T, writtenTxHashes map[string]struct{}, txsCh chan rippledata.TransactionWithMetaData, -) { + ctx context.Context, writtenTxHashes map[string]struct{}, txsCh chan rippledata.TransactionWithMetaData, +) error { scanCtx, scanCtxCancel := context.WithTimeout(ctx, time.Minute) defer scanCtxCancel() // copy the original map @@ -157,34 +173,17 @@ func validateTxsHashesInChannel( for { select { case <-scanCtx.Done(): - t.Fail() + return scanCtx.Err() case tx := <-txsCh: // validate that we have all sent hashed and no duplicated hash := tx.GetHash().String() - _, found := expectedHashes[hash] - require.True(t, found) - delete(expectedHashes, hash) - if len(expectedHashes) == 0 { - return + if _, found := expectedHashes[hash]; !found { + return errors.Errorf("not found expected tx hash:%s", hash) } - } - } -} -func getTxHashesFromChannel( - ctx context.Context, t *testing.T, txsCh chan rippledata.TransactionWithMetaData, count int, -) map[string]struct{} { - scanCtx, scanCtxCancel := context.WithTimeout(ctx, time.Minute) - defer scanCtxCancel() - txHashes := make(map[string]struct{}, count) - for { - select { - case <-scanCtx.Done(): - t.Fail() - case tx := <-txsCh: - txHashes[tx.GetHash().String()] = struct{}{} - if len(txHashes) == count { - return txHashes + delete(expectedHashes, hash) + if len(expectedHashes) == 0 { + return nil } } } diff --git a/relayer/go.mod b/relayer/go.mod index 7130d3b4..6cd8d9a3 100644 --- a/relayer/go.mod +++ b/relayer/go.mod @@ -20,7 +20,7 @@ replace ( require ( cosmossdk.io/math v1.1.2 - github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b + github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872 github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd github.com/CosmWasm/wasmd v0.41.0 github.com/cosmos/cosmos-sdk v0.47.5 diff --git a/relayer/go.sum b/relayer/go.sum index 9f1dc0a4..86b05c68 100644 --- a/relayer/go.sum +++ b/relayer/go.sum @@ -222,8 +222,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d h1:nalkkPQcITbvhmL4+C4cKA87NW0tfm3Kl9VXRoPywFg= github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d/go.mod h1:URdX5+vg25ts3aCh8H5IFZybJYKWhJHYMTnf+ULtoC4= -github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b h1:nSNvOe9oRVl0Ijph3u/e1nZi7j4vGhYyTI3NVbPLdXI= -github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b/go.mod h1:VD93vCHkxYaT/RhOesXTFgd/GQDW54tr0BqGi5JU1c0= +github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872 h1:cEkMNqZamgrLwUAqjhRcoF0h3Z/AZY6mp20H8N0P/98= +github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872/go.mod h1:VD93vCHkxYaT/RhOesXTFgd/GQDW54tr0BqGi5JU1c0= github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd h1:NEwCGG9i6yjPY/avFTcrCDF16zvzIcvldnUR8AZtA7U= github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd/go.mod h1:XTqILFqH1e0GF1bYEnu/I0mElsfwH5OWfu2F5DACIjY= github.com/CosmWasm/wasmd v0.41.0 h1:fmwxSbwb50zZDcBaayYFRLIaSFca+EFld1WOaQi49jg= diff --git a/relayer/logger/logger.go b/relayer/logger/logger.go index c85405b5..0abff8b9 100644 --- a/relayer/logger/logger.go +++ b/relayer/logger/logger.go @@ -4,9 +4,16 @@ import ( "context" "go.uber.org/zap" + + "github.com/CoreumFoundation/coreum-tools/pkg/parallel" ) -//go:generate mockgen -destination=mock.go -package=logger . Logger +//go:generate mockgen -destination=mock.go -package=logger . Logger,ParallelLogger + +// ParallelLogger is parallel logger interface used mostly for mocks. +type ParallelLogger interface { + parallel.Logger +} // A Field is a marshaling operation used to add a key-value pair to a logger's context. Most fields are lazily // marshaled, so it's inexpensive to add fields to disabled debug-level log statements. @@ -27,6 +34,7 @@ type Logger interface { Info(ctx context.Context, msg string, fields ...Field) Warn(ctx context.Context, msg string, fields ...Field) Error(ctx context.Context, msg string, fields ...Field) + ParallelLogger(ctx context.Context) ParallelLogger } // AnyField takes a key and an arbitrary value and chooses the best way to represent them as a field, falling back to a @@ -55,6 +63,11 @@ func Uint64Field(key string, value uint64) Field { return convertZapFieldToField(zap.Uint64(key, value)) } +// ByteStringField constructs a field with the given key and value. +func ByteStringField(key string, value []byte) Field { + return convertZapFieldToField(zap.ByteString(key, value)) +} + // Error is shorthand for the common idiom NamedError("error", err). func Error(err error) Field { return convertZapFieldToField(zap.Error(err)) diff --git a/relayer/logger/mock.go b/relayer/logger/mock.go index 25a7fe0d..4639e892 100644 --- a/relayer/logger/mock.go +++ b/relayer/logger/mock.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/CoreumFoundation/coreumbridge-xrpl/relayer/logger (interfaces: Logger) +// Source: github.com/CoreumFoundation/coreumbridge-xrpl/relayer/logger (interfaces: Logger,ParallelLogger) // Package logger is a generated GoMock package. package logger @@ -8,6 +8,7 @@ import ( context "context" reflect "reflect" + parallel "github.com/CoreumFoundation/coreum-tools/pkg/parallel" gomock "github.com/golang/mock/gomock" ) @@ -85,6 +86,20 @@ func (mr *MockLoggerMockRecorder) Info(arg0, arg1 interface{}, arg2 ...interface return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Info", reflect.TypeOf((*MockLogger)(nil).Info), varargs...) } +// ParallelLogger mocks base method. +func (m *MockLogger) ParallelLogger(arg0 context.Context) ParallelLogger { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ParallelLogger", arg0) + ret0, _ := ret[0].(ParallelLogger) + return ret0 +} + +// ParallelLogger indicates an expected call of ParallelLogger. +func (mr *MockLoggerMockRecorder) ParallelLogger(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ParallelLogger", reflect.TypeOf((*MockLogger)(nil).ParallelLogger), arg0) +} + // Warn mocks base method. func (m *MockLogger) Warn(arg0 context.Context, arg1 string, arg2 ...Field) { m.ctrl.T.Helper() @@ -101,3 +116,50 @@ func (mr *MockLoggerMockRecorder) Warn(arg0, arg1 interface{}, arg2 ...interface varargs := append([]interface{}{arg0, arg1}, arg2...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Warn", reflect.TypeOf((*MockLogger)(nil).Warn), varargs...) } + +// MockParallelLogger is a mock of ParallelLogger interface. +type MockParallelLogger struct { + ctrl *gomock.Controller + recorder *MockParallelLoggerMockRecorder +} + +// MockParallelLoggerMockRecorder is the mock recorder for MockParallelLogger. +type MockParallelLoggerMockRecorder struct { + mock *MockParallelLogger +} + +// NewMockParallelLogger creates a new mock instance. +func NewMockParallelLogger(ctrl *gomock.Controller) *MockParallelLogger { + mock := &MockParallelLogger{ctrl: ctrl} + mock.recorder = &MockParallelLoggerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockParallelLogger) EXPECT() *MockParallelLoggerMockRecorder { + return m.recorder +} + +// Debug mocks base method. +func (m *MockParallelLogger) Debug(arg0 string, arg1 int64, arg2 parallel.OnExit, arg3 string) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Debug", arg0, arg1, arg2, arg3) +} + +// Debug indicates an expected call of Debug. +func (mr *MockParallelLoggerMockRecorder) Debug(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Debug", reflect.TypeOf((*MockParallelLogger)(nil).Debug), arg0, arg1, arg2, arg3) +} + +// Error mocks base method. +func (m *MockParallelLogger) Error(arg0 string, arg1 int64, arg2 parallel.OnExit, arg3 string, arg4 error) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Error", arg0, arg1, arg2, arg3, arg4) +} + +// Error indicates an expected call of Error. +func (mr *MockParallelLoggerMockRecorder) Error(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Error", reflect.TypeOf((*MockParallelLogger)(nil).Error), arg0, arg1, arg2, arg3, arg4) +} diff --git a/relayer/logger/zap.go b/relayer/logger/zap.go index c6807190..5e39ce6e 100644 --- a/relayer/logger/zap.go +++ b/relayer/logger/zap.go @@ -2,6 +2,7 @@ package logger import ( "context" + "fmt" "strings" "github.com/pkg/errors" @@ -9,6 +10,7 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zapcore" + "github.com/CoreumFoundation/coreum-tools/pkg/parallel" "github.com/CoreumFoundation/coreumbridge-xrpl/relayer/tracing" ) @@ -20,6 +22,60 @@ const ( tracingProcessFieldName = "process" ) +var _ ParallelLogger = &ParallelZapLogger{} + +// ParallelZapLogger is parallel zap logger. +type ParallelZapLogger struct { + //nolint:containedctx // the design depends on the parallel logger design where the ctx is set similar + ctx context.Context + zapLog *ZapLogger +} + +// NewParallelZapLogger return new instance of the ParallelZapLogger. +func NewParallelZapLogger(ctx context.Context, zapLog *ZapLogger) *ParallelZapLogger { + return &ParallelZapLogger{ + ctx: ctx, + zapLog: zapLog, + } +} + +// Debug prints debug log. +func (p *ParallelZapLogger) Debug(name string, id int64, onExit parallel.OnExit, message string) { + p.zapLog.Named(name).Debug( + p.ctx, message, + Int64Field("id", id), + StringField("onExit", onExit.String()), + ) +} + +// Error prints error log. +func (p *ParallelZapLogger) Error(name string, id int64, onExit parallel.OnExit, message string, err error) { + // the context canceled is not an error + if errors.Is(err, context.Canceled) { + return + } + var panicErr parallel.ErrPanic + if errors.As(err, &panicErr) { + p.zapLog.Named(name).Error( + p.ctx, + message, + Int64Field("id", id), + StringField("onExit", onExit.String()), + StringField("value", fmt.Sprint(panicErr.Value)), + ByteStringField("stack", panicErr.Stack), + Error(err), + ) + return + } + p.zapLog.Named(name).Error( + p.ctx, + message, + Int64Field("id", id), + StringField("onExit", onExit.String()), + Error(err), + ) +} + // ZapLoggerConfig is ZapLogger config. type ZapLoggerConfig struct { Level string @@ -77,28 +133,39 @@ func NewZapLogger(cfg ZapLoggerConfig) (*ZapLogger, error) { // Debug logs a message at DebugLevel. The message includes any fields passed at the log site, as well as any fields // accumulated on the logger. -func (z ZapLogger) Debug(ctx context.Context, msg string, fields ...Field) { +func (z *ZapLogger) Debug(ctx context.Context, msg string, fields ...Field) { z.zapLogger.Debug(msg, filedToZapField(ctx, fields...)...) } // Info logs a message at InfoLevel. The message includes any fields passed at the log site, as well as any fields // accumulated on the logger. -func (z ZapLogger) Info(ctx context.Context, msg string, fields ...Field) { +func (z *ZapLogger) Info(ctx context.Context, msg string, fields ...Field) { z.zapLogger.Info(msg, filedToZapField(ctx, fields...)...) } // Warn logs a message at WarnLevel. The message includes any fields passed at the log site, as well as any fields // accumulated on the logger. -func (z ZapLogger) Warn(ctx context.Context, msg string, fields ...Field) { +func (z *ZapLogger) Warn(ctx context.Context, msg string, fields ...Field) { z.zapLogger.Warn(msg, filedToZapField(ctx, fields...)...) } // Error logs a message at ErrorLevel. The message includes any fields passed at the log site, as well as any fields // accumulated on the logger. -func (z ZapLogger) Error(ctx context.Context, msg string, fields ...Field) { +func (z *ZapLogger) Error(ctx context.Context, msg string, fields ...Field) { z.zapLogger.Error(msg, filedToZapField(ctx, fields...)...) } +// Named adds a new path segment to the logger's name. Segments are joined by +// periods. By default, Loggers are unnamed. +func (z *ZapLogger) Named(name string) *ZapLogger { + return NewZapLoggerFromLogger(z.zapLogger.Named(name)) +} + +// ParallelLogger returns parallel zap logger. +func (z *ZapLogger) ParallelLogger(ctx context.Context) ParallelLogger { + return NewParallelZapLogger(ctx, z) +} + func filedToZapField(ctx context.Context, fields ...Field) []zap.Field { zapFields := lo.Map(fields, func(filed Field, _ int) zap.Field { return zap.Field{ diff --git a/relayer/processes/processor.go b/relayer/processes/processor.go index 79f763c1..820bcfb0 100644 --- a/relayer/processes/processor.go +++ b/relayer/processes/processor.go @@ -2,10 +2,11 @@ package processes import ( "context" - "sync" + "runtime/debug" "github.com/pkg/errors" + "github.com/CoreumFoundation/coreum-tools/pkg/parallel" "github.com/CoreumFoundation/coreumbridge-xrpl/relayer/logger" "github.com/CoreumFoundation/coreumbridge-xrpl/relayer/tracing" ) @@ -47,46 +48,45 @@ func (p *Processor) StartProcesses(ctx context.Context, processes ...ProcessWith } } - wg := sync.WaitGroup{} - wg.Add(len(processes)) - for _, process := range processes { - go func(process ProcessWithOptions) { - // set process name to the context - ctx = tracing.WithTracingProcess(ctx, process.Name) - defer wg.Done() - defer func() { - if r := recover(); r != nil { - p.log.Error(ctx, "Received panic during the process execution", logger.Error(errors.Errorf("%s", r))) - if !process.IsRestartableOnError { - p.log.Warn(ctx, "The process is not auto-restartable on error") - return - } - p.log.Info(ctx, "Restarting process after the panic") - p.startProcessWithRestartOnError(ctx, process) - } - }() - p.startProcessWithRestartOnError(ctx, process) - }(process) - } - wg.Wait() - - return nil + return parallel.Run(ctx, func(ctx context.Context, spawn parallel.SpawnFn) error { + for i := range processes { + process := processes[i] + spawn(process.Name, parallel.Continue, func(ctx context.Context) error { + ctx = tracing.WithTracingProcess(ctx, process.Name) + return p.startProcessWithRestartOnError(ctx, process) + }) + } + return nil + }, parallel.WithGroupLogger(p.log.ParallelLogger(ctx))) } -func (p *Processor) startProcessWithRestartOnError(ctx context.Context, process ProcessWithOptions) { +func (p *Processor) startProcessWithRestartOnError(ctx context.Context, process ProcessWithOptions) error { for { - if err := process.Process.Start(ctx); err != nil { + // start process and handle the panic + err := func() (err error) { + defer func() { + if p := recover(); p != nil { + err = errors.Wrapf( + parallel.ErrPanic{Value: p, Stack: debug.Stack()}, + "handled panic on process:%s", process.Name, + ) + } + }() + return process.Process.Start(ctx) + }() + // restart the process is it is restartable + if err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - return + return nil } p.log.Error(ctx, "Received unexpected error from the process", logger.Error(err)) if !process.IsRestartableOnError { p.log.Warn(ctx, "The process is not auto-restartable on error") - break + return err } p.log.Info(ctx, "Restarting process after the error") } else { - return + return nil } } } diff --git a/relayer/processes/processor_test.go b/relayer/processes/processor_test.go index 27d66a92..f3b96f76 100644 --- a/relayer/processes/processor_test.go +++ b/relayer/processes/processor_test.go @@ -92,6 +92,7 @@ func TestProcessor_StartProcesses(t *testing.T) { } }, logErrorsCount: 1, + wantErr: true, }, { name: "singe_process_with_error_restartable", @@ -157,6 +158,7 @@ func TestProcessor_StartProcesses(t *testing.T) { } }, logErrorsCount: 1, + wantErr: true, }, { name: "singe_process_with_panic_restartable", @@ -234,6 +236,12 @@ func TestProcessor_StartProcesses(t *testing.T) { ctrl := gomock.NewController(t) logMock := logger.NewAnyLogMock(ctrl) + + parallelLoggerMock := logger.NewMockParallelLogger(ctrl) + logMock.EXPECT().ParallelLogger(gomock.Any()).Return(parallelLoggerMock).AnyTimes() + parallelLoggerMock.EXPECT().Debug(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + parallelLoggerMock.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + if tt.logErrorsCount > 0 { logMock.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).Times(tt.logErrorsCount) } diff --git a/relayer/processes/xrpl_tx_observer.go b/relayer/processes/xrpl_tx_observer.go index 006e4e13..e5f65fa5 100644 --- a/relayer/processes/xrpl_tx_observer.go +++ b/relayer/processes/xrpl_tx_observer.go @@ -9,6 +9,7 @@ import ( rippledata "github.com/rubblelabs/ripple/data" "github.com/samber/lo" + "github.com/CoreumFoundation/coreum-tools/pkg/parallel" "github.com/CoreumFoundation/coreumbridge-xrpl/relayer/coreum" "github.com/CoreumFoundation/coreumbridge-xrpl/relayer/logger" "github.com/CoreumFoundation/coreumbridge-xrpl/relayer/tracing" @@ -61,24 +62,26 @@ func (o *XRPLTxObserver) Init(ctx context.Context) error { // Start starts the process. func (o *XRPLTxObserver) Start(ctx context.Context) error { txCh := make(chan rippledata.TransactionWithMetaData) - if err := o.txScanner.ScanTxs(ctx, txCh); err != nil { - return err - } - - for { - select { - case <-ctx.Done(): - return errors.WithStack(ctx.Err()) - case tx := <-txCh: - if err := o.processTx(ctx, tx); err != nil { - if errors.Is(err, context.Canceled) { - o.log.Warn(ctx, "Context canceled during the XRPL tx processing", logger.StringField("error", err.Error())) - } else { - o.log.Error(ctx, "Failed to process XRPL tx", logger.Error(err)) + return parallel.Run(ctx, func(ctx context.Context, spawn parallel.SpawnFn) error { + spawn("tx-scanner", parallel.Continue, func(ctx context.Context) error { + defer close(txCh) + return o.txScanner.ScanTxs(ctx, txCh) + }) + spawn("tx-processor", parallel.Fail, func(ctx context.Context) error { + for tx := range txCh { + if err := o.processTx(ctx, tx); err != nil { + if errors.Is(err, context.Canceled) { + o.log.Warn(ctx, "Context canceled during the XRPL tx processing", logger.StringField("error", err.Error())) + } else { + return errors.Wrapf(err, "failed to process XRPL tx, txHash:%s", tx.GetHash().String()) + } } } - } - } + return errors.WithStack(ctx.Err()) + }) + + return nil + }, parallel.WithGroupLogger(o.log.ParallelLogger(ctx))) } func (o *XRPLTxObserver) processTx(ctx context.Context, tx rippledata.TransactionWithMetaData) error { diff --git a/relayer/processes/xrpl_tx_observer_test.go b/relayer/processes/xrpl_tx_observer_test.go index b1f49eff..f95bace4 100644 --- a/relayer/processes/xrpl_tx_observer_test.go +++ b/relayer/processes/xrpl_tx_observer_test.go @@ -100,10 +100,8 @@ func TestXRPLTxObserver_Start(t *testing.T) { xrplAccountTxScannerMock := NewMockXRPLAccountTxScanner(ctrl) xrplAccountTxScannerMock.EXPECT().ScanTxs(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, ch chan<- rippledata.TransactionWithMetaData) error { - go func() { - ch <- xrplOriginatedTokenPaymentWithMetadataTx - cancel() - }() + ch <- xrplOriginatedTokenPaymentWithMetadataTx + cancel() return nil }) @@ -132,10 +130,8 @@ func TestXRPLTxObserver_Start(t *testing.T) { xrplAccountTxScannerMock := NewMockXRPLAccountTxScanner(ctrl) xrplAccountTxScannerMock.EXPECT().ScanTxs(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, ch chan<- rippledata.TransactionWithMetaData) error { - go func() { - ch <- coreumOriginatedTokenPaymentWithMetadataTx - cancel() - }() + ch <- coreumOriginatedTokenPaymentWithMetadataTx + cancel() return nil }) @@ -166,15 +162,13 @@ func TestXRPLTxObserver_Start(t *testing.T) { xrplAccountTxScannerMock := NewMockXRPLAccountTxScanner(ctrl) xrplAccountTxScannerMock.EXPECT().ScanTxs(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, ch chan<- rippledata.TransactionWithMetaData) error { - go func() { - ch <- rippledata.TransactionWithMetaData{ - Transaction: &rippledata.Payment{}, - MetaData: rippledata.MetaData{ - TransactionResult: failTxResult, - }, - } - cancel() - }() + ch <- rippledata.TransactionWithMetaData{ + Transaction: &rippledata.Payment{}, + MetaData: rippledata.MetaData{ + TransactionResult: failTxResult, + }, + } + cancel() return nil }) @@ -187,15 +181,13 @@ func TestXRPLTxObserver_Start(t *testing.T) { xrplAccountTxScannerMock := NewMockXRPLAccountTxScanner(ctrl) xrplAccountTxScannerMock.EXPECT().ScanTxs(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, ch chan<- rippledata.TransactionWithMetaData) error { - go func() { - ch <- rippledata.TransactionWithMetaData{ - Transaction: &rippledata.Payment{}, - MetaData: rippledata.MetaData{ - TransactionResult: notTxResult, - }, - } - cancel() - }() + ch <- rippledata.TransactionWithMetaData{ + Transaction: &rippledata.Payment{}, + MetaData: rippledata.MetaData{ + TransactionResult: notTxResult, + }, + } + cancel() return nil }) @@ -208,16 +200,14 @@ func TestXRPLTxObserver_Start(t *testing.T) { xrplAccountTxScannerMock := NewMockXRPLAccountTxScanner(ctrl) xrplAccountTxScannerMock.EXPECT().ScanTxs(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, ch chan<- rippledata.TransactionWithMetaData) error { - go func() { - ch <- rippledata.TransactionWithMetaData{ - Transaction: &rippledata.TrustSet{ - TxBase: rippledata.TxBase{ - TransactionType: rippledata.TRUST_SET, - }, + ch <- rippledata.TransactionWithMetaData{ + Transaction: &rippledata.TrustSet{ + TxBase: rippledata.TxBase{ + TransactionType: rippledata.TRUST_SET, }, - } - cancel() - }() + }, + } + cancel() return nil }) @@ -230,19 +220,17 @@ func TestXRPLTxObserver_Start(t *testing.T) { xrplAccountTxScannerMock := NewMockXRPLAccountTxScanner(ctrl) xrplAccountTxScannerMock.EXPECT().ScanTxs(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, ch chan<- rippledata.TransactionWithMetaData) error { - go func() { - ch <- rippledata.TransactionWithMetaData{ - Transaction: &rippledata.TicketCreate{ - TxBase: rippledata.TxBase{ - Account: bridgeXRPLAddress, - Sequence: 5, - TransactionType: rippledata.TICKET_CREATE, - }, + ch <- rippledata.TransactionWithMetaData{ + Transaction: &rippledata.TicketCreate{ + TxBase: rippledata.TxBase{ + Account: bridgeXRPLAddress, + Sequence: 5, + TransactionType: rippledata.TICKET_CREATE, }, - MetaData: createAllocatedTicketsMetaData([]uint32{3, 5, 7}), - } - cancel() - }() + }, + MetaData: createAllocatedTicketsMetaData([]uint32{3, 5, 7}), + } + cancel() return nil }) @@ -272,19 +260,17 @@ func TestXRPLTxObserver_Start(t *testing.T) { xrplAccountTxScannerMock := NewMockXRPLAccountTxScanner(ctrl) xrplAccountTxScannerMock.EXPECT().ScanTxs(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, ch chan<- rippledata.TransactionWithMetaData) error { - go func() { - ch <- rippledata.TransactionWithMetaData{ - Transaction: &rippledata.TicketCreate{ - TxBase: rippledata.TxBase{ - Account: bridgeXRPLAddress, - TransactionType: rippledata.TICKET_CREATE, - }, - TicketSequence: lo.ToPtr(uint32(11)), + ch <- rippledata.TransactionWithMetaData{ + Transaction: &rippledata.TicketCreate{ + TxBase: rippledata.TxBase{ + Account: bridgeXRPLAddress, + TransactionType: rippledata.TICKET_CREATE, }, - MetaData: createAllocatedTicketsMetaData([]uint32{3, 5, 7}), - } - cancel() - }() + TicketSequence: lo.ToPtr(uint32(11)), + }, + MetaData: createAllocatedTicketsMetaData([]uint32{3, 5, 7}), + } + cancel() return nil }) @@ -314,21 +300,19 @@ func TestXRPLTxObserver_Start(t *testing.T) { xrplAccountTxScannerMock := NewMockXRPLAccountTxScanner(ctrl) xrplAccountTxScannerMock.EXPECT().ScanTxs(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, ch chan<- rippledata.TransactionWithMetaData) error { - go func() { - ch <- rippledata.TransactionWithMetaData{ - Transaction: &rippledata.TicketCreate{ - TxBase: rippledata.TxBase{ - Account: bridgeXRPLAddress, - Sequence: 5, - TransactionType: rippledata.TICKET_CREATE, - }, + ch <- rippledata.TransactionWithMetaData{ + Transaction: &rippledata.TicketCreate{ + TxBase: rippledata.TxBase{ + Account: bridgeXRPLAddress, + Sequence: 5, + TransactionType: rippledata.TICKET_CREATE, }, - MetaData: rippledata.MetaData{ - TransactionResult: failTxResult, - }, - } - cancel() - }() + }, + MetaData: rippledata.MetaData{ + TransactionResult: failTxResult, + }, + } + cancel() return nil }) @@ -358,19 +342,17 @@ func TestXRPLTxObserver_Start(t *testing.T) { xrplAccountTxScannerMock := NewMockXRPLAccountTxScanner(ctrl) xrplAccountTxScannerMock.EXPECT().ScanTxs(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, ch chan<- rippledata.TransactionWithMetaData) error { - go func() { - ch <- rippledata.TransactionWithMetaData{ - Transaction: &rippledata.TrustSet{ - TxBase: rippledata.TxBase{ - Account: bridgeXRPLAddress, - TransactionType: rippledata.TRUST_SET, - }, - LimitAmount: xrplOriginatedTokenXRPLAmount, - TicketSequence: lo.ToPtr(uint32(11)), + ch <- rippledata.TransactionWithMetaData{ + Transaction: &rippledata.TrustSet{ + TxBase: rippledata.TxBase{ + Account: bridgeXRPLAddress, + TransactionType: rippledata.TRUST_SET, }, - } - cancel() - }() + LimitAmount: xrplOriginatedTokenXRPLAmount, + TicketSequence: lo.ToPtr(uint32(11)), + }, + } + cancel() return nil }) @@ -401,22 +383,20 @@ func TestXRPLTxObserver_Start(t *testing.T) { xrplAccountTxScannerMock := NewMockXRPLAccountTxScanner(ctrl) xrplAccountTxScannerMock.EXPECT().ScanTxs(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, ch chan<- rippledata.TransactionWithMetaData) error { - go func() { - ch <- rippledata.TransactionWithMetaData{ - Transaction: &rippledata.TrustSet{ - TxBase: rippledata.TxBase{ - Account: bridgeXRPLAddress, - TransactionType: rippledata.TRUST_SET, - }, - LimitAmount: xrplOriginatedTokenXRPLAmount, - TicketSequence: lo.ToPtr(uint32(11)), - }, - MetaData: rippledata.MetaData{ - TransactionResult: failTxResult, + ch <- rippledata.TransactionWithMetaData{ + Transaction: &rippledata.TrustSet{ + TxBase: rippledata.TxBase{ + Account: bridgeXRPLAddress, + TransactionType: rippledata.TRUST_SET, }, - } - cancel() - }() + LimitAmount: xrplOriginatedTokenXRPLAmount, + TicketSequence: lo.ToPtr(uint32(11)), + }, + MetaData: rippledata.MetaData{ + TransactionResult: failTxResult, + }, + } + cancel() return nil }) @@ -447,20 +427,18 @@ func TestXRPLTxObserver_Start(t *testing.T) { xrplAccountTxScannerMock := NewMockXRPLAccountTxScanner(ctrl) xrplAccountTxScannerMock.EXPECT().ScanTxs(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, ch chan<- rippledata.TransactionWithMetaData) error { - go func() { - ch <- rippledata.TransactionWithMetaData{ - Transaction: &rippledata.Payment{ - TxBase: rippledata.TxBase{ - Account: bridgeXRPLAddress, - TransactionType: rippledata.PAYMENT, - }, - Destination: recipientXRPLAddress, - Amount: xrplOriginatedTokenXRPLAmount, - TicketSequence: lo.ToPtr(uint32(11)), + ch <- rippledata.TransactionWithMetaData{ + Transaction: &rippledata.Payment{ + TxBase: rippledata.TxBase{ + Account: bridgeXRPLAddress, + TransactionType: rippledata.PAYMENT, }, - } - cancel() - }() + Destination: recipientXRPLAddress, + Amount: xrplOriginatedTokenXRPLAmount, + TicketSequence: lo.ToPtr(uint32(11)), + }, + } + cancel() return nil }) @@ -489,23 +467,21 @@ func TestXRPLTxObserver_Start(t *testing.T) { xrplAccountTxScannerMock := NewMockXRPLAccountTxScanner(ctrl) xrplAccountTxScannerMock.EXPECT().ScanTxs(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, ch chan<- rippledata.TransactionWithMetaData) error { - go func() { - ch <- rippledata.TransactionWithMetaData{ - Transaction: &rippledata.Payment{ - TxBase: rippledata.TxBase{ - Account: bridgeXRPLAddress, - TransactionType: rippledata.PAYMENT, - }, - Destination: recipientXRPLAddress, - Amount: xrplOriginatedTokenXRPLAmount, - TicketSequence: lo.ToPtr(uint32(11)), + ch <- rippledata.TransactionWithMetaData{ + Transaction: &rippledata.Payment{ + TxBase: rippledata.TxBase{ + Account: bridgeXRPLAddress, + TransactionType: rippledata.PAYMENT, }, - MetaData: rippledata.MetaData{ - TransactionResult: failTxResult, - }, - } - cancel() - }() + Destination: recipientXRPLAddress, + Amount: xrplOriginatedTokenXRPLAmount, + TicketSequence: lo.ToPtr(uint32(11)), + }, + MetaData: rippledata.MetaData{ + TransactionResult: failTxResult, + }, + } + cancel() return nil }) @@ -534,17 +510,15 @@ func TestXRPLTxObserver_Start(t *testing.T) { xrplAccountTxScannerMock := NewMockXRPLAccountTxScanner(ctrl) xrplAccountTxScannerMock.EXPECT().ScanTxs(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, ch chan<- rippledata.TransactionWithMetaData) error { - go func() { - ch <- rippledata.TransactionWithMetaData{ - Transaction: &rippledata.TrustSet{ - TxBase: rippledata.TxBase{ - Account: bridgeXRPLAddress, - TransactionType: rippledata.NFTOKEN_CREATE_OFFER, - }, + ch <- rippledata.TransactionWithMetaData{ + Transaction: &rippledata.TrustSet{ + TxBase: rippledata.TxBase{ + Account: bridgeXRPLAddress, + TransactionType: rippledata.NFTOKEN_CREATE_OFFER, }, - } - cancel() - }() + }, + } + cancel() return nil }) @@ -561,7 +535,14 @@ func TestXRPLTxObserver_Start(t *testing.T) { t.Cleanup(cancel) ctrl := gomock.NewController(t) + logMock := logger.NewAnyLogMock(ctrl) + + parallelLoggerMock := logger.NewMockParallelLogger(ctrl) + logMock.EXPECT().ParallelLogger(gomock.Any()).Return(parallelLoggerMock).AnyTimes() + parallelLoggerMock.EXPECT().Debug(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + parallelLoggerMock.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + var contractClient processes.ContractClient if tt.contractClientBuilder != nil { contractClient = tt.contractClientBuilder(ctrl) diff --git a/relayer/processes/xrpl_tx_submitter.go b/relayer/processes/xrpl_tx_submitter.go index c86a6e2e..6d912850 100644 --- a/relayer/processes/xrpl_tx_submitter.go +++ b/relayer/processes/xrpl_tx_submitter.go @@ -101,7 +101,7 @@ func (s *XRPLTxSubmitter) Start(ctx context.Context) error { return errors.WithStack(ctx.Err()) default: if err := s.processPendingOperations(ctx); err != nil && !errors.Is(err, context.Canceled) { - s.log.Error(ctx, "Failed to process pending operations", logger.Error(err)) + return errors.Wrap(err, "failed to process pending operations") } if !s.cfg.RepeatRecentScan { s.log.Info(ctx, "Process repeating is disabled, process is finished") diff --git a/relayer/xrpl/scanner.go b/relayer/xrpl/scanner.go index e23be613..63fd00fc 100644 --- a/relayer/xrpl/scanner.go +++ b/relayer/xrpl/scanner.go @@ -7,6 +7,7 @@ import ( "github.com/pkg/errors" rippledata "github.com/rubblelabs/ripple/data" + "github.com/CoreumFoundation/coreum-tools/pkg/parallel" "github.com/CoreumFoundation/coreum-tools/pkg/retry" "github.com/CoreumFoundation/coreumbridge-xrpl/relayer/logger" ) @@ -72,24 +73,33 @@ func NewAccountScanner(cfg AccountScannerConfig, log logger.Logger, rpcTxProvide // ScanTxs subscribes on rpc account transactions and continuously scans the recent and historical transactions. func (s *AccountScanner) ScanTxs(ctx context.Context, ch chan<- rippledata.TransactionWithMetaData) error { s.log.Info(ctx, "Subscribing xrpl scanner", logger.AnyField("config", s.cfg)) - if s.cfg.RecentScanEnabled { - currentLedgerRes, err := s.rpcTxProvider.LedgerCurrent(ctx) - if err != nil { - return err - } - currentLedger := currentLedgerRes.LedgerCurrentIndex - go s.scanRecentHistory(ctx, currentLedger, ch) - } - - if s.cfg.FullScanEnabled { - go s.scanFullHistory(ctx, ch) - } if !s.cfg.RecentScanEnabled && !s.cfg.FullScanEnabled { return errors.Errorf("both recent and full scans are disabled") } - return nil + return parallel.Run(ctx, func(ctx context.Context, spawn parallel.SpawnFn) error { + if s.cfg.RecentScanEnabled { + currentLedgerRes, err := s.rpcTxProvider.LedgerCurrent(ctx) + if err != nil { + return err + } + currentLedger := currentLedgerRes.LedgerCurrentIndex + spawn("recent-history-scanner", parallel.Continue, func(ctx context.Context) error { + s.scanRecentHistory(ctx, currentLedger, ch) + return nil + }) + } + + if s.cfg.FullScanEnabled { + spawn("full-history-scanner", parallel.Continue, func(ctx context.Context) error { + s.scanFullHistory(ctx, ch) + return nil + }) + } + + return nil + }, parallel.WithGroupLogger(s.log.ParallelLogger(ctx))) } func (s *AccountScanner) scanRecentHistory( @@ -172,7 +182,11 @@ func (s *AccountScanner) scanTransactions( if tx == nil { continue } - ch <- *tx + select { + case <-ctx.Done(): + return lastLedger + case ch <- *tx: + } } } if len(accountTxResult.Marker) == 0 { diff --git a/relayer/xrpl/scanner_test.go b/relayer/xrpl/scanner_test.go index 9f7c915d..b848b2fb 100644 --- a/relayer/xrpl/scanner_test.go +++ b/relayer/xrpl/scanner_test.go @@ -3,6 +3,7 @@ package xrpl_test import ( "context" "encoding/hex" + "reflect" "strings" "testing" "time" @@ -14,6 +15,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" + "github.com/CoreumFoundation/coreum-tools/pkg/parallel" "github.com/CoreumFoundation/coreumbridge-xrpl/relayer/logger" "github.com/CoreumFoundation/coreumbridge-xrpl/relayer/xrpl" ) @@ -21,10 +23,6 @@ import ( func TestAccountScanner_ScanTxs(t *testing.T) { t.Parallel() - // set the time to prevent infinite test - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - t.Cleanup(cancel) - account := xrpl.GenPrivKeyTxSigner().Account() notEmptyMarker := map[string]any{"key": "val"} @@ -98,7 +96,7 @@ func TestAccountScanner_ScanTxs(t *testing.T) { rpcTxProvider: func(ctrl *gomock.Controller) xrpl.RPCTxProvider { mockedProvider := NewMockRPCTxProvider(ctrl) - mockedProvider.EXPECT().LedgerCurrent(ctx).Return(xrpl.LedgerCurrentResult{ + mockedProvider.EXPECT().LedgerCurrent(gomock.Any()).Return(xrpl.LedgerCurrentResult{ LedgerCurrentIndex: 100, }, nil) @@ -180,17 +178,25 @@ func TestAccountScanner_ScanTxs(t *testing.T) { s := xrpl.NewAccountScanner(tt.cfg, logger.NewZapLoggerFromLogger(zapDevLogger), rpcTxProvider) txsCh := make(chan rippledata.TransactionWithMetaData) - if err := s.ScanTxs(ctx, txsCh); (err != nil) != tt.wantErr { - t.Errorf("ScanTxs() error = %+v, wantErr %+v", err, tt.wantErr) - } - if len(tt.wantTxHashes) == 0 { - return - } - // validate that we have received expected hashes - gotTxHashes := readTxHashesFromChannels(ctx, t, txsCh, len(tt.wantTxHashes)) - require.Equal(t, lo.SliceToMap(tt.wantTxHashes, func(hash string) (string, struct{}) { - return hash, struct{}{} - }), gotTxHashes) + + ctx := context.Background() + require.NoError(t, parallel.Run(ctx, func(ctx context.Context, spawn parallel.SpawnFn) error { + spawn("scan", parallel.Continue, func(ctx context.Context) error { + return s.ScanTxs(ctx, txsCh) + }) + spawn("read", parallel.Exit, func(ctx context.Context) error { + // validate that we have received expected hashes + gotTxHashes := readTxHashesFromChannels(ctx, t, txsCh, len(tt.wantTxHashes)) + expectedTxHashes := lo.SliceToMap(tt.wantTxHashes, func(hash string) (string, struct{}) { + return hash, struct{}{} + }) + if !reflect.DeepEqual(expectedTxHashes, gotTxHashes) { + return errors.Errorf("expectec tx hashes:%v, got:%v", expectedTxHashes, gotTxHashes) + } + return nil + }) + return nil + })) }) } }