Skip to content

Commit

Permalink
trade_simnet_test fixes
Browse files Browse the repository at this point in the history
Allow spv wallet to sync before funding it.  Wait a bit after funding
it for it to filter the new blocks and update balance.

Add a sleepFactor that multiplies various test sleeps when the race
detector is enabled.  Unfortunately this is a very high level harness-
based tests where actual synchronization without short arbitrary
delays is very difficult.  Just modulate the delays.
  • Loading branch information
chappjc committed Nov 7, 2021
1 parent 8ac1498 commit f5835bd
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 25 deletions.
15 changes: 14 additions & 1 deletion client/asset/btc/spv.go
Expand Up @@ -780,7 +780,10 @@ func (w *spvWallet) swapConfirmations(txHash *chainhash.Hash, vout uint32, pkScr
var assumedMempool bool
switch err {
case WalletTransactionNotFound:
w.log.Tracef("swapConfirmations - WalletTransactionNotFound: %v:%d", txHash, vout)
case SpentStatusUnknown:
w.log.Tracef("swapConfirmations - SpentStatusUnknown: %v:%d (block %v, confs %d)",
txHash, vout, blockHash, confs)
if blockHash == nil {
// We generated this swap, but it probably hasn't been mined yet.
// It's SpentStatusUnknown because the wallet doesn't track the
Expand All @@ -804,13 +807,17 @@ func (w *spvWallet) swapConfirmations(txHash *chainhash.Hash, vout uint32, pkScr
}

// Our last option is neutrino.
w.log.Tracef("swapConfirmations - scanFilters: %v:%d (block %v, start time %v)",
txHash, vout, blockHash, startTime)
utxo, err := w.scanFilters(txHash, vout, pkScript, startTime, blockHash)
if err != nil {
return 0, false, err
}

if utxo.spend == nil && utxo.blockHash == nil {
if assumedMempool {
w.log.Tracef("swapConfirmations - scanFilters did not find %v:%d, assuming in mempool.",
txHash, vout)
return 0, false, nil
}
return 0, false, fmt.Errorf("output %s:%v not found with search parameters startTime = %s, pkScript = %x",
Expand Down Expand Up @@ -1273,6 +1280,8 @@ func (w *spvWallet) scanFilters(txHash *chainhash.Hash, vout uint32, pkScript []
// If we found a block, let's store a reference in our local database so we
// can maybe bypass a long search next time.
if utxo.blockHash != nil {
w.log.Debugf("cfilters scan SUCCEEDED for %v:%d. block hash: %v, spent: %v",
txHash, vout, utxo.blockHash, utxo.spend != nil)
w.storeTxBlock(*txHash, *utxo.blockHash)
}

Expand Down Expand Up @@ -1361,7 +1370,7 @@ search:
if res.spend != nil && res.blockHash == nil {
w.log.Warnf("A spending input (%s) was found during the scan but the output (%s) "+
"itself wasn't found. Was the startBlockHeight early enough?",
newOutPoint(&res.spend.blockHash, res.spend.vin),
newOutPoint(&res.spend.txHash, res.spend.vin),
newOutPoint(&txHash, vout),
)
return res, nil
Expand All @@ -1380,6 +1389,7 @@ search:
continue search
}
// Pull the block.
w.log.Tracef("Block %v matched pkScript %v. Pulling the block...", blockHash, pkScript)
block, err := w.cl.GetBlock(*blockHash)
if err != nil {
return nil, fmt.Errorf("GetBlock error: %v", err)
Expand All @@ -1400,6 +1410,8 @@ search:
blockHash: *block.Hash(),
blockHeight: uint32(height),
}
w.log.Tracef("Found txn %v spending %v in block %v (%d)", res.spend.txHash,
txHash, res.spend.blockHash, res.spend.blockHeight)
if res.blockHash != nil {
break search
}
Expand All @@ -1418,6 +1430,7 @@ search:
res.blockHash = block.Hash()
res.blockHeight = uint32(height)
res.txOut = txOut
w.log.Tracef("Found txn %v in block %v (%d)", txHash, res.blockHash, height)
if res.spend != nil {
break search
}
Expand Down
6 changes: 6 additions & 0 deletions client/core/harness_norace.go
@@ -0,0 +1,6 @@
//go:build !race && harness
// +build !race,harness

package core

const race = false
6 changes: 6 additions & 0 deletions client/core/harness_race.go
@@ -0,0 +1,6 @@
//go:build race && harness
// +build race,harness

package core

const race = true
5 changes: 5 additions & 0 deletions client/core/trade.go
Expand Up @@ -880,6 +880,9 @@ func (t *trackedTrade) isSwappable(ctx context.Context, match *matchTracker) boo
if match.Status == order.MakerSwapCast {
// Get the confirmation count on the maker's coin.
if match.Side == order.Taker {
toAssetID := t.wallets.toAsset.ID
t.dc.log.Tracef("Checking confirmations on COUNTERPARTY swap txn %v (%s)...",
coinIDString(toAssetID, match.MetaData.Proof.MakerSwap), unbip(toAssetID))
// If the maker is the counterparty, we can determine swappability
// based on the confirmations.
confs, req, changed, spent := t.counterPartyConfirms(ctx, match)
Expand All @@ -896,6 +899,8 @@ func (t *trackedTrade) isSwappable(ctx context.Context, match *matchTracker) boo
return ready
}
// If we're the maker, check the confirmations anyway so we can notify.
t.dc.log.Tracef("Checking confirmations on our OWN swap txn %v (%s)...",
coinIDString(wallet.AssetID, match.MetaData.Proof.MakerSwap), unbip(wallet.AssetID))
confs, spent, err := wallet.SwapConfirmations(ctx, match.MetaData.Proof.MakerSwap,
match.MetaData.Proof.Script, match.MetaData.Stamp)
if err != nil {
Expand Down
60 changes: 36 additions & 24 deletions client/core/trade_simnet_test.go
Expand Up @@ -66,7 +66,7 @@ var (
appPass: []byte("client2"),
wallets: map[uint32]*tWallet{
dcr.BipID: dcrWallet("trading2"),
btc.BipID: &tWallet{spv: true}, // the btc/spv startWallet method auto connects to the alpha node for spv syncing
btc.BipID: {spv: true}, // the btc/spv startWallet method auto connects to the alpha node for spv syncing
},
processedStatus: make(map[order.MatchID]order.MatchStatus),
}
Expand Down Expand Up @@ -172,9 +172,15 @@ func startClients(ctx context.Context) error {
if err != nil {
return err
}
c.log("Connected %s wallet", unbip(assetID))
c.log("Connected %s wallet (spv = %v)", unbip(assetID), wallet.spv)

if wallet.spv {
c.log("Waiting for %s wallet to sync", unbip(assetID))
for !c.core.WalletState(assetID).Synced {
time.Sleep(time.Second)
}
c.log("%s wallet synced and ready to use", unbip(assetID))

// Fund newly created wallet.
c.log("Funding newly created SPV wallet")
address := c.core.WalletState(assetID).Address
Expand All @@ -187,25 +193,15 @@ func startClients(ctx context.Context) error {
if err != nil {
return err
}
// Tip change after block filtering scan takes the wallet time.
time.Sleep(2 * time.Second * sleepFactor)
}
}

err = c.registerDEX(ctx)
if err != nil {
return err
}

// Wait for spv wallets to sync.
for assetID, wallet := range c.wallets {
if !wallet.spv {
continue
}
c.log("Waiting for %s wallet to sync", unbip(assetID))
for !c.core.WalletState(assetID).Synced {
time.Sleep(time.Second)
}
c.log("%s wallet synced and ready to use", unbip(assetID))
}
}

return nil
Expand Down Expand Up @@ -238,7 +234,12 @@ func teardown(cancelCtx context.CancelFunc) {
}
}

var sleepFactor time.Duration = 1

func TestMain(m *testing.M) {
if race {
sleepFactor = 3
}
tmpDir, _ = os.MkdirTemp("", "")
defer os.RemoveAll(tmpDir)
os.Exit(m.Run())
Expand Down Expand Up @@ -383,6 +384,11 @@ func TestMakerGhostingAfterTakerRedeem(t *testing.T) {
client.log("tick failure: %v", err)
}

// Propagation to miners can take some time after the send RPC
// completes, especially with SPV wallets, so wait a bit before mining
// blocks in monitorTrackedTrade.
time.Sleep(sleepFactor * time.Second)

return monitorTrackedTrade(ctx, client, tracker, finalStatus)
}
resumeTrades, ctx := errgroup.WithContext(context.Background())
Expand All @@ -401,7 +407,7 @@ func TestMakerGhostingAfterTakerRedeem(t *testing.T) {
// has been spent but the spending tx is still in mempool. This
// will cause the txout to be included in the wallets locked
// balance, causing a higher than actual balance report.
time.Sleep(4 * time.Second)
time.Sleep(4 * sleepFactor * time.Second)

for _, client := range clients {
if err = client.assertBalanceChanges(); err != nil {
Expand Down Expand Up @@ -681,7 +687,7 @@ func TestOrderStatusReconciliation(t *testing.T) {
client2.log("Disconnecting from the DEX server")
c2dc.connMaster.Disconnect()
// Disconnection is asynchronous, wait for confirmation of DEX disconnection.
disconnectTimeout := 10 * time.Second
disconnectTimeout := 10 * sleepFactor * time.Second
disconnected := client2.notes.find(context.Background(), disconnectTimeout, func(n Notification) bool {
connNote, ok := n.(*ConnEventNote)
return ok && connNote.Host == dexHost && !connNote.Connected
Expand All @@ -700,6 +706,7 @@ func TestOrderStatusReconciliation(t *testing.T) {
client2.log("Reconnecting DEX to trigger order status reconciliation")
// Use core.initialize to restore client 2 orders from db, and login
// to trigger dex authentication.
// TODO: cannot do this anymore with built-in wallets
client2.core.initialize()
_, err = client2.core.Login(client2.appPass)
if err != nil {
Expand Down Expand Up @@ -807,14 +814,17 @@ func TestResendPendingRequests(t *testing.T) {
// create new notification feed to catch swap-related errors from send{Init,Redeem}Async
notes := client.core.NotificationFeed()

wait := 5 * sleepFactor * time.Second
timer := time.NewTimer(wait)
defer timer.Stop()
var foundSwapErrorNote bool
for !foundSwapErrorNote {
select {
case note := <-notes:
foundSwapErrorNote = note.Severity() == db.ErrorLevel && (note.Topic() == TopicSwapSendError ||
note.Topic() == TopicInitError || note.Topic() == TopicReportRedeemError)
case <-time.After(4 * time.Second):
return fmt.Errorf("client %d: no init/redeem error note after 4 seconds", client.id)
case <-timer.C:
return fmt.Errorf("client %d: no init/redeem error note after %v", client.id, wait)
}
}

Expand Down Expand Up @@ -877,9 +887,9 @@ func TestResendPendingRequests(t *testing.T) {
// Allow some time for balance changes to be properly reported.
// There is usually a split-second window where a locked output
// has been spent but the spending tx is still in mempool. This
// will cause the txout to be included in the wallets locked
// will cause the txout to be included in the wallet's locked
// balance, causing a higher than actual balance report.
time.Sleep(4 * time.Second)
time.Sleep(4 * sleepFactor * time.Second)

for _, client := range clients {
if err = client.assertBalanceChanges(); err != nil {
Expand Down Expand Up @@ -931,7 +941,7 @@ func simpleTradeTest(t *testing.T, qty, rate uint64, finalStatus order.MatchStat
// has been spent but the spending tx is still in mempool. This
// will cause the txout to be included in the wallets locked
// balance, causing a higher than actual balance report.
time.Sleep(4 * time.Second)
time.Sleep(4 * sleepFactor * time.Second)

for _, client := range clients {
if err = client.assertBalanceChanges(); err != nil {
Expand Down Expand Up @@ -1089,6 +1099,7 @@ func monitorTrackedTrade(ctx context.Context, client *tClient, tracker *trackedT

if assetToMine != nil {
assetID, nBlocks := assetToMine.ID, assetToMine.SwapConf
time.Sleep(2 * sleepFactor * time.Second)
err := mineBlocks(assetID, nBlocks)
if err == nil {
var actor order.MatchSide
Expand All @@ -1099,7 +1110,8 @@ func monitorTrackedTrade(ctx context.Context, client *tClient, tracker *trackedT
} else {
actor = order.Maker
}
client.log("Mined %d blocks for %s's %s, match %s", nBlocks, actor, swapOrRedeem, token(match.MatchID.Bytes()))
client.log("Mined %d %s blocks for %s's %s, match %s", nBlocks, unbip(assetID),
actor, swapOrRedeem, token(match.MatchID.Bytes()))
} else {
client.log("%s mine error %v", unbip(assetID), err) // return err???
}
Expand Down Expand Up @@ -1202,7 +1214,7 @@ func checkAndWaitForRefunds(ctx context.Context, client *tClient, orderID string
// will be greater than the furthest swap locktime, thereby lifting the
// time lock on all these swaps.
mineMedian := func(assetID uint32) error {
time.Sleep(1 * time.Second)
time.Sleep(sleepFactor * time.Second)
if err := mineBlocks(assetID, 6); err != nil {
return fmt.Errorf("client %d: error mining 6 btc blocks for swap refunds: %v",
client.id, err)
Expand Down Expand Up @@ -1252,7 +1264,7 @@ func checkAndWaitForRefunds(ctx context.Context, client *tClient, orderID string
}
}
}
time.Sleep(4 * time.Second)
time.Sleep(4 * sleepFactor * time.Second)

client.expectBalanceDiffs = refundAmts
err = client.assertBalanceChanges()
Expand Down

0 comments on commit f5835bd

Please sign in to comment.