Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client/core: missing matches were incomplete #661

Merged
merged 7 commits into from
Sep 3, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 124 additions & 10 deletions client/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,13 @@ func (dc *dexConnection) hasActiveOrders() bool {
func (dc *dexConnection) findOrder(oid order.OrderID) (tracker *trackedTrade, preImg order.Preimage, isCancel bool) {
dc.tradeMtx.RLock()
defer dc.tradeMtx.RUnlock()
// Try to find the order as a trade.
if tracker, found := dc.trades[oid]; found {
return tracker, tracker.preImg, false
}
// Search the cancel order IDs.
for _, tracker := range dc.trades {
if tracker.ID() == oid {
return tracker, tracker.preImg, false
} else if tracker.cancel != nil && tracker.cancel.ID() == oid {
if tracker.cancel != nil && tracker.cancel.ID() == oid {
return tracker, tracker.cancel.preImg, true
}
}
Expand Down Expand Up @@ -490,17 +493,90 @@ func (dc *dexConnection) runMatches(tradeMatches map[order.OrderID]*serverMatche
return errs.ifAny()
}

type matchDiscreps struct {
trade *trackedTrade
missing []*matchTracker
extra []*msgjson.Match
}

// compareServerMatches resolves the matches reported by the server in the
// 'connect' response against those marked incomplete in the matchTracker map
// for each serverMatch.
// Reported matches with missing trackers are already checked by parseMatches,
// but we also must check for incomplete matches that the server is not
// reporting.
func (dc *dexConnection) compareServerMatches(matches map[order.OrderID]*serverMatches) {
for _, match := range matches {
// readConnectMatches sends notifications for any problems encountered.
match.tracker.readConnectMatches(match.msgMatches)
func (dc *dexConnection) compareServerMatches(srvMatches map[order.OrderID]*serverMatches) (exceptions map[order.OrderID]*matchDiscreps) {
exceptions = make(map[order.OrderID]*matchDiscreps)

// Identify extra matches named by the server response that we do not
// recognize.
for _, match := range srvMatches {
var extra []*msgjson.Match
match.tracker.mtx.RLock()
for _, msgMatch := range match.msgMatches {
var matchID order.MatchID
copy(matchID[:], msgMatch.MatchID)
if match.tracker.matches[matchID] == nil {
extra = append(extra, msgMatch)
}
}
match.tracker.mtx.RUnlock()
if len(extra) > 0 {
exceptions[match.tracker.ID()] = &matchDiscreps{
trade: match.tracker,
extra: extra,
}
}
}

in := func(matches []*msgjson.Match, mid []byte) bool {
for _, m := range matches {
if bytes.Equal(m.MatchID, mid) {
return true
}
}
return false
}

setMissing := func(trade *trackedTrade, missing []*matchTracker) {
if tt, found := exceptions[trade.ID()]; found {
tt.missing = missing
} else {
exceptions[trade.ID()] = &matchDiscreps{
trade: trade,
missing: missing,
}
}
}

// Identify active matches that are missing from server's response.
dc.tradeMtx.RLock()
defer dc.tradeMtx.RUnlock()
for oid, trade := range dc.trades {
activeMatches := trade.activeMatches()
if len(activeMatches) == 0 {
continue
}
tradeMatches, found := srvMatches[oid]
if !found {
// ALL of this trade's active matches are missing.
setMissing(trade, activeMatches)
continue // check next local trade
}
// Check this local trade's active matches against server's reported
// matches for this trade.
var missing []*matchTracker
for _, match := range activeMatches { // each local match
if !in(tradeMatches.msgMatches, match.id[:]) { // against reported matches
missing = append(missing, match)
}
}
if len(missing) > 0 {
setMissing(trade, missing)
}
}

return
}

// tickAsset checks open matches related to a specific asset for needed action.
Expand Down Expand Up @@ -1734,7 +1810,7 @@ func (c *Core) resolveActiveTrades(crypter encrypt.Crypter) (loaded int) {
c.notify(newOrderNote("Order load failure", details, db.ErrorLevel, nil))
}
if len(ready) > 0 {
locks := c.resumeTrades(dc, ready)
locks := c.resumeTrades(dc, ready) // fills out dc.trades
// if err != nil {
// details := fmt.Sprintf("Some active orders failed to resume: %v", err)
// c.notify(newOrderNote("Order resumption error", details, db.ErrorLevel, nil))
Expand Down Expand Up @@ -2231,7 +2307,45 @@ func (c *Core) authDEX(dc *dexConnection) error {
log.Error(err)
}

dc.compareServerMatches(matches)
exceptions := dc.compareServerMatches(matches)
for oid, matchAnomalies := range exceptions {
trade := matchAnomalies.trade
missing, extras := matchAnomalies.missing, matchAnomalies.extra

// Flag each of the missing matches as revoked.
for _, match := range missing {
log.Warnf("DEX %s did not report active match %s on order %s - assuming revoked.",
dc.acct.host, match.id, oid)
match.failErr = fmt.Errorf("order not reported by the server on connect")
// Must have been revoked while we were gone. Flag to allow recovery
// and subsequent retirement of the match and parent trade.
match.MetaData.Proof.IsRevoked = true
if err := c.db.UpdateMatch(&match.MetaMatch); err != nil {
log.Errorf("Failed to update missing/revoked match: %v", err)
}
}

corder, _ := trade.coreOrder()

// Send a "Missing matches" order note if there are missing match message.
if len(missing) > 0 {
details := fmt.Sprintf("%d matches for order %s were not reported by %q and are considered revoked",
len(missing), trade.token(), dc.acct.host)
c.notify(newOrderNote("Missing matches", details, db.ErrorLevel, corder))
}

// Send a "Match resolution error" if there are extra match messages.
if len(extras) > 0 {
details := fmt.Sprintf("%d matches reported by %s were not found for %s.",
len(extras), dc.acct.host, trade.token())
c.notify(newOrderNote("Match resolution error", details, db.ErrorLevel, corder))
// t.negotiate(extras) // missed match message request, but match not revoked (?!)
for _, extra := range extras {
log.Errorf("DEX %s reported match %s which is not a known active match for order %s",
dc.acct.host, extra.MatchID, extra.OrderID)
}
}
}

return nil
}
Expand Down Expand Up @@ -2522,7 +2636,7 @@ func (c *Core) loadDBTrades(dc *dexConnection, crypter encrypt.Crypter, failed m
}

errs := newErrorSet(dc.acct.host + ": ")
ready := make([]*trackedTrade, 0, len(dc.trades))
ready := make([]*trackedTrade, 0, len(trades))
for _, trade := range trades {
if !trade.isActive() {
// In this event, there is a discrepancy between the active criteria
Expand Down
Loading