From ecd48541d4ede2d7d9202f74eb7b25a4d657b857 Mon Sep 17 00:00:00 2001 From: Dzmitry Hil Date: Thu, 30 Nov 2023 17:18:05 +0300 Subject: [PATCH 1/6] Integrate coreum parallel lib --- integration-tests/go.mod | 2 +- integration-tests/go.sum | 4 +- integration-tests/processes/env_test.go | 91 ++++++++----------- integration-tests/processes/send_test.go | 14 +-- .../processes/ticket_allocation_test.go | 10 +- relayer/go.mod | 2 +- relayer/go.sum | 4 +- relayer/logger/logger.go | 15 ++- relayer/logger/mock.go | 64 ++++++++++++- relayer/logger/zap.go | 74 ++++++++++++++- relayer/processes/processor.go | 48 +++++----- relayer/processes/processor_test.go | 8 ++ relayer/processes/xrpl_tx_observer.go | 2 +- relayer/processes/xrpl_tx_submitter.go | 2 +- relayer/xrpl/scanner.go | 22 +++-- 15 files changed, 248 insertions(+), 114 deletions(-) diff --git a/integration-tests/go.mod b/integration-tests/go.mod index a1248819..e4f6ad2e 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -20,7 +20,7 @@ replace ( ) require ( - github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b + github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872 github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd github.com/CoreumFoundation/coreumbridge-xrpl/relayer v1.0.0 github.com/rubblelabs/ripple v0.0.0-20230908201244-7f73b1fe5e22 diff --git a/integration-tests/go.sum b/integration-tests/go.sum index 9f1dc0a4..86b05c68 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -222,8 +222,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d h1:nalkkPQcITbvhmL4+C4cKA87NW0tfm3Kl9VXRoPywFg= github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d/go.mod h1:URdX5+vg25ts3aCh8H5IFZybJYKWhJHYMTnf+ULtoC4= -github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b h1:nSNvOe9oRVl0Ijph3u/e1nZi7j4vGhYyTI3NVbPLdXI= -github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b/go.mod h1:VD93vCHkxYaT/RhOesXTFgd/GQDW54tr0BqGi5JU1c0= +github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872 h1:cEkMNqZamgrLwUAqjhRcoF0h3Z/AZY6mp20H8N0P/98= +github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872/go.mod h1:VD93vCHkxYaT/RhOesXTFgd/GQDW54tr0BqGi5JU1c0= github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd h1:NEwCGG9i6yjPY/avFTcrCDF16zvzIcvldnUR8AZtA7U= github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd/go.mod h1:XTqILFqH1e0GF1bYEnu/I0mElsfwH5OWfu2F5DACIjY= github.com/CosmWasm/wasmd v0.41.0 h1:fmwxSbwb50zZDcBaayYFRLIaSFca+EFld1WOaQi49jg= diff --git a/integration-tests/processes/env_test.go b/integration-tests/processes/env_test.go index 3cc81fce..532e9723 100644 --- a/integration-tests/processes/env_test.go +++ b/integration-tests/processes/env_test.go @@ -5,7 +5,8 @@ package processes_test import ( "context" - "sync" + "fmt" + "strings" "testing" "time" @@ -18,6 +19,7 @@ import ( "github.com/samber/lo" "github.com/stretchr/testify/require" + "github.com/CoreumFoundation/coreum-tools/pkg/parallel" "github.com/CoreumFoundation/coreum-tools/pkg/retry" coreumapp "github.com/CoreumFoundation/coreum/v3/app" coreumconfig "github.com/CoreumFoundation/coreum/v3/pkg/config" @@ -56,18 +58,18 @@ func DefaultRunnerEnvConfig() RunnerEnvConfig { // RunnerEnv is runner environment used for the integration tests. type RunnerEnv struct { - Cfg RunnerEnvConfig - bridgeXRPLAddress rippledata.Account - ContractClient *coreum.ContractClient - Chains integrationtests.Chains - ContractOwner sdk.AccAddress - Runners []*runner.Runner - ProcessErrorsMu sync.RWMutex - ProcessErrors []error + Cfg RunnerEnvConfig + bridgeXRPLAddress rippledata.Account + ContractClient *coreum.ContractClient + Chains integrationtests.Chains + ContractOwner sdk.AccAddress + RunnersParallelGroup *parallel.Group + Runners []*runner.Runner } // NewRunnerEnv returns new instance of the RunnerEnv. func NewRunnerEnv(ctx context.Context, t *testing.T, cfg RunnerEnvConfig, chains integrationtests.Chains) *RunnerEnv { + ctx, cancel := context.WithCancel(ctx) relayerCoreumAddresses := genCoreumRelayers( ctx, t, @@ -136,57 +138,45 @@ func NewRunnerEnv(ctx context.Context, t *testing.T, cfg RunnerEnvConfig, chains } runnerEnv := &RunnerEnv{ - Cfg: cfg, - bridgeXRPLAddress: bridgeXRPLAddress, - ContractClient: contractClient, - Chains: chains, - ContractOwner: contractOwner, - Runners: runners, - ProcessErrorsMu: sync.RWMutex{}, - ProcessErrors: make([]error, 0), + Cfg: cfg, + bridgeXRPLAddress: bridgeXRPLAddress, + ContractClient: contractClient, + Chains: chains, + ContractOwner: contractOwner, + RunnersParallelGroup: parallel.NewGroup(ctx), + Runners: runners, } t.Cleanup(func() { - runnerEnv.RequireNoErrors(t) + // we can cancel the context now and wait for the runner to stop gracefully + cancel() + err := runnerEnv.RunnersParallelGroup.Wait() + if err == nil || errors.Is(err, context.Canceled) { + return + } + // the client replies with that error in if the context is canceled at the time of the request, + // and the error is in the internal package, so we can't check the type + if strings.Contains(err.Error(), "context canceled") { + return + } + + require.NoError(t, err, "Found unexpected runner process errors after the execution") }) return runnerEnv } // StartAllRunnerProcesses starts all relayer processes. -func (r *RunnerEnv) StartAllRunnerProcesses(ctx context.Context, t *testing.T) { - errCh := make(chan error, len(r.Runners)) - go func() { - for { - select { - case <-ctx.Done(): - if !errors.Is(ctx.Err(), context.Canceled) { - r.ProcessErrorsMu.Lock() - r.ProcessErrors = append(r.ProcessErrors, ctx.Err()) - r.ProcessErrorsMu.Unlock() - } - return - case err := <-errCh: - r.ProcessErrorsMu.Lock() - r.ProcessErrors = append(r.ProcessErrors, err) - r.ProcessErrorsMu.Unlock() - } - } - }() - - for _, relayerRunner := range r.Runners { - go func(relayerRunner *runner.Runner) { +func (r *RunnerEnv) StartAllRunnerProcesses() { + for i := range r.Runners { + relayerRunner := r.Runners[i] + r.RunnersParallelGroup.Spawn(fmt.Sprintf("runner-%d", i), parallel.Exit, func(ctx context.Context) error { // disable restart on error to handler unexpected errors xrplTxObserverProcess := relayerRunner.Processes.XRPLTxObserver xrplTxObserverProcess.IsRestartableOnError = false xrplTxSubmitterProcess := relayerRunner.Processes.XRPLTxSubmitter xrplTxSubmitterProcess.IsRestartableOnError = false - - err := relayerRunner.Processor.StartProcesses(ctx, xrplTxObserverProcess, xrplTxSubmitterProcess) - if err != nil && !errors.Is(err, context.Canceled) { - t.Logf("Unexpected error on process start:%s", err) - errCh <- err - } - }(relayerRunner) + return relayerRunner.Processor.StartProcesses(ctx, xrplTxObserverProcess, xrplTxSubmitterProcess) + }) } } @@ -276,13 +266,6 @@ func (r *RunnerEnv) RegisterXRPLOriginatedToken( return registeredXRPLToken } -// RequireNoErrors check whether the runner err received runner errors. -func (r *RunnerEnv) RequireNoErrors(t *testing.T) { - r.ProcessErrorsMu.RLock() - defer r.ProcessErrorsMu.RUnlock() - require.Empty(t, r.ProcessErrors, "Found unexpected process errors after the execution") -} - // SendXRPLPaymentTx sends Payment transaction. func (r *RunnerEnv) SendXRPLPaymentTx( ctx context.Context, diff --git a/integration-tests/processes/send_test.go b/integration-tests/processes/send_test.go index fb4c46e6..c573a383 100644 --- a/integration-tests/processes/send_test.go +++ b/integration-tests/processes/send_test.go @@ -31,7 +31,7 @@ func TestSendXRPLOriginatedTokensFromXRPLToCoreumAndBack(t *testing.T) { envCfg := DefaultRunnerEnvConfig() runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains) - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() runnerEnv.AllocateTickets(ctx, t, uint32(200)) coreumSender := chains.Coreum.GenAccount() @@ -95,7 +95,7 @@ func TestSendXRPLOriginatedTokenFromXRPLToCoreumWithMaliciousRelayer(t *testing. envCfg := DefaultRunnerEnvConfig() envCfg.MaliciousRelayerNumber = 1 runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains) - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() runnerEnv.AllocateTickets(ctx, t, uint32(200)) coreumSender := chains.Coreum.GenAccount() @@ -148,7 +148,7 @@ func TestSendXRPLOriginatedTokenFromXRPLToCoreumWithTicketsReallocation(t *testi envCfg := DefaultRunnerEnvConfig() envCfg.UsedTicketSequenceThreshold = 3 runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains) - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() runnerEnv.AllocateTickets(ctx, t, uint32(5)) sendingCount := 10 @@ -241,7 +241,7 @@ func TestSendXRPLOriginatedTokensFromXRPLToCoreumWithDifferentAmountAndPartialAm }) // start relayers - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() // recover tickets so we can register tokens runnerEnv.AllocateTickets(ctx, t, 200) @@ -314,7 +314,7 @@ func TestRecoverXRPLOriginatedTokenRegistrationAndSendFromXRPLToCoreumAndBack(t envCfg := DefaultRunnerEnvConfig() runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains) - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() runnerEnv.AllocateTickets(ctx, t, uint32(200)) coreumSender := chains.Coreum.GenAccount() @@ -422,7 +422,7 @@ func TestSendCoreumOriginatedTokenFromCoreumToXRPLAndBackWithDifferentAmountsAnd runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains) // start relayers - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() // recover tickets so we can register tokens runnerEnv.AllocateTickets(ctx, t, 200) @@ -546,7 +546,7 @@ func TestSendCoreumOriginatedTokenFromCoreumToXRPLAndBackWithMaliciousRelayer(t runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains) // start relayers - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() // recover tickets so we can register tokens runnerEnv.AllocateTickets(ctx, t, 200) diff --git a/integration-tests/processes/ticket_allocation_test.go b/integration-tests/processes/ticket_allocation_test.go index 3168cb8c..dac41133 100644 --- a/integration-tests/processes/ticket_allocation_test.go +++ b/integration-tests/processes/ticket_allocation_test.go @@ -25,7 +25,7 @@ func TestTicketsAllocationRecoveryWithAccountSequence(t *testing.T) { require.NoError(t, err) require.Empty(t, availableTickets) - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() chains.XRPL.FundAccountForTicketAllocation(ctx, t, runnerEnv.bridgeXRPLAddress, numberOfTicketsToAllocate) bridgeXRPLAccountInfo, err := chains.XRPL.RPCClient().AccountInfo(ctx, runnerEnv.bridgeXRPLAddress) @@ -57,7 +57,7 @@ func TestTicketsAllocationRecoveryWithRejection(t *testing.T) { require.NoError(t, err) require.Empty(t, availableTickets) - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() // we don't fund the contract for the tickets allocation to let the chain reject the allocation transaction bridgeXRPLAccountInfo, err := chains.XRPL.RPCClient().AccountInfo(ctx, runnerEnv.bridgeXRPLAddress) @@ -90,7 +90,7 @@ func TestTicketsAllocationRecoveryWithInvalidAccountSequence(t *testing.T) { require.NoError(t, err) require.Empty(t, availableTickets) - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() chains.XRPL.FundAccountForTicketAllocation(ctx, t, runnerEnv.bridgeXRPLAddress, numberOfTicketsToAllocate) bridgeXRPLAccountInfo, err := chains.XRPL.RPCClient().AccountInfo(ctx, runnerEnv.bridgeXRPLAddress) @@ -153,7 +153,7 @@ func TestTicketsAllocationRecoveryWithMaliciousRelayers(t *testing.T) { require.NoError(t, err) require.Empty(t, availableTickets) - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() chains.XRPL.FundAccountForTicketAllocation(ctx, t, runnerEnv.bridgeXRPLAddress, numberOfTicketsToAllocate) @@ -184,7 +184,7 @@ func TestTicketsReAllocationByTheXRPLTokenRegistration(t *testing.T) { envCfg.UsedTicketSequenceThreshold = 3 runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains) - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() // allocate first five tickets numberOfTicketsToAllocate := uint32(5) diff --git a/relayer/go.mod b/relayer/go.mod index 7130d3b4..6cd8d9a3 100644 --- a/relayer/go.mod +++ b/relayer/go.mod @@ -20,7 +20,7 @@ replace ( require ( cosmossdk.io/math v1.1.2 - github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b + github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872 github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd github.com/CosmWasm/wasmd v0.41.0 github.com/cosmos/cosmos-sdk v0.47.5 diff --git a/relayer/go.sum b/relayer/go.sum index 9f1dc0a4..86b05c68 100644 --- a/relayer/go.sum +++ b/relayer/go.sum @@ -222,8 +222,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d h1:nalkkPQcITbvhmL4+C4cKA87NW0tfm3Kl9VXRoPywFg= github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d/go.mod h1:URdX5+vg25ts3aCh8H5IFZybJYKWhJHYMTnf+ULtoC4= -github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b h1:nSNvOe9oRVl0Ijph3u/e1nZi7j4vGhYyTI3NVbPLdXI= -github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b/go.mod h1:VD93vCHkxYaT/RhOesXTFgd/GQDW54tr0BqGi5JU1c0= +github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872 h1:cEkMNqZamgrLwUAqjhRcoF0h3Z/AZY6mp20H8N0P/98= +github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872/go.mod h1:VD93vCHkxYaT/RhOesXTFgd/GQDW54tr0BqGi5JU1c0= github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd h1:NEwCGG9i6yjPY/avFTcrCDF16zvzIcvldnUR8AZtA7U= github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd/go.mod h1:XTqILFqH1e0GF1bYEnu/I0mElsfwH5OWfu2F5DACIjY= github.com/CosmWasm/wasmd v0.41.0 h1:fmwxSbwb50zZDcBaayYFRLIaSFca+EFld1WOaQi49jg= diff --git a/relayer/logger/logger.go b/relayer/logger/logger.go index c85405b5..0abff8b9 100644 --- a/relayer/logger/logger.go +++ b/relayer/logger/logger.go @@ -4,9 +4,16 @@ import ( "context" "go.uber.org/zap" + + "github.com/CoreumFoundation/coreum-tools/pkg/parallel" ) -//go:generate mockgen -destination=mock.go -package=logger . Logger +//go:generate mockgen -destination=mock.go -package=logger . Logger,ParallelLogger + +// ParallelLogger is parallel logger interface used mostly for mocks. +type ParallelLogger interface { + parallel.Logger +} // A Field is a marshaling operation used to add a key-value pair to a logger's context. Most fields are lazily // marshaled, so it's inexpensive to add fields to disabled debug-level log statements. @@ -27,6 +34,7 @@ type Logger interface { Info(ctx context.Context, msg string, fields ...Field) Warn(ctx context.Context, msg string, fields ...Field) Error(ctx context.Context, msg string, fields ...Field) + ParallelLogger(ctx context.Context) ParallelLogger } // AnyField takes a key and an arbitrary value and chooses the best way to represent them as a field, falling back to a @@ -55,6 +63,11 @@ func Uint64Field(key string, value uint64) Field { return convertZapFieldToField(zap.Uint64(key, value)) } +// ByteStringField constructs a field with the given key and value. +func ByteStringField(key string, value []byte) Field { + return convertZapFieldToField(zap.ByteString(key, value)) +} + // Error is shorthand for the common idiom NamedError("error", err). func Error(err error) Field { return convertZapFieldToField(zap.Error(err)) diff --git a/relayer/logger/mock.go b/relayer/logger/mock.go index 25a7fe0d..4639e892 100644 --- a/relayer/logger/mock.go +++ b/relayer/logger/mock.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/CoreumFoundation/coreumbridge-xrpl/relayer/logger (interfaces: Logger) +// Source: github.com/CoreumFoundation/coreumbridge-xrpl/relayer/logger (interfaces: Logger,ParallelLogger) // Package logger is a generated GoMock package. package logger @@ -8,6 +8,7 @@ import ( context "context" reflect "reflect" + parallel "github.com/CoreumFoundation/coreum-tools/pkg/parallel" gomock "github.com/golang/mock/gomock" ) @@ -85,6 +86,20 @@ func (mr *MockLoggerMockRecorder) Info(arg0, arg1 interface{}, arg2 ...interface return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Info", reflect.TypeOf((*MockLogger)(nil).Info), varargs...) } +// ParallelLogger mocks base method. +func (m *MockLogger) ParallelLogger(arg0 context.Context) ParallelLogger { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ParallelLogger", arg0) + ret0, _ := ret[0].(ParallelLogger) + return ret0 +} + +// ParallelLogger indicates an expected call of ParallelLogger. +func (mr *MockLoggerMockRecorder) ParallelLogger(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ParallelLogger", reflect.TypeOf((*MockLogger)(nil).ParallelLogger), arg0) +} + // Warn mocks base method. func (m *MockLogger) Warn(arg0 context.Context, arg1 string, arg2 ...Field) { m.ctrl.T.Helper() @@ -101,3 +116,50 @@ func (mr *MockLoggerMockRecorder) Warn(arg0, arg1 interface{}, arg2 ...interface varargs := append([]interface{}{arg0, arg1}, arg2...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Warn", reflect.TypeOf((*MockLogger)(nil).Warn), varargs...) } + +// MockParallelLogger is a mock of ParallelLogger interface. +type MockParallelLogger struct { + ctrl *gomock.Controller + recorder *MockParallelLoggerMockRecorder +} + +// MockParallelLoggerMockRecorder is the mock recorder for MockParallelLogger. +type MockParallelLoggerMockRecorder struct { + mock *MockParallelLogger +} + +// NewMockParallelLogger creates a new mock instance. +func NewMockParallelLogger(ctrl *gomock.Controller) *MockParallelLogger { + mock := &MockParallelLogger{ctrl: ctrl} + mock.recorder = &MockParallelLoggerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockParallelLogger) EXPECT() *MockParallelLoggerMockRecorder { + return m.recorder +} + +// Debug mocks base method. +func (m *MockParallelLogger) Debug(arg0 string, arg1 int64, arg2 parallel.OnExit, arg3 string) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Debug", arg0, arg1, arg2, arg3) +} + +// Debug indicates an expected call of Debug. +func (mr *MockParallelLoggerMockRecorder) Debug(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Debug", reflect.TypeOf((*MockParallelLogger)(nil).Debug), arg0, arg1, arg2, arg3) +} + +// Error mocks base method. +func (m *MockParallelLogger) Error(arg0 string, arg1 int64, arg2 parallel.OnExit, arg3 string, arg4 error) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Error", arg0, arg1, arg2, arg3, arg4) +} + +// Error indicates an expected call of Error. +func (mr *MockParallelLoggerMockRecorder) Error(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Error", reflect.TypeOf((*MockParallelLogger)(nil).Error), arg0, arg1, arg2, arg3, arg4) +} diff --git a/relayer/logger/zap.go b/relayer/logger/zap.go index c6807190..d142e391 100644 --- a/relayer/logger/zap.go +++ b/relayer/logger/zap.go @@ -2,6 +2,7 @@ package logger import ( "context" + "fmt" "strings" "github.com/pkg/errors" @@ -9,6 +10,7 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zapcore" + "github.com/CoreumFoundation/coreum-tools/pkg/parallel" "github.com/CoreumFoundation/coreumbridge-xrpl/relayer/tracing" ) @@ -20,6 +22,59 @@ const ( tracingProcessFieldName = "process" ) +var _ ParallelLogger = &ParallelZapLogger{} + +// ParallelZapLogger is parallel zap logger. +type ParallelZapLogger struct { + ctx context.Context //nolint:containedctx // the design depends on the parallel logger design where the ctx is set similar + zapLog *ZapLogger +} + +// NewParallelZapLogger return new instance of the ParallelZapLogger. +func NewParallelZapLogger(ctx context.Context, zapLog *ZapLogger) *ParallelZapLogger { + return &ParallelZapLogger{ + ctx: ctx, + zapLog: zapLog, + } +} + +// Debug prints debug log. +func (p *ParallelZapLogger) Debug(name string, id int64, onExit parallel.OnExit, message string) { + p.zapLog.Named(name).Debug( + p.ctx, message, + Int64Field("id", id), + StringField("onExit", onExit.String()), + ) +} + +// Error prints error log. +func (p *ParallelZapLogger) Error(name string, id int64, onExit parallel.OnExit, message string, err error) { + // the context canceled is not an error + if errors.Is(err, context.Canceled) { + return + } + var panicErr parallel.ErrPanic + if errors.As(err, &panicErr) { + p.zapLog.Named(name).Error( + p.ctx, + message, + Int64Field("id", id), + StringField("onExit", onExit.String()), + StringField("value", fmt.Sprint(panicErr.Value)), + ByteStringField("stack", panicErr.Stack), + Error(err), + ) + return + } + p.zapLog.Named(name).Error( + p.ctx, + message, + Int64Field("id", id), + StringField("onExit", onExit.String()), + Error(err), + ) +} + // ZapLoggerConfig is ZapLogger config. type ZapLoggerConfig struct { Level string @@ -77,28 +132,39 @@ func NewZapLogger(cfg ZapLoggerConfig) (*ZapLogger, error) { // Debug logs a message at DebugLevel. The message includes any fields passed at the log site, as well as any fields // accumulated on the logger. -func (z ZapLogger) Debug(ctx context.Context, msg string, fields ...Field) { +func (z *ZapLogger) Debug(ctx context.Context, msg string, fields ...Field) { z.zapLogger.Debug(msg, filedToZapField(ctx, fields...)...) } // Info logs a message at InfoLevel. The message includes any fields passed at the log site, as well as any fields // accumulated on the logger. -func (z ZapLogger) Info(ctx context.Context, msg string, fields ...Field) { +func (z *ZapLogger) Info(ctx context.Context, msg string, fields ...Field) { z.zapLogger.Info(msg, filedToZapField(ctx, fields...)...) } // Warn logs a message at WarnLevel. The message includes any fields passed at the log site, as well as any fields // accumulated on the logger. -func (z ZapLogger) Warn(ctx context.Context, msg string, fields ...Field) { +func (z *ZapLogger) Warn(ctx context.Context, msg string, fields ...Field) { z.zapLogger.Warn(msg, filedToZapField(ctx, fields...)...) } // Error logs a message at ErrorLevel. The message includes any fields passed at the log site, as well as any fields // accumulated on the logger. -func (z ZapLogger) Error(ctx context.Context, msg string, fields ...Field) { +func (z *ZapLogger) Error(ctx context.Context, msg string, fields ...Field) { z.zapLogger.Error(msg, filedToZapField(ctx, fields...)...) } +// Named adds a new path segment to the logger's name. Segments are joined by +// periods. By default, Loggers are unnamed. +func (z *ZapLogger) Named(name string) *ZapLogger { + return NewZapLoggerFromLogger(z.zapLogger.Named(name)) +} + +// ParallelLogger returns parallel zap logger. +func (z *ZapLogger) ParallelLogger(ctx context.Context) ParallelLogger { + return NewParallelZapLogger(ctx, z) +} + func filedToZapField(ctx context.Context, fields ...Field) []zap.Field { zapFields := lo.Map(fields, func(filed Field, _ int) zap.Field { return zap.Field{ diff --git a/relayer/processes/processor.go b/relayer/processes/processor.go index 79f763c1..7d5b226d 100644 --- a/relayer/processes/processor.go +++ b/relayer/processes/processor.go @@ -2,10 +2,10 @@ package processes import ( "context" - "sync" "github.com/pkg/errors" + "github.com/CoreumFoundation/coreum-tools/pkg/parallel" "github.com/CoreumFoundation/coreumbridge-xrpl/relayer/logger" "github.com/CoreumFoundation/coreumbridge-xrpl/relayer/tracing" ) @@ -47,46 +47,40 @@ func (p *Processor) StartProcesses(ctx context.Context, processes ...ProcessWith } } - wg := sync.WaitGroup{} - wg.Add(len(processes)) - for _, process := range processes { - go func(process ProcessWithOptions) { - // set process name to the context + pg := parallel.NewGroup(ctx, parallel.WithGroupLogger(p.log.ParallelLogger(ctx))) + for i := range processes { + process := processes[i] + pg.Spawn(process.Name, parallel.Continue, func(ctx context.Context) error { ctx = tracing.WithTracingProcess(ctx, process.Name) - defer wg.Done() - defer func() { - if r := recover(); r != nil { - p.log.Error(ctx, "Received panic during the process execution", logger.Error(errors.Errorf("%s", r))) - if !process.IsRestartableOnError { - p.log.Warn(ctx, "The process is not auto-restartable on error") - return - } - p.log.Info(ctx, "Restarting process after the panic") - p.startProcessWithRestartOnError(ctx, process) - } - }() - p.startProcessWithRestartOnError(ctx, process) - }(process) + return p.startProcessWithRestartOnError(ctx, process) + }) } - wg.Wait() - return nil + return pg.Wait() } -func (p *Processor) startProcessWithRestartOnError(ctx context.Context, process ProcessWithOptions) { +func (p *Processor) startProcessWithRestartOnError(ctx context.Context, process ProcessWithOptions) error { for { - if err := process.Process.Start(ctx); err != nil { + // spawn one independent task to handle the panics properly + err := parallel.Run(ctx, func(ctx context.Context, spawnFn parallel.SpawnFn) error { + spawnFn(process.Name, parallel.Continue, func(ctx context.Context) error { + return process.Process.Start(ctx) + }) + return nil + }, parallel.WithGroupLogger(p.log.ParallelLogger(ctx))) + + if err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - return + return nil } p.log.Error(ctx, "Received unexpected error from the process", logger.Error(err)) if !process.IsRestartableOnError { p.log.Warn(ctx, "The process is not auto-restartable on error") - break + return err } p.log.Info(ctx, "Restarting process after the error") } else { - return + return nil } } } diff --git a/relayer/processes/processor_test.go b/relayer/processes/processor_test.go index 27d66a92..f3b96f76 100644 --- a/relayer/processes/processor_test.go +++ b/relayer/processes/processor_test.go @@ -92,6 +92,7 @@ func TestProcessor_StartProcesses(t *testing.T) { } }, logErrorsCount: 1, + wantErr: true, }, { name: "singe_process_with_error_restartable", @@ -157,6 +158,7 @@ func TestProcessor_StartProcesses(t *testing.T) { } }, logErrorsCount: 1, + wantErr: true, }, { name: "singe_process_with_panic_restartable", @@ -234,6 +236,12 @@ func TestProcessor_StartProcesses(t *testing.T) { ctrl := gomock.NewController(t) logMock := logger.NewAnyLogMock(ctrl) + + parallelLoggerMock := logger.NewMockParallelLogger(ctrl) + logMock.EXPECT().ParallelLogger(gomock.Any()).Return(parallelLoggerMock).AnyTimes() + parallelLoggerMock.EXPECT().Debug(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + parallelLoggerMock.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + if tt.logErrorsCount > 0 { logMock.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()).Times(tt.logErrorsCount) } diff --git a/relayer/processes/xrpl_tx_observer.go b/relayer/processes/xrpl_tx_observer.go index 8dfdce23..cde0ddcb 100644 --- a/relayer/processes/xrpl_tx_observer.go +++ b/relayer/processes/xrpl_tx_observer.go @@ -75,7 +75,7 @@ func (o *XRPLTxObserver) Start(ctx context.Context) error { if errors.Is(err, context.Canceled) { o.log.Warn(ctx, "Context canceled during the XRPL tx processing", logger.StringField("error", err.Error())) } else { - o.log.Error(ctx, "Failed to process XRPL tx", logger.Error(err)) + return errors.Wrapf(err, "failed to process XRPL tx, txHash:%s", tx.GetHash().String()) } } } diff --git a/relayer/processes/xrpl_tx_submitter.go b/relayer/processes/xrpl_tx_submitter.go index a603b923..cc8b4726 100644 --- a/relayer/processes/xrpl_tx_submitter.go +++ b/relayer/processes/xrpl_tx_submitter.go @@ -98,7 +98,7 @@ func (s *XRPLTxSubmitter) Start(ctx context.Context) error { return errors.WithStack(ctx.Err()) default: if err := s.processPendingOperations(ctx); err != nil && !errors.Is(err, context.Canceled) { - s.log.Error(ctx, "Failed to process pending operations", logger.Error(err)) + return errors.Wrap(err, "failed to process pending operations") } if !s.cfg.RepeatRecentScan { s.log.Info(ctx, "Process repeating is disabled, process is finished") diff --git a/relayer/xrpl/scanner.go b/relayer/xrpl/scanner.go index 97ee40b4..1f7cb252 100644 --- a/relayer/xrpl/scanner.go +++ b/relayer/xrpl/scanner.go @@ -7,6 +7,7 @@ import ( "github.com/pkg/errors" rippledata "github.com/rubblelabs/ripple/data" + "github.com/CoreumFoundation/coreum-tools/pkg/parallel" "github.com/CoreumFoundation/coreum-tools/pkg/retry" "github.com/CoreumFoundation/coreumbridge-xrpl/relayer/logger" ) @@ -67,21 +68,28 @@ func NewAccountScanner(cfg AccountScannerConfig, log logger.Logger, rpcTxProvide // ScanTxs subscribes on rpc account transactions and continuously scans the recent and historical transactions. func (s *AccountScanner) ScanTxs(ctx context.Context, ch chan<- rippledata.TransactionWithMetaData) error { s.log.Info(ctx, "Subscribing xrpl scanner", logger.AnyField("config", s.cfg)) + + if !s.cfg.RecentScanEnabled && !s.cfg.FullScanEnabled { + return errors.Errorf("both recent and full scans are disabled") + } + + pg := parallel.NewGroup(ctx, parallel.WithGroupLogger(s.log.ParallelLogger(ctx))) if s.cfg.RecentScanEnabled { currentLedgerRes, err := s.rpcTxProvider.LedgerCurrent(ctx) if err != nil { return err } currentLedger := currentLedgerRes.LedgerCurrentIndex - go s.scanRecentHistory(ctx, currentLedger, ch) + pg.Spawn("recent-history-scanner", parallel.Continue, func(ctx context.Context) error { + s.scanRecentHistory(ctx, currentLedger, ch) + return nil + }) } - if s.cfg.FullScanEnabled { - go s.scanFullHistory(ctx, ch) - } - - if !s.cfg.RecentScanEnabled && !s.cfg.FullScanEnabled { - return errors.Errorf("both recent and full scans are disabled") + pg.Spawn("full-history-scanner", parallel.Continue, func(ctx context.Context) error { + s.scanFullHistory(ctx, ch) + return nil + }) } return nil From 48f41cc47ff9d8be258ff14cc3cfbbf1ee7e12b0 Mon Sep 17 00:00:00 2001 From: Dzmitry Hil Date: Mon, 4 Dec 2023 08:48:16 +0300 Subject: [PATCH 2/6] Refactor process restart panic handling --- relayer/processes/processor.go | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/relayer/processes/processor.go b/relayer/processes/processor.go index 7d5b226d..fcebdb8c 100644 --- a/relayer/processes/processor.go +++ b/relayer/processes/processor.go @@ -2,6 +2,7 @@ package processes import ( "context" + "runtime/debug" "github.com/pkg/errors" @@ -61,14 +62,19 @@ func (p *Processor) StartProcesses(ctx context.Context, processes ...ProcessWith func (p *Processor) startProcessWithRestartOnError(ctx context.Context, process ProcessWithOptions) error { for { - // spawn one independent task to handle the panics properly - err := parallel.Run(ctx, func(ctx context.Context, spawnFn parallel.SpawnFn) error { - spawnFn(process.Name, parallel.Continue, func(ctx context.Context) error { - return process.Process.Start(ctx) - }) - return nil - }, parallel.WithGroupLogger(p.log.ParallelLogger(ctx))) - + // start process and handle the panic + err := func() (err error) { + defer func() { + if p := recover(); p != nil { + err = errors.Wrapf( + parallel.ErrPanic{Value: p, Stack: debug.Stack()}, + "handled panic on process:%s", process.Name, + ) + } + }() + return process.Process.Start(ctx) + }() + // restart the process is it is restartable if err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return nil From 6f90f07819eb3323d833b7a662a49924a5c8d33f Mon Sep 17 00:00:00 2001 From: Dzmitry Hil Date: Tue, 5 Dec 2023 12:55:17 +0300 Subject: [PATCH 3/6] improve parallel processor handling --- relayer/processes/processor.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/relayer/processes/processor.go b/relayer/processes/processor.go index fcebdb8c..820bcfb0 100644 --- a/relayer/processes/processor.go +++ b/relayer/processes/processor.go @@ -48,16 +48,16 @@ func (p *Processor) StartProcesses(ctx context.Context, processes ...ProcessWith } } - pg := parallel.NewGroup(ctx, parallel.WithGroupLogger(p.log.ParallelLogger(ctx))) - for i := range processes { - process := processes[i] - pg.Spawn(process.Name, parallel.Continue, func(ctx context.Context) error { - ctx = tracing.WithTracingProcess(ctx, process.Name) - return p.startProcessWithRestartOnError(ctx, process) - }) - } - - return pg.Wait() + return parallel.Run(ctx, func(ctx context.Context, spawn parallel.SpawnFn) error { + for i := range processes { + process := processes[i] + spawn(process.Name, parallel.Continue, func(ctx context.Context) error { + ctx = tracing.WithTracingProcess(ctx, process.Name) + return p.startProcessWithRestartOnError(ctx, process) + }) + } + return nil + }, parallel.WithGroupLogger(p.log.ParallelLogger(ctx))) } func (p *Processor) startProcessWithRestartOnError(ctx context.Context, process ProcessWithOptions) error { From ebba217e6c9a90e806b38fad52f202a118705f1a Mon Sep 17 00:00:00 2001 From: Dzmitry Hil Date: Thu, 7 Dec 2023 10:44:00 +0300 Subject: [PATCH 4/6] Fix tests --- Makefile | 4 +- integration-tests/xrpl/scanner_test.go | 83 +++---- relayer/processes/xrpl_tx_observer.go | 35 +-- relayer/processes/xrpl_tx_observer_test.go | 261 ++++++++++----------- relayer/xrpl/scanner.go | 45 ++-- relayer/xrpl/scanner_test.go | 38 +-- 6 files changed, 232 insertions(+), 234 deletions(-) diff --git a/Makefile b/Makefile index a02abaa7..618bd3ba 100644 --- a/Makefile +++ b/Makefile @@ -73,12 +73,12 @@ lint-contract: test-integration: # test each directory separately to prevent faucet concurrent access for d in $(INTEGRATION_TESTS_DIR)/*/; \ - do cd $$d && go clean -testcache && go test -v --tags=integrationtests -mod=readonly -parallel=4 ./... || exit 1; \ + do cd $$d && go clean -testcache && go test -v --tags=integrationtests -mod=readonly -parallel=10 -timeout 5m ./... || exit 1; \ done .PHONY: test-relayer test-relayer: - cd $(RELAYER_DIR) && go clean -testcache && go test -v -mod=readonly -parallel=4 -timeout 500ms ./... + cd $(RELAYER_DIR) && go clean -testcache && go test -v -mod=readonly -parallel=10 -timeout 500ms ./... .PHONY: test-contract test-contract: diff --git a/integration-tests/xrpl/scanner_test.go b/integration-tests/xrpl/scanner_test.go index a627e43d..4a6eac46 100644 --- a/integration-tests/xrpl/scanner_test.go +++ b/integration-tests/xrpl/scanner_test.go @@ -8,10 +8,12 @@ import ( "testing" "time" + "github.com/pkg/errors" rippledata "github.com/rubblelabs/ripple/data" "github.com/stretchr/testify/require" "github.com/CoreumFoundation/coreum-tools/pkg/http" + "github.com/CoreumFoundation/coreum-tools/pkg/parallel" integrationtests "github.com/CoreumFoundation/coreumbridge-xrpl/integration-tests" "github.com/CoreumFoundation/coreumbridge-xrpl/relayer/xrpl" ) @@ -52,9 +54,18 @@ func TestFullHistoryScanAccountTx(t *testing.T) { // add timeout to finish the tests in case of error txsCh := make(chan rippledata.TransactionWithMetaData, txsCount) - require.NoError(t, scanner.ScanTxs(ctx, txsCh)) + t.Logf("Waiting for %d transactions to be scanned by the historycal scanner", len(writtenTxHashes)) - validateTxsHashesInChannel(ctx, t, writtenTxHashes, txsCh) + + require.NoError(t, parallel.Run(ctx, func(ctx context.Context, spawn parallel.SpawnFn) error { + spawn("scan", parallel.Continue, func(ctx context.Context) error { + return scanner.ScanTxs(ctx, txsCh) + }) + spawn("read", parallel.Exit, func(ctx context.Context) error { + return validateTxsHashesInChannel(ctx, writtenTxHashes, txsCh) + }) + return nil + })) } func TestRecentHistoryScanAccountTx(t *testing.T) { @@ -93,29 +104,33 @@ func TestRecentHistoryScanAccountTx(t *testing.T) { // await for the state when the current ledger is valid to run the scanner currentLedger, err := chains.XRPL.RPCClient().LedgerCurrent(ctx) require.NoError(t, err) - chains.XRPL.AwaitLedger(ctx, t, - currentLedger.LedgerCurrentIndex+ - scannerCfg.RecentScanWindow) + chains.XRPL.AwaitLedger(ctx, t, currentLedger.LedgerCurrentIndex+scannerCfg.RecentScanWindow) var writtenTxHashes map[string]struct{} - writeDone := make(chan struct{}) - go func() { - defer close(writeDone) - writtenTxHashes = sendMultipleTxs(ctx, t, chains.XRPL, 20, senderAcc, recipientAcc) - }() + receivedTxHashes := make(map[string]struct{}) txsCh := make(chan rippledata.TransactionWithMetaData, txsCount) - require.NoError(t, scanner.ScanTxs(ctx, txsCh)) - - t.Logf("Waiting for %d transactions to be scanned by the recent scanner", len(writtenTxHashes)) - receivedTxHashes := getTxHashesFromChannel(ctx, t, txsCh, txsCount) + require.NoError(t, parallel.Run(ctx, func(ctx context.Context, spawn parallel.SpawnFn) error { + spawn("scan", parallel.Continue, func(ctx context.Context) error { + return scanner.ScanTxs(ctx, txsCh) + }) + spawn("write", parallel.Continue, func(ctx context.Context) error { + writtenTxHashes = sendMultipleTxs(ctx, t, chains.XRPL, txsCount, senderAcc, recipientAcc) + return nil + }) + spawn("wait", parallel.Exit, func(ctx context.Context) error { + t.Logf("Waiting for %d transactions to be scanned by the scanner", txsCount) + for tx := range txsCh { + receivedTxHashes[tx.GetHash().String()] = struct{}{} + if len(receivedTxHashes) == txsCount { + return nil + } + } + return nil + }) + return nil + })) - // wait for the writing to be done - select { - case <-ctx.Done(): - t.FailNow() - case <-writeDone: - } require.Equal(t, writtenTxHashes, receivedTxHashes) } @@ -144,7 +159,7 @@ func sendMultipleTxs( return writtenTxHashes } -func validateTxsHashesInChannel(ctx context.Context, t *testing.T, writtenTxHashes map[string]struct{}, txsCh chan rippledata.TransactionWithMetaData) { +func validateTxsHashesInChannel(ctx context.Context, writtenTxHashes map[string]struct{}, txsCh chan rippledata.TransactionWithMetaData) error { scanCtx, scanCtxCancel := context.WithTimeout(ctx, time.Minute) defer scanCtxCancel() // copy the original map @@ -155,32 +170,18 @@ func validateTxsHashesInChannel(ctx context.Context, t *testing.T, writtenTxHash for { select { case <-scanCtx.Done(): - t.Fail() + return scanCtx.Err() case tx := <-txsCh: // validate that we have all sent hashed and no duplicated hash := tx.GetHash().String() _, found := expectedHashes[hash] - require.True(t, found) - delete(expectedHashes, hash) - if len(expectedHashes) == 0 { - return + if !found { + return errors.Errorf("not found expected tx hash:%s", hash) } - } - } -} -func getTxHashesFromChannel(ctx context.Context, t *testing.T, txsCh chan rippledata.TransactionWithMetaData, count int) map[string]struct{} { - scanCtx, scanCtxCancel := context.WithTimeout(ctx, time.Minute) - defer scanCtxCancel() - txHashes := make(map[string]struct{}, count) - for { - select { - case <-scanCtx.Done(): - t.Fail() - case tx := <-txsCh: - txHashes[tx.GetHash().String()] = struct{}{} - if len(txHashes) == count { - return txHashes + delete(expectedHashes, hash) + if len(expectedHashes) == 0 { + return nil } } } diff --git a/relayer/processes/xrpl_tx_observer.go b/relayer/processes/xrpl_tx_observer.go index cde0ddcb..a0848369 100644 --- a/relayer/processes/xrpl_tx_observer.go +++ b/relayer/processes/xrpl_tx_observer.go @@ -10,6 +10,7 @@ import ( rippledata "github.com/rubblelabs/ripple/data" "github.com/samber/lo" + "github.com/CoreumFoundation/coreum-tools/pkg/parallel" "github.com/CoreumFoundation/coreumbridge-xrpl/relayer/coreum" "github.com/CoreumFoundation/coreumbridge-xrpl/relayer/logger" "github.com/CoreumFoundation/coreumbridge-xrpl/relayer/tracing" @@ -62,24 +63,26 @@ func (o *XRPLTxObserver) Init(ctx context.Context) error { // Start starts the process. func (o *XRPLTxObserver) Start(ctx context.Context) error { txCh := make(chan rippledata.TransactionWithMetaData) - if err := o.txScanner.ScanTxs(ctx, txCh); err != nil { - return err - } - - for { - select { - case <-ctx.Done(): - return errors.WithStack(ctx.Err()) - case tx := <-txCh: - if err := o.processTx(ctx, tx); err != nil { - if errors.Is(err, context.Canceled) { - o.log.Warn(ctx, "Context canceled during the XRPL tx processing", logger.StringField("error", err.Error())) - } else { - return errors.Wrapf(err, "failed to process XRPL tx, txHash:%s", tx.GetHash().String()) + return parallel.Run(ctx, func(ctx context.Context, spawn parallel.SpawnFn) error { + spawn("tx-scanner", parallel.Continue, func(ctx context.Context) error { + defer close(txCh) + return o.txScanner.ScanTxs(ctx, txCh) + }) + spawn("tx-processor", parallel.Continue, func(ctx context.Context) error { + for tx := range txCh { + if err := o.processTx(ctx, tx); err != nil { + if errors.Is(err, context.Canceled) { + o.log.Warn(ctx, "Context canceled during the XRPL tx processing", logger.StringField("error", err.Error())) + } else { + return errors.Wrapf(err, "failed to process XRPL tx, txHash:%s", tx.GetHash().String()) + } } } - } - } + return nil + }) + + return nil + }, parallel.WithGroupLogger(o.log.ParallelLogger(ctx))) } func (o *XRPLTxObserver) processTx(ctx context.Context, tx rippledata.TransactionWithMetaData) error { diff --git a/relayer/processes/xrpl_tx_observer_test.go b/relayer/processes/xrpl_tx_observer_test.go index 9ba24780..0440dfa4 100644 --- a/relayer/processes/xrpl_tx_observer_test.go +++ b/relayer/processes/xrpl_tx_observer_test.go @@ -100,10 +100,8 @@ func TestXRPLTxObserver_Start(t *testing.T) { xrplAccountTxScannerMock := NewMockXRPLAccountTxScanner(ctrl) xrplAccountTxScannerMock.EXPECT().ScanTxs(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, ch chan<- rippledata.TransactionWithMetaData) error { - go func() { - ch <- xrplOriginatedTokenPaymentWithMetadataTx - cancel() - }() + ch <- xrplOriginatedTokenPaymentWithMetadataTx + cancel() return nil }) @@ -132,10 +130,8 @@ func TestXRPLTxObserver_Start(t *testing.T) { xrplAccountTxScannerMock := NewMockXRPLAccountTxScanner(ctrl) xrplAccountTxScannerMock.EXPECT().ScanTxs(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, ch chan<- rippledata.TransactionWithMetaData) error { - go func() { - ch <- coreumOriginatedTokenPaymentWithMetadataTx - cancel() - }() + ch <- coreumOriginatedTokenPaymentWithMetadataTx + cancel() return nil }) @@ -172,15 +168,13 @@ func TestXRPLTxObserver_Start(t *testing.T) { xrplAccountTxScannerMock := NewMockXRPLAccountTxScanner(ctrl) xrplAccountTxScannerMock.EXPECT().ScanTxs(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, ch chan<- rippledata.TransactionWithMetaData) error { - go func() { - ch <- rippledata.TransactionWithMetaData{ - Transaction: &rippledata.Payment{}, - MetaData: rippledata.MetaData{ - TransactionResult: failTxResult, - }, - } - cancel() - }() + ch <- rippledata.TransactionWithMetaData{ + Transaction: &rippledata.Payment{}, + MetaData: rippledata.MetaData{ + TransactionResult: failTxResult, + }, + } + cancel() return nil }) @@ -193,15 +187,13 @@ func TestXRPLTxObserver_Start(t *testing.T) { xrplAccountTxScannerMock := NewMockXRPLAccountTxScanner(ctrl) xrplAccountTxScannerMock.EXPECT().ScanTxs(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, ch chan<- rippledata.TransactionWithMetaData) error { - go func() { - ch <- rippledata.TransactionWithMetaData{ - Transaction: &rippledata.Payment{}, - MetaData: rippledata.MetaData{ - TransactionResult: notTxResult, - }, - } - cancel() - }() + ch <- rippledata.TransactionWithMetaData{ + Transaction: &rippledata.Payment{}, + MetaData: rippledata.MetaData{ + TransactionResult: notTxResult, + }, + } + cancel() return nil }) @@ -214,16 +206,14 @@ func TestXRPLTxObserver_Start(t *testing.T) { xrplAccountTxScannerMock := NewMockXRPLAccountTxScanner(ctrl) xrplAccountTxScannerMock.EXPECT().ScanTxs(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, ch chan<- rippledata.TransactionWithMetaData) error { - go func() { - ch <- rippledata.TransactionWithMetaData{ - Transaction: &rippledata.TrustSet{ - TxBase: rippledata.TxBase{ - TransactionType: rippledata.TRUST_SET, - }, + ch <- rippledata.TransactionWithMetaData{ + Transaction: &rippledata.TrustSet{ + TxBase: rippledata.TxBase{ + TransactionType: rippledata.TRUST_SET, }, - } - cancel() - }() + }, + } + cancel() return nil }) @@ -236,19 +226,17 @@ func TestXRPLTxObserver_Start(t *testing.T) { xrplAccountTxScannerMock := NewMockXRPLAccountTxScanner(ctrl) xrplAccountTxScannerMock.EXPECT().ScanTxs(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, ch chan<- rippledata.TransactionWithMetaData) error { - go func() { - ch <- rippledata.TransactionWithMetaData{ - Transaction: &rippledata.TicketCreate{ - TxBase: rippledata.TxBase{ - Account: bridgeXRPLAddress, - Sequence: 5, - TransactionType: rippledata.TICKET_CREATE, - }, + ch <- rippledata.TransactionWithMetaData{ + Transaction: &rippledata.TicketCreate{ + TxBase: rippledata.TxBase{ + Account: bridgeXRPLAddress, + Sequence: 5, + TransactionType: rippledata.TICKET_CREATE, }, - MetaData: createAllocatedTicketsMetaData([]uint32{3, 5, 7}), - } - cancel() - }() + }, + MetaData: createAllocatedTicketsMetaData([]uint32{3, 5, 7}), + } + cancel() return nil }) @@ -278,19 +266,17 @@ func TestXRPLTxObserver_Start(t *testing.T) { xrplAccountTxScannerMock := NewMockXRPLAccountTxScanner(ctrl) xrplAccountTxScannerMock.EXPECT().ScanTxs(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, ch chan<- rippledata.TransactionWithMetaData) error { - go func() { - ch <- rippledata.TransactionWithMetaData{ - Transaction: &rippledata.TicketCreate{ - TxBase: rippledata.TxBase{ - Account: bridgeXRPLAddress, - TransactionType: rippledata.TICKET_CREATE, - }, - TicketSequence: lo.ToPtr(uint32(11)), + ch <- rippledata.TransactionWithMetaData{ + Transaction: &rippledata.TicketCreate{ + TxBase: rippledata.TxBase{ + Account: bridgeXRPLAddress, + TransactionType: rippledata.TICKET_CREATE, }, - MetaData: createAllocatedTicketsMetaData([]uint32{3, 5, 7}), - } - cancel() - }() + TicketSequence: lo.ToPtr(uint32(11)), + }, + MetaData: createAllocatedTicketsMetaData([]uint32{3, 5, 7}), + } + cancel() return nil }) @@ -320,21 +306,19 @@ func TestXRPLTxObserver_Start(t *testing.T) { xrplAccountTxScannerMock := NewMockXRPLAccountTxScanner(ctrl) xrplAccountTxScannerMock.EXPECT().ScanTxs(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, ch chan<- rippledata.TransactionWithMetaData) error { - go func() { - ch <- rippledata.TransactionWithMetaData{ - Transaction: &rippledata.TicketCreate{ - TxBase: rippledata.TxBase{ - Account: bridgeXRPLAddress, - Sequence: 5, - TransactionType: rippledata.TICKET_CREATE, - }, + ch <- rippledata.TransactionWithMetaData{ + Transaction: &rippledata.TicketCreate{ + TxBase: rippledata.TxBase{ + Account: bridgeXRPLAddress, + Sequence: 5, + TransactionType: rippledata.TICKET_CREATE, }, - MetaData: rippledata.MetaData{ - TransactionResult: failTxResult, - }, - } - cancel() - }() + }, + MetaData: rippledata.MetaData{ + TransactionResult: failTxResult, + }, + } + cancel() return nil }) @@ -364,19 +348,17 @@ func TestXRPLTxObserver_Start(t *testing.T) { xrplAccountTxScannerMock := NewMockXRPLAccountTxScanner(ctrl) xrplAccountTxScannerMock.EXPECT().ScanTxs(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, ch chan<- rippledata.TransactionWithMetaData) error { - go func() { - ch <- rippledata.TransactionWithMetaData{ - Transaction: &rippledata.TrustSet{ - TxBase: rippledata.TxBase{ - Account: bridgeXRPLAddress, - TransactionType: rippledata.TRUST_SET, - }, - LimitAmount: xrplOriginatedTokenXRPLAmount, - TicketSequence: lo.ToPtr(uint32(11)), + ch <- rippledata.TransactionWithMetaData{ + Transaction: &rippledata.TrustSet{ + TxBase: rippledata.TxBase{ + Account: bridgeXRPLAddress, + TransactionType: rippledata.TRUST_SET, }, - } - cancel() - }() + LimitAmount: xrplOriginatedTokenXRPLAmount, + TicketSequence: lo.ToPtr(uint32(11)), + }, + } + cancel() return nil }) @@ -407,22 +389,20 @@ func TestXRPLTxObserver_Start(t *testing.T) { xrplAccountTxScannerMock := NewMockXRPLAccountTxScanner(ctrl) xrplAccountTxScannerMock.EXPECT().ScanTxs(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, ch chan<- rippledata.TransactionWithMetaData) error { - go func() { - ch <- rippledata.TransactionWithMetaData{ - Transaction: &rippledata.TrustSet{ - TxBase: rippledata.TxBase{ - Account: bridgeXRPLAddress, - TransactionType: rippledata.TRUST_SET, - }, - LimitAmount: xrplOriginatedTokenXRPLAmount, - TicketSequence: lo.ToPtr(uint32(11)), - }, - MetaData: rippledata.MetaData{ - TransactionResult: failTxResult, + ch <- rippledata.TransactionWithMetaData{ + Transaction: &rippledata.TrustSet{ + TxBase: rippledata.TxBase{ + Account: bridgeXRPLAddress, + TransactionType: rippledata.TRUST_SET, }, - } - cancel() - }() + LimitAmount: xrplOriginatedTokenXRPLAmount, + TicketSequence: lo.ToPtr(uint32(11)), + }, + MetaData: rippledata.MetaData{ + TransactionResult: failTxResult, + }, + } + cancel() return nil }) @@ -453,20 +433,18 @@ func TestXRPLTxObserver_Start(t *testing.T) { xrplAccountTxScannerMock := NewMockXRPLAccountTxScanner(ctrl) xrplAccountTxScannerMock.EXPECT().ScanTxs(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, ch chan<- rippledata.TransactionWithMetaData) error { - go func() { - ch <- rippledata.TransactionWithMetaData{ - Transaction: &rippledata.Payment{ - TxBase: rippledata.TxBase{ - Account: bridgeXRPLAddress, - TransactionType: rippledata.PAYMENT, - }, - Destination: recipientXRPLAddress, - Amount: xrplOriginatedTokenXRPLAmount, - TicketSequence: lo.ToPtr(uint32(11)), + ch <- rippledata.TransactionWithMetaData{ + Transaction: &rippledata.Payment{ + TxBase: rippledata.TxBase{ + Account: bridgeXRPLAddress, + TransactionType: rippledata.PAYMENT, }, - } - cancel() - }() + Destination: recipientXRPLAddress, + Amount: xrplOriginatedTokenXRPLAmount, + TicketSequence: lo.ToPtr(uint32(11)), + }, + } + cancel() return nil }) @@ -495,23 +473,21 @@ func TestXRPLTxObserver_Start(t *testing.T) { xrplAccountTxScannerMock := NewMockXRPLAccountTxScanner(ctrl) xrplAccountTxScannerMock.EXPECT().ScanTxs(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, ch chan<- rippledata.TransactionWithMetaData) error { - go func() { - ch <- rippledata.TransactionWithMetaData{ - Transaction: &rippledata.Payment{ - TxBase: rippledata.TxBase{ - Account: bridgeXRPLAddress, - TransactionType: rippledata.PAYMENT, - }, - Destination: recipientXRPLAddress, - Amount: xrplOriginatedTokenXRPLAmount, - TicketSequence: lo.ToPtr(uint32(11)), + ch <- rippledata.TransactionWithMetaData{ + Transaction: &rippledata.Payment{ + TxBase: rippledata.TxBase{ + Account: bridgeXRPLAddress, + TransactionType: rippledata.PAYMENT, }, - MetaData: rippledata.MetaData{ - TransactionResult: failTxResult, - }, - } - cancel() - }() + Destination: recipientXRPLAddress, + Amount: xrplOriginatedTokenXRPLAmount, + TicketSequence: lo.ToPtr(uint32(11)), + }, + MetaData: rippledata.MetaData{ + TransactionResult: failTxResult, + }, + } + cancel() return nil }) @@ -540,17 +516,15 @@ func TestXRPLTxObserver_Start(t *testing.T) { xrplAccountTxScannerMock := NewMockXRPLAccountTxScanner(ctrl) xrplAccountTxScannerMock.EXPECT().ScanTxs(gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, ch chan<- rippledata.TransactionWithMetaData) error { - go func() { - ch <- rippledata.TransactionWithMetaData{ - Transaction: &rippledata.TrustSet{ - TxBase: rippledata.TxBase{ - Account: bridgeXRPLAddress, - TransactionType: rippledata.NFTOKEN_CREATE_OFFER, - }, + ch <- rippledata.TransactionWithMetaData{ + Transaction: &rippledata.TrustSet{ + TxBase: rippledata.TxBase{ + Account: bridgeXRPLAddress, + TransactionType: rippledata.NFTOKEN_CREATE_OFFER, }, - } - cancel() - }() + }, + } + cancel() return nil }) @@ -567,7 +541,14 @@ func TestXRPLTxObserver_Start(t *testing.T) { t.Cleanup(cancel) ctrl := gomock.NewController(t) + logMock := logger.NewAnyLogMock(ctrl) + + parallelLoggerMock := logger.NewMockParallelLogger(ctrl) + logMock.EXPECT().ParallelLogger(gomock.Any()).Return(parallelLoggerMock).AnyTimes() + parallelLoggerMock.EXPECT().Debug(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + parallelLoggerMock.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + var contractClient processes.ContractClient if tt.contractClientBuilder != nil { contractClient = tt.contractClientBuilder(ctrl) @@ -581,7 +562,7 @@ func TestXRPLTxObserver_Start(t *testing.T) { tt.txScannerBuilder(ctrl, cancel), contractClient, ) - require.ErrorIs(t, o.Start(ctx), context.Canceled) + require.NoError(t, o.Start(ctx)) }) } } diff --git a/relayer/xrpl/scanner.go b/relayer/xrpl/scanner.go index 1f7cb252..e7f869e1 100644 --- a/relayer/xrpl/scanner.go +++ b/relayer/xrpl/scanner.go @@ -73,26 +73,28 @@ func (s *AccountScanner) ScanTxs(ctx context.Context, ch chan<- rippledata.Trans return errors.Errorf("both recent and full scans are disabled") } - pg := parallel.NewGroup(ctx, parallel.WithGroupLogger(s.log.ParallelLogger(ctx))) - if s.cfg.RecentScanEnabled { - currentLedgerRes, err := s.rpcTxProvider.LedgerCurrent(ctx) - if err != nil { - return err + return parallel.Run(ctx, func(ctx context.Context, spawn parallel.SpawnFn) error { + if s.cfg.RecentScanEnabled { + currentLedgerRes, err := s.rpcTxProvider.LedgerCurrent(ctx) + if err != nil { + return err + } + currentLedger := currentLedgerRes.LedgerCurrentIndex + spawn("recent-history-scanner", parallel.Continue, func(ctx context.Context) error { + s.scanRecentHistory(ctx, currentLedger, ch) + return nil + }) } - currentLedger := currentLedgerRes.LedgerCurrentIndex - pg.Spawn("recent-history-scanner", parallel.Continue, func(ctx context.Context) error { - s.scanRecentHistory(ctx, currentLedger, ch) - return nil - }) - } - if s.cfg.FullScanEnabled { - pg.Spawn("full-history-scanner", parallel.Continue, func(ctx context.Context) error { - s.scanFullHistory(ctx, ch) - return nil - }) - } - return nil + if s.cfg.FullScanEnabled { + spawn("full-history-scanner", parallel.Continue, func(ctx context.Context) error { + s.scanFullHistory(ctx, ch) + return nil + }) + } + + return nil + }, parallel.WithGroupLogger(s.log.ParallelLogger(ctx))) } func (s *AccountScanner) scanRecentHistory(ctx context.Context, currentLedger int64, ch chan<- rippledata.TransactionWithMetaData) { @@ -163,7 +165,12 @@ func (s *AccountScanner) scanTransactions(ctx context.Context, minLedger int64, if tx == nil { continue } - ch <- *tx + select { + case <-ctx.Done(): + return lastLedger + default: + ch <- *tx + } } } if len(accountTxResult.Marker) == 0 { diff --git a/relayer/xrpl/scanner_test.go b/relayer/xrpl/scanner_test.go index 829c3cd0..567abdc2 100644 --- a/relayer/xrpl/scanner_test.go +++ b/relayer/xrpl/scanner_test.go @@ -3,6 +3,7 @@ package xrpl_test import ( "context" "encoding/hex" + "reflect" "strings" "testing" "time" @@ -14,6 +15,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" + "github.com/CoreumFoundation/coreum-tools/pkg/parallel" "github.com/CoreumFoundation/coreumbridge-xrpl/relayer/logger" "github.com/CoreumFoundation/coreumbridge-xrpl/relayer/xrpl" ) @@ -21,10 +23,6 @@ import ( func TestAccountScanner_ScanTxs(t *testing.T) { t.Parallel() - // set the time to prevent infinite test - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - t.Cleanup(cancel) - account := xrpl.GenPrivKeyTxSigner().Account() notEmptyMarker := map[string]any{"key": "val"} @@ -93,7 +91,7 @@ func TestAccountScanner_ScanTxs(t *testing.T) { rpcTxProvider: func(ctrl *gomock.Controller) xrpl.RPCTxProvider { mockedProvider := NewMockRPCTxProvider(ctrl) - mockedProvider.EXPECT().LedgerCurrent(ctx).Return(xrpl.LedgerCurrentResult{ + mockedProvider.EXPECT().LedgerCurrent(gomock.Any()).Return(xrpl.LedgerCurrentResult{ LedgerCurrentIndex: 100, }, nil) @@ -170,17 +168,25 @@ func TestAccountScanner_ScanTxs(t *testing.T) { s := xrpl.NewAccountScanner(tt.cfg, logger.NewZapLoggerFromLogger(zapDevLogger), rpcTxProvider) txsCh := make(chan rippledata.TransactionWithMetaData) - if err := s.ScanTxs(ctx, txsCh); (err != nil) != tt.wantErr { - t.Errorf("ScanTxs() error = %+v, wantErr %+v", err, tt.wantErr) - } - if len(tt.wantTxHashes) == 0 { - return - } - // validate that we have received expected hashes - gotTxHashes := readTxHashesFromChannels(ctx, t, txsCh, len(tt.wantTxHashes)) - require.Equal(t, lo.SliceToMap(tt.wantTxHashes, func(hash string) (string, struct{}) { - return hash, struct{}{} - }), gotTxHashes) + + ctx := context.Background() + require.NoError(t, parallel.Run(ctx, func(ctx context.Context, spawn parallel.SpawnFn) error { + spawn("scan", parallel.Continue, func(ctx context.Context) error { + return s.ScanTxs(ctx, txsCh) + }) + spawn("read", parallel.Exit, func(ctx context.Context) error { + // validate that we have received expected hashes + gotTxHashes := readTxHashesFromChannels(ctx, t, txsCh, len(tt.wantTxHashes)) + expectedTxHashes := lo.SliceToMap(tt.wantTxHashes, func(hash string) (string, struct{}) { + return hash, struct{}{} + }) + if !reflect.DeepEqual(expectedTxHashes, gotTxHashes) { + return errors.Errorf("expectec tx hashes:%v, got:%v", expectedTxHashes, gotTxHashes) + } + return nil + }) + return nil + })) }) } } From d0ed1cd8adecc7e2cac4571c882a0ac4431b9fd2 Mon Sep 17 00:00:00 2001 From: Dzmitry Hil Date: Fri, 8 Dec 2023 15:07:40 +0300 Subject: [PATCH 5/6] Address comments --- integration-tests/xrpl/scanner_test.go | 3 +-- relayer/processes/xrpl_tx_observer.go | 4 ++-- relayer/xrpl/scanner.go | 3 +-- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/integration-tests/xrpl/scanner_test.go b/integration-tests/xrpl/scanner_test.go index 4a6eac46..f2eace59 100644 --- a/integration-tests/xrpl/scanner_test.go +++ b/integration-tests/xrpl/scanner_test.go @@ -174,8 +174,7 @@ func validateTxsHashesInChannel(ctx context.Context, writtenTxHashes map[string] case tx := <-txsCh: // validate that we have all sent hashed and no duplicated hash := tx.GetHash().String() - _, found := expectedHashes[hash] - if !found { + if _, found := expectedHashes[hash]; !found { return errors.Errorf("not found expected tx hash:%s", hash) } diff --git a/relayer/processes/xrpl_tx_observer.go b/relayer/processes/xrpl_tx_observer.go index a0848369..8eaa2ea4 100644 --- a/relayer/processes/xrpl_tx_observer.go +++ b/relayer/processes/xrpl_tx_observer.go @@ -68,7 +68,7 @@ func (o *XRPLTxObserver) Start(ctx context.Context) error { defer close(txCh) return o.txScanner.ScanTxs(ctx, txCh) }) - spawn("tx-processor", parallel.Continue, func(ctx context.Context) error { + spawn("tx-processor", parallel.Fail, func(ctx context.Context) error { for tx := range txCh { if err := o.processTx(ctx, tx); err != nil { if errors.Is(err, context.Canceled) { @@ -78,7 +78,7 @@ func (o *XRPLTxObserver) Start(ctx context.Context) error { } } } - return nil + return errors.WithStack(ctx.Err()) }) return nil diff --git a/relayer/xrpl/scanner.go b/relayer/xrpl/scanner.go index e7f869e1..bcd80538 100644 --- a/relayer/xrpl/scanner.go +++ b/relayer/xrpl/scanner.go @@ -168,8 +168,7 @@ func (s *AccountScanner) scanTransactions(ctx context.Context, minLedger int64, select { case <-ctx.Done(): return lastLedger - default: - ch <- *tx + case ch <- *tx: } } } From 5710fa21af1ebe109194e112e46400d545a96ab6 Mon Sep 17 00:00:00 2001 From: Dzmitry Hil Date: Mon, 11 Dec 2023 14:39:06 +0300 Subject: [PATCH 6/6] Fix master merge conflicts --- contract/src/operation.rs | 2 +- integration-tests/processes/send_test.go | 2 +- integration-tests/xrpl/scanner_test.go | 5 +++-- relayer/logger/zap.go | 3 ++- relayer/processes/xrpl_tx_observer_test.go | 2 +- 5 files changed, 8 insertions(+), 6 deletions(-) diff --git a/contract/src/operation.rs b/contract/src/operation.rs index 3550ac25..798a3820 100644 --- a/contract/src/operation.rs +++ b/contract/src/operation.rs @@ -143,7 +143,7 @@ pub fn handle_coreum_to_xrpl_transfer_confirmation( match COREUM_TOKENS .idx .xrpl_currency - .item(storage, currency.to_owned())? + .item(storage, currency)? .map(|(_, ct)| ct) { Some(token) => { diff --git a/integration-tests/processes/send_test.go b/integration-tests/processes/send_test.go index 1f340e92..860f575c 100644 --- a/integration-tests/processes/send_test.go +++ b/integration-tests/processes/send_test.go @@ -138,7 +138,7 @@ func TestSendXRPTokenFromXRPLToCoreumAndBack(t *testing.T) { envCfg := DefaultRunnerEnvConfig() runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains) - runnerEnv.StartAllRunnerProcesses(ctx, t) + runnerEnv.StartAllRunnerProcesses() runnerEnv.AllocateTickets(ctx, t, uint32(200)) coreumSender := chains.Coreum.GenAccount() diff --git a/integration-tests/xrpl/scanner_test.go b/integration-tests/xrpl/scanner_test.go index 6c14edff..a6847789 100644 --- a/integration-tests/xrpl/scanner_test.go +++ b/integration-tests/xrpl/scanner_test.go @@ -114,11 +114,12 @@ func TestRecentHistoryScanAccountTx(t *testing.T) { spawn("scan", parallel.Continue, func(ctx context.Context) error { return scanner.ScanTxs(ctx, txsCh) }) - spawn("write", parallel.Continue, func(ctx context.Context) error { + // write and exist + spawn("write", parallel.Exit, func(ctx context.Context) error { writtenTxHashes = sendMultipleTxs(ctx, t, chains.XRPL, txsCount, senderAcc, recipientAcc) return nil }) - spawn("wait", parallel.Exit, func(ctx context.Context) error { + spawn("wait", parallel.Continue, func(ctx context.Context) error { t.Logf("Waiting for %d transactions to be scanned by the scanner", txsCount) for tx := range txsCh { receivedTxHashes[tx.GetHash().String()] = struct{}{} diff --git a/relayer/logger/zap.go b/relayer/logger/zap.go index d142e391..5e39ce6e 100644 --- a/relayer/logger/zap.go +++ b/relayer/logger/zap.go @@ -26,7 +26,8 @@ var _ ParallelLogger = &ParallelZapLogger{} // ParallelZapLogger is parallel zap logger. type ParallelZapLogger struct { - ctx context.Context //nolint:containedctx // the design depends on the parallel logger design where the ctx is set similar + //nolint:containedctx // the design depends on the parallel logger design where the ctx is set similar + ctx context.Context zapLog *ZapLogger } diff --git a/relayer/processes/xrpl_tx_observer_test.go b/relayer/processes/xrpl_tx_observer_test.go index 6a27af81..f95bace4 100644 --- a/relayer/processes/xrpl_tx_observer_test.go +++ b/relayer/processes/xrpl_tx_observer_test.go @@ -556,7 +556,7 @@ func TestXRPLTxObserver_Start(t *testing.T) { tt.txScannerBuilder(ctrl, cancel), contractClient, ) - require.NoError(t, o.Start(ctx)) + require.ErrorIs(t, o.Start(ctx), context.Canceled) }) } }