From 75899395cfa0afa3b046514f7e376c493fe0f940 Mon Sep 17 00:00:00 2001 From: Jonathan Chappelow Date: Wed, 9 Sep 2020 18:54:39 -0500 Subject: [PATCH 01/12] server/swap: inaction check fixes use Timers and AfterFunc for inaction checks more precise latencyQ expirations dcrdex: 5 min default bcast timeout fixing up broadcast timeout tests --- server/cmd/dcrdex/config.go | 2 +- server/market/orderrouter.go | 4 +- server/swap/swap.go | 398 +++++++++++++++++++++-------------- server/swap/swap_test.go | 130 +++++++++--- 4 files changed, 340 insertions(+), 194 deletions(-) diff --git a/server/cmd/dcrdex/config.go b/server/cmd/dcrdex/config.go index c99c12d774..2463f5d312 100644 --- a/server/cmd/dcrdex/config.go +++ b/server/cmd/dcrdex/config.go @@ -52,7 +52,7 @@ const ( defaultCancelThresh = 0.95 // 19 cancels : 1 success defaultRegFeeConfirms = 4 defaultRegFeeAmount = 1e8 - defaultBroadcastTimeout = time.Minute + defaultBroadcastTimeout = 5 * time.Minute ) var ( diff --git a/server/market/orderrouter.go b/server/market/orderrouter.go index f8e66c8a7b..a0db8e04fc 100644 --- a/server/market/orderrouter.go +++ b/server/market/orderrouter.go @@ -172,7 +172,7 @@ func (r *OrderRouter) handleLimit(user account.AccountID, msg *msgjson.Message) } if _, suspended := r.auth.Suspended(user); suspended { - return msgjson.NewError(msgjson.MarketNotRunningError, "suspended account may not submit trade orders") + return msgjson.NewError(msgjson.MarketNotRunningError, "suspended account %v may not submit trade orders", user) } tunnel, coins, sell, rpcErr := r.extractMarketDetails(&limit.Prefix, &limit.Trade) @@ -290,7 +290,7 @@ func (r *OrderRouter) handleMarket(user account.AccountID, msg *msgjson.Message) } if _, suspended := r.auth.Suspended(user); suspended { - return msgjson.NewError(msgjson.MarketNotRunningError, "suspended account may not submit trade orders") + return msgjson.NewError(msgjson.MarketNotRunningError, "suspended account %v may not submit trade orders", user) } tunnel, assets, sell, rpcErr := r.extractMarketDetails(&market.Prefix, &market.Trade) diff --git a/server/swap/swap.go b/server/swap/swap.go index e685345773..b3ddf25d11 100644 --- a/server/swap/swap.go +++ b/server/swap/swap.go @@ -35,7 +35,7 @@ var ( recheckInterval = time.Second * 5 // txWaitExpiration is the longest the Swapper will wait for a coin waiter. // This could be thought of as the maximum allowable backend latency. - txWaitExpiration = time.Minute + txWaitExpiration = 2 * time.Minute ) func unixMsNow() time.Time { @@ -399,6 +399,10 @@ func NewSwapper(cfg *Config) (*Swapper, error) { liveAckers: make(map[uint64]*msgAckers), } + if sensible := swapper.bTimeout; txWaitExpiration > sensible { + txWaitExpiration = sensible + } + // Load the initial state. var state *State if cfg.StatePath != "" { @@ -471,7 +475,7 @@ func (s *Swapper) restoreState(state *State, allowPartial bool) error { for _, id := range state.Assets { if s.coins[id] == nil { if !allowPartial { - return fmt.Errorf("Unable to find backend for asset %d in restore state.", id) + return fmt.Errorf("unable to find backend for asset %d in restore state", id) } log.Warnf("Unable to find backend for asset %d in restore state.", id) missingAssets[id] = true @@ -896,19 +900,56 @@ func (s *Swapper) Run(ctx context.Context) { wgHelpers.Done() }() - // bcastTriggers is used to sequence an examination of an asset's related - // matches some time (bTimeout) after a block notification is received. - bcastTriggers := make([]*blockNotification, 0, 16) - bcastTicker := time.NewTimer(s.bTimeout) - minTimeout := s.bTimeout / 10 - setTimeout := func(block *blockNotification) { - timeTil := time.Until(block.time.Add(s.bTimeout)) - if timeTil < minTimeout { - timeTil = minTimeout + log.Debugf("Swapper started with %v broadcast timeout.", s.bTimeout) + + // Block-based inaction checks are started with Timers. Track them so that + // they may be stopped on shutdown. + var timerID uint32 + var checkTimerMtx sync.Mutex + checkTimers := make(map[uint32]*time.Timer) + + // On shutdown, stop any unfired check timers. + defer func() { + checkTimerMtx.Lock() + for id, t := range checkTimers { + t.Stop() + delete(checkTimers, id) } - bcastTicker = time.NewTimer(timeTil) + checkTimerMtx.Unlock() + }() + + startCheckTimer := func(assetID uint32) { + id := timerID + timerID++ + checkTimerMtx.Lock() + checkTimers[id] = time.AfterFunc(s.bTimeout, func() { + checkTimerMtx.Lock() + if _, ok := checkTimers[id]; !ok { + checkTimerMtx.Unlock() + return // shutdown removed us first + } + // Unregister the timer and start the check. + delete(checkTimers, id) + wgHelpers.Add(1) // while locked so none can start after the shutdown defer + checkTimerMtx.Unlock() + s.checkInactionBlockBased(assetID) + wgHelpers.Done() + }) + checkTimerMtx.Unlock() + } + + // On startup, schedule an inaction check for each asset. Ideally these + // would start bTimeout after the best block times. + for assetID := range s.coins { + startCheckTimer(assetID) } + // For actions that are expected some time after other events, just have a + // ticker. Each event could make an AfterFunc, but this allows match checks + // to be batched, and it's simpler. + bcastEventTicker := time.NewTicker(s.bTimeout / 4) + defer bcastEventTicker.Stop() + // Main loop can stop on internal error via cancel(), or when the caller // cancels the parent context triggering graceful shutdown. wgMain.Add(1) @@ -930,9 +971,9 @@ func (s *Swapper) Run(ctx context.Context) { } else { log.Errorf("asset %d is reporting a block notification error: %v", block.assetID, block.err) } - break + continue } - bcastTriggers = append(bcastTriggers, block) + // processBlock will update confirmation times in the swapStatus // structs. // @@ -950,31 +991,15 @@ func (s *Swapper) Run(ctx context.Context) { // Presently, one stuck backend that hangs on Confirmations // halts the whole DEX! So timeouts on Confirmations too. s.processBlock(block) - case <-bcastTicker.C: - for { - // checkInaction will fail if the DB is failing. - if err := s.storage.LastErr(); err != nil { - return - } - if len(bcastTriggers) == 0 { - bcastTicker = time.NewTimer(s.bTimeout) - break - } - if time.Now().Before(bcastTriggers[0].time.Add(s.bTimeout)) { - setTimeout(bcastTriggers[0]) - break - } - block := bcastTriggers[0] - bcastTriggers = bcastTriggers[1:] - s.checkInaction(block.assetID) - if len(bcastTriggers) == 0 { - bcastTicker = time.NewTimer(s.bTimeout) - break - } else { - setTimeout(bcastTriggers[0]) - } - } + // Schedule an inaction check for matches that involve this + // asset, as they could be expecting user action within bTimeout + // of this event. + startCheckTimer(block.assetID) + + case <-bcastEventTicker.C: + s.checkInactionEventsBased() + case <-mainLoop: return } @@ -985,7 +1010,7 @@ func (s *Swapper) Run(ctx context.Context) { <-ctxMaster.Done() } -func (s *Swapper) tryConfirmSwap(status *swapStatus) { +func (s *Swapper) tryConfirmSwap(status *swapStatus, confTime time.Time) { status.mtx.Lock() defer status.mtx.Unlock() if status.swapTime.IsZero() || !status.swapConfirmed.IsZero() { @@ -1002,7 +1027,7 @@ func (s *Swapper) tryConfirmSwap(status *swapStatus) { if confs >= int64(swapConf) { log.Debugf("Swap %v (%s) has reached %d confirmations (%d required)", status.swap, dex.BipIDSymbol(status.swapAsset), confs, swapConf) - status.swapConfirmed = time.Now().UTC() + status.swapConfirmed = confTime.UTC() } } @@ -1035,7 +1060,7 @@ func (s *Swapper) processBlock(block *blockNotification) { } // If the maker has broadcast their transaction, the taker's broadcast // timeout starts once the maker's swap has SwapConf confs. - s.tryConfirmSwap(match.makerStatus) + s.tryConfirmSwap(match.makerStatus, block.time) case order.TakerSwapCast: if match.takerStatus.swapAsset != block.assetID { break statusSwitch @@ -1043,7 +1068,7 @@ func (s *Swapper) processBlock(block *blockNotification) { // If the taker has broadcast their transaction, the maker's broadcast // timeout (for redemption) starts once the maker's swap has SwapConf // confs. - s.tryConfirmSwap(match.takerStatus) + s.tryConfirmSwap(match.takerStatus, block.time) case order.MakerRedeemed: // It's the taker's turn to redeem. Nothing to do here. break statusSwitch @@ -1132,10 +1157,64 @@ func (s *Swapper) makerRedeemStatus(mStatus *swapStatus, tAsset uint32) (makerRe return } -// checkInaction scans the swapStatus structures relevant to the specified -// asset. If a client is found to have not acted when required, a match may be -// revoked and a penalty assigned to the user. -func (s *Swapper) checkInaction(assetID uint32) { +func (s *Swapper) failMatch(match *matchTracker, makerFault bool) { + orderAtFault, otherOrder := match.Taker, order.Order(match.Maker) // an order.Order + if makerFault { + orderAtFault, otherOrder = match.Maker, match.Taker + } + log.Debugf("failMatch: swap %v failing (maker fault = %v) at %v", + match.ID(), makerFault, match.Status) + + // Record the end of this match's processing. + s.storage.SetMatchInactive(db.MatchID(match.Match)) + + // If the at-fault order is a limit order, signal that if it is + // still on the book is should be unbooked, changed to revoked + // status, counted against the user's cancellation ratio, and a + // server-generated cancel order recorded. + if lo, isLimit := orderAtFault.(*order.LimitOrder); isLimit { + if s.unbookHook(lo) { + s.orders.canceled(orderAtFault) // set as off-book and failed + } + } + + // That's one less active swap for this order, and a failure. If + // this order has no other active swaps, it will be removed from the + // order swap tracker by decrementActiveSwapCount since it is + // off-book and no new swap negotiations can begin for this order. + s.orders.swapFailure(orderAtFault) + + // The other order now has one less active swap too. + if s.orders.swapSuccess(otherOrder) { + // TODO: We should count this as a successful swap, but should + // it only be a completed order with the extra stipulation that + // it had already completed another swap? + compTime := time.Now().UTC() + s.authMgr.RecordCompletedOrder(otherOrder.User(), otherOrder.ID(), compTime) + if err := s.storage.SetOrderCompleteTime(otherOrder, encode.UnixMilli(compTime)); err != nil { + if db.IsErrGeneralFailure(err) { + log.Errorf("fatal error with SetOrderCompleteTime for order %v: %v", otherOrder.UID(), err) + } else { + log.Warnf("SetOrderCompleteTime for %v: %v", otherOrder.UID(), err) + } + } + } + + // Penalize for failure to act. + // + // TODO: Arguably, this obviates the RecordCancel above since this + // closes the account before the possibility of a cancellation ratio + // penalty. I'm keeping it this way for now however since penalties + // may become less severe than account closure (e.g. temporary + // suspension, cool down, or order throttling), and restored + // accounts will still require a record of the revoked order. + s.authMgr.Penalize(orderAtFault.User(), account.FailureToAct) + + // Send the revoke_match messages, and solicit acks. + s.revoke(match) +} + +func (s *Swapper) checkInactionEventsBased() { // If the DB is failing, do not penalize or attempt to start revocations. if err := s.storage.LastErr(); err != nil { log.Errorf("DB in failing state.") @@ -1143,7 +1222,12 @@ func (s *Swapper) checkInaction(assetID uint32) { } var deletions []*matchTracker - oldestAllowed := time.Now().Add(-s.bTimeout).UTC() + + // Do time.Since(event) with the same now time for each match. + now := time.Now() + tooOld := func(evt time.Time) bool { + return now.Sub(evt) >= s.bTimeout + } checkMatch := func(match *matchTracker) { // Lock entire matchTracker so the following is atomic with respect to @@ -1151,126 +1235,94 @@ func (s *Swapper) checkInaction(assetID uint32) { match.mtx.RLock() defer match.mtx.RUnlock() - if match.makerStatus.swapAsset != assetID && match.takerStatus.swapAsset != assetID { - return - } + log.Tracef("checkInactionEventsBased() => checkMatch(%v, %v)", + match.ID(), match.Status) failMatch := func(makerFault bool) { - orderAtFault, otherOrder := match.Taker, order.Order(match.Maker) // an order.Order - if makerFault { - orderAtFault, otherOrder = match.Maker, match.Taker + s.failMatch(match, makerFault) + deletions = append(deletions, match) + } + + switch match.Status { + case order.NewlyMatched: + // Maker has not broadcast their swap. They have until match time + // plus bTimeout. + if tooOld(match.time) { + failMatch(true) // maker should have swapped } - log.Debugf("checkInaction(failMatch): swap %v failing (maker fault = %v) at %v", - match.ID(), makerFault, match.Status) + case order.MakerRedeemed: + // If the maker has redeemed, the taker can redeem immediately, so + // check the timeout against the time the Swapper received the + // maker's `redeem` request (and sent the taker's 'redemption'). + match.makerStatus.mtx.RLock() + defer match.makerStatus.mtx.RUnlock() + if tooOld(match.makerStatus.redeemTime) { + failMatch(false) // taker should have redeemed + } + } + } - deletions = append(deletions, match) + s.matchMtx.Lock() + defer s.matchMtx.Unlock() - // Record the end of this match's processing. - s.storage.SetMatchInactive(db.MatchID(match.Match)) + for _, match := range s.matches { + checkMatch(match) + } - // If the at-fault order is a limit order, signal that if it is - // still on the book is should be unbooked, changed to revoked - // status, counted against the user's cancellation ratio, and a - // server-generated cancel order recorded. - if lo, isLimit := orderAtFault.(*order.LimitOrder); isLimit { - if s.unbookHook(lo) { - s.orders.canceled(orderAtFault) // set as off-book and failed - } - } + for _, match := range deletions { + delete(s.matches, match.ID()) + } +} - // That's one less active swap for this order, and a failure. If - // this order has no other active swaps, it will be removed from the - // order swap tracker by decrementActiveSwapCount since it is - // off-book and no new swap negotiations can begin for this order. - s.orders.swapFailure(orderAtFault) - - // The other order now has one less active swap too. - if s.orders.swapSuccess(otherOrder) { - // TODO: We should count this as a successful swap, but should - // it only be a completed order with the extra stipulation that - // it had already completed another swap? - compTime := time.Now().UTC() - s.authMgr.RecordCompletedOrder(otherOrder.User(), otherOrder.ID(), compTime) - if err := s.storage.SetOrderCompleteTime(otherOrder, encode.UnixMilli(compTime)); err != nil { - if db.IsErrGeneralFailure(err) { - log.Errorf("fatal error with SetOrderCompleteTime for order %v: %v", otherOrder.UID(), err) - } else { - log.Warnf("SetOrderCompleteTime for %v: %v", otherOrder.UID(), err) - } - } - } +// checkInaction scans the swapStatus structures relevant to the specified +// asset. If a client is found to have not acted when required, a match may be +// revoked and a penalty assigned to the user. +func (s *Swapper) checkInactionBlockBased(assetID uint32) { + // If the DB is failing, do not penalize or attempt to start revocations. + if err := s.storage.LastErr(); err != nil { + log.Errorf("DB in failing state.") + return + } - // Penalize for failure to act. - // - // TODO: Arguably, this obviates the RecordCancel above since this - // closes the account before the possibility of a cancellation ratio - // penalty. I'm keeping it this way for now however since penalties - // may become less severe than account closure (e.g. temporary - // suspension, cool down, or order throttling), and restored - // accounts will still require a record of the revoked order. - s.authMgr.Penalize(orderAtFault.User(), account.FailureToAct) + var deletions []*matchTracker + // Do time.Since(event) with the same now time for each match. + now := time.Now() + tooOld := func(evt time.Time) bool { + return !evt.IsZero() && now.Sub(evt) >= s.bTimeout + } + + checkMatch := func(match *matchTracker) { + // Lock entire matchTracker so the following is atomic with respect to + // Status. + match.mtx.RLock() + defer match.mtx.RUnlock() - // Send the revoke_match messages, and solicit acks. - s.revoke(match) + if match.makerStatus.swapAsset != assetID && match.takerStatus.swapAsset != assetID { + return } - match.makerStatus.mtx.RLock() - defer match.makerStatus.mtx.RUnlock() - match.takerStatus.mtx.RLock() - defer match.takerStatus.mtx.RUnlock() + log.Tracef("checkInactionBlockBased(%d) => checkMatch(%v, %v): assets %d / %d", + assetID, match.ID(), match.Status, + match.makerStatus.swapAsset, match.takerStatus.swapAsset) + + failMatch := func(makerFault bool) { + s.failMatch(match, makerFault) + deletions = append(deletions, match) + } switch match.Status { - case order.NewlyMatched: - if match.makerStatus.swapAsset != assetID { - return - } - // If the maker is not acting, the swapTime won't be set. Check against - // the time the match notification was sent (match.time) for the broadcast - // timeout. - if match.makerStatus.swapTime.IsZero() && match.time.Before(oldestAllowed) { - failMatch(true) - } case order.MakerSwapCast: - if match.takerStatus.swapAsset != assetID { - return - } - // If the maker has sent their swap tx, check the taker's broadcast - // timeout against the time of the swap's SwapConf'th confirmation. - if match.takerStatus.swapTime.IsZero() && - !match.makerStatus.swapConfirmed.IsZero() && - match.makerStatus.swapConfirmed.Before(oldestAllowed) { - failMatch(false) + match.makerStatus.mtx.RLock() + defer match.makerStatus.mtx.RUnlock() + if tooOld(match.makerStatus.swapConfirmed) { + failMatch(false) // taker should have swapped } case order.TakerSwapCast: - if match.takerStatus.swapAsset != assetID { - return - } - // If the taker has sent their swap tx, check the maker's broadcast - // timeout (for redemption) against the time of the swap's SwapConf'th - // confirmation. - if match.makerStatus.redeemTime.IsZero() && - !match.takerStatus.swapConfirmed.IsZero() && - match.takerStatus.swapConfirmed.Before(oldestAllowed) { - failMatch(true) + match.takerStatus.mtx.RLock() + defer match.takerStatus.mtx.RUnlock() + if tooOld(match.takerStatus.swapConfirmed) { + failMatch(true) // maker should have redeemed } - case order.MakerRedeemed: - if match.takerStatus.swapAsset != assetID { - return - } - // If the maker has redeemed, the taker can redeem immediately, so - // check the timeout against the time the Swapper received the - // maker's `redeem` request (and sent the taker's 'redemption'). - if match.takerStatus.redeemTime.IsZero() && - !match.makerStatus.redeemTime.IsZero() && - match.makerStatus.redeemTime.Before(oldestAllowed) { - failMatch(false) - } - case order.MatchComplete: - // Nothing to do here right now. - - // Note: clients still must ack the counterparty's redeem for the - // swap to be flagged as done/inactive in the DB, but the match may - // be deleted from s.matches if the redeem txns fully confirm first. } } @@ -1485,7 +1537,7 @@ type messageAcker struct { isAudit bool } -// processAck processes a msgjson.Acknowledgement to the audit, redeem, and +// processAck processes a msgjson.Acknowledgement to the audit, redemption, and // revoke_match requests, validating the signature and updating the // (order.Match).Sigs record. This is required by processInit, processRedeem, // and revoke. Match Acknowledgements are handled by processMatchAck. @@ -1529,7 +1581,7 @@ func (s *Swapper) processAck(msg *msgjson.Message, acker *messageAcker) { return // drop the revoke ack sig for now } - // This is an ack of either contract audit or redeem receipt. + // This is an ack of either contract audit or redemption receipt. if acker.isAudit { log.Debugf("Received contract audit acknowledgement from user %v (%s) for match %v (%v)", acker.user, makerTaker(acker.isMaker), acker.match.Match.ID(), acker.match.Status) @@ -1544,7 +1596,7 @@ func (s *Swapper) processAck(msg *msgjson.Message, acker *messageAcker) { return } - // It's a redeem ack. + // It's a redemption ack. log.Debugf("Received redemption acknowledgement from user %v (%s) for match %v (%s)", acker.user, makerTaker(acker.isMaker), acker.match.Match.ID(), acker.match.Status) @@ -1622,7 +1674,6 @@ func (s *Swapper) processInit(msg *msgjson.Message, params *msgjson.Init, stepIn "redemption error") return wait.DontTryAgain } - //actor.swapAsset reqFeeRate := stepInfo.match.FeeRateQuote if stepInfo.isBaseAsset { reqFeeRate = stepInfo.match.FeeRateBase @@ -2021,10 +2072,27 @@ func (s *Swapper) handleInit(user account.AccountID, msg *msgjson.Message) *msgj s.setLiveWaiter(user, msg) + // Do not search for the transaction past the inaction deadline. For maker, + // this is bTimeout after match request. For taker, this is bTimeout after + // maker's swap reached swapConfs. + lastEvent := stepInfo.match.time // NewlyMatched - the match request time, not matchTime + if stepInfo.step == order.MakerSwapCast { + lastEvent = stepInfo.match.makerStatus.swapConfirmed + } + expireTime := time.Now().Add(txWaitExpiration).UTC() + if lastEvent.IsZero() { + log.Warnf("Prematurely received 'init' from %v at step %v", makerTaker(stepInfo.actor.isMaker), stepInfo.step) + } else if deadline := lastEvent.Add(s.bTimeout); expireTime.After(deadline) { + expireTime = deadline + } + log.Debugf("Waiting until %v (%v) to locate contract from %v (%v), match %v", + expireTime, time.Until(expireTime), makerTaker(stepInfo.actor.isMaker), + stepInfo.step, matchID) + // Since we have to consider broadcast latency of the asset's network, run // this as a coin waiter. s.latencyQ.Wait(&wait.Waiter{ - Expiration: time.Now().Add(txWaitExpiration), + Expiration: expireTime, TryFunc: func() bool { done := s.processInit(msg, params, stepInfo) if done == wait.DontTryAgain { @@ -2103,9 +2171,26 @@ func (s *Swapper) handleRedeem(user account.AccountID, msg *msgjson.Message) *ms s.setLiveWaiter(user, msg) + // Do not search for the transaction past the inaction deadline. For maker, + // this is bTimeout after taker's swap reached swapConfs. For taker, this is + // bTimeout after maker's redeem cast (and redemption request time). + lastEvent := stepInfo.match.takerStatus.swapConfirmed // TakerSwapCast + if stepInfo.step == order.MakerRedeemed { + lastEvent = stepInfo.match.makerStatus.redeemTime + } + expireTime := time.Now().Add(txWaitExpiration).UTC() + if lastEvent.IsZero() { + log.Warnf("Prematurely received 'redeem' from %v at step %v", makerTaker(stepInfo.actor.isMaker), stepInfo.step) + } else if deadline := lastEvent.Add(s.bTimeout); expireTime.After(deadline) { + expireTime = deadline + } + log.Debugf("Waiting until %v (%v) to locate redeem from %v (%v), match %v", + expireTime, time.Until(expireTime), makerTaker(stepInfo.actor.isMaker), + stepInfo.step, matchID) + // Since we have to consider latency, run this as a coin waiter. s.latencyQ.Wait(&wait.Waiter{ - Expiration: time.Now().Add(txWaitExpiration), + Expiration: expireTime, TryFunc: func() bool { done := s.processRedeem(msg, params, stepInfo) if done == wait.DontTryAgain { @@ -2118,6 +2203,7 @@ func (s *Swapper) handleRedeem(user account.AccountID, msg *msgjson.Message) *ms s.rmLiveWaiter(user, msg.ID) s.respondError(msg.ID, user, msgjson.TransactionUndiscovered, fmt.Sprintf("failed to find redeemed coin %v", coinStr)) + // Client should retry the redeem request, maybe even rebroadcast. }, }) return nil diff --git a/server/swap/swap_test.go b/server/swap/swap_test.go index aca7644875..f727e63779 100644 --- a/server/swap/swap_test.go +++ b/server/swap/swap_test.go @@ -68,7 +68,7 @@ func timeOutMempool() { func timeoutBroadcast() { // swapper.bTimeout is 5*txWaitExpiration for testing - time.Sleep(txWaitExpiration * 6) + time.Sleep(txWaitExpiration * 6) // 200 ms txWaitExpiration => 1 sec bTimeout => 1.2 sec timeout sleep } func dirtyEncode(s string) []byte { @@ -751,8 +751,10 @@ func (rig *testRig) sendSwap(user *tUser, oid order.OrderID, recipient string) ( matchInfo := rig.matchInfo swap := tNewSwap(matchInfo, oid, recipient, user) if isQuoteSwap(user, matchInfo.match) { + fmt.Println("xyz contract") rig.xyzNode.setContract(swap.coin, false) } else { + fmt.Println("abc contract") rig.abcNode.setContract(swap.coin, false) } rpcErr := rig.swapper.handleInit(user.acct, swap.req) @@ -1326,8 +1328,9 @@ func TestMain(m *testing.M) { comms.UseLogger(logger) var shutdown func() testCtx, shutdown = context.WithCancel(context.Background()) - defer shutdown() - os.Exit(m.Run()) + code := m.Run() + shutdown() + os.Exit(code) } func TestFatalStorageErr(t *testing.T) { @@ -1345,6 +1348,11 @@ func testSwap(t *testing.T, rig *testRig) { t.Helper() ensureNilErr := makeEnsureNilErr(t) + sendBlock := func(node *TAsset) { + node.bChan <- &asset.BlockUpdate{Err: nil} + tickMempool() + } + // Step through the negotiation process. No errors should be generated. var takerAcked bool for _, matchInfo := range rig.matches.matchInfos { @@ -1354,12 +1362,23 @@ func testSwap(t *testing.T, rig *testRig) { ensureNilErr(rig.ackMatch_taker(true)) takerAcked = true } + + // Assuming market's base asset is abc, quote is xyz + makerSwapAsset, takerSwapAsset := rig.xyz, rig.abc + if matchInfo.match.Maker.Sell { + makerSwapAsset, takerSwapAsset = rig.abc, rig.xyz + } + ensureNilErr(rig.sendSwap_maker(true)) ensureNilErr(rig.auditSwap_taker()) ensureNilErr(rig.ackAudit_taker(true)) + matchInfo.db.makerSwap.coin.setConfs(int64(makerSwapAsset.SwapConf)) + sendBlock(makerSwapAsset.Backend.(*TAsset)) ensureNilErr(rig.sendSwap_taker(true)) ensureNilErr(rig.auditSwap_maker()) ensureNilErr(rig.ackAudit_maker(true)) + matchInfo.db.takerSwap.coin.setConfs(int64(takerSwapAsset.SwapConf)) + sendBlock(takerSwapAsset.Backend.(*TAsset)) ensureNilErr(rig.redeem_maker(true)) ensureNilErr(rig.ackRedemption_taker(true)) ensureNilErr(rig.redeem_taker(true)) @@ -1425,6 +1444,7 @@ func TestNoAck(t *testing.T) { // Check that the response from the Swapper is an // msgjson.SettlementSequenceError. checkSeqError := func(user *tUser) { + t.Helper() msg, resp := rig.auth.getResp(user.acct) if msg == nil { t.Fatalf("checkSeqError: no message") @@ -1437,6 +1457,11 @@ func TestNoAck(t *testing.T) { } } + sendBlock := func(node *TAsset) { + node.bChan <- &asset.BlockUpdate{Err: nil} + tickMempool() + } + // Don't acknowledge from either side yet. Have the maker broadcast their swap // transaction mustBeError(rig.sendSwap_maker(true), "maker swap send") @@ -1444,6 +1469,8 @@ func TestNoAck(t *testing.T) { ensureNilErr(rig.ackMatch_maker(true)) // Should be good to send the swap now. ensureNilErr(rig.sendSwap_maker(true)) + matchInfo.db.makerSwap.coin.setConfs(int64(rig.abc.SwapConf)) + sendBlock(rig.abc.Backend.(*TAsset)) // For the taker, there must be two acknowledgements before broadcasting the // swap transaction, the match ack and the audit ack. mustBeError(rig.sendSwap_taker(true), "no match-ack taker swap send") @@ -1455,6 +1482,8 @@ func TestNoAck(t *testing.T) { ensureNilErr(rig.auditSwap_taker()) ensureNilErr(rig.ackAudit_taker(true)) ensureNilErr(rig.sendSwap_taker(true)) + matchInfo.db.takerSwap.coin.setConfs(int64(rig.xyz.SwapConf)) + sendBlock(rig.xyz.Backend.(*TAsset)) // The maker should have received an 'audit' request. Don't acknowledge yet. mustBeError(rig.redeem_maker(true), "maker redeem") checkSeqError(maker) @@ -1479,6 +1508,11 @@ func TestTxWaiters(t *testing.T) { ensureNilErr := makeEnsureNilErr(t) dummyError := fmt.Errorf("test error") + sendBlock := func(node *TAsset) { + node.bChan <- &asset.BlockUpdate{Err: nil} + tickMempool() + } + // Get the MatchNotifications that the swapper sent to the clients and check // the match notification length, content, IDs, etc. if err := rig.ackMatch_maker(true); err != nil { @@ -1490,9 +1524,9 @@ func TestTxWaiters(t *testing.T) { // Set a non-latency error. rig.abcNode.setContractErr(dummyError) rig.sendSwap_maker(false) - msg, resp := rig.auth.getResp(matchInfo.maker.acct) + msg, _ := rig.auth.getResp(matchInfo.maker.acct) if msg == nil { - t.Fatalf("no response for erroneous maker swap") + t.Fatalf("no response for erroneous maker swap. err") } // Set an error for the maker's swap asset rig.abcNode.setContractErr(asset.CoinNotFoundError) @@ -1503,7 +1537,7 @@ func TestTxWaiters(t *testing.T) { } timeOutMempool() // Should have an rpc error. - msg, resp = rig.auth.getResp(matchInfo.maker.acct) + msg, resp := rig.auth.getResp(matchInfo.maker.acct) if msg == nil { t.Fatalf("no response for missing tx after timeout") } @@ -1516,6 +1550,8 @@ func TestTxWaiters(t *testing.T) { if err := rig.sendSwap_maker(true); err != nil { t.Fatal(err) } + matchInfo.db.makerSwap.coin.setConfs(int64(rig.abc.SwapConf)) + sendBlock(rig.abc.Backend.(*TAsset)) if err := rig.auditSwap_taker(); err != nil { t.Fatal(err) } @@ -1544,12 +1580,15 @@ func TestTxWaiters(t *testing.T) { tickMempool() msg, resp = rig.auth.getResp(matchInfo.taker.acct) if msg == nil { - t.Fatalf("no response for erroneous taker swap") + t.Fatalf("no response for ok taker swap") } if resp.Error != nil { - t.Fatalf("unexpected rpc error for erroneous taker swap. code: %d, msg: %s", + t.Fatalf("unexpected rpc error for ok taker swap. code: %d, msg: %s", resp.Error.Code, resp.Error.Message) } + matchInfo.db.takerSwap.coin.setConfs(int64(rig.xyz.SwapConf)) + sendBlock(rig.xyz.Backend.(*TAsset)) + ensureNilErr(rig.auditSwap_maker()) ensureNilErr(rig.ackAudit_maker(true)) @@ -1611,6 +1650,7 @@ func TestBroadcastTimeouts(t *testing.T) { tickMempool() } checkRevokeMatch := func(user *tUser, i int) { + t.Helper() req := rig.auth.getReq(user.acct) if req == nil { t.Fatalf("no match_cancellation") @@ -1618,7 +1658,7 @@ func TestBroadcastTimeouts(t *testing.T) { params := new(msgjson.RevokeMatch) err := json.Unmarshal(req.req.Payload, ¶ms) if err != nil { - t.Fatalf("unmarshal error for %s at step %d: %s", user.lbl, i, string(req.req.Payload)) + t.Fatalf("unmarshal error for %s at step %d: %s (%v)", user.lbl, i, string(req.req.Payload), err) } if err = checkSigS256(params, rig.auth.privkey.PubKey()); err != nil { t.Fatalf("incorrect server signature: %v", err) @@ -1636,6 +1676,7 @@ func TestBroadcastTimeouts(t *testing.T) { // check that a penalty was assigned to the appropriate user, and that a // revoke_match message is sent to both users. tryExpire := func(i, j int, step order.MatchStatus, jerk, victim *tUser, node *TAsset) bool { + t.Helper() if i != j { return false } @@ -1656,55 +1697,74 @@ func TestBroadcastTimeouts(t *testing.T) { return true } // Run a timeout test after every important step. - for i := 0; i <= 7; i++ { + for i := 0; i <= 3; i++ { set := tPerfectLimitLimit(uint64(1e8), uint64(1e8), true) // same orders, different users matchInfo := set.matchInfos[0] rig.matchInfo = matchInfo rig.swapper.Negotiate([]*order.MatchSet{set.matchSet}, nil) // Step through the negotiation process. No errors should be generated. + // TODO: timeout each match ack, not block based inaction. + ensureNilErr(rig.ackMatch_maker(true)) ensureNilErr(rig.ackMatch_taker(true)) + + // Timeout waiting for maker swap. if tryExpire(i, 0, order.NewlyMatched, matchInfo.maker, matchInfo.taker, rig.abcNode) { continue } - if tryExpire(i, 1, order.NewlyMatched, matchInfo.maker, matchInfo.taker, rig.abcNode) { - continue - } + ensureNilErr(rig.sendSwap_maker(true)) - matchInfo.db.makerSwap.coin.setConfs(int64(rig.abc.SwapConf)) - // Do the audit here to clear the 'audit' request from the comms queue. + + // Pull the server's 'audit' request from the comms queue. ensureNilErr(rig.auditSwap_taker()) - sendBlock(rig.abcNode) - if tryExpire(i, 2, order.MakerSwapCast, matchInfo.taker, matchInfo.maker, rig.xyzNode) { - continue - } + + // NOTE: timeout on the taker's audit ack response itself does not cause + // a revocation. The taker not broadcasting their swap when maker's swap + // reaches swapconf plus bTimeout is the trigger. ensureNilErr(rig.ackAudit_taker(true)) - if tryExpire(i, 3, order.MakerSwapCast, matchInfo.taker, matchInfo.maker, rig.xyzNode) { + + // Maker's swap reaches swapConf. + matchInfo.db.makerSwap.coin.setConfs(int64(rig.abc.SwapConf)) + sendBlock(rig.abcNode) // tryConfirmSwap + // With maker swap confirmed, inaction happens bTimeout after + // swapConfirmed time. + if tryExpire(i, 1, order.MakerSwapCast, matchInfo.taker, matchInfo.maker, rig.xyzNode) { continue } + ensureNilErr(rig.sendSwap_taker(true)) - matchInfo.db.takerSwap.coin.setConfs(int64(rig.xyz.SwapConf)) - // Do the audit here to clear the 'audit' request from the comms queue. + + // Pull the server's 'audit' request from the comms queue ensureNilErr(rig.auditSwap_maker()) - sendBlock(rig.xyzNode) - if tryExpire(i, 4, order.TakerSwapCast, matchInfo.maker, matchInfo.taker, rig.xyzNode) { - continue - } + + // NOTE: timeout on the maker's audit ack response itself does not cause + // a revocation. The maker not broadcasting their redeem when taker's + // swap reaches swapconf plus bTimeout is the trigger. ensureNilErr(rig.ackAudit_maker(true)) - if tryExpire(i, 5, order.TakerSwapCast, matchInfo.maker, matchInfo.taker, rig.xyzNode) { + + // Taker's swap reaches swapConf. + matchInfo.db.takerSwap.coin.setConfs(int64(rig.xyz.SwapConf)) + sendBlock(rig.xyzNode) + // With taker swap confirmed, inaction happens bTimeout after + // swapConfirmed time. + if tryExpire(i, 2, order.TakerSwapCast, matchInfo.maker, matchInfo.taker, rig.xyzNode) { continue } + ensureNilErr(rig.redeem_maker(true)) - matchInfo.db.makerRedeem.coin.setConfs(int64(rig.xyz.SwapConf)) - // Ack the redemption here to clear the 'audit' request from the comms queue. + + // Pull the server's 'redemption' request from the comms queue ensureNilErr(rig.ackRedemption_taker(true)) - sendBlock(rig.xyzNode) - if tryExpire(i, 6, order.MakerRedeemed, matchInfo.taker, matchInfo.maker, rig.abcNode) { - continue - } - if tryExpire(i, 7, order.MakerRedeemed, matchInfo.taker, matchInfo.maker, rig.abcNode) { + + // Maker's redeem reaches swapConf. Not necessary for taker redeem. + // matchInfo.db.makerRedeem.coin.setConfs(int64(rig.xyz.SwapConf)) + // sendBlock(rig.xyzNode) + if tryExpire(i, 3, order.MakerRedeemed, matchInfo.taker, matchInfo.maker, rig.abcNode) { continue } + + // Next is redeem_taker... not a block-based inaction. + return } } @@ -1893,7 +1953,7 @@ func TestState(t *testing.T) { return nil, fmt.Errorf("error finding swap state files: %v", err) } if stateFile == nil { - return nil, fmt.Errorf("no swap state file found.") + return nil, fmt.Errorf("no swap state file found") } state, err := LoadStateFile(stateFile.Name) if err != nil { From 0aebdccf397fa324cb3445d85eb00b93d0578970 Mon Sep 17 00:00:00 2001 From: Jonathan Chappelow Date: Mon, 14 Sep 2020 20:50:09 -0500 Subject: [PATCH 02/12] harness tweaks and optimizations --- client/core/trade_simnet_test.go | 35 +++++++++++++++++++------------- dex/testing/dcr/harness.sh | 20 +++++++++++------- dex/testing/dcrdex/harness.sh | 1 + 3 files changed, 35 insertions(+), 21 deletions(-) diff --git a/client/core/trade_simnet_test.go b/client/core/trade_simnet_test.go index 72d5c239f0..1a2dc86211 100644 --- a/client/core/trade_simnet_test.go +++ b/client/core/trade_simnet_test.go @@ -247,7 +247,7 @@ func testNoMakerRedeem(t *testing.T) { } // testMakerGhostingAfterTakerRedeem places simple orders for clients 1 and 2, -// neogiates the resulting trades smoothly till TakerSwapCast, then Maker goes +// negotiates the resulting trades smoothly till TakerSwapCast, then Maker goes // AWOL after redeeming taker's swap without notifying Taker. This test ensures // that Taker auto-finds Maker's redeem, extracts the secret key and redeems // Maker's swap to complete the trade. @@ -543,7 +543,7 @@ func monitorTrackedTrade(ctx context.Context, client *tClient, tracker *trackedT } if assetToMine != nil { - assetID, nBlocks := assetToMine.ID, uint16(assetToMine.SwapConf)+1 // mine 1 extra to ensure tx gets req. confs + assetID, nBlocks := assetToMine.ID, uint16(assetToMine.SwapConf) err := mineBlocks(assetID, nBlocks) if err == nil { var actor order.MatchSide @@ -693,10 +693,10 @@ func checkAndWaitForRefunds(ctx context.Context, client *tClient, orderID string // confirm that balance changes are as expected. for assetID, expectedBalanceDiff := range refundAmts { if expectedBalanceDiff > 0 { - mineBlocks(assetID, 2) + mineBlocks(assetID, 1) } } - time.Sleep(5 * time.Second) + time.Sleep(2 * time.Second) client.expectBalanceDiffs = refundAmts err = client.assertBalanceChanges() @@ -826,7 +826,7 @@ func (client *tClient) connectDEX(ctx context.Context) error { // mine drc block(s) to mark fee as paid // sometimes need to mine an extra block for fee tx to get req. confs - err = mineBlocks(dcr.BipID, regRes.ReqConfirms+1) + err = mineBlocks(dcr.BipID, regRes.ReqConfirms) if err != nil { return err } @@ -995,27 +995,25 @@ func (client *tClient) lockWallets() error { client.log("locking wallets") dcrw := client.dcrw() lockCmd := fmt.Sprintf("./%s walletlock", dcrw.daemon) - if err := tmuxSendKeys("dcr-harness:0", lockCmd); err != nil { + if err := tmuxRun("dcr-harness:0", lockCmd); err != nil { return err } - time.Sleep(500 * time.Millisecond) btcw := client.btcw() lockCmd = fmt.Sprintf("./%s -rpcwallet=%s walletlock", btcw.daemon, btcw.walletName) - return tmuxSendKeys("btc-harness:2", lockCmd) + return tmuxRun("btc-harness:2", lockCmd) } func (client *tClient) unlockWallets() error { client.log("unlocking wallets") dcrw := client.dcrw() unlockCmd := fmt.Sprintf("./%s walletpassphrase %q 600", dcrw.daemon, string(dcrw.pass)) - if err := tmuxSendKeys("dcr-harness:0", unlockCmd); err != nil { + if err := tmuxRun("dcr-harness:0", unlockCmd); err != nil { return err } - time.Sleep(500 * time.Millisecond) btcw := client.btcw() unlockCmd = fmt.Sprintf("./%s -rpcwallet=%s walletpassphrase %q 600", btcw.daemon, btcw.walletName, string(btcw.pass)) - return tmuxSendKeys("btc-harness:2", unlockCmd) + return tmuxRun("btc-harness:2", unlockCmd) } func mineBlocks(assetID uint32, blocks uint16) error { @@ -1028,11 +1026,20 @@ func mineBlocks(assetID uint32, blocks uint16) error { default: return fmt.Errorf("can't mine blocks for unknown asset %d", assetID) } - return tmuxSendKeys(harnessID, fmt.Sprintf("./mine-alpha %d", blocks)) + return tmuxRun(harnessID, fmt.Sprintf("./mine-alpha %d", blocks)) } -func tmuxSendKeys(tmuxWindow, cmd string) error { - return exec.Command("tmux", "send-keys", "-t", tmuxWindow, cmd, "C-m").Run() +func tmuxRun(tmuxWindow, cmd string) error { + tStart := time.Now() + defer func() { + fmt.Printf("********** TIMING: Took %v to run %q", time.Since(tStart), cmd) + }() + cmd += "; tmux wait-for -S harnessdone" + err := exec.Command("tmux", "send-keys", "-t", tmuxWindow, cmd, "C-m").Run() // ; wait-for harnessdone + if err != nil { + return nil + } + return exec.Command("tmux", "wait-for", "harnessdone").Run() } func fmtAmt(anyAmt interface{}) float64 { diff --git a/dex/testing/dcr/harness.sh b/dex/testing/dcr/harness.sh index 0684d8a268..5edd727bb1 100755 --- a/dex/testing/dcr/harness.sh +++ b/dex/testing/dcr/harness.sh @@ -62,9 +62,11 @@ cat > "${NODES_ROOT}/harness-ctl/mine-alpha" < Date: Mon, 14 Sep 2020 20:50:31 -0500 Subject: [PATCH 03/12] simplify inaction tickers/timers/triggers --- server/swap/swap.go | 92 +++++++++++++++++++++++---------------------- 1 file changed, 47 insertions(+), 45 deletions(-) diff --git a/server/swap/swap.go b/server/swap/swap.go index b3ddf25d11..3905ebfab9 100644 --- a/server/swap/swap.go +++ b/server/swap/swap.go @@ -399,6 +399,7 @@ func NewSwapper(cfg *Config) (*Swapper, error) { liveAckers: make(map[uint64]*msgAckers), } + // Ensure txWaitExpiration is not greater than broadcast timeout setting. if sensible := swapper.bTimeout; txWaitExpiration > sensible { txWaitExpiration = sensible } @@ -824,7 +825,8 @@ func (s *Swapper) rmLiveWaiter(user account.AccountID, msgID uint64) { s.liveWaitersMtx.Unlock() } -// Run is the main Swapper loop. +// Run is the main Swapper loop. It's primary purpose is to update transaction +// confirmations when new blocks are mined, and to trigger inaction checks. func (s *Swapper) Run(ctx context.Context) { // Permit internal cancel on anomaly such as storage failure. ctxMaster, cancel := context.WithCancel(ctx) @@ -861,7 +863,7 @@ func (s *Swapper) Run(ctx context.Context) { // Start a listen loop for each asset's block channel. Normal shutdown stops // this before the main loop since this sends to the main loop. - blockNotes := make(chan *blockNotification, 128) + blockNotes := make(chan *blockNotification, 32*len(s.coins)) for assetID, lockable := range s.coins { wgHelpers.Add(1) go func(assetID uint32, blockSource <-chan *asset.BlockUpdate) { @@ -902,53 +904,28 @@ func (s *Swapper) Run(ctx context.Context) { log.Debugf("Swapper started with %v broadcast timeout.", s.bTimeout) - // Block-based inaction checks are started with Timers. Track them so that - // they may be stopped on shutdown. - var timerID uint32 - var checkTimerMtx sync.Mutex - checkTimers := make(map[uint32]*time.Timer) - - // On shutdown, stop any unfired check timers. - defer func() { - checkTimerMtx.Lock() - for id, t := range checkTimers { - t.Stop() - delete(checkTimers, id) - } - checkTimerMtx.Unlock() - }() - - startCheckTimer := func(assetID uint32) { - id := timerID - timerID++ - checkTimerMtx.Lock() - checkTimers[id] = time.AfterFunc(s.bTimeout, func() { - checkTimerMtx.Lock() - if _, ok := checkTimers[id]; !ok { - checkTimerMtx.Unlock() - return // shutdown removed us first + // Block-based inaction checks are started with Timers, and run in the main + // loop to avoid locks and WaitGroups. + bcastBlockTrigger := make(chan uint32, 32*len(s.coins)) + scheduleInactionCheck := func(assetID uint32) { + time.AfterFunc(s.bTimeout, func() { + select { + case bcastBlockTrigger <- assetID: // all checks run in main loop + case <-ctxMaster.Done(): } - // Unregister the timer and start the check. - delete(checkTimers, id) - wgHelpers.Add(1) // while locked so none can start after the shutdown defer - checkTimerMtx.Unlock() - s.checkInactionBlockBased(assetID) - wgHelpers.Done() }) - checkTimerMtx.Unlock() } // On startup, schedule an inaction check for each asset. Ideally these // would start bTimeout after the best block times. for assetID := range s.coins { - startCheckTimer(assetID) + scheduleInactionCheck(assetID) } - // For actions that are expected some time after other events, just have a - // ticker. Each event could make an AfterFunc, but this allows match checks - // to be batched, and it's simpler. - bcastEventTicker := time.NewTicker(s.bTimeout / 4) - defer bcastEventTicker.Stop() + // Event-based action checks are started with a single ticker. Each of the + // events, e.g. match request, could start a timer, but this is simpler and + // allows batching the match checks. + bcastEventTrigger := bufferedTicker(ctxMaster, s.bTimeout/4) // Main loop can stop on internal error via cancel(), or when the caller // cancels the parent context triggering graceful shutdown. @@ -995,10 +972,15 @@ func (s *Swapper) Run(ctx context.Context) { // Schedule an inaction check for matches that involve this // asset, as they could be expecting user action within bTimeout // of this event. - startCheckTimer(block.assetID) + scheduleInactionCheck(block.assetID) - case <-bcastEventTicker.C: - s.checkInactionEventsBased() + case assetID := <-bcastBlockTrigger: + // There was a new block for this asset bTimeout ago. + s.checkInactionBlockBased(assetID) + + case <-bcastEventTrigger: + // Inaction checks that are not relative to blocks. + s.checkInactionEventBased() case <-mainLoop: return @@ -1010,6 +992,26 @@ func (s *Swapper) Run(ctx context.Context) { <-ctxMaster.Done() } +// bufferedTicker creates a "ticker" that periodically sends on the returned +// channel, which has a buffer of length 1 and thus suitable for use in a select +// with other events that might cause a regular Ticker send to be dropped. +func bufferedTicker(ctx context.Context, dur time.Duration) chan struct{} { + buffered := make(chan struct{}, 1) // only need 1 since back-to-back is pointless + go func() { + ticker := time.NewTicker(dur) + defer ticker.Stop() + for { + select { + case <-ticker.C: + buffered <- struct{}{} + case <-ctx.Done(): + return + } + } + }() + return buffered +} + func (s *Swapper) tryConfirmSwap(status *swapStatus, confTime time.Time) { status.mtx.Lock() defer status.mtx.Unlock() @@ -1214,7 +1216,7 @@ func (s *Swapper) failMatch(match *matchTracker, makerFault bool) { s.revoke(match) } -func (s *Swapper) checkInactionEventsBased() { +func (s *Swapper) checkInactionEventBased() { // If the DB is failing, do not penalize or attempt to start revocations. if err := s.storage.LastErr(); err != nil { log.Errorf("DB in failing state.") @@ -1235,7 +1237,7 @@ func (s *Swapper) checkInactionEventsBased() { match.mtx.RLock() defer match.mtx.RUnlock() - log.Tracef("checkInactionEventsBased() => checkMatch(%v, %v)", + log.Tracef("checkInactionEventBased() => checkMatch(%v, %v)", match.ID(), match.Status) failMatch := func(makerFault bool) { From 831ec23ff3e910bfa1a543b662979b1a6af2fbfa Mon Sep 17 00:00:00 2001 From: Jonathan Chappelow Date: Wed, 16 Sep 2020 16:59:03 -0500 Subject: [PATCH 04/12] TESTING logs and timeout tweaks --- server/db/driver/pg/orders.go | 2 +- server/matcher/match.go | 2 +- server/swap/swap.go | 60 +++++++++++++++-------------------- 3 files changed, 28 insertions(+), 36 deletions(-) diff --git a/server/db/driver/pg/orders.go b/server/db/driver/pg/orders.go index 3294b49d70..b28510edd9 100644 --- a/server/db/driver/pg/orders.go +++ b/server/db/driver/pg/orders.go @@ -796,7 +796,7 @@ func (a *Archiver) updateOrderStatusByID(oid order.OrderID, base, quote uint32, } if initStatus == status && filled == initFilled { - log.Debugf("Not updating order with no status or filled amount change.") + log.Tracef("Not updating order with no status or filled amount change: %v.", oid) return nil } if filled == -1 { diff --git a/server/matcher/match.go b/server/matcher/match.go index 158250a2cc..9be0ea1607 100644 --- a/server/matcher/match.go +++ b/server/matcher/match.go @@ -184,7 +184,7 @@ func (m *Matcher) Match(book Booker, queue []*OrderRevealed) (seed []byte, match removed, ok := book.Remove(o.TargetOrderID) if !ok { // The targeted order might be down queue or non-existent. - log.Debugf("Failed to remove order %v set by a cancel order %v", + log.Debugf("Order %v not removed by a cancel order %v (target either non-existent or down queue in this epoch)", o.ID(), o.TargetOrderID) failed = append(failed, q) updates.CancelsFailed = append(updates.CancelsFailed, o) diff --git a/server/swap/swap.go b/server/swap/swap.go index 3905ebfab9..e0d9e6337e 100644 --- a/server/swap/swap.go +++ b/server/swap/swap.go @@ -32,7 +32,7 @@ import ( var ( // The coin waiter will query for transaction data every recheckInterval. - recheckInterval = time.Second * 5 + recheckInterval = time.Second * 3 // txWaitExpiration is the longest the Swapper will wait for a coin waiter. // This could be thought of as the maximum allowable backend latency. txWaitExpiration = 2 * time.Minute @@ -264,7 +264,6 @@ func (s *orderSwapTracker) canceled(ord order.Order) { stat := s.orderMatches[oid] if stat == nil { // No active swaps for canceled order, OK. - log.Debugf("orderOffBook: untracked order %v", oid) return } @@ -1547,6 +1546,9 @@ func (s *Swapper) processAck(msg *msgjson.Message, acker *messageAcker) { // Remove the live acker from Swapper's tracking. defer s.rmLiveAckers(msg.ID) + acker.match.mtx.Lock() + defer acker.match.mtx.Unlock() + // The time that the ack is received is stored for redeem acks to facilitate // cancellation rate enforcement. tAck := time.Now() @@ -1573,9 +1575,6 @@ func (s *Swapper) processAck(msg *msgjson.Message, acker *messageAcker) { // actor. mktMatch := db.MatchID(acker.match.Match) - acker.match.mtx.Lock() - defer acker.match.mtx.Unlock() - if rev, ok := acker.params.(*msgjson.RevokeMatch); ok { log.Infof("Received revoke_match ack for match %v, order %v, user %v", rev.MatchID, rev.OrderID, acker.user) @@ -1585,7 +1584,7 @@ func (s *Swapper) processAck(msg *msgjson.Message, acker *messageAcker) { // This is an ack of either contract audit or redemption receipt. if acker.isAudit { - log.Debugf("Received contract audit acknowledgement from user %v (%s) for match %v (%v)", + log.Debugf("Received contract 'audit' acknowledgement from user %v (%s) for match %v (%v)", acker.user, makerTaker(acker.isMaker), acker.match.Match.ID(), acker.match.Status) // It's a contract audit ack. if acker.isMaker { @@ -1599,9 +1598,11 @@ func (s *Swapper) processAck(msg *msgjson.Message, acker *messageAcker) { } // It's a redemption ack. - log.Debugf("Received redemption acknowledgement from user %v (%s) for match %v (%s)", + log.Debugf("Received 'redemption' acknowledgement from user %v (%s) for match %v (%s)", acker.user, makerTaker(acker.isMaker), acker.match.Match.ID(), acker.match.Status) + // TODO: Don't record it if the match is revoked. + // This is a redemption acknowledgement. Store the ack signature, and // potentially record the order as complete with the auth manager and in // persistent storage. @@ -1739,7 +1740,7 @@ func (s *Swapper) processInit(msg *msgjson.Message, params *msgjson.Init, stepIn s.matchMtx.RLock() if _, found := s.matches[matchID]; !found { s.matchMtx.RUnlock() - log.Errorf("contract txn found after match was revoked (match id=%v, maker=%v)", + log.Errorf("Contract txn located after match was revoked (match id=%v, maker=%v)", matchID, actor.isMaker) s.respondError(msg.ID, actor.user, msgjson.ContractError, "match already revoked due to inaction") return wait.DontTryAgain @@ -1798,7 +1799,7 @@ func (s *Swapper) processInit(msg *msgjson.Message, params *msgjson.Init, stepIn isAudit: true, } // Send the 'audit' request to the counter-party. - log.Debugf("processInit: sending contract 'audit' ack request to counterparty %v (%s) "+ + log.Debugf("processInit: sending contract 'audit' request to counterparty %v (%s) "+ "for match %v", ack.user, makerTaker(ack.isMaker), matchID) // Register that there is an outstanding ack request. This is unregistered @@ -1808,8 +1809,8 @@ func (s *Swapper) processInit(msg *msgjson.Message, params *msgjson.Init, stepIn // Expire function to unregister the outstanding request. expireFunc := func() { s.rmLiveAckers(notification.ID) - log.Infof("Timeout waiting for contract audit request from user %v (%s).", - ack.user, makerTaker(ack.isMaker)) + log.Infof("Timeout waiting for contract 'audit' request from user %v (%s) for match %v", + ack.user, makerTaker(ack.isMaker), matchID) } // Send the ack request. @@ -1841,6 +1842,7 @@ func (s *Swapper) processRedeem(msg *msgjson.Message, params *msgjson.Redeem, st counterParty.status.mtx.RLock() cpContract := counterParty.status.swap.RedeemScript() cpSwapCoin := counterParty.status.swap.ID() + cpSwapStr := counterParty.status.swap.String() counterParty.status.mtx.RUnlock() // Get the transaction. @@ -1894,8 +1896,8 @@ func (s *Swapper) processRedeem(msg *msgjson.Message, params *msgjson.Redeem, st // ensuring that checkInaction will not revoke the match as we respond. s.matchMtx.RUnlock() - log.Debugf("processRedeem: valid redemption %v (%s) received at %v from %v (%s) for match %v, "+ - "swapStatus %v => %v", redemption, stepInfo.asset.Symbol, redeemTime, actor.user, + log.Debugf("processRedeem: valid redemption %v (%s) spending contract %s received at %v from %v (%s) for match %v, "+ + "swapStatus %v => %v", redemption, stepInfo.asset.Symbol, cpSwapStr, redeemTime, actor.user, makerTaker(actor.isMaker), matchID, stepInfo.step, stepInfo.nextStep) // Store the swap contract and the coinID (e.g. txid:vout) containing the @@ -1952,7 +1954,7 @@ func (s *Swapper) processRedeem(msg *msgjson.Message, params *msgjson.Redeem, st isMaker: counterParty.isMaker, // isAudit: false, } - log.Debugf("processRedeem: sending 'redeem' ack request to counterparty %v (%s)"+ + log.Debugf("processRedeem: sending 'redemption' request to counterparty %v (%s) "+ "for match %v", ack.user, makerTaker(ack.isMaker), matchID) // Register that there is an outstanding ack request. This is unregistered @@ -1962,17 +1964,19 @@ func (s *Swapper) processRedeem(msg *msgjson.Message, params *msgjson.Redeem, st // Expire function to unregister the outstanding request. expireFunc := func() { s.rmLiveAckers(notification.ID) - log.Infof("Timeout waiting for redeem ack request from user %v (%s).", - ack.user, makerTaker(ack.isMaker)) + log.Infof("Timeout waiting for 'redemption' request from user %v (%s) for match %v", + ack.user, makerTaker(ack.isMaker), matchID) } // Send the ack request. - // The counterparty does not need to actually locate the redemption on txn, - // so use the default request timeout. + // The counterparty does not need to actually locate the redemption txn, but + // they do need to author and broadcast their own redeem, so allow the + // request to wait for a response up until the inaction penalty. s.authMgr.RequestWhenConnected(ack.user, notification, func(_ comms.Link, resp *msgjson.Message) { s.processAck(resp, ack) // resp.ID == notification.ID - }, auth.DefaultRequestTimeout, auth.DefaultConnectTimeout, expireFunc) + }, s.bTimeout, auth.DefaultConnectTimeout, expireFunc) + // maybe: s.bTimeout-time.Since(redeemTime) or time.Until(redeemTime.Add(s.bTimeout)) return wait.DontTryAgain } @@ -2027,19 +2031,7 @@ func (s *Swapper) handleInit(user account.AccountID, msg *msgjson.Message) *msgj // init requests should only be sent when contracts are still required, in // the correct sequence, and by the correct party. switch stepInfo.match.Status { - // These cases can eventually reduced to: - // case order.NewlyMatched, order.MakerSwapCast: // just continue - // default: // respond with settlement sequence error - case order.NewlyMatched: - if !stepInfo.actor.isMaker { - // s.step should have returned an error of code SettlementSequenceError. - panic("handleInit: this stepInformation should be for the maker!") - } - case order.MakerSwapCast: - if stepInfo.actor.isMaker { - // s.step should have returned an error of code SettlementSequenceError. - panic("handleInit: this stepInformation should be for the taker!") - } + case order.NewlyMatched, order.MakerSwapCast: default: return &msgjson.Error{ Code: msgjson.SettlementSequenceError, @@ -2087,7 +2079,7 @@ func (s *Swapper) handleInit(user account.AccountID, msg *msgjson.Message) *msgj } else if deadline := lastEvent.Add(s.bTimeout); expireTime.After(deadline) { expireTime = deadline } - log.Debugf("Waiting until %v (%v) to locate contract from %v (%v), match %v", + log.Debugf("Allowing until %v (%v) to locate contract from %v (%v), match %v", expireTime, time.Until(expireTime), makerTaker(stepInfo.actor.isMaker), stepInfo.step, matchID) @@ -2186,7 +2178,7 @@ func (s *Swapper) handleRedeem(user account.AccountID, msg *msgjson.Message) *ms } else if deadline := lastEvent.Add(s.bTimeout); expireTime.After(deadline) { expireTime = deadline } - log.Debugf("Waiting until %v (%v) to locate redeem from %v (%v), match %v", + log.Debugf("Allowing until %v (%v) to locate redeem from %v (%v), match %v", expireTime, time.Until(expireTime), makerTaker(stepInfo.actor.isMaker), stepInfo.step, matchID) From a2897cea91b91f6d5aed5fb68607a5073e9f75dc Mon Sep 17 00:00:00 2001 From: Jonathan Chappelow Date: Wed, 16 Sep 2020 17:36:35 -0500 Subject: [PATCH 05/12] that was a recipie for deadlock --- server/swap/swap.go | 63 +++++++++++++++++++++------------------------ 1 file changed, 30 insertions(+), 33 deletions(-) diff --git a/server/swap/swap.go b/server/swap/swap.go index e0d9e6337e..4eb2cfa6f7 100644 --- a/server/swap/swap.go +++ b/server/swap/swap.go @@ -1418,6 +1418,9 @@ func (s *Swapper) step(user account.AccountID, matchID order.MatchID) (*stepInfo log.Debugf("swap %v at status %v missing MakerMatch signature(s) needed for NewlyMatched->MakerSwapCast", match.ID(), match.Status) } + // NOTE: Consider requiring TakerMatch too to prevent needless maker + // swap if taker bailed out, which should be OK if client retries + // the 'init' request. reqSigs = [][]byte{match.Sigs.MakerMatch} } else /* TakerSwapCast */ { nextStep = order.MakerRedeemed @@ -2287,22 +2290,6 @@ func (s *Swapper) processMatchAcks(user account.AccountID, msg *msgjson.Message, // Remove the live acker from Swapper's tracking. defer s.rmLiveAckers(msg.ID) - // Lock each matchTracker as early as possible to try to avoid 'init' - // requests for these matches failing because the ack signatures have not - // yet been stored. A better solution would be a response to the client so - // they know when it's OK to 'init', but we presently lack a response to a - // response mechanism. - uniqueTrackers := make(map[*matchTracker]bool, len(matches)/2) - for _, matchInfo := range matches { - mt := matchInfo.match - if uniqueTrackers[mt] { - continue - } - uniqueTrackers[mt] = true - mt.mtx.Lock() - defer mt.mtx.Unlock() - } - // NOTE: acks must be in same order as matches []*messageAcker. var acks []msgjson.Acknowledgement err := msg.UnmarshalResult(&acks) @@ -2325,8 +2312,10 @@ func (s *Swapper) processMatchAcks(user account.AccountID, msg *msgjson.Message, // either a MakerMatch or TakerMatch signature depending on whether the // responding user is the maker or taker. for i, matchInfo := range matches { - ack := acks[i] - matchID := matchInfo.match.ID() + ack := &acks[i] + match := matchInfo.match + + matchID := match.ID() if !bytes.Equal(ack.MatchID, matchID[:]) { s.respondError(msg.ID, user, msgjson.IDMismatchError, fmt.Sprintf("unexpected match ID at acknowledgment index %d", i)) @@ -2336,23 +2325,41 @@ func (s *Swapper) processMatchAcks(user account.AccountID, msg *msgjson.Message, err = s.authMgr.Auth(user, sigMsg, ack.Sig) if err != nil { log.Warnf("processMatchAcks: 'match' ack for match %v from user %v, "+ - " failed sig verification: %v", matchInfo.match.ID(), user, err) + " failed sig verification: %v", match.ID(), user, err) s.respondError(msg.ID, user, msgjson.SignatureError, fmt.Sprintf("signature validation error: %v", err)) return } - // Store the signature in the DB. + // Store the signature in the matchTracker. These must be collected + // before the init steps begin and swap contracts are broadcasted. + match.mtx.Lock() + log.Debugf("processMatchAcks: storing valid 'match' ack signature from %v (maker=%v) "+ + "for match %v (status %v)", user, matchInfo.isMaker, matchID, match.Status) + if matchInfo.isMaker { + match.Sigs.MakerMatch = ack.Sig + } else { + match.Sigs.TakerMatch = ack.Sig + } + match.mtx.Unlock() + } + + // Store the signatures in the DB. + for i, matchInfo := range matches { + ackSig := acks[i].Sig + match := matchInfo.match + storFn := s.storage.SaveMatchAckSigB if matchInfo.isMaker { storFn = s.storage.SaveMatchAckSigA } + matchID := match.ID() mid := db.MarketMatchID{ MatchID: matchID, - Base: matchInfo.match.Maker.BaseAsset, // same for taker's redeem as BaseAsset refers to the market - Quote: matchInfo.match.Maker.QuoteAsset, + Base: match.Maker.BaseAsset, // same for taker's redeem as BaseAsset refers to the market + Quote: match.Maker.QuoteAsset, } - err = storFn(mid, ack.Sig) + err = storFn(mid, ackSig) if err != nil { log.Errorf("saving match ack signature (match id=%v, maker=%v) failed: %v", matchID, matchInfo.isMaker, err) @@ -2361,16 +2368,6 @@ func (s *Swapper) processMatchAcks(user account.AccountID, msg *msgjson.Message, // TODO: revoke the match without penalties? return } - - // Store the signature in the matchTracker. These must be collected - // before the init steps begin and swap contracts are broadcasted. - log.Debugf("processMatchAcks: storing valid 'match' ack signature from %v (maker=%v) "+ - "for match %v (status %v)", user, matchInfo.isMaker, matchID, matchInfo.match.Status) - if matchInfo.isMaker { - matchInfo.match.Sigs.MakerMatch = ack.Sig - } else { - matchInfo.match.Sigs.TakerMatch = ack.Sig - } } } From a7611e26282d5ff8ee25c3277554d47054554483 Mon Sep 17 00:00:00 2001 From: Jonathan Chappelow Date: Thu, 17 Sep 2020 15:52:03 -0500 Subject: [PATCH 06/12] fix data race on swapStatus time accesses --- server/swap/swap.go | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/server/swap/swap.go b/server/swap/swap.go index 4eb2cfa6f7..54b42b841d 100644 --- a/server/swap/swap.go +++ b/server/swap/swap.go @@ -101,6 +101,18 @@ type swapStatus struct { redemption asset.Coin } +func (ss *swapStatus) swapConfTime() time.Time { + ss.mtx.RLock() + defer ss.mtx.RUnlock() + return ss.swapConfirmed +} + +func (ss *swapStatus) redeemSeenTime() time.Time { + ss.mtx.RLock() + defer ss.mtx.RUnlock() + return ss.redeemTime +} + // matchTracker embeds an order.Match and adds some data necessary for tracking // the match negotiation. type matchTracker struct { @@ -1255,9 +1267,7 @@ func (s *Swapper) checkInactionEventBased() { // If the maker has redeemed, the taker can redeem immediately, so // check the timeout against the time the Swapper received the // maker's `redeem` request (and sent the taker's 'redemption'). - match.makerStatus.mtx.RLock() - defer match.makerStatus.mtx.RUnlock() - if tooOld(match.makerStatus.redeemTime) { + if tooOld(match.makerStatus.redeemSeenTime()) { failMatch(false) // taker should have redeemed } } @@ -1313,15 +1323,11 @@ func (s *Swapper) checkInactionBlockBased(assetID uint32) { switch match.Status { case order.MakerSwapCast: - match.makerStatus.mtx.RLock() - defer match.makerStatus.mtx.RUnlock() - if tooOld(match.makerStatus.swapConfirmed) { + if tooOld(match.makerStatus.swapConfTime()) { failMatch(false) // taker should have swapped } case order.TakerSwapCast: - match.takerStatus.mtx.RLock() - defer match.takerStatus.mtx.RUnlock() - if tooOld(match.takerStatus.swapConfirmed) { + if tooOld(match.takerStatus.swapConfTime()) { failMatch(true) // maker should have redeemed } } @@ -2074,7 +2080,7 @@ func (s *Swapper) handleInit(user account.AccountID, msg *msgjson.Message) *msgj // maker's swap reached swapConfs. lastEvent := stepInfo.match.time // NewlyMatched - the match request time, not matchTime if stepInfo.step == order.MakerSwapCast { - lastEvent = stepInfo.match.makerStatus.swapConfirmed + lastEvent = stepInfo.match.makerStatus.swapConfTime() } expireTime := time.Now().Add(txWaitExpiration).UTC() if lastEvent.IsZero() { @@ -2171,9 +2177,9 @@ func (s *Swapper) handleRedeem(user account.AccountID, msg *msgjson.Message) *ms // Do not search for the transaction past the inaction deadline. For maker, // this is bTimeout after taker's swap reached swapConfs. For taker, this is // bTimeout after maker's redeem cast (and redemption request time). - lastEvent := stepInfo.match.takerStatus.swapConfirmed // TakerSwapCast + lastEvent := stepInfo.match.takerStatus.swapConfTime() // TakerSwapCast if stepInfo.step == order.MakerRedeemed { - lastEvent = stepInfo.match.makerStatus.redeemTime + lastEvent = stepInfo.match.makerStatus.redeemSeenTime() } expireTime := time.Now().Add(txWaitExpiration).UTC() if lastEvent.IsZero() { From 0dd6524895cf1254bcd5453c57835e8ef5398d0b Mon Sep 17 00:00:00 2001 From: Jonathan Chappelow Date: Thu, 17 Sep 2020 15:54:33 -0500 Subject: [PATCH 07/12] TESTS no longer sleep on new block, recv signal instead --- server/swap/swap_test.go | 120 ++++++++++++++++++++++++++++++++------- 1 file changed, 100 insertions(+), 20 deletions(-) diff --git a/server/swap/swap_test.go b/server/swap/swap_test.go index f727e63779..7d22c92037 100644 --- a/server/swap/swap_test.go +++ b/server/swap/swap_test.go @@ -66,11 +66,6 @@ func timeOutMempool() { time.Sleep(txWaitExpiration * 3 / 2) } -func timeoutBroadcast() { - // swapper.bTimeout is 5*txWaitExpiration for testing - time.Sleep(txWaitExpiration * 6) // 200 ms txWaitExpiration => 1 sec bTimeout => 1.2 sec timeout sleep -} - func dirtyEncode(s string) []byte { b, err := hex.DecodeString(s) if err != nil { @@ -111,8 +106,10 @@ type TAuthManager struct { authErr error privkey *secp256k1.PrivateKey reqs map[account.AccountID][]*TRequest + newReq chan struct{} resps map[account.AccountID][]*msgjson.Message suspensions map[account.AccountID]account.Rule + newSuspend chan struct{} } func newTAuthManager() *TAuthManager { @@ -166,6 +163,9 @@ func (m *TAuthManager) RequestWithTimeout(user account.AccountID, msg *msgjson.M l = make([]*TRequest, 0, 1) } m.reqs[user] = append(l, tReq) + if m.newReq != nil { + m.newReq <- struct{}{} + } return nil } func (m *TAuthManager) Sign(signables ...msgjson.Signable) error { @@ -195,6 +195,9 @@ func (m *TAuthManager) Penalize(id account.AccountID, rule account.Rule) error { m.mtx.Lock() defer m.mtx.Unlock() m.suspensions[id] = rule + if m.newSuspend != nil { + m.newSuspend <- struct{}{} + } return nil } @@ -513,6 +516,7 @@ type testRig struct { func tNewTestRig(matchInfo *tMatch, seed ...*testRig) (*testRig, func()) { var abcBackend, xyzBackend *TAsset storage := &TStorage{} + authMgr := newTAuthManager() var swapDataDir, statePath string var ignoreState bool manualMode := len(seed) > 0 @@ -524,6 +528,8 @@ func tNewTestRig(matchInfo *tMatch, seed ...*testRig) (*testRig, func()) { swapDataDir = seedRig.swapDataDir statePath = seedRig.statePath ignoreState = seedRig.ignoreState + authMgr.newReq = seedRig.auth.newReq + authMgr.newSuspend = seedRig.auth.newSuspend } else { abcBackend = newTAsset("abc") xyzBackend = newTAsset("xyz") @@ -540,8 +546,6 @@ func tNewTestRig(matchInfo *tMatch, seed ...*testRig) (*testRig, func()) { xyzAsset := TNewAsset(xyzBackend) xyzCoinLocker := coinlock.NewAssetCoinLocker() - authMgr := newTAuthManager() - swapper, err := NewSwapper(&Config{ DataDir: swapDataDir, Assets: map[uint32]*LockableAsset{ @@ -631,8 +635,13 @@ func (rig *testRig) ackMatch_taker(checkSig bool) error { } func (rig *testRig) ackMatch(user *tUser, oid order.OrderID, counterAddr string) error { - // If the match is already acked, which might be the case for the taker when - // an order.MatchSet hash multiple makers, skip the this step without error. + if rig.auth.newReq != nil { + select { + case <-rig.auth.newReq: + default: + return fmt.Errorf("no match ntfn") + } + } req := rig.auth.getReq(user.acct) if req == nil { return fmt.Errorf("failed to find match notification for %s", user.lbl) @@ -751,10 +760,8 @@ func (rig *testRig) sendSwap(user *tUser, oid order.OrderID, recipient string) ( matchInfo := rig.matchInfo swap := tNewSwap(matchInfo, oid, recipient, user) if isQuoteSwap(user, matchInfo.match) { - fmt.Println("xyz contract") rig.xyzNode.setContract(swap.coin, false) } else { - fmt.Println("abc contract") rig.abcNode.setContract(swap.coin, false) } rpcErr := rig.swapper.handleInit(user.acct, swap.req) @@ -770,6 +777,13 @@ func (rig *testRig) sendSwap(user *tUser, oid order.OrderID, recipient string) ( // acknowledged separately with ackAudit_taker. func (rig *testRig) auditSwap_taker() error { matchInfo := rig.matchInfo + if rig.auth.newReq != nil { + select { + case <-rig.auth.newReq: + default: + return fmt.Errorf("no match ntfn") + } + } req := rig.auth.getReq(matchInfo.taker.acct) matchInfo.db.takerAudit = req if req == nil { @@ -782,6 +796,13 @@ func (rig *testRig) auditSwap_taker() error { // acknowledged separately with ackAudit_maker. func (rig *testRig) auditSwap_maker() error { matchInfo := rig.matchInfo + if rig.auth.newReq != nil { + select { + case <-rig.auth.newReq: + default: + return fmt.Errorf("no match ntfn") + } + } req := rig.auth.getReq(matchInfo.maker.acct) matchInfo.db.makerAudit = req if req == nil { @@ -961,6 +982,13 @@ func (rig *testRig) ackRedemption(user *tUser, oid order.OrderID, redeem *tRedee if redeem == nil { return fmt.Errorf("nil redeem info") } + if rig.auth.newReq != nil { + select { + case <-rig.auth.newReq: + default: + return fmt.Errorf("no match ntfn") + } + } req := rig.auth.getReq(user.acct) if req == nil { return fmt.Errorf("failed to find audit request for %s after counterparty's init", user.lbl) @@ -1350,7 +1378,6 @@ func testSwap(t *testing.T, rig *testRig) { sendBlock := func(node *TAsset) { node.bChan <- &asset.BlockUpdate{Err: nil} - tickMempool() } // Step through the negotiation process. No errors should be generated. @@ -1389,6 +1416,11 @@ func testSwap(t *testing.T, rig *testRig) { func TestSwaps(t *testing.T) { rig, cleanup := tNewTestRig(nil) defer cleanup() + + // For N matches, Negotiate sends up to 2*N requests. The "three match set" + // sends 4: one for taker and 3 for makers. + rig.auth.newReq = make(chan struct{}, 4) + for _, makerSell := range []bool{true, false} { sellStr := " buy" if makerSell { @@ -1423,6 +1455,7 @@ func TestSwaps(t *testing.T) { t.Run("three match set"+sellStr+marketStr, func(t *testing.T) { matchQtys := []uint64{uint64(1e8), uint64(9e8), uint64(3e8)} rates := []uint64{uint64(10e8), uint64(11e8), uint64(12e8)} + // one taker, 3 makers => 4 'match' requests rig.matches = tMultiMatchSet(matchQtys, rates, makerSell, isMarket) rig.swapper.Negotiate([]*order.MatchSet{rig.matches.matchSet}, nil) testSwap(t, rig) @@ -1436,7 +1469,9 @@ func TestNoAck(t *testing.T) { matchInfo := set.matchInfos[0] rig, cleanup := tNewTestRig(matchInfo) defer cleanup() - rig.swapper.Negotiate([]*order.MatchSet{set.matchSet}, nil) + + rig.auth.newReq = make(chan struct{}, 2) // Negotiate sends 2 requests, one for each party assuming different users matched + ensureNilErr := makeEnsureNilErr(t) mustBeError := makeMustBeError(t) maker, taker := matchInfo.maker, matchInfo.taker @@ -1459,9 +1494,10 @@ func TestNoAck(t *testing.T) { sendBlock := func(node *TAsset) { node.bChan <- &asset.BlockUpdate{Err: nil} - tickMempool() } + rig.swapper.Negotiate([]*order.MatchSet{set.matchSet}, nil) + // Don't acknowledge from either side yet. Have the maker broadcast their swap // transaction mustBeError(rig.sendSwap_maker(true), "maker swap send") @@ -1504,15 +1540,18 @@ func TestTxWaiters(t *testing.T) { matchInfo := set.matchInfos[0] rig, cleanup := tNewTestRig(matchInfo) defer cleanup() - rig.swapper.Negotiate([]*order.MatchSet{set.matchSet}, nil) + + rig.auth.newReq = make(chan struct{}, 2) // Negotiate sends 2 requests, one for each party assuming different users matched + ensureNilErr := makeEnsureNilErr(t) dummyError := fmt.Errorf("test error") sendBlock := func(node *TAsset) { node.bChan <- &asset.BlockUpdate{Err: nil} - tickMempool() } + rig.swapper.Negotiate([]*order.MatchSet{set.matchSet}, nil) + // Get the MatchNotifications that the swapper sent to the clients and check // the match notification length, content, IDs, etc. if err := rig.ackMatch_maker(true); err != nil { @@ -1521,6 +1560,7 @@ func TestTxWaiters(t *testing.T) { if err := rig.ackMatch_taker(true); err != nil { t.Fatal(err) } + // Set a non-latency error. rig.abcNode.setContractErr(dummyError) rig.sendSwap_maker(false) @@ -1528,6 +1568,7 @@ func TestTxWaiters(t *testing.T) { if msg == nil { t.Fatalf("no response for erroneous maker swap. err") } + // Set an error for the maker's swap asset rig.abcNode.setContractErr(asset.CoinNotFoundError) // The error will be generated by the chainWaiter thread, so will need to @@ -1644,16 +1685,29 @@ func TestTxWaiters(t *testing.T) { func TestBroadcastTimeouts(t *testing.T) { rig, cleanup := tNewTestRig(nil) defer cleanup() + + rig.auth.newSuspend = make(chan struct{}, 1) + rig.auth.newReq = make(chan struct{}, 2) // Negotiate sends 2 requests, one for each party assuming different users matched + ensureNilErr := makeEnsureNilErr(t) sendBlock := func(node *TAsset) { node.bChan <- &asset.BlockUpdate{Err: nil} - tickMempool() } + + reqTimeout := func(timeout time.Duration) { + t.Helper() + select { + case <-rig.auth.newReq: + case <-time.After(timeout): + t.Fatalf("no request received") + } + } + checkRevokeMatch := func(user *tUser, i int) { t.Helper() req := rig.auth.getReq(user.acct) if req == nil { - t.Fatalf("no match_cancellation") + t.Fatalf("no revoke_match") } params := new(msgjson.RevokeMatch) err := json.Unmarshal(req.req.Payload, ¶ms) @@ -1672,6 +1726,7 @@ func TestBroadcastTimeouts(t *testing.T) { user.lbl, i, msgjson.RevokeMatchRoute, req.req.Route) } } + // tryExpire will sleep for the duration of a BroadcastTimeout, and then // check that a penalty was assigned to the appropriate user, and that a // revoke_match message is sent to both users. @@ -1683,7 +1738,11 @@ func TestBroadcastTimeouts(t *testing.T) { // Sending a block through should schedule an inaction check after duration // BroadcastTimeout. sendBlock(node) - timeoutBroadcast() + select { + case <-rig.auth.newSuspend: + case <-time.After(rig.swapper.bTimeout * 2): + t.Fatalf("no penalization happened") + } found, rule := rig.auth.flushPenalty(jerk.acct) if !found { t.Fatalf("failed to penalize user at step %d", i) @@ -1692,6 +1751,8 @@ func TestBroadcastTimeouts(t *testing.T) { t.Fatalf("no penalty at step %d (status %v)", i, step) } // Make sure the specified user has a cancellation for this order + reqTimeout(rig.swapper.bTimeout * 2) // wait for both revoke requests, no particular order + reqTimeout(rig.swapper.bTimeout * 2) checkRevokeMatch(jerk, i) checkRevokeMatch(victim, i) return true @@ -1942,7 +2003,6 @@ func TestCancel(t *testing.T) { func TestState(t *testing.T) { sendBlock := func(node *TAsset) { node.bChan <- &asset.BlockUpdate{Err: nil} - tickMempool() // look for another signal } var rig *testRig @@ -1962,6 +2022,16 @@ func TestState(t *testing.T) { return state, nil } + // Wait for a request up to timeout. + reqTimeout := func(timeout time.Duration) { + t.Helper() + select { + case <-rig.auth.newReq: + case <-time.After(timeout): + t.Fatalf("no request received") + } + } + // Start a swap negotiation with two pending match acks, then shutdown. // ABC is base, XYZ is quote makerSell := true // maker: swap = ABC, redeem = XYZ / taker: swap = XYZ, redeem = ABC @@ -1969,6 +2039,7 @@ func TestState(t *testing.T) { matchInfo := set.matchInfos[0] rig, stop = tNewTestRig(matchInfo, nil) defer os.RemoveAll(rig.swapDataDir) + rig.swapper.Negotiate([]*order.MatchSet{set.matchSet}, nil) abc, xyz := rig.abcNode, rig.xyzNode @@ -2041,6 +2112,7 @@ func TestState(t *testing.T) { rig.storage.stateHash = ogHash // Now load the state in the default way. + rig.auth.newReq = make(chan struct{}, 2) // 2 'match' request sent on start rig, stop = tNewTestRig(matchInfo, rig) defer os.RemoveAll(rig.swapDataDir) defer stop() @@ -2059,6 +2131,9 @@ func TestState(t *testing.T) { if err != nil { t.Fatalf("maker failed to ack the match: %v", err) } + // Swallow the taker's 'match' signal since we'll send it again and receive + // it in the next test after a state restore. The reqs slice is fresh. + reqTimeout(time.Second) stop() @@ -2162,6 +2237,7 @@ func TestState(t *testing.T) { makerSwap.coin.setConfs(int64(rig.abc.SwapConf)) sendBlock(abc) // wait recheckInterval*3/2 for the coin waiter, plus trigger processBlock // processInit should have succeeded, requesting an ack from taker of the maker's contract. + reqTimeout(recheckInterval * 3 / 2) // 'audit' sent, but no ack resp yet matchInfo.db.makerSwap = makerSwap // for taker's audit @@ -2266,6 +2342,7 @@ func TestState(t *testing.T) { takerSwap.coin.setConfs(int64(rig.xyz.SwapConf)) sendBlock(xyz) // wait recheckInterval*3/2 for the coin waiter, plus trigger processBlock // processInit should have succeeded, requesting an ack from taker of the maker's contract. + reqTimeout(recheckInterval * 3 / 2) // 'audit' sent, but no ack resp yet matchInfo.db.takerSwap = takerSwap // for maker's audit @@ -2367,6 +2444,7 @@ func TestState(t *testing.T) { makerRedeem.coin.setConfs(int64(rig.xyz.SwapConf)) sendBlock(xyz) // wait recheckInterval*3/2 for the coin waiter, plus trigger processBlock // processRedeem should have succeeded, requesting an ack from taker of the maker's redeem. + reqTimeout(recheckInterval * 3 / 2) // 'redemption' sent, but no ack resp yet matchInfo.db.makerRedeem = makerRedeem // for taker's redeem ack @@ -2462,6 +2540,7 @@ func TestState(t *testing.T) { takerRedeem.coin.setConfs(int64(rig.abc.SwapConf)) sendBlock(abc) // wait recheckInterval*3/2 for the coin waiter, plus trigger processBlock // processRedeem should have succeeded, requesting an ack from maker of the taker's redeem. + reqTimeout(recheckInterval * 3 / 2) // 'redemption' sent, but no ack resp yet matchInfo.db.takerRedeem = takerRedeem // for maker's redeem ack @@ -2519,6 +2598,7 @@ func TestState(t *testing.T) { } sendBlock(abc) + tickMempool() // processBlock -> match removed // match should be gone now if rig.getTracker() != nil { From 06fe8a13ac78d253f29bbc98c33cacfb8a13bc70 Mon Sep 17 00:00:00 2001 From: Jonathan Chappelow Date: Thu, 17 Sep 2020 21:59:52 -0500 Subject: [PATCH 08/12] dcr harness: use sendmany to fund wallets Co-authored-by: Wisdom Arerosuoghene --- dex/testing/dcr/harness.sh | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/dex/testing/dcr/harness.sh b/dex/testing/dcr/harness.sh index 5edd727bb1..e0e59ec5af 100755 --- a/dex/testing/dcr/harness.sh +++ b/dex/testing/dcr/harness.sh @@ -225,9 +225,8 @@ sleep 5 # Have alpha send some credits to the other wallets for i in 10 18 5 7 1 15 3 25 do - tmux send-keys -t $SESSION:0 "./alpha sendtoaddress ${BETA_MINING_ADDR} ${i}${WAIT}" C-m\; wait-for donedcr - tmux send-keys -t $SESSION:0 "./alpha sendtoaddress ${TRADING_WALLET1_ADDRESS} ${i}${WAIT}" C-m\; wait-for donedcr - tmux send-keys -t $SESSION:0 "./alpha sendtoaddress ${TRADING_WALLET2_ADDRESS} ${i}${WAIT}" C-m\; wait-for donedcr + RECIPIENTS="{\"${BETA_MINING_ADDR}\":${i},\"${TRADING_WALLET1_ADDRESS}\":${i},\"${TRADING_WALLET2_ADDRESS}\":${i}}" + tmux send-keys -t $SESSION:0 "./alpha sendmany default '${RECIPIENTS}'${WAIT}" C-m\; wait-for donedcr done sleep 0.5 tmux send-keys -t $SESSION:0 "./mine-alpha 1${WAIT}" C-m\; wait-for donedcr From 5420c42183eb57502367412e902dbaf3af8bf19e Mon Sep 17 00:00:00 2001 From: Jonathan Chappelow Date: Thu, 17 Sep 2020 22:53:11 -0500 Subject: [PATCH 09/12] review items --- server/swap/swap.go | 42 ++++++++++++++++++++++++---------------- server/swap/swap_test.go | 20 ++++++++++--------- 2 files changed, 36 insertions(+), 26 deletions(-) diff --git a/server/swap/swap.go b/server/swap/swap.go index 54b42b841d..4d311c3fa4 100644 --- a/server/swap/swap.go +++ b/server/swap/swap.go @@ -949,8 +949,6 @@ func (s *Swapper) Run(ctx context.Context) { case <-s.storage.Fatal(): return case block := <-blockNotes: - // Schedule a check of matches with one side equal to this - // block's asset by appending the block to the bcastTriggers. if block.err != nil { var connectionErr asset.ConnectionError if errors.As(block.err, &connectionErr) { @@ -1182,8 +1180,8 @@ func (s *Swapper) failMatch(match *matchTracker, makerFault bool) { s.storage.SetMatchInactive(db.MatchID(match.Match)) // If the at-fault order is a limit order, signal that if it is - // still on the book is should be unbooked, changed to revoked - // status, counted against the user's cancellation ratio, and a + // still on the book it should be unbooked, changed to revoked + // status, counted against the user's cancellation rate, and a // server-generated cancel order recorded. if lo, isLimit := orderAtFault.(*order.LimitOrder); isLimit { if s.unbookHook(lo) { @@ -1216,7 +1214,7 @@ func (s *Swapper) failMatch(match *matchTracker, makerFault bool) { // Penalize for failure to act. // // TODO: Arguably, this obviates the RecordCancel above since this - // closes the account before the possibility of a cancellation ratio + // closes the account before the possibility of a cancellation rate // penalty. I'm keeping it this way for now however since penalties // may become less severe than account closure (e.g. temporary // suspension, cool down, or order throttling), and restored @@ -1227,6 +1225,13 @@ func (s *Swapper) failMatch(match *matchTracker, makerFault bool) { s.revoke(match) } +// checkInactionEventBased scans the swapStatus structures, checking for actions +// that are expected in a time frame relative to another event that is not a +// confirmation time. If a client is found to have not acted when required, a +// match may be revoked and a penalty assigned to the user. This includes +// matches in NewlyMatched that have not received a maker swap following the +// match request, and in MakerRedeemed that have not received a taker redeem +// following the redemption request triggered by the makers redeem. func (s *Swapper) checkInactionEventBased() { // If the DB is failing, do not penalize or attempt to start revocations. if err := s.storage.LastErr(); err != nil { @@ -1248,8 +1253,7 @@ func (s *Swapper) checkInactionEventBased() { match.mtx.RLock() defer match.mtx.RUnlock() - log.Tracef("checkInactionEventBased() => checkMatch(%v, %v)", - match.ID(), match.Status) + log.Tracef("checkInactionEventBased: match %v (%v)", match.ID(), match.Status) failMatch := func(makerFault bool) { s.failMatch(match, makerFault) @@ -1285,9 +1289,13 @@ func (s *Swapper) checkInactionEventBased() { } } -// checkInaction scans the swapStatus structures relevant to the specified -// asset. If a client is found to have not acted when required, a match may be -// revoked and a penalty assigned to the user. +// checkInactionBlockBased scans the swapStatus structures relevant to the +// specified asset. If a client is found to have not acted when required, a +// match may be revoked and a penalty assigned to the user. This includes +// matches in MakerSwapCast that have not received a taker swap after the +// maker's swap reaches the required confirmation count, and in TakerSwapCast +// that have not received a maker redeem after the taker's swap reaches the +// required confirmation count. func (s *Swapper) checkInactionBlockBased(assetID uint32) { // If the DB is failing, do not penalize or attempt to start revocations. if err := s.storage.LastErr(); err != nil { @@ -1299,22 +1307,22 @@ func (s *Swapper) checkInactionBlockBased(assetID uint32) { // Do time.Since(event) with the same now time for each match. now := time.Now() tooOld := func(evt time.Time) bool { + // If the time is not set (zero), it has not happened yet (not too old). return !evt.IsZero() && now.Sub(evt) >= s.bTimeout } checkMatch := func(match *matchTracker) { + if match.makerStatus.swapAsset != assetID && match.takerStatus.swapAsset != assetID { + return + } + // Lock entire matchTracker so the following is atomic with respect to // Status. match.mtx.RLock() defer match.mtx.RUnlock() - if match.makerStatus.swapAsset != assetID && match.takerStatus.swapAsset != assetID { - return - } - - log.Tracef("checkInactionBlockBased(%d) => checkMatch(%v, %v): assets %d / %d", - assetID, match.ID(), match.Status, - match.makerStatus.swapAsset, match.takerStatus.swapAsset) + log.Tracef("checkInactionBlockBased: asset %d, match %v (%v)", + assetID, match.ID(), match.Status) failMatch := func(makerFault bool) { s.failMatch(match, makerFault) diff --git a/server/swap/swap_test.go b/server/swap/swap_test.go index 7d22c92037..1c721a557d 100644 --- a/server/swap/swap_test.go +++ b/server/swap/swap_test.go @@ -2234,10 +2234,11 @@ func TestState(t *testing.T) { // "broadcast" the contract and signal a new block. abc.setContract(makerSwap.coin, true) + // tickMempool() // latencyQ triggers processInit and audit request makerSwap.coin.setConfs(int64(rig.abc.SwapConf)) - sendBlock(abc) // wait recheckInterval*3/2 for the coin waiter, plus trigger processBlock - // processInit should have succeeded, requesting an ack from taker of the maker's contract. - reqTimeout(recheckInterval * 3 / 2) // 'audit' sent, but no ack resp yet + sendBlock(abc) // trigger processBlock, tryConfirmSwap + // processInit should have succeeded, requesting an audit ack from taker of the maker's contract. + reqTimeout(recheckInterval * 2) // 'audit' sent, but no ack resp yet matchInfo.db.makerSwap = makerSwap // for taker's audit @@ -2339,10 +2340,11 @@ func TestState(t *testing.T) { // "broadcast" the contract and signal a new block. xyz.setContract(takerSwap.coin, true) + // tickMempool() // latencyQ triggers processInit and audit request takerSwap.coin.setConfs(int64(rig.xyz.SwapConf)) - sendBlock(xyz) // wait recheckInterval*3/2 for the coin waiter, plus trigger processBlock + sendBlock(xyz) // trigger processBlock, tryConfirmSwap // processInit should have succeeded, requesting an ack from taker of the maker's contract. - reqTimeout(recheckInterval * 3 / 2) // 'audit' sent, but no ack resp yet + reqTimeout(recheckInterval * 2) // 'audit' sent, but no ack resp yet matchInfo.db.takerSwap = takerSwap // for maker's audit @@ -2442,9 +2444,9 @@ func TestState(t *testing.T) { // "broadcast" the redeem and signal a new block. xyz.setRedemption(makerRedeem.coin, true) makerRedeem.coin.setConfs(int64(rig.xyz.SwapConf)) - sendBlock(xyz) // wait recheckInterval*3/2 for the coin waiter, plus trigger processBlock + sendBlock(xyz) // trigger processBlock, redeem status check (not needed!) // processRedeem should have succeeded, requesting an ack from taker of the maker's redeem. - reqTimeout(recheckInterval * 3 / 2) // 'redemption' sent, but no ack resp yet + reqTimeout(recheckInterval * 2) // 'redemption' sent, but no ack resp yet matchInfo.db.makerRedeem = makerRedeem // for taker's redeem ack @@ -2538,9 +2540,9 @@ func TestState(t *testing.T) { // "broadcast" the redeem and signal a new block. abc.setRedemption(takerRedeem.coin, true) takerRedeem.coin.setConfs(int64(rig.abc.SwapConf)) - sendBlock(abc) // wait recheckInterval*3/2 for the coin waiter, plus trigger processBlock + sendBlock(abc) // trigger processBlock, redeem status check (not needed!) // processRedeem should have succeeded, requesting an ack from maker of the taker's redeem. - reqTimeout(recheckInterval * 3 / 2) // 'redemption' sent, but no ack resp yet + reqTimeout(recheckInterval * 2) // 'redemption' sent, but no ack resp yet matchInfo.db.takerRedeem = takerRedeem // for maker's redeem ack From 0ed20a16d77f4c22961e9ad5fce5037caf49466e Mon Sep 17 00:00:00 2001 From: Jonathan Chappelow Date: Fri, 18 Sep 2020 16:06:44 -0500 Subject: [PATCH 10/12] review --- server/swap/swap.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/swap/swap.go b/server/swap/swap.go index 4d311c3fa4..244cc46580 100644 --- a/server/swap/swap.go +++ b/server/swap/swap.go @@ -1563,9 +1563,6 @@ func (s *Swapper) processAck(msg *msgjson.Message, acker *messageAcker) { // Remove the live acker from Swapper's tracking. defer s.rmLiveAckers(msg.ID) - acker.match.mtx.Lock() - defer acker.match.mtx.Unlock() - // The time that the ack is received is stored for redeem acks to facilitate // cancellation rate enforcement. tAck := time.Now() @@ -1592,6 +1589,9 @@ func (s *Swapper) processAck(msg *msgjson.Message, acker *messageAcker) { // actor. mktMatch := db.MatchID(acker.match.Match) + acker.match.mtx.Lock() + defer acker.match.mtx.Unlock() + if rev, ok := acker.params.(*msgjson.RevokeMatch); ok { log.Infof("Received revoke_match ack for match %v, order %v, user %v", rev.MatchID, rev.OrderID, acker.user) @@ -2339,7 +2339,7 @@ func (s *Swapper) processMatchAcks(user account.AccountID, msg *msgjson.Message, err = s.authMgr.Auth(user, sigMsg, ack.Sig) if err != nil { log.Warnf("processMatchAcks: 'match' ack for match %v from user %v, "+ - " failed sig verification: %v", match.ID(), user, err) + " failed sig verification: %v", matchID, user, err) s.respondError(msg.ID, user, msgjson.SignatureError, fmt.Sprintf("signature validation error: %v", err)) return From 9780f88d7ae4cbe8f038ceb4d96a18991d441b1e Mon Sep 17 00:00:00 2001 From: Jonathan Chappelow Date: Mon, 21 Sep 2020 08:30:52 -0500 Subject: [PATCH 11/12] revise comment in latencyQ ExpireFunc --- server/swap/swap.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/server/swap/swap.go b/server/swap/swap.go index 244cc46580..40f12680c4 100644 --- a/server/swap/swap.go +++ b/server/swap/swap.go @@ -2114,8 +2114,9 @@ func (s *Swapper) handleInit(user account.AccountID, msg *msgjson.Message) *msgj }, ExpireFunc: func() { s.rmLiveWaiter(user, msg.ID) - // Tell them to broadcast again or check their node before broadcast - // timeout is reached and the match is revoked. + // NOTE: We may consider a shorter expire time so the client can + // receive warning that their may be node connectivity trouble + // while they still have a chance to fix it. s.respondError(msg.ID, user, msgjson.TransactionUndiscovered, fmt.Sprintf("failed to find contract coin %v", coinStr)) }, @@ -2212,9 +2213,11 @@ func (s *Swapper) handleRedeem(user account.AccountID, msg *msgjson.Message) *ms }, ExpireFunc: func() { s.rmLiveWaiter(user, msg.ID) + // NOTE: We may consider a shorter expire time so the client can + // receive warning that their may be node connectivity trouble + // while they still have a chance to fix it. s.respondError(msg.ID, user, msgjson.TransactionUndiscovered, fmt.Sprintf("failed to find redeemed coin %v", coinStr)) - // Client should retry the redeem request, maybe even rebroadcast. }, }) return nil From 659fc3d4f4e1259c83df1d186d6506bb2eae8fd1 Mon Sep 17 00:00:00 2001 From: Jonathan Chappelow Date: Mon, 21 Sep 2020 09:01:02 -0500 Subject: [PATCH 12/12] grammer --- server/swap/swap.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/swap/swap.go b/server/swap/swap.go index 40f12680c4..be91528cd8 100644 --- a/server/swap/swap.go +++ b/server/swap/swap.go @@ -2115,8 +2115,8 @@ func (s *Swapper) handleInit(user account.AccountID, msg *msgjson.Message) *msgj ExpireFunc: func() { s.rmLiveWaiter(user, msg.ID) // NOTE: We may consider a shorter expire time so the client can - // receive warning that their may be node connectivity trouble - // while they still have a chance to fix it. + // receive warning that there may be node or wallet connectivity + // trouble while they still have a chance to fix it. s.respondError(msg.ID, user, msgjson.TransactionUndiscovered, fmt.Sprintf("failed to find contract coin %v", coinStr)) }, @@ -2214,8 +2214,8 @@ func (s *Swapper) handleRedeem(user account.AccountID, msg *msgjson.Message) *ms ExpireFunc: func() { s.rmLiveWaiter(user, msg.ID) // NOTE: We may consider a shorter expire time so the client can - // receive warning that their may be node connectivity trouble - // while they still have a chance to fix it. + // receive warning that there may be node or wallet connectivity + // trouble while they still have a chance to fix it. s.respondError(msg.ID, user, msgjson.TransactionUndiscovered, fmt.Sprintf("failed to find redeemed coin %v", coinStr)) },