From 9790fb1cfb6e7ed7ffadc4624979ca57341d2ca0 Mon Sep 17 00:00:00 2001 From: buck54321 Date: Mon, 26 Oct 2020 21:54:30 -0500 Subject: [PATCH] core: don't fail match on misses and redeem errors * failErr tweaks and default debug logging * de-group suspect matches. retry until bcast timeout --- client/cmd/dexc/ui/config.go | 2 +- client/core/core.go | 40 +++++- client/core/core_test.go | 240 +++++++++++++++++++++++++++++-- client/core/status.go | 6 +- client/core/trade.go | 204 +++++++++++++++++++++----- client/core/trade_simnet_test.go | 7 +- client/core/types.go | 2 +- 7 files changed, 441 insertions(+), 60 deletions(-) diff --git a/client/cmd/dexc/ui/config.go b/client/cmd/dexc/ui/config.go index 7345e0efdf..5d899b8330 100644 --- a/client/cmd/dexc/ui/config.go +++ b/client/cmd/dexc/ui/config.go @@ -24,7 +24,7 @@ const ( defaultRPCKeyFile = "rpc.key" defaultWebAddr = "localhost:5758" configFilename = "dexc.conf" - defaultLogLevel = "info" + defaultLogLevel = "debug" ) var ( diff --git a/client/core/core.go b/client/core/core.go index 3404cf1fa0..1f44ea6360 100644 --- a/client/core/core.go +++ b/client/core/core.go @@ -44,6 +44,7 @@ const ( // regConfirmationsPaid is used to indicate completed registration to // (*Core).setRegConfirms. regConfirmationsPaid uint32 = math.MaxUint32 + tickCheckDivisions = 3 ) var ( @@ -1479,6 +1480,30 @@ func (c *Core) ReconfigureWallet(appPW []byte, assetID uint32, cfg map[string]st details := fmt.Sprintf("Configuration for %s wallet has been updated.", unbip(assetID)) c.notify(newWalletConfigNote("Wallet Configuration Updated", details, db.Success, wallet.state())) + // Clear any existing tickGovernors for suspect matches. + c.connMtx.RLock() + defer c.connMtx.RUnlock() + for _, dc := range c.conns { + dc.tradeMtx.RLock() + for _, t := range dc.trades { + if t.Base() != assetID && t.Quote() != assetID { + continue + } + isFromAsset := t.wallets.fromAsset.ID == assetID + t.mtx.Lock() + for _, m := range t.matches { + if m.tickGovernor != nil && + ((m.suspectSwap && isFromAsset) || (m.suspectRedeem && !isFromAsset)) { + + m.tickGovernor.Stop() + m.tickGovernor = nil + } + } + t.mtx.Unlock() + } + dc.tradeMtx.RUnlock() + } + return nil } @@ -2733,7 +2758,6 @@ func (c *Core) authDEX(dc *dexConnection) error { for _, match := range missing { c.log.Warnf("DEX %s did not report active match %s on order %s - assuming revoked.", dc.acct.host, match.id, oid) - match.failErr = fmt.Errorf("order not reported by the server on connect") // Must have been revoked while we were gone. Flag to allow recovery // and subsequent retirement of the match and parent trade. match.MetaData.Proof.SelfRevoked = true @@ -3215,22 +3239,24 @@ func (c *Core) resumeTrades(dc *dexConnection, trackers []*trackedTrade) assetMa } } if needsAuditInfo { + // Check for unresolvable states. if len(counterSwap) == 0 { - match.failErr = fmt.Errorf("missing counter-swap, order %s, match %s", tracker.ID(), match.id) + match.swapErr = fmt.Errorf("missing counter-swap, order %s, match %s", tracker.ID(), match.id) notifyErr("Match status error", "Match %s for order %s is in state %s, but has no maker swap coin.", dbMatch.Side, tracker.token(), dbMatch.Status) continue } counterContract := metaData.Proof.CounterScript if len(counterContract) == 0 { - match.failErr = fmt.Errorf("missing counter-contract, order %s, match %s", tracker.ID(), match.id) + match.swapErr = fmt.Errorf("missing counter-contract, order %s, match %s", tracker.ID(), match.id) notifyErr("Match status error", "Match %s for order %s is in state %s, but has no maker swap contract.", dbMatch.Side, tracker.token(), dbMatch.Status) continue } auditInfo, err := wallets.toWallet.AuditContract(counterSwap, counterContract) if err != nil { - c.log.Debugf("Match %v status %v, refunded = %v, revoked = %v", match.id, match.MetaData.Status, - len(match.MetaData.Proof.RefundCoin) > 0, match.MetaData.Proof.IsRevoked()) - match.failErr = fmt.Errorf("audit error, order %s, match %s: %v", tracker.ID(), match.id, err) + c.log.Debugf("AuditContract error for match %v status %v, refunded = %v, revoked = %v: %v", + match.id, match.MetaData.Status, len(match.MetaData.Proof.RefundCoin) > 0, match.MetaData.Proof.IsRevoked(), err) + match.swapErr = fmt.Errorf("audit error, order %s, match %s: %v", tracker.ID(), match.id, err) + match.MetaData.Proof.SelfRevoked = true notifyErr("Match recovery error", "Error auditing counter-party's swap contract (%v) during swap recovery on order %s: %v", tracker.token(), coinIDString(wallets.toAsset.ID, counterSwap), err) continue @@ -3470,7 +3496,7 @@ func (c *Core) connectDEX(acctInfo *db.AccountInfo) (*dexConnection, error) { connMaster: connMaster, assets: assets, cfg: dexCfg, - tickInterval: bTimeout / 3, + tickInterval: bTimeout / tickCheckDivisions, books: make(map[string]*bookie), acct: newDEXAccount(acctInfo), marketMap: marketMap, diff --git a/client/core/core_test.go b/client/core/core_test.go index eb00ea7cd2..9906fe0d79 100644 --- a/client/core/core_test.go +++ b/client/core/core_test.go @@ -201,11 +201,12 @@ func testDexConnection() (*dexConnection, *TWebsocket, *dexAccount) { }, Fee: tFee, }, - notify: func(Notification) {}, - marketMap: map[string]*Market{tDcrBtcMktName: mkt}, - trades: make(map[order.OrderID]*trackedTrade), - epoch: map[string]uint64{tDcrBtcMktName: 0}, - connected: true, + tickInterval: time.Millisecond * 1000 / 3, + notify: func(Notification) {}, + marketMap: map[string]*Market{tDcrBtcMktName: mkt}, + trades: make(map[order.OrderID]*trackedTrade), + epoch: map[string]uint64{tDcrBtcMktName: 0}, + connected: true, }, conn, acct } @@ -493,11 +494,15 @@ type TXCWallet struct { signCoinErr error lastSwaps *asset.Swaps swapReceipts []asset.Receipt + swapCounter int + swapErr error auditInfo asset.AuditInfo auditErr error refundCoin dex.Bytes refundErr error redeemCoins []dex.Bytes + redeemCounter int + redeemErr error badSecret bool fundedVal uint64 fundedSwaps uint64 @@ -583,11 +588,19 @@ func (w *TXCWallet) FundingCoins([]dex.Bytes) (asset.Coins, error) { } func (w *TXCWallet) Swap(swaps *asset.Swaps) ([]asset.Receipt, asset.Coin, uint64, error) { + w.swapCounter++ w.lastSwaps = swaps + if w.swapErr != nil { + return nil, nil, 0, w.swapErr + } return w.swapReceipts, w.changeCoin, tSwapFeesPaid, nil } func (w *TXCWallet) Redeem([]*asset.Redemption) ([]dex.Bytes, asset.Coin, uint64, error) { + w.redeemCounter++ + if w.redeemErr != nil { + return nil, nil, 0, w.redeemErr + } return w.redeemCoins, &tCoin{id: []byte{0x0c, 0x0d}}, tRedemptionFeesPaid, nil } @@ -1654,14 +1667,11 @@ func TestLogin(t *testing.T) { t.Fatalf("final Login error: %v", err) } - if tracker.matches[missingID].failErr == nil { - t.Errorf("failErr not set for missing match tracker") - } if !tracker.matches[missingID].MetaData.Proof.SelfRevoked { t.Errorf("SelfRevoked not true for missing match tracker") } - if tracker.matches[matchID].failErr != nil { - t.Errorf("failErr set for non-missing match tracker") + if tracker.matches[matchID].swapErr != nil { + t.Errorf("swapErr set for non-missing match tracker") } if tracker.matches[matchID].MetaData.Proof.IsRevoked() { t.Errorf("IsRevoked true for non-missing match tracker") @@ -4575,6 +4585,29 @@ func TestReconfigureWallet(t *testing.T) { } tXyzWallet.unlockErr = nil + // For the last success, make sure that we also clear any related + // tickGovernors. + match := &matchTracker{ + suspectSwap: true, + tickGovernor: time.NewTimer(time.Hour), + } + tCore.conns[tDexHost].trades[order.OrderID{}] = &trackedTrade{ + Order: &order.LimitOrder{ + P: order.Prefix{ + BaseAsset: assetID, + }, + }, + wallets: &walletSet{ + fromAsset: &dex.Asset{ID: assetID}, + fromWallet: &xcWallet{AssetID: assetID}, + toAsset: &dex.Asset{}, + toWallet: &xcWallet{}, + }, + matches: map[order.MatchID]*matchTracker{ + order.MatchID{}: match, + }, + } + // Success err = tCore.ReconfigureWallet(tPW, assetID, newSettings) if err != nil { @@ -4585,6 +4618,10 @@ func TestReconfigureWallet(t *testing.T) { if len(settings) != 1 || settings["def"] != "456" { t.Fatalf("settings not stored") } + + if match.tickGovernor != nil { + t.Fatalf("tickGovernor not removed") + } } func TestSetWalletPassword(t *testing.T) { @@ -5239,3 +5276,186 @@ func TestMatchStatusResolution(t *testing.T) { } } } + +func TestSuspectTrades(t *testing.T) { + rig := newTestRig() + dc := rig.dc + tCore := rig.core + + dcrWallet, tDcrWallet := newTWallet(tDCR.ID) + tCore.wallets[tDCR.ID] = dcrWallet + btcWallet, tBtcWallet := newTWallet(tBTC.ID) + tCore.wallets[tBTC.ID] = btcWallet + walletSet, _ := tCore.walletSet(dc, tDCR.ID, tBTC.ID, true) + + lo, dbOrder, preImg, _ := makeLimitOrder(dc, true, 0, 0) + oid := lo.ID() + mkt := dc.market(tDcrBtcMktName) + tracker := newTrackedTrade(dbOrder, preImg, dc, mkt.EpochLen, rig.core.lockTimeTaker, rig.core.lockTimeMaker, + rig.db, rig.queue, walletSet, nil, rig.core.notify) + dc.trades[oid] = tracker + + newMatch := func(side order.MatchSide, status order.MatchStatus) *matchTracker { + return &matchTracker{ + id: ordertest.RandomMatchID(), + prefix: lo.Prefix(), + trade: lo.Trade(), + MetaMatch: db.MetaMatch{ + MetaData: &db.MatchMetaData{ + Proof: db.MatchProof{ + Auth: db.MatchAuth{ + MatchStamp: encode.UnixMilliU(time.Now()), + AuditStamp: encode.UnixMilliU(time.Now()), + }, + }, + Status: status, + }, + Match: &order.UserMatch{ + Side: side, + Address: ordertest.RandomAddress(), + Status: status, + }, + }, + } + } + + var swappableMatch1, swappableMatch2 *matchTracker + setSwaps := func() { + swappableMatch1 = newMatch(order.Maker, order.NewlyMatched) + swappableMatch2 = newMatch(order.Taker, order.MakerSwapCast) + _, auditInfo := tMsgAudit(oid, swappableMatch2.id, ordertest.RandomAddress(), 1, encode.RandomBytes(32)) + auditInfo.coin.confs = tDCR.SwapConf + swappableMatch2.counterSwap = auditInfo + tDcrWallet.swapCounter = 0 + tracker.matches = map[order.MatchID]*matchTracker{ + swappableMatch1.id: swappableMatch1, + swappableMatch2.id: swappableMatch2, + } + } + setSwaps() + + // Initial success + _, err := tCore.tick(tracker) + if err != nil { + t.Fatalf("swap tick error: %v", err) + } + + setSwaps() + tDcrWallet.swapErr = tErr + _, err = tCore.tick(tracker) + if err == nil || !strings.Contains(err.Error(), "error sending swap transaction") { + t.Fatalf("swap error not propagated, err = %v", err) + } + if tDcrWallet.swapCounter != 1 { + t.Fatalf("never swapped") + } + + // Both matches should be marked as suspect and have tickGovernors in place. + tracker.mtx.Lock() + for i, m := range []*matchTracker{swappableMatch1, swappableMatch2} { + if !m.suspectSwap { + t.Fatalf("swappable match %d not suspect after failed swap", i+1) + } + if m.tickGovernor == nil { + t.Fatalf("swappable match %d has no tick meterer set", i+1) + } + } + tracker.mtx.Unlock() + + // Ticking right away again should do nothing. + tDcrWallet.swapErr = nil + _, err = tCore.tick(tracker) + if err != nil { + t.Fatalf("tick error during metered swap tick: %v", err) + } + if tDcrWallet.swapCounter != 1 { + t.Fatalf("swapped during metered tick") + } + + // But once the tickGovernors expire, we should succeed with two separate + // requests. + tracker.mtx.Lock() + swappableMatch1.tickGovernor = nil + swappableMatch2.tickGovernor = nil + tracker.mtx.Unlock() + _, err = tCore.tick(tracker) + if err != nil { + t.Fatalf("tick error while swapping suspect matches: %v", err) + } + if tDcrWallet.swapCounter != 3 { + t.Fatalf("suspect swap matches not run or not run separately. expected 2 new calls to Swap, got %d", tDcrWallet.swapCounter-1) + } + + var redeemableMatch1, redeemableMatch2 *matchTracker + setRedeems := func() { + redeemableMatch1 = newMatch(order.Maker, order.TakerSwapCast) + redeemableMatch2 = newMatch(order.Taker, order.MakerRedeemed) + _, auditInfo := tMsgAudit(oid, redeemableMatch1.id, ordertest.RandomAddress(), 1, encode.RandomBytes(32)) + auditInfo.coin.confs = tBTC.SwapConf + redeemableMatch1.counterSwap = auditInfo + tBtcWallet.redeemCounter = 0 + tracker.matches = map[order.MatchID]*matchTracker{ + redeemableMatch1.id: redeemableMatch1, + redeemableMatch2.id: redeemableMatch2, + } + rig.ws.queueResponse(msgjson.RedeemRoute, redeemAcker) + rig.ws.queueResponse(msgjson.RedeemRoute, redeemAcker) + } + setRedeems() + + // Initial success + tBtcWallet.redeemCoins = []dex.Bytes{encode.RandomBytes(36), encode.RandomBytes(36)} + _, err = tCore.tick(tracker) + if err != nil { + t.Fatalf("redeem tick error: %v", err) + } + if tBtcWallet.redeemCounter != 1 { + t.Fatalf("never redeemed") + } + + setRedeems() + tBtcWallet.redeemErr = tErr + _, err = tCore.tick(tracker) + if err == nil || !strings.Contains(err.Error(), "error sending redeem transaction") { + t.Fatalf("redeem error not propagated. err = %v", err) + } + if tBtcWallet.redeemCounter != 1 { + t.Fatalf("never redeemed") + } + + // Both matches should be marked as suspect and have tickGovernors in place. + tracker.mtx.Lock() + for i, m := range []*matchTracker{redeemableMatch1, redeemableMatch2} { + if !m.suspectRedeem { + t.Fatalf("redeemable match %d not suspect after failed swap", i+1) + } + if m.tickGovernor == nil { + t.Fatalf("redeemable match %d has no tick meterer set", i+1) + } + } + tracker.mtx.Unlock() + + // Ticking right away again should do nothing. + tBtcWallet.redeemErr = nil + _, err = tCore.tick(tracker) + if err != nil { + t.Fatalf("tick error during metered redeem tick: %v", err) + } + if tBtcWallet.redeemCounter != 1 { + t.Fatalf("redeemed during metered tick %d", tBtcWallet.redeemCounter) + } + + // But once the tickGovernors expire, we should succeed with two separate + // requests. + tracker.mtx.Lock() + redeemableMatch1.tickGovernor = nil + redeemableMatch2.tickGovernor = nil + tracker.mtx.Unlock() + _, err = tCore.tick(tracker) + if err != nil { + t.Fatalf("tick error while redeeming suspect matches: %v", err) + } + if tBtcWallet.redeemCounter != 3 { + t.Fatalf("suspect redeem matches not run or not run separately. expected 2 new calls to Redeem, got %d", tBtcWallet.redeemCounter-1) + } +} diff --git a/client/core/status.go b/client/core/status.go index b19ecf5ae2..f471ca1d3f 100644 --- a/client/core/status.go +++ b/client/core/status.go @@ -144,13 +144,13 @@ func (c *Core) resolveConflictWithServerData(dc *dexConnection, trade *trackedTr if resolver != nil { resolver(dc, trade, match, srvData) } else { - // We don't know how to handle this. Set the failErr, and self-revoke + // We don't know how to handle this. Set the swapErr, and self-revoke // the match. This condition would be virtually impossible, because it // would mean that the client and server were at least two steps out of // sync. - match.failErr = fmt.Errorf("status conflict (%s -> %s) has no handler. %s", + match.swapErr = fmt.Errorf("status conflict (%s -> %s) has no handler. %s", match.MetaData.Status, srvStatus, logID) - c.log.Error(match.failErr) + c.log.Error(match.swapErr) match.MetaData.Proof.SelfRevoked = true err := c.db.UpdateMatch(&match.MetaMatch) if err != nil { diff --git a/client/core/trade.go b/client/core/trade.go index a7601524f0..09eb8385f0 100644 --- a/client/core/trade.go +++ b/client/core/trade.go @@ -34,8 +34,32 @@ func (err ExpirationErr) Error() string { return string(err) } // A matchTracker is used to negotiate a match. type matchTracker struct { db.MetaMatch - id order.MatchID - failErr error + id order.MatchID + // swapErr is an error set when we have given up hope on broadcasting a swap + // tx for a match. This can happen if 1) the swap has been attempted + // (repeatedly), but could not be successfully broadcast before the + // broadcast timeout, or 2) a match data was found to be in a non-sensical + // state during startup. + swapErr error + // tickGovernor can be set non-nil to prevent swaps or redeems from + // being attempted for a match. Typically, the *Timer comes from an + // AfterFunc that itself nils the tickGovernor. + tickGovernor *time.Timer + // swapErrCount counts the swap attempts. It is used in recovery. + swapErrCount int + // redeemErrCount counts the redeem attempts. It is used in recovery. + redeemErrCount int + // suspectSwap is a flag to indicate that there was a problem encountered + // trying to send a swap contract for this match. If suspectSwap is true, + // the match will not be grouped when attempting future swaps. + suspectSwap bool + // suspectRedeem is a flag to indicate that there was a problem encountered + // trying to redeem this match. If suspectRedeem is true, the match will not + // be grouped when attempting future redemptions. + suspectRedeem bool + // refundErr will be set to true if we attempt a refund and get a + // CoinNotFoundError, indicating there is nothing to refund. Prevents + // retries. refundErr error prefix *order.Prefix trade *order.Trade @@ -62,12 +86,17 @@ type matchTracker struct { // parts is a getter for pointers to commonly used struct fields in the // matchTracker. -func (match *matchTracker) parts() (*order.UserMatch, *db.MatchMetaData, *db.MatchProof, *db.MatchAuth) { - dbMatch, metaData := match.Match, match.MetaData +func (m *matchTracker) parts() (*order.UserMatch, *db.MatchMetaData, *db.MatchProof, *db.MatchAuth) { + dbMatch, metaData := m.Match, m.MetaData proof, auth := &metaData.Proof, &metaData.Proof.Auth return dbMatch, metaData, proof, auth } +// matchTime returns the match's match time as a time.Time. +func (m *matchTracker) matchTime() time.Time { + return encode.UnixTimeMilli(int64(m.MetaData.Proof.Auth.MatchStamp)).UTC() +} + // trackedCancel is information necessary to track a cancel order. A // trackedCancel is always associated with a trackedTrade. type trackedCancel struct { @@ -154,6 +183,16 @@ func (t *trackedTrade) broadcastTimeout() time.Duration { return time.Millisecond * time.Duration(t.dc.cfg.BroadcastTimeout) } +// delayTicks sets the tickGovernor to prevent retrying too quickly after an +// error. +func (t *trackedTrade) delayTicks(m *matchTracker, waitTime time.Duration) { + m.tickGovernor = time.AfterFunc(waitTime, func() { + t.mtx.Lock() + defer t.mtx.Unlock() + m.tickGovernor = nil + }) +} + // coreOrder constructs a *core.Order for the tracked order.Order. If the trade // has a cancel order associated with it, the cancel order will be returned, // otherwise the second returned *Order will be nil. @@ -689,9 +728,9 @@ func (t *trackedTrade) unspentContractAmounts() (amount uint64) { // mutex lock held for reads. func (t *trackedTrade) isSwappable(match *matchTracker) bool { dbMatch, metaData, proof, _ := match.parts() - if match.failErr != nil || proof.IsRevoked() { - t.dc.log.Tracef("Match %v not swappable: failErr = %v, revoked = %v", - match.id, match.failErr, proof.IsRevoked()) + if match.swapErr != nil || proof.IsRevoked() || match.tickGovernor != nil { + t.dc.log.Tracef("Match %v not swappable: swapErr = %v, revoked = %v, metered = %t", + match.id, match.swapErr, proof.IsRevoked(), match.tickGovernor != nil) return false } @@ -725,9 +764,9 @@ func (t *trackedTrade) isSwappable(match *matchTracker) bool { // mutex lock held for reads. func (t *trackedTrade) isRedeemable(match *matchTracker) bool { dbMatch, metaData, proof, _ := match.parts() - if match.failErr != nil || len(proof.RefundCoin) != 0 { - t.dc.log.Tracef("Match %v not redeemable: failErr = %v, RefundCoin = %v", - match.id, match.failErr, proof.RefundCoin) + if match.swapErr != nil || len(proof.RefundCoin) != 0 || match.tickGovernor != nil { + t.dc.log.Tracef("Match %v not redeemable: swapErr = %v, RefundCoin = %v, metered = %t", + match.id, match.swapErr, proof.RefundCoin, match.tickGovernor != nil) return false } @@ -843,8 +882,8 @@ func (t *trackedTrade) shouldBeginFindRedemption(match *matchTracker) bool { swapCoinID := proof.TakerSwap if match.Match.Side != order.Taker || len(swapCoinID) == 0 || len(proof.MakerRedeem) > 0 || len(proof.RefundCoin) > 0 { t.dc.log.Tracef( - "Not finding redemption for match %v: side = %s, failErr = %v, TakerSwap = %v RefundCoin = %v", - match.id, match.Match.Side, match.failErr, proof.TakerSwap, proof.RefundCoin) + "Not finding redemption for match %v: side = %s, swapErr = %v, TakerSwap = %v RefundCoin = %v", + match.id, match.Match.Side, match.swapErr, proof.TakerSwap, proof.RefundCoin) return false } if match.cancelRedemptionSearch != nil { // already finding redemption @@ -993,7 +1032,7 @@ func (c *Core) resendPendingRequests(t *trackedTrade) error { // Do not resend pending requests for revoked matches. // Matches where we've refunded our swap or we auto-redeemed maker's // swap will be set to revoked and will be skipped as well. - if match.failErr != nil || proof.IsRevoked() { + if match.swapErr != nil || proof.IsRevoked() { continue } side, status := dbMatch.Side, dbMatch.Status @@ -1092,26 +1131,50 @@ func (t *trackedTrade) revokeMatch(matchID order.MatchID, fromServer bool) error return nil } -// swapMatches will send a transaction with swap outputs for the specified -// matches. +// swapMatches will send a transaction with swaps for the specified matches. +// The matches will be de-grouped so that matches marked as suspect are swapped +// individually and separate from the non-suspect group. // // This method modifies match fields and MUST be called with the trackedTrade // mutex lock held for writes. func (c *Core) swapMatches(t *trackedTrade, matches []*matchTracker) error { errs := newErrorSet("swapMatches order %s - ", t.ID()) + groupables := make([]*matchTracker, 0, len(matches)) // Over-allocating if there are suspect matches + var suspects []*matchTracker + for _, m := range matches { + if m.suspectSwap { + suspects = append(suspects, m) + } else { + groupables = append(groupables, m) + } + } + if len(groupables) > 0 { + c.swapMatchGroup(t, groupables, errs) + } + for _, m := range suspects { + c.swapMatchGroup(t, []*matchTracker{m}, errs) + } + return errs.ifAny() +} +// swapMatchGroup will send a transaction with swap outputs for the specified +// matches. +// +// This method modifies match fields and MUST be called with the trackedTrade +// mutex lock held for writes. +func (c *Core) swapMatchGroup(t *trackedTrade, matches []*matchTracker, errs *errorSet) { // Prepare the asset.Contracts. contracts := make([]*asset.Contract, len(matches)) // These matches may have different fee rates, matched in different epochs. var highestFeeRate uint64 var includesMakerSwap bool for i, match := range matches { - dbMatch, _, proof, auth := match.parts() + dbMatch, proof := match.Match, &match.MetaData.Proof value := dbMatch.Quantity if !match.trade.Sell { value = calc.BaseToQuote(dbMatch.Rate, dbMatch.Quantity) } - matchTime := encode.UnixTimeMilli(int64(auth.MatchStamp)) + matchTime := match.matchTime() lockTime := matchTime.Add(t.lockTimeTaker).UTC().Unix() if dbMatch.Side == order.Maker { proof.Secret = encode.RandomBytes(32) @@ -1167,17 +1230,19 @@ func (c *Core) swapMatches(t *trackedTrade, matches []*matchTracker) error { for i, coinID := range coinIDs { coin, found := t.coins[hex.EncodeToString(coinID)] if !found { - return errs.add("%s coin %s not found", fromAsset.Symbol, coinIDString(fromAsset.ID, coinID)) + errs.add("%s coin %s not found", fromAsset.Symbol, coinIDString(fromAsset.ID, coinID)) + return } inputs[i] = coin } if t.dc.IsDown() { - return errs.add("not broadcasting swap while DEX %s connection is down (could be revoked)", t.dc.acct.host) + errs.add("not broadcasting swap while DEX %s connection is down (could be revoked)", t.dc.acct.host) + return } // swapMatches is no longer idempotent after this point. - // Send the swap. If the swap fails, set the failErr flag for all matches. + // Send the swap. If the swap fails, set the swapErr flag for all matches. // A more sophisticated solution might involve tracking the error time too // and trying again in certain circumstances. swaps := &asset.Swaps{ @@ -1188,11 +1253,34 @@ func (c *Core) swapMatches(t *trackedTrade, matches []*matchTracker) error { } receipts, change, fees, err := t.wallets.fromWallet.Swap(swaps) if err != nil { - // Set the error on the matches. for _, match := range matches { - match.failErr = err + // Mark the matches as suspect to prevent them being grouped again. + match.suspectSwap = true + match.swapErrCount++ + // If we can still swap before the broadcast timeout, allow retries + // soon. + auditStamp := match.MetaData.Proof.Auth.AuditStamp + lastActionTime := match.matchTime() + if match.Match.Side == order.Taker { + // It is possible that AuditStamp could be zero if we're + // recovering during startup or after a DEX reconnect. In that + // case, allow three retries before giving up. + lastActionTime = encode.UnixTimeMilli(int64(auditStamp)) + } + if time.Since(lastActionTime) < t.broadcastTimeout() || + (auditStamp == 0 && match.swapErrCount < tickCheckDivisions) { + + t.delayTicks(match, t.dc.tickInterval*3/4) + } else { + // If we can't get a swap out before the broadcast timeout, just + // quit. We could also self-revoke here, but we're also + // expecting a revocation from the server, so relying on that + // one for now. + match.swapErr = err + } } - return errs.add("error sending swap transaction: %v", err) + errs.add("error sending swap transaction: %v", err) + return } c.log.Infof("Broadcasted transaction with %d swap contracts for order %v. Fee rate = %d. Receipts (%s): %v", @@ -1239,8 +1327,6 @@ func (c *Core) swapMatches(t *trackedTrade, matches []*matchTracker) error { errs.addErr(err) } } - - return errs.ifAny() } // finalizeSwapAction sends an `init` request for the specified match, waits @@ -1311,11 +1397,36 @@ func (c *Core) finalizeSwapAction(t *trackedTrade, match *matchTracker, coinID, } // redeemMatches will send a transaction redeeming the specified matches. +// The matches will be de-grouped so that matches marked as suspect are redeemed +// individually and separate from the non-suspect group. // // This method modifies match fields and MUST be called with the trackedTrade // mutex lock held for writes. func (c *Core) redeemMatches(t *trackedTrade, matches []*matchTracker) error { - errs := newErrorSet("redeemMatches - order %s - ", t.ID()) + errs := newErrorSet("redeemMatches order %s - ", t.ID()) + groupables := make([]*matchTracker, 0, len(matches)) // Over-allocating if there are suspect matches + var suspects []*matchTracker + for _, m := range matches { + if m.suspectRedeem { + suspects = append(suspects, m) + } else { + groupables = append(groupables, m) + } + } + if len(groupables) > 0 { + c.redeemMatchGroup(t, groupables, errs) + } + for _, m := range suspects { + c.redeemMatchGroup(t, []*matchTracker{m}, errs) + } + return errs.ifAny() +} + +// redeemMatchGroup will send a transaction redeeming the specified matches. +// +// This method modifies match fields and MUST be called with the trackedTrade +// mutex lock held for writes. +func (c *Core) redeemMatchGroup(t *trackedTrade, matches []*matchTracker, errs *errorSet) { // Collect a asset.Redemption for each match into a slice of redemptions that // will be grouped into a single transaction. redemptions := make([]*asset.Redemption, 0, len(matches)) @@ -1332,10 +1443,36 @@ func (c *Core) redeemMatches(t *trackedTrade, matches []*matchTracker) error { // If an error was encountered, fail all of the matches. A failed match will // not run again on during ticks. if err != nil { + // The caller will notify the user that there is a problem. We really + // have no way of knowing whether this is recoverable (so we can't set + // swapErr), but we do want to prevent redemptions every tick. for _, match := range matches { - match.failErr = err + // Mark these matches as suspect. Suspect matches will not be + // grouped for redemptions in future attempts. + match.suspectRedeem = true + match.redeemErrCount++ + // If we can still make a broadcast timeout, allow retries soon. It + // is possible for RedemptionStamp or AuditStamp to be zero if we're + // recovering during startup or after a DEX reconnect. In that case, + // allow three retries before giving up. + lastActionStamp := match.MetaData.Proof.Auth.AuditStamp + if match.Match.Side == order.Taker { + lastActionStamp = match.MetaData.Proof.Auth.RedemptionStamp + } + lastActionTime := encode.UnixTimeMilli(int64(lastActionStamp)) + // Try to wait until about the next auto-tick to try again. + waitTime := t.dc.tickInterval * 3 / 4 + if time.Since(lastActionTime) > t.broadcastTimeout() || + (lastActionStamp == 0 && match.redeemErrCount >= tickCheckDivisions) { + // If we already missed the broadcast timeout, we're not in as + // much of a hurry. but keep trying and sending errors, because + // we do want the user to recover. + waitTime = 15 * time.Minute + } + t.delayTicks(match, waitTime) } - return errs.addErr(err) + errs.add("error sending redeem transaction: %v", err) + return } c.log.Infof("Broadcasted redeem transaction spending %d contracts for order %v, paying to %s (%s)", @@ -1361,8 +1498,6 @@ func (c *Core) redeemMatches(t *trackedTrade, matches []*matchTracker) error { errs.addErr(err) } } - - return errs.ifAny() } // finalizeRedeemAction sends a `redeem` request for the specified match, @@ -1547,8 +1682,8 @@ func (t *trackedTrade) refundMatches(matches []*matchTracker) (uint64, error) { refundCoin, err := refundWallet.Refund(swapCoinID, contractToRefund) if err != nil { - match.refundErr = err - if err == asset.CoinNotFoundError { + if err == asset.CoinNotFoundError && dbMatch.Side == order.Taker { + match.refundErr = err // Could not find the contract coin, which means it has been spent. // We should have already started FindRedemption for this contract, // but let's do it again to ensure we find the secret. @@ -1556,6 +1691,7 @@ func (t *trackedTrade) refundMatches(matches []*matchTracker) (uint64, error) { refundAsset.Symbol, swapCoinString) t.findMakersRedemption(match) } else { + t.delayTicks(match, time.Minute*5) errs.add("error sending refund tx for match %s, swap coin %s: %v", match.id, swapCoinString, err) } @@ -1667,7 +1803,7 @@ func (t *trackedTrade) auditContract(match *matchTracker, coinID []byte, contrac // 1. Recipient Address // 2. Contract value // 3. Secret hash: maker compares, taker records - dbMatch, _, proof, auth := match.parts() + dbMatch, proof := match.Match, &match.MetaData.Proof match.counterSwap = auditInfo if auditInfo.Recipient() != t.Trade().Address { return fmt.Errorf("swap recipient %s is not the order address %s.", auditInfo.Recipient(), t.Trade().Address) @@ -1692,7 +1828,7 @@ func (t *trackedTrade) auditContract(match *matchTracker, coinID []byte, contrac // Check or store the secret hash and update the database. match.MetaData.Proof.CounterScript = contract - matchTime := encode.UnixTimeMilli(int64(auth.MatchStamp)) + matchTime := match.matchTime() reqLockTime := encode.DropMilliseconds(matchTime.Add(t.lockTimeMaker)) // counterparty = maker if dbMatch.Side == order.Maker { // Check that the secret hash is correct. diff --git a/client/core/trade_simnet_test.go b/client/core/trade_simnet_test.go index 0920e256d5..c4e63e46f9 100644 --- a/client/core/trade_simnet_test.go +++ b/client/core/trade_simnet_test.go @@ -301,7 +301,6 @@ func TestMakerGhostingAfterTakerRedeem(t *testing.T) { } else { client.log("%s: resuming trade negotiations to audit Maker's redeem", side) } - match.failErr = nil // remove next action blocker on match } tracker.mtx.Unlock() // force next action since trade.tick() will not be called for disconnected dcs. @@ -550,7 +549,7 @@ func TestOrderStatusReconciliation(t *testing.T) { // revocation due to match inaction. var isTaker bool for _, match := range tracker.matches { - match.failErr = fmt.Errorf("ditch match") + match.swapErr = fmt.Errorf("ditch match") isTaker = match.Match.Side == order.Taker break // only interested in first match } @@ -828,9 +827,9 @@ func monitorTrackedTrade(ctx context.Context, client *tClient, tracker *trackedT side, status := match.Match.Side, match.Match.Status if status >= finalStatus { // We've done the needful for this match, - // - prevent further action by blocking the match with a failErr + // - prevent further action by blocking the match with a swapErr // - check if this client will be suspended for inaction - match.failErr = fmt.Errorf("take no further action") + match.swapErr = fmt.Errorf("take no further action") if (side == order.Maker && makerAtFault) || (side == order.Taker && takerAtFault) { client.atFault = true } diff --git a/client/core/types.go b/client/core/types.go index 96f6178599..e4b4b5b9d9 100644 --- a/client/core/types.go +++ b/client/core/types.go @@ -43,7 +43,7 @@ func (set *errorSet) addErr(err error) *errorSet { return set } -// If any returns the error set if there are any errors, else nil. +// ifAny returns the error set if there are any errors, else nil. func (set *errorSet) ifAny() error { if len(set.errs) > 0 { return set