Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dzmitryhil committed Dec 7, 2023
1 parent 6f90f07 commit ebba217
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 234 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ lint-contract:
test-integration:
# test each directory separately to prevent faucet concurrent access
for d in $(INTEGRATION_TESTS_DIR)/*/; \
do cd $$d && go clean -testcache && go test -v --tags=integrationtests -mod=readonly -parallel=4 ./... || exit 1; \
do cd $$d && go clean -testcache && go test -v --tags=integrationtests -mod=readonly -parallel=10 -timeout 5m ./... || exit 1; \
done

.PHONY: test-relayer
test-relayer:
cd $(RELAYER_DIR) && go clean -testcache && go test -v -mod=readonly -parallel=4 -timeout 500ms ./...
cd $(RELAYER_DIR) && go clean -testcache && go test -v -mod=readonly -parallel=10 -timeout 500ms ./...

.PHONY: test-contract
test-contract:
Expand Down
83 changes: 42 additions & 41 deletions integration-tests/xrpl/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"testing"
"time"

"github.com/pkg/errors"
rippledata "github.com/rubblelabs/ripple/data"
"github.com/stretchr/testify/require"

"github.com/CoreumFoundation/coreum-tools/pkg/http"
"github.com/CoreumFoundation/coreum-tools/pkg/parallel"
integrationtests "github.com/CoreumFoundation/coreumbridge-xrpl/integration-tests"
"github.com/CoreumFoundation/coreumbridge-xrpl/relayer/xrpl"
)
Expand Down Expand Up @@ -52,9 +54,18 @@ func TestFullHistoryScanAccountTx(t *testing.T) {
// add timeout to finish the tests in case of error

txsCh := make(chan rippledata.TransactionWithMetaData, txsCount)
require.NoError(t, scanner.ScanTxs(ctx, txsCh))

t.Logf("Waiting for %d transactions to be scanned by the historycal scanner", len(writtenTxHashes))
validateTxsHashesInChannel(ctx, t, writtenTxHashes, txsCh)

require.NoError(t, parallel.Run(ctx, func(ctx context.Context, spawn parallel.SpawnFn) error {
spawn("scan", parallel.Continue, func(ctx context.Context) error {
return scanner.ScanTxs(ctx, txsCh)
})
spawn("read", parallel.Exit, func(ctx context.Context) error {
return validateTxsHashesInChannel(ctx, writtenTxHashes, txsCh)
})
return nil
}))
}

func TestRecentHistoryScanAccountTx(t *testing.T) {
Expand Down Expand Up @@ -93,29 +104,33 @@ func TestRecentHistoryScanAccountTx(t *testing.T) {
// await for the state when the current ledger is valid to run the scanner
currentLedger, err := chains.XRPL.RPCClient().LedgerCurrent(ctx)
require.NoError(t, err)
chains.XRPL.AwaitLedger(ctx, t,
currentLedger.LedgerCurrentIndex+
scannerCfg.RecentScanWindow)
chains.XRPL.AwaitLedger(ctx, t, currentLedger.LedgerCurrentIndex+scannerCfg.RecentScanWindow)

var writtenTxHashes map[string]struct{}
writeDone := make(chan struct{})
go func() {
defer close(writeDone)
writtenTxHashes = sendMultipleTxs(ctx, t, chains.XRPL, 20, senderAcc, recipientAcc)
}()
receivedTxHashes := make(map[string]struct{})

txsCh := make(chan rippledata.TransactionWithMetaData, txsCount)
require.NoError(t, scanner.ScanTxs(ctx, txsCh))

t.Logf("Waiting for %d transactions to be scanned by the recent scanner", len(writtenTxHashes))
receivedTxHashes := getTxHashesFromChannel(ctx, t, txsCh, txsCount)
require.NoError(t, parallel.Run(ctx, func(ctx context.Context, spawn parallel.SpawnFn) error {
spawn("scan", parallel.Continue, func(ctx context.Context) error {
return scanner.ScanTxs(ctx, txsCh)
})
spawn("write", parallel.Continue, func(ctx context.Context) error {
writtenTxHashes = sendMultipleTxs(ctx, t, chains.XRPL, txsCount, senderAcc, recipientAcc)
return nil
})
spawn("wait", parallel.Exit, func(ctx context.Context) error {
t.Logf("Waiting for %d transactions to be scanned by the scanner", txsCount)
for tx := range txsCh {
receivedTxHashes[tx.GetHash().String()] = struct{}{}
if len(receivedTxHashes) == txsCount {
return nil
}
}
return nil
})
return nil
}))

// wait for the writing to be done
select {
case <-ctx.Done():
t.FailNow()
case <-writeDone:
}
require.Equal(t, writtenTxHashes, receivedTxHashes)
}

Expand Down Expand Up @@ -144,7 +159,7 @@ func sendMultipleTxs(
return writtenTxHashes
}

func validateTxsHashesInChannel(ctx context.Context, t *testing.T, writtenTxHashes map[string]struct{}, txsCh chan rippledata.TransactionWithMetaData) {
func validateTxsHashesInChannel(ctx context.Context, writtenTxHashes map[string]struct{}, txsCh chan rippledata.TransactionWithMetaData) error {
scanCtx, scanCtxCancel := context.WithTimeout(ctx, time.Minute)
defer scanCtxCancel()
// copy the original map
Expand All @@ -155,32 +170,18 @@ func validateTxsHashesInChannel(ctx context.Context, t *testing.T, writtenTxHash
for {
select {
case <-scanCtx.Done():
t.Fail()
return scanCtx.Err()
case tx := <-txsCh:
// validate that we have all sent hashed and no duplicated
hash := tx.GetHash().String()
_, found := expectedHashes[hash]
require.True(t, found)
delete(expectedHashes, hash)
if len(expectedHashes) == 0 {
return
if !found {
return errors.Errorf("not found expected tx hash:%s", hash)
}
}
}
}

func getTxHashesFromChannel(ctx context.Context, t *testing.T, txsCh chan rippledata.TransactionWithMetaData, count int) map[string]struct{} {
scanCtx, scanCtxCancel := context.WithTimeout(ctx, time.Minute)
defer scanCtxCancel()
txHashes := make(map[string]struct{}, count)
for {
select {
case <-scanCtx.Done():
t.Fail()
case tx := <-txsCh:
txHashes[tx.GetHash().String()] = struct{}{}
if len(txHashes) == count {
return txHashes
delete(expectedHashes, hash)
if len(expectedHashes) == 0 {
return nil
}
}
}
Expand Down
35 changes: 19 additions & 16 deletions relayer/processes/xrpl_tx_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
rippledata "github.com/rubblelabs/ripple/data"
"github.com/samber/lo"

"github.com/CoreumFoundation/coreum-tools/pkg/parallel"
"github.com/CoreumFoundation/coreumbridge-xrpl/relayer/coreum"
"github.com/CoreumFoundation/coreumbridge-xrpl/relayer/logger"
"github.com/CoreumFoundation/coreumbridge-xrpl/relayer/tracing"
Expand Down Expand Up @@ -62,24 +63,26 @@ func (o *XRPLTxObserver) Init(ctx context.Context) error {
// Start starts the process.
func (o *XRPLTxObserver) Start(ctx context.Context) error {
txCh := make(chan rippledata.TransactionWithMetaData)
if err := o.txScanner.ScanTxs(ctx, txCh); err != nil {
return err
}

for {
select {
case <-ctx.Done():
return errors.WithStack(ctx.Err())
case tx := <-txCh:
if err := o.processTx(ctx, tx); err != nil {
if errors.Is(err, context.Canceled) {
o.log.Warn(ctx, "Context canceled during the XRPL tx processing", logger.StringField("error", err.Error()))
} else {
return errors.Wrapf(err, "failed to process XRPL tx, txHash:%s", tx.GetHash().String())
return parallel.Run(ctx, func(ctx context.Context, spawn parallel.SpawnFn) error {
spawn("tx-scanner", parallel.Continue, func(ctx context.Context) error {
defer close(txCh)
return o.txScanner.ScanTxs(ctx, txCh)
})
spawn("tx-processor", parallel.Continue, func(ctx context.Context) error {
for tx := range txCh {
if err := o.processTx(ctx, tx); err != nil {
if errors.Is(err, context.Canceled) {
o.log.Warn(ctx, "Context canceled during the XRPL tx processing", logger.StringField("error", err.Error()))
} else {
return errors.Wrapf(err, "failed to process XRPL tx, txHash:%s", tx.GetHash().String())
}
}
}
}
}
return nil
})

return nil
}, parallel.WithGroupLogger(o.log.ParallelLogger(ctx)))
}

func (o *XRPLTxObserver) processTx(ctx context.Context, tx rippledata.TransactionWithMetaData) error {
Expand Down
Loading

0 comments on commit ebba217

Please sign in to comment.