Skip to content

Commit

Permalink
Integrate coreum parallel lib (#55)
Browse files Browse the repository at this point in the history
  • Loading branch information
dzmitryhil authored Dec 11, 2023
1 parent 81d44ff commit 3094ba0
Show file tree
Hide file tree
Showing 20 changed files with 479 additions and 343 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ lint-contract:
test-integration:
# test each directory separately to prevent faucet concurrent access
for d in $(INTEGRATION_TESTS_DIR)/*/; \
do cd $$d && go clean -testcache && go test -v --tags=integrationtests -mod=readonly -parallel=4 ./... || exit 1; \
do cd $$d && go clean -testcache && go test -v --tags=integrationtests -mod=readonly -parallel=10 -timeout 5m ./... || exit 1; \
done

.PHONY: test-relayer
test-relayer:
cd $(RELAYER_DIR) && go clean -testcache && go test -v -mod=readonly -parallel=4 -timeout 500ms ./...
cd $(RELAYER_DIR) && go clean -testcache && go test -v -mod=readonly -parallel=10 -timeout 500ms ./...

.PHONY: test-contract
test-contract:
Expand Down
2 changes: 1 addition & 1 deletion contract/src/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ pub fn handle_coreum_to_xrpl_transfer_confirmation(
match COREUM_TOKENS
.idx
.xrpl_currency
.item(storage, currency.to_owned())?
.item(storage, currency)?
.map(|(_, ct)| ct)
{
Some(token) => {
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ replace (
)

require (
github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b
github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872
github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd
github.com/CoreumFoundation/coreumbridge-xrpl/relayer v1.0.0
github.com/rubblelabs/ripple v0.0.0-20230908201244-7f73b1fe5e22
Expand Down
4 changes: 2 additions & 2 deletions integration-tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d h1:nalkkPQcITbvhmL4+C4cKA87NW0tfm3Kl9VXRoPywFg=
github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d/go.mod h1:URdX5+vg25ts3aCh8H5IFZybJYKWhJHYMTnf+ULtoC4=
github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b h1:nSNvOe9oRVl0Ijph3u/e1nZi7j4vGhYyTI3NVbPLdXI=
github.com/CoreumFoundation/coreum-tools v0.4.1-0.20230920110418-b30366f1b19b/go.mod h1:VD93vCHkxYaT/RhOesXTFgd/GQDW54tr0BqGi5JU1c0=
github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872 h1:cEkMNqZamgrLwUAqjhRcoF0h3Z/AZY6mp20H8N0P/98=
github.com/CoreumFoundation/coreum-tools v0.4.1-0.20231130105442-8b79a25db872/go.mod h1:VD93vCHkxYaT/RhOesXTFgd/GQDW54tr0BqGi5JU1c0=
github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd h1:NEwCGG9i6yjPY/avFTcrCDF16zvzIcvldnUR8AZtA7U=
github.com/CoreumFoundation/coreum/v3 v3.0.0-20231107070602-2ae43ed1f7cd/go.mod h1:XTqILFqH1e0GF1bYEnu/I0mElsfwH5OWfu2F5DACIjY=
github.com/CosmWasm/wasmd v0.41.0 h1:fmwxSbwb50zZDcBaayYFRLIaSFca+EFld1WOaQi49jg=
Expand Down
91 changes: 37 additions & 54 deletions integration-tests/processes/env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ package processes_test

import (
"context"
"sync"
"fmt"
"strings"
"testing"
"time"

Expand All @@ -18,6 +19,7 @@ import (
"github.com/samber/lo"
"github.com/stretchr/testify/require"

"github.com/CoreumFoundation/coreum-tools/pkg/parallel"
"github.com/CoreumFoundation/coreum-tools/pkg/retry"
coreumapp "github.com/CoreumFoundation/coreum/v3/app"
coreumconfig "github.com/CoreumFoundation/coreum/v3/pkg/config"
Expand Down Expand Up @@ -54,18 +56,18 @@ func DefaultRunnerEnvConfig() RunnerEnvConfig {

// RunnerEnv is runner environment used for the integration tests.
type RunnerEnv struct {
Cfg RunnerEnvConfig
bridgeXRPLAddress rippledata.Account
ContractClient *coreum.ContractClient
Chains integrationtests.Chains
ContractOwner sdk.AccAddress
Runners []*runner.Runner
ProcessErrorsMu sync.RWMutex
ProcessErrors []error
Cfg RunnerEnvConfig
bridgeXRPLAddress rippledata.Account
ContractClient *coreum.ContractClient
Chains integrationtests.Chains
ContractOwner sdk.AccAddress
RunnersParallelGroup *parallel.Group
Runners []*runner.Runner
}

// NewRunnerEnv returns new instance of the RunnerEnv.
func NewRunnerEnv(ctx context.Context, t *testing.T, cfg RunnerEnvConfig, chains integrationtests.Chains) *RunnerEnv {
ctx, cancel := context.WithCancel(ctx)
relayerCoreumAddresses := genCoreumRelayers(
ctx,
t,
Expand Down Expand Up @@ -134,57 +136,45 @@ func NewRunnerEnv(ctx context.Context, t *testing.T, cfg RunnerEnvConfig, chains
}

runnerEnv := &RunnerEnv{
Cfg: cfg,
bridgeXRPLAddress: bridgeXRPLAddress,
ContractClient: contractClient,
Chains: chains,
ContractOwner: contractOwner,
Runners: runners,
ProcessErrorsMu: sync.RWMutex{},
ProcessErrors: make([]error, 0),
Cfg: cfg,
bridgeXRPLAddress: bridgeXRPLAddress,
ContractClient: contractClient,
Chains: chains,
ContractOwner: contractOwner,
RunnersParallelGroup: parallel.NewGroup(ctx),
Runners: runners,
}
t.Cleanup(func() {
runnerEnv.RequireNoErrors(t)
// we can cancel the context now and wait for the runner to stop gracefully
cancel()
err := runnerEnv.RunnersParallelGroup.Wait()
if err == nil || errors.Is(err, context.Canceled) {
return
}
// the client replies with that error in if the context is canceled at the time of the request,
// and the error is in the internal package, so we can't check the type
if strings.Contains(err.Error(), "context canceled") {
return
}

require.NoError(t, err, "Found unexpected runner process errors after the execution")
})

return runnerEnv
}

// StartAllRunnerProcesses starts all relayer processes.
func (r *RunnerEnv) StartAllRunnerProcesses(ctx context.Context, t *testing.T) {
errCh := make(chan error, len(r.Runners))
go func() {
for {
select {
case <-ctx.Done():
if !errors.Is(ctx.Err(), context.Canceled) {
r.ProcessErrorsMu.Lock()
r.ProcessErrors = append(r.ProcessErrors, ctx.Err())
r.ProcessErrorsMu.Unlock()
}
return
case err := <-errCh:
r.ProcessErrorsMu.Lock()
r.ProcessErrors = append(r.ProcessErrors, err)
r.ProcessErrorsMu.Unlock()
}
}
}()

for _, relayerRunner := range r.Runners {
go func(relayerRunner *runner.Runner) {
func (r *RunnerEnv) StartAllRunnerProcesses() {
for i := range r.Runners {
relayerRunner := r.Runners[i]
r.RunnersParallelGroup.Spawn(fmt.Sprintf("runner-%d", i), parallel.Exit, func(ctx context.Context) error {
// disable restart on error to handler unexpected errors
xrplTxObserverProcess := relayerRunner.Processes.XRPLTxObserver
xrplTxObserverProcess.IsRestartableOnError = false
xrplTxSubmitterProcess := relayerRunner.Processes.XRPLTxSubmitter
xrplTxSubmitterProcess.IsRestartableOnError = false

err := relayerRunner.Processor.StartProcesses(ctx, xrplTxObserverProcess, xrplTxSubmitterProcess)
if err != nil && !errors.Is(err, context.Canceled) {
t.Logf("Unexpected error on process start:%s", err)
errCh <- err
}
}(relayerRunner)
return relayerRunner.Processor.StartProcesses(ctx, xrplTxObserverProcess, xrplTxSubmitterProcess)
})
}
}

Expand Down Expand Up @@ -285,13 +275,6 @@ func (r *RunnerEnv) RegisterXRPLOriginatedToken(
return registeredXRPLToken
}

// RequireNoErrors check whether the runner err received runner errors.
func (r *RunnerEnv) RequireNoErrors(t *testing.T) {
r.ProcessErrorsMu.RLock()
defer r.ProcessErrorsMu.RUnlock()
require.Empty(t, r.ProcessErrors, "Found unexpected process errors after the execution")
}

// SendXRPLPaymentTx sends Payment transaction.
func (r *RunnerEnv) SendXRPLPaymentTx(
ctx context.Context,
Expand Down
16 changes: 8 additions & 8 deletions integration-tests/processes/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestSendXRPLOriginatedTokensFromXRPLToCoreumAndBack(t *testing.T) {

envCfg := DefaultRunnerEnvConfig()
runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains)
runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()
runnerEnv.AllocateTickets(ctx, t, uint32(200))

coreumSender := chains.Coreum.GenAccount()
Expand Down Expand Up @@ -138,7 +138,7 @@ func TestSendXRPTokenFromXRPLToCoreumAndBack(t *testing.T) {

envCfg := DefaultRunnerEnvConfig()
runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains)
runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()
runnerEnv.AllocateTickets(ctx, t, uint32(200))

coreumSender := chains.Coreum.GenAccount()
Expand Down Expand Up @@ -220,7 +220,7 @@ func TestSendXRPLOriginatedTokenFromXRPLToCoreumWithMaliciousRelayer(t *testing.
envCfg := DefaultRunnerEnvConfig()
envCfg.MaliciousRelayerNumber = 1
runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains)
runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()
runnerEnv.AllocateTickets(ctx, t, uint32(200))

coreumSender := chains.Coreum.GenAccount()
Expand Down Expand Up @@ -302,7 +302,7 @@ func TestSendXRPLOriginatedTokenFromXRPLToCoreumWithTicketsReallocation(t *testi
envCfg := DefaultRunnerEnvConfig()
envCfg.UsedTicketSequenceThreshold = 3
runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains)
runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()
runnerEnv.AllocateTickets(ctx, t, uint32(5))
sendingCount := 10

Expand Down Expand Up @@ -430,7 +430,7 @@ func TestSendXRPLOriginatedTokensFromXRPLToCoreumWithDifferentAmountAndPartialAm
})

// start relayers
runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()
// recover tickets so we can register tokens
runnerEnv.AllocateTickets(ctx, t, 200)

Expand Down Expand Up @@ -561,7 +561,7 @@ func TestRecoverXRPLOriginatedTokenRegistrationAndSendFromXRPLToCoreumAndBack(t

envCfg := DefaultRunnerEnvConfig()
runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains)
runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()
runnerEnv.AllocateTickets(ctx, t, uint32(200))

coreumSender := chains.Coreum.GenAccount()
Expand Down Expand Up @@ -712,7 +712,7 @@ func TestSendCoreumOriginatedTokenFromCoreumToXRPLAndBackWithDifferentAmountsAnd
runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains)

// start relayers
runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()
// recover tickets so we can register tokens
runnerEnv.AllocateTickets(ctx, t, 200)

Expand Down Expand Up @@ -883,7 +883,7 @@ func TestSendCoreumOriginatedTokenFromCoreumToXRPLAndBackWithMaliciousRelayer(t
runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains)

// start relayers
runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()
// recover tickets so we can register tokens
runnerEnv.AllocateTickets(ctx, t, 200)

Expand Down
10 changes: 5 additions & 5 deletions integration-tests/processes/ticket_allocation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestTicketsAllocationRecoveryWithAccountSequence(t *testing.T) {
require.NoError(t, err)
require.Empty(t, availableTickets)

runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()
chains.XRPL.FundAccountForTicketAllocation(ctx, t, runnerEnv.bridgeXRPLAddress, numberOfTicketsToAllocate)

bridgeXRPLAccountInfo, err := chains.XRPL.RPCClient().AccountInfo(ctx, runnerEnv.bridgeXRPLAddress)
Expand Down Expand Up @@ -57,7 +57,7 @@ func TestTicketsAllocationRecoveryWithRejection(t *testing.T) {
require.NoError(t, err)
require.Empty(t, availableTickets)

runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()
// we don't fund the contract for the tickets allocation to let the chain reject the allocation transaction

bridgeXRPLAccountInfo, err := chains.XRPL.RPCClient().AccountInfo(ctx, runnerEnv.bridgeXRPLAddress)
Expand Down Expand Up @@ -90,7 +90,7 @@ func TestTicketsAllocationRecoveryWithInvalidAccountSequence(t *testing.T) {
require.NoError(t, err)
require.Empty(t, availableTickets)

runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()
chains.XRPL.FundAccountForTicketAllocation(ctx, t, runnerEnv.bridgeXRPLAddress, numberOfTicketsToAllocate)

bridgeXRPLAccountInfo, err := chains.XRPL.RPCClient().AccountInfo(ctx, runnerEnv.bridgeXRPLAddress)
Expand Down Expand Up @@ -153,7 +153,7 @@ func TestTicketsAllocationRecoveryWithMaliciousRelayers(t *testing.T) {
require.NoError(t, err)
require.Empty(t, availableTickets)

runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()

chains.XRPL.FundAccountForTicketAllocation(ctx, t, runnerEnv.bridgeXRPLAddress, numberOfTicketsToAllocate)

Expand Down Expand Up @@ -184,7 +184,7 @@ func TestTicketsReAllocationByTheXRPLTokenRegistration(t *testing.T) {
envCfg.UsedTicketSequenceThreshold = 3
runnerEnv := NewRunnerEnv(ctx, t, envCfg, chains)

runnerEnv.StartAllRunnerProcesses(ctx, t)
runnerEnv.StartAllRunnerProcesses()

// allocate first five tickets
numberOfTicketsToAllocate := uint32(5)
Expand Down
Loading

0 comments on commit 3094ba0

Please sign in to comment.