Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate coreum parallel lib #55

Merged
merged 7 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integration-tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ replace (
)

require (
github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b
github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872
github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd
github.com/CoreumFoundation/coreumbridge-xrpl/relayer v1.0.0
github.com/rubblelabs/ripple v0.0.0-20230908201244-7f73b1fe5e22
Expand Down
4 changes: 2 additions & 2 deletions integration-tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d h1:nalkkPQcITbvhmL4+C4cKA87NW0tfm3Kl9VXRoPywFg=
github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d/go.mod h1:URdX5+vg25ts3aCh8H5IFZybJYKWhJHYMTnf+ULtoC4=
github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b h1:nSNvOe9oRVl0Ijph3u/e1nZi7j4vGhYyTI3NVbPLdXI=
github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b/go.mod h1:VD93vCHkxYaT/RhOesXTFgd/GQDW54tr0BqGi5JU1c0=
github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872 h1:cEkMNqZamgrLwUAqjhRcoF0h3Z/AZY6mp20H8N0P/98=
github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872/go.mod h1:VD93vCHkxYaT/RhOesXTFgd/GQDW54tr0BqGi5JU1c0=
github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd h1:NEwCGG9i6yjPY/avFTcrCDF16zvzIcvldnUR8AZtA7U=
github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd/go.mod h1:XTqILFqH1e0GF1bYEnu/I0mElsfwH5OWfu2F5DACIjY=
github.com/CosmWasm/wasmd v0.41.0 h1:fmwxSbwb50zZDcBaayYFRLIaSFca+EFld1WOaQi49jg=
Expand Down
91 changes: 37 additions & 54 deletions integration-tests/processes/env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ package processes_test

import (
"context"
"sync"
"fmt"
"strings"
"testing"
"time"

Expand All @@ -18,6 +19,7 @@ import (
"github.com/samber/lo"
"github.com/stretchr/testify/require"

"github.com/CoreumFoundation/coreum-tools/pkg/parallel"
"github.com/CoreumFoundation/coreum-tools/pkg/retry"
coreumapp "github.com/CoreumFoundation/coreum/v3/app"
coreumconfig "github.com/CoreumFoundation/coreum/v3/pkg/config"
Expand Down Expand Up @@ -56,18 +58,18 @@ func DefaultRunnerEnvConfig() RunnerEnvConfig {

// RunnerEnv is runner environment used for the integration tests.
type RunnerEnv struct {
Cfg RunnerEnvConfig
bridgeXRPLAddress rippledata.Account
ContractClient *coreum.ContractClient
Chains integrationtests.Chains
ContractOwner sdk.AccAddress
Runners []*runner.Runner
ProcessErrorsMu sync.RWMutex
ProcessErrors []error
Cfg RunnerEnvConfig
bridgeXRPLAddress rippledata.Account
ContractClient *coreum.ContractClient
Chains integrationtests.Chains
ContractOwner sdk.AccAddress
RunnersParallelGroup *parallel.Group
Runners []*runner.Runner
}

// NewRunnerEnv returns new instance of the RunnerEnv.
func NewRunnerEnv(ctx context.Context, t *testing.T, cfg RunnerEnvConfig, chains integrationtests.Chains) *RunnerEnv {
ctx, cancel := context.WithCancel(ctx)
relayerCoreumAddresses := genCoreumRelayers(
ctx,
t,
Expand Down Expand Up @@ -136,57 +138,45 @@ func NewRunnerEnv(ctx context.Context, t *testing.T, cfg RunnerEnvConfig, chains
}

runnerEnv := &RunnerEnv{
Cfg: cfg,
bridgeXRPLAddress: bridgeXRPLAddress,
ContractClient: contractClient,
Chains: chains,
ContractOwner: contractOwner,
Runners: runners,
ProcessErrorsMu: sync.RWMutex{},
ProcessErrors: make([]error, 0),
Cfg: cfg,
bridgeXRPLAddress: bridgeXRPLAddress,
ContractClient: contractClient,
Chains: chains,
ContractOwner: contractOwner,
RunnersParallelGroup: parallel.NewGroup(ctx),
Runners: runners,
}
t.Cleanup(func() {
runnerEnv.RequireNoErrors(t)
// we can cancel the context now and wait for the runner to stop gracefully
cancel()
err := runnerEnv.RunnersParallelGroup.Wait()
if err == nil || errors.Is(err, context.Canceled) {
return
}
// the client replies with that error in if the context is canceled at the time of the request,
// and the error is in the internal package, so we can't check the type
if strings.Contains(err.Error(), "context canceled") {
return
}

require.NoError(t, err, "Found unexpected runner process errors after the execution")
})

return runnerEnv
}

// StartAllRunnerProcesses starts all relayer processes.
func (r *RunnerEnv) StartAllRunnerProcesses(ctx context.Context, t *testing.T) {
errCh := make(chan error, len(r.Runners))
go func() {
for {
select {
case <-ctx.Done():
if !errors.Is(ctx.Err(), context.Canceled) {
r.ProcessErrorsMu.Lock()
r.ProcessErrors = append(r.ProcessErrors, ctx.Err())
r.ProcessErrorsMu.Unlock()
}
return
case err := <-errCh:
r.ProcessErrorsMu.Lock()
r.ProcessErrors = append(r.ProcessErrors, err)
r.ProcessErrorsMu.Unlock()
}
}
}()

for _, relayerRunner := range r.Runners {
go func(relayerRunner *runner.Runner) {
func (r *RunnerEnv) StartAllRunnerProcesses() {
for i := range r.Runners {
relayerRunner := r.Runners[i]
r.RunnersParallelGroup.Spawn(fmt.Sprintf("runner-%d", i), parallel.Exit, func(ctx context.Context) error {
// disable restart on error to handler unexpected errors
xrplTxObserverProcess := relayerRunner.Processes.XRPLTxObserver
xrplTxObserverProcess.IsRestartableOnError = false
xrplTxSubmitterProcess := relayerRunner.Processes.XRPLTxSubmitter
xrplTxSubmitterProcess.IsRestartableOnError = false

err := relayerRunner.Processor.StartProcesses(ctx, xrplTxObserverProcess, xrplTxSubmitterProcess)
if err != nil && !errors.Is(err, context.Canceled) {
t.Logf("Unexpected error on process start:%s", err)
errCh <- err
}
}(relayerRunner)
return relayerRunner.Processor.StartProcesses(ctx, xrplTxObserverProcess, xrplTxSubmitterProcess)
})
}
}

Expand Down Expand Up @@ -276,13 +266,6 @@ func (r *RunnerEnv) RegisterXRPLOriginatedToken(
return registeredXRPLToken
}

// RequireNoErrors check whether the runner err received runner errors.
func (r *RunnerEnv) RequireNoErrors(t *testing.T) {
r.ProcessErrorsMu.RLock()
defer r.ProcessErrorsMu.RUnlock()
require.Empty(t, r.ProcessErrors, "Found unexpected process errors after the execution")
}

// SendXRPLPaymentTx sends Payment transaction.
func (r *RunnerEnv) SendXRPLPaymentTx(
ctx context.Context,
Expand Down
14 changes: 7 additions & 7 deletions integration-tests/processes/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestSendXRPLOriginatedTokensFromXRPLToCoreumAndBack(t *testing.T) {

envCfg := DefaultRunnerEnvConfig()
runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains)
runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()
runnerEnv.AllocateTickets(ctx, t, uint32(200))

coreumSender := chains.Coreum.GenAccount()
Expand Down Expand Up @@ -95,7 +95,7 @@ func TestSendXRPLOriginatedTokenFromXRPLToCoreumWithMaliciousRelayer(t *testing.
envCfg := DefaultRunnerEnvConfig()
envCfg.MaliciousRelayerNumber = 1
runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains)
runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()
runnerEnv.AllocateTickets(ctx, t, uint32(200))

coreumSender := chains.Coreum.GenAccount()
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestSendXRPLOriginatedTokenFromXRPLToCoreumWithTicketsReallocation(t *testi
envCfg := DefaultRunnerEnvConfig()
envCfg.UsedTicketSequenceThreshold = 3
runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains)
runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()
runnerEnv.AllocateTickets(ctx, t, uint32(5))
sendingCount := 10

Expand Down Expand Up @@ -241,7 +241,7 @@ func TestSendXRPLOriginatedTokensFromXRPLToCoreumWithDifferentAmountAndPartialAm
})

// start relayers
runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()
// recover tickets so we can register tokens
runnerEnv.AllocateTickets(ctx, t, 200)

Expand Down Expand Up @@ -314,7 +314,7 @@ func TestRecoverXRPLOriginatedTokenRegistrationAndSendFromXRPLToCoreumAndBack(t

envCfg := DefaultRunnerEnvConfig()
runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains)
runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()
runnerEnv.AllocateTickets(ctx, t, uint32(200))

coreumSender := chains.Coreum.GenAccount()
Expand Down Expand Up @@ -422,7 +422,7 @@ func TestSendCoreumOriginatedTokenFromCoreumToXRPLAndBackWithDifferentAmountsAnd
runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains)

// start relayers
runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()
// recover tickets so we can register tokens
runnerEnv.AllocateTickets(ctx, t, 200)

Expand Down Expand Up @@ -546,7 +546,7 @@ func TestSendCoreumOriginatedTokenFromCoreumToXRPLAndBackWithMaliciousRelayer(t
runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains)

// start relayers
runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()
// recover tickets so we can register tokens
runnerEnv.AllocateTickets(ctx, t, 200)

Expand Down
10 changes: 5 additions & 5 deletions integration-tests/processes/ticket_allocation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestTicketsAllocationRecoveryWithAccountSequence(t *testing.T) {
require.NoError(t, err)
require.Empty(t, availableTickets)

runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()
chains.XRPL.FundAccountForTicketAllocation(ctx, t, runnerEnv.bridgeXRPLAddress, numberOfTicketsToAllocate)

bridgeXRPLAccountInfo, err := chains.XRPL.RPCClient().AccountInfo(ctx, runnerEnv.bridgeXRPLAddress)
Expand Down Expand Up @@ -57,7 +57,7 @@ func TestTicketsAllocationRecoveryWithRejection(t *testing.T) {
require.NoError(t, err)
require.Empty(t, availableTickets)

runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()
// we don't fund the contract for the tickets allocation to let the chain reject the allocation transaction

bridgeXRPLAccountInfo, err := chains.XRPL.RPCClient().AccountInfo(ctx, runnerEnv.bridgeXRPLAddress)
Expand Down Expand Up @@ -90,7 +90,7 @@ func TestTicketsAllocationRecoveryWithInvalidAccountSequence(t *testing.T) {
require.NoError(t, err)
require.Empty(t, availableTickets)

runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()
chains.XRPL.FundAccountForTicketAllocation(ctx, t, runnerEnv.bridgeXRPLAddress, numberOfTicketsToAllocate)

bridgeXRPLAccountInfo, err := chains.XRPL.RPCClient().AccountInfo(ctx, runnerEnv.bridgeXRPLAddress)
Expand Down Expand Up @@ -153,7 +153,7 @@ func TestTicketsAllocationRecoveryWithMaliciousRelayers(t *testing.T) {
require.NoError(t, err)
require.Empty(t, availableTickets)

runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()

chains.XRPL.FundAccountForTicketAllocation(ctx, t, runnerEnv.bridgeXRPLAddress, numberOfTicketsToAllocate)

Expand Down Expand Up @@ -184,7 +184,7 @@ func TestTicketsReAllocationByTheXRPLTokenRegistration(t *testing.T) {
envCfg.UsedTicketSequenceThreshold = 3
runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains)

runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()

// allocate first five tickets
numberOfTicketsToAllocate := uint32(5)
Expand Down
2 changes: 1 addition & 1 deletion relayer/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ replace (

require (
cosmossdk.io/math v1.1.2
github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b
github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872
github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd
github.com/CosmWasm/wasmd v0.41.0
github.com/cosmos/cosmos-sdk v0.47.5
Expand Down
4 changes: 2 additions & 2 deletions relayer/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d h1:nalkkPQcITbvhmL4+C4cKA87NW0tfm3Kl9VXRoPywFg=
github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d/go.mod h1:URdX5+vg25ts3aCh8H5IFZybJYKWhJHYMTnf+ULtoC4=
github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b h1:nSNvOe9oRVl0Ijph3u/e1nZi7j4vGhYyTI3NVbPLdXI=
github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b/go.mod h1:VD93vCHkxYaT/RhOesXTFgd/GQDW54tr0BqGi5JU1c0=
github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872 h1:cEkMNqZamgrLwUAqjhRcoF0h3Z/AZY6mp20H8N0P/98=
github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872/go.mod h1:VD93vCHkxYaT/RhOesXTFgd/GQDW54tr0BqGi5JU1c0=
github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd h1:NEwCGG9i6yjPY/avFTcrCDF16zvzIcvldnUR8AZtA7U=
github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd/go.mod h1:XTqILFqH1e0GF1bYEnu/I0mElsfwH5OWfu2F5DACIjY=
github.com/CosmWasm/wasmd v0.41.0 h1:fmwxSbwb50zZDcBaayYFRLIaSFca+EFld1WOaQi49jg=
Expand Down
15 changes: 14 additions & 1 deletion relayer/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,16 @@ import (
"context"

"go.uber.org/zap"

"github.com/CoreumFoundation/coreum-tools/pkg/parallel"
)

//go:generate mockgen -destination=mock.go -package=logger . Logger
//go:generate mockgen -destination=mock.go -package=logger . Logger,ParallelLogger

// ParallelLogger is parallel logger interface used mostly for mocks.
type ParallelLogger interface {
parallel.Logger
}

// A Field is a marshaling operation used to add a key-value pair to a logger's context. Most fields are lazily
// marshaled, so it's inexpensive to add fields to disabled debug-level log statements.
Expand All @@ -27,6 +34,7 @@ type Logger interface {
Info(ctx context.Context, msg string, fields ...Field)
Warn(ctx context.Context, msg string, fields ...Field)
Error(ctx context.Context, msg string, fields ...Field)
ParallelLogger(ctx context.Context) ParallelLogger
}

// AnyField takes a key and an arbitrary value and chooses the best way to represent them as a field, falling back to a
Expand Down Expand Up @@ -55,6 +63,11 @@ func Uint64Field(key string, value uint64) Field {
return convertZapFieldToField(zap.Uint64(key, value))
}

// ByteStringField constructs a field with the given key and value.
func ByteStringField(key string, value []byte) Field {
return convertZapFieldToField(zap.ByteString(key, value))
}

// Error is shorthand for the common idiom NamedError("error", err).
func Error(err error) Field {
return convertZapFieldToField(zap.Error(err))
Expand Down
Loading
Loading