Skip to content

Commit

Permalink
address client wallet hanging issues
Browse files Browse the repository at this point in the history
client/{asset,core}: more request context passing and checking

Add some more context passing to avoid things getting hung and unable
to cancel them. Some sensible timeouts are also established.

dexc: timeout Logout in promptShutdown

The second commit is a bit of a hack to prevent shutdown from becoming
impossible if an RPC or something hangs and causes deadlock on the
trackedTrade mutex.
  • Loading branch information
chappjc committed Aug 4, 2022
1 parent 139eb16 commit 66f5f9c
Show file tree
Hide file tree
Showing 10 changed files with 131 additions and 56 deletions.
4 changes: 2 additions & 2 deletions client/asset/btc/btc.go
Expand Up @@ -3321,13 +3321,13 @@ func (btc *baseWallet) AuditContract(coinID, contract, txData dex.Bytes, rebroad

// LocktimeExpired returns true if the specified contract's locktime has
// expired, making it possible to issue a Refund.
func (btc *baseWallet) LocktimeExpired(contract dex.Bytes) (bool, time.Time, error) {
func (btc *baseWallet) LocktimeExpired(_ context.Context, contract dex.Bytes) (bool, time.Time, error) {
_, _, locktime, _, err := dexbtc.ExtractSwapDetails(contract, btc.segwit, btc.chainParams)
if err != nil {
return false, time.Time{}, fmt.Errorf("error extracting contract locktime: %w", err)
}
contractExpiry := time.Unix(int64(locktime), 0).UTC()
medianTime, err := btc.node.medianTime()
medianTime, err := btc.node.medianTime() // TODO: pass ctx
if err != nil {
return false, time.Time{}, fmt.Errorf("error getting median time: %w", err)
}
Expand Down
35 changes: 20 additions & 15 deletions client/asset/dcr/dcr.go
Expand Up @@ -2449,7 +2449,7 @@ func (dcr *ExchangeWallet) lookupTxOutput(ctx context.Context, txHash *chainhash

// LocktimeExpired returns true if the specified contract's locktime has
// expired, making it possible to issue a Refund.
func (dcr *ExchangeWallet) LocktimeExpired(contract dex.Bytes) (bool, time.Time, error) {
func (dcr *ExchangeWallet) LocktimeExpired(ctx context.Context, contract dex.Bytes) (bool, time.Time, error) {
_, _, locktime, _, err := dexdcr.ExtractSwapDetails(contract, dcr.chainParams)
if err != nil {
return false, time.Time{}, fmt.Errorf("error extracting contract locktime: %w", err)
Expand All @@ -2458,7 +2458,7 @@ func (dcr *ExchangeWallet) LocktimeExpired(contract dex.Bytes) (bool, time.Time,
dcr.tipMtx.RLock()
blockHash := dcr.currentTip.hash
dcr.tipMtx.RUnlock()
hdr, err := dcr.wallet.GetBlockHeader(dcr.ctx, blockHash)
hdr, err := dcr.wallet.GetBlockHeader(ctx, blockHash)
if err != nil {
return false, time.Time{}, fmt.Errorf("unable to retrieve the block header: %w", err)
}
Expand Down Expand Up @@ -3628,12 +3628,12 @@ func (dcr *ExchangeWallet) monitorBlocks(ctx context.Context) {

// checkTip captures queuedBlock and walletBlock.
checkTip := func() {
ctx, cancel := context.WithTimeout(dcr.ctx, 4*time.Second)
defer cancel()
ctxInternal, cancel0 := context.WithTimeout(ctx, 4*time.Second)
defer cancel0()

newTip, err := dcr.getBestBlock(ctx)
newTip, err := dcr.getBestBlock(ctxInternal)
if err != nil {
dcr.handleTipChange(nil, 0, fmt.Errorf("failed to get best block: %w", err))
dcr.handleTipChange(ctx, nil, 0, fmt.Errorf("failed to get best block: %w", err))
return
}

Expand All @@ -3646,7 +3646,7 @@ func (dcr *ExchangeWallet) monitorBlocks(ctx context.Context) {
}

if walletBlock == nil {
dcr.handleTipChange(newTip.hash, newTip.height, nil)
dcr.handleTipChange(ctx, newTip.hash, newTip.height, nil)
return
}

Expand All @@ -3658,9 +3658,9 @@ func (dcr *ExchangeWallet) monitorBlocks(ctx context.Context) {
queuedBlock.queue.Stop()
}
blockAllowance := walletBlockAllowance
ctx, cancel = context.WithTimeout(dcr.ctx, 4*time.Second)
synced, _, err := dcr.wallet.SyncStatus(ctx)
cancel()
ctxInternal, cancel1 := context.WithTimeout(ctx, 4*time.Second)
defer cancel1()
synced, _, err := dcr.wallet.SyncStatus(ctxInternal)
if err != nil {
dcr.log.Errorf("Error retrieving sync status before queuing polled block: %v", err)
} else if !synced {
Expand All @@ -3672,7 +3672,7 @@ func (dcr *ExchangeWallet) monitorBlocks(ctx context.Context) {
dcr.log.Warnf("Reporting a block found in polling that the wallet apparently "+
"never reported: %s (%d). If you see this message repeatedly, it may indicate "+
"an issue with the wallet.", newTip.hash, newTip.height)
dcr.handleTipChange(newTip.hash, newTip.height, nil)
dcr.handleTipChange(ctx, newTip.hash, newTip.height, nil)
}),
}
}
Expand All @@ -3689,15 +3689,20 @@ func (dcr *ExchangeWallet) monitorBlocks(ctx context.Context) {
}
queuedBlock = nil
}
dcr.handleTipChange(walletTip.hash, walletTip.height, nil)
dcr.handleTipChange(ctx, walletTip.hash, walletTip.height, nil)

case <-ctx.Done():
return
}

// Ensure context cancellation takes priority before the next iteration.
if ctx.Err() != nil {
return
}
}
}

func (dcr *ExchangeWallet) handleTipChange(newTipHash *chainhash.Hash, newTipHeight int64, err error) {
func (dcr *ExchangeWallet) handleTipChange(ctx context.Context, newTipHash *chainhash.Hash, newTipHeight int64, err error) {
if err != nil {
go dcr.tipChange(err)
return
Expand Down Expand Up @@ -3732,7 +3737,7 @@ func (dcr *ExchangeWallet) handleTipChange(newTipHash *chainhash.Hash, newTipHei
// Redemption search would typically resume from prevTipHeight + 1 unless the
// previous tip was re-orged out of the mainchain, in which case redemption
// search will resume from the mainchain ancestor of the previous tip.
prevTipHeader, isMainchain, _, err := dcr.blockHeader(dcr.ctx, prevTip.hash)
prevTipHeader, isMainchain, _, err := dcr.blockHeader(ctx, prevTip.hash)
if err != nil {
// Redemption search cannot continue reliably without knowing if there
// was a reorg, cancel all find redemption requests in queue.
Expand All @@ -3748,7 +3753,7 @@ func (dcr *ExchangeWallet) handleTipChange(newTipHash *chainhash.Hash, newTipHei
// that is the immediate ancestor to the previous tip.
ancestorBlockHash := &prevTipHeader.PrevBlock
for {
aBlock, isMainchain, _, err := dcr.blockHeader(dcr.ctx, ancestorBlockHash)
aBlock, isMainchain, _, err := dcr.blockHeader(ctx, ancestorBlockHash)
if err != nil {
notifyFatalFindRedemptionError("Error getting block header %s: %w", ancestorBlockHash, err)
return
Expand Down
2 changes: 1 addition & 1 deletion client/asset/dcr/wallet.go
Expand Up @@ -43,7 +43,7 @@ func RegisterCustomWallet(constructor WalletConstructor, def *asset.WalletDefini
return nil
}

type TipChangeCallback func(*chainhash.Hash, int64, error)
type TipChangeCallback func(context.Context, *chainhash.Hash, int64, error)

// BlockHeader is a wire.BlockHeader with the addition of a MedianTime field.
// Implementations must fill in the MedianTime field when returning a
Expand Down
6 changes: 3 additions & 3 deletions client/asset/eth/eth.go
Expand Up @@ -1827,13 +1827,13 @@ func (eth *baseWallet) AuditContract(coinID, contract, serializedTx dex.Bytes, r

// LocktimeExpired returns true if the specified contract's locktime has
// expired, making it possible to issue a Refund.
func (w *assetWallet) LocktimeExpired(contract dex.Bytes) (bool, time.Time, error) {
func (w *assetWallet) LocktimeExpired(ctx context.Context, contract dex.Bytes) (bool, time.Time, error) {
contractVer, secretHash, err := dexeth.DecodeContractData(contract)
if err != nil {
return false, time.Time{}, err
}

swap, err := w.swap(w.ctx, secretHash, contractVer)
swap, err := w.swap(ctx, secretHash, contractVer)
if err != nil {
return false, time.Time{}, err
}
Expand All @@ -1843,7 +1843,7 @@ func (w *assetWallet) LocktimeExpired(contract dex.Bytes) (bool, time.Time, erro
return false, time.Time{}, asset.ErrSwapNotInitiated
}

header, err := w.node.bestHeader(w.ctx)
header, err := w.node.bestHeader(ctx)
if err != nil {
return false, time.Time{}, err
}
Expand Down
2 changes: 1 addition & 1 deletion client/asset/eth/eth_test.go
Expand Up @@ -3007,7 +3007,7 @@ func TestLocktimeExpired(t *testing.T) {

ensureResult := func(tag string, expErr, expExpired bool) {
t.Helper()
expired, _, err := eth.LocktimeExpired(contract)
expired, _, err := eth.LocktimeExpired(context.Background(), contract)
switch {
case err != nil:
if !expErr {
Expand Down
2 changes: 1 addition & 1 deletion client/asset/interface.go
Expand Up @@ -309,7 +309,7 @@ type Wallet interface {
// contract can be refunded since assets have different rules to satisfy the
// lock. For example, in Bitcoin the median of the last 11 blocks must be
// past the expiry time, not the current time.
LocktimeExpired(contract dex.Bytes) (bool, time.Time, error)
LocktimeExpired(ctx context.Context, contract dex.Bytes) (bool, time.Time, error)
// FindRedemption watches for the input that spends the specified
// coin and contract, and returns the spending input and the
// secret key when it finds a spender.
Expand Down
31 changes: 24 additions & 7 deletions client/cmd/dexc/main.go
Expand Up @@ -228,13 +228,30 @@ func mainCore() error {
// shutdown if there are. The return value indicates if it is safe to stop Core
// or if the user has confirmed they want to shutdown with active orders.
func promptShutdown(clientCore *core.Core) bool {
err := clientCore.Logout()
if err == nil {
return true
}
if !errors.Is(err, core.ActiveOrdersLogoutErr) {
log.Errorf("unable to logout: %v", err)
return true
log.Infof("Attempting to logout...")
// Do not allow Logout hanging to prevent shutdown.
res := make(chan bool, 1)
go func() {
// Only block logout if err is ActiveOrdersLogoutErr.
var ok bool
err := clientCore.Logout()
if err == nil {
ok = true
} else if !errors.Is(err, core.ActiveOrdersLogoutErr) {
log.Errorf("Unexpected logout error: %v", err)
ok = true
} // else not ok => prompt
res <- ok
}()

select {
case <-time.After(10 * time.Second):
log.Errorf("Timeout waiting for Logout. Allowing shutdown, but you likely have active orders!")
return true // cancel all the contexts, hopefully breaking whatever deadlock
case ok := <-res:
if ok {
return true
}
}

fmt.Print("You have active orders. Shutting down now may result in failed swaps and account penalization.\n" +
Expand Down
8 changes: 7 additions & 1 deletion client/core/core.go
Expand Up @@ -925,8 +925,12 @@ func (c *Core) tickAsset(dc *dexConnection, assetID uint32) assetMap {
}
dc.tradeMtx.RUnlock()

updated := make(assetMap)
updateChan := make(chan assetMap)
for _, trade := range assetTrades {
if c.ctx.Err() != nil { // don't fail each one in sequence if shutting down
return updated
}
trade := trade // bad go, bad
go func() {
newUpdates, err := c.tick(trade)
Expand All @@ -937,7 +941,6 @@ func (c *Core) tickAsset(dc *dexConnection, assetID uint32) assetMap {
}()
}

updated := make(assetMap)
for range assetTrades {
updated.merge(<-updateChan)
}
Expand Down Expand Up @@ -6514,6 +6517,9 @@ func (c *Core) listen(dc *dexConnection) {
}

for _, trade := range activeTrades {
if c.ctx.Err() != nil { // don't fail each one in sequence if shutting down
return
}
newUpdates, err := c.tick(trade)
if err != nil {
c.log.Error(err)
Expand Down
6 changes: 3 additions & 3 deletions client/core/core_test.go
Expand Up @@ -801,7 +801,7 @@ func (w *TXCWallet) AuditContract(coinID, contract, txData dex.Bytes, rebroadcas
return w.auditInfo, w.auditErr
}

func (w *TXCWallet) LocktimeExpired(contract dex.Bytes) (bool, time.Time, error) {
func (w *TXCWallet) LocktimeExpired(_ context.Context, contract dex.Bytes) (bool, time.Time, error) {
return w.contractExpired, w.contractLockTime, nil
}

Expand Down Expand Up @@ -5052,7 +5052,7 @@ func TestRefunds(t *testing.T) {
checkStatus("taker swapped", match, order.TakerSwapCast)
}
// Confirm isRefundable = true.
if !tracker.isRefundable(match) {
if !tracker.isRefundable(tCore.ctx, match) {
t.Fatalf("%s's swap not refundable", match.Side)
}
// Check refund.
Expand All @@ -5065,7 +5065,7 @@ func TestRefunds(t *testing.T) {
t.Fatalf("expected %d refund amount, got %d", expectAmt, amtRefunded)
}
// Confirm isRefundable = false.
if tracker.isRefundable(match) {
if tracker.isRefundable(tCore.ctx, match) {
t.Fatalf("%s's swap refundable after being refunded", match.Side)
}
// Expect refund re-attempt to not refund any coin.
Expand Down

0 comments on commit 66f5f9c

Please sign in to comment.