Skip to content

Commit

Permalink
better lock pattern in match_status resolution
Browse files Browse the repository at this point in the history
  • Loading branch information
buck54321 authored and chappjc committed Nov 2, 2020
1 parent c0adb26 commit c09017d
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 77 deletions.
36 changes: 26 additions & 10 deletions client/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,8 +465,8 @@ type matchDiscreps struct {
// matchStatusConflict is a conflict between our status, and the status returned
// by the server in the connect response.
type matchStatusConflict struct {
trade *trackedTrade
match *matchTracker
trade *trackedTrade
matches []*matchTracker
}

// compareServerMatches resolves the matches reported by the server in the
Expand All @@ -476,13 +476,14 @@ type matchStatusConflict struct {
// but we also must check for incomplete matches that the server is not
// reporting.
func (dc *dexConnection) compareServerMatches(srvMatches map[order.OrderID]*serverMatches) (
exceptions map[order.OrderID]*matchDiscreps, statusConflicts []*matchStatusConflict) {
exceptions map[order.OrderID]*matchDiscreps, statusConflicts map[order.OrderID]*matchStatusConflict) {

exceptions = make(map[order.OrderID]*matchDiscreps)
statusConflicts = make(map[order.OrderID]*matchStatusConflict)

// Identify extra matches named by the server response that we do not
// recognize.
for _, match := range srvMatches {
for oid, match := range srvMatches {
var extra []*msgjson.Match
match.tracker.mtx.RLock()
for _, msgMatch := range match.msgMatches {
Expand All @@ -494,10 +495,12 @@ func (dc *dexConnection) compareServerMatches(srvMatches map[order.OrderID]*serv
continue
}
if mt.Match.Status != order.MatchStatus(msgMatch.Status) {
statusConflicts = append(statusConflicts, &matchStatusConflict{
trade: match.tracker,
match: mt,
})
conflict := statusConflicts[oid]
if conflict == nil {
conflict = &matchStatusConflict{trade: match.tracker}
statusConflicts[oid] = conflict
}
conflict.matches = append(conflict.matches, mt)
}
}
match.tracker.mtx.RUnlock()
Expand Down Expand Up @@ -2802,7 +2805,14 @@ func (c *Core) authDEX(dc *dexConnection) error {
}
c.log.Infof("Queueing match status resolution for newly discovered match %v (%s) "+
"as taker to MakerSwapCast status.", matchID, match.Match.Status) // had better be NewlyMatched!
matchConflicts = append(matchConflicts, &matchStatusConflict{trade, match})

oid := trade.ID()
conflicts := matchConflicts[oid]
if conflicts == nil {
conflicts = &matchStatusConflict{trade: trade}
matchConflicts[oid] = conflicts
}
conflicts.matches = append(conflicts.matches, trade.matches[matchID])
}
}
}
Expand All @@ -2827,7 +2837,11 @@ func (c *Core) authDEX(dc *dexConnection) error {
}

if len(matchConflicts) > 0 {
c.log.Warnf("Beginning match status resolution for %d matches...", len(matchConflicts))
var n int
for _, c := range matchConflicts {
n += len(c.matches)
}
c.log.Warnf("Beginning match status resolution for %d matches...", n)
c.resolveMatchConflicts(dc, matchConflicts)
}

Expand Down Expand Up @@ -3686,7 +3700,9 @@ func handleRevokeMatchMsg(c *Core, dc *dexConnection, msg *msgjson.Message) erro
var matchID order.MatchID
copy(matchID[:], revocation.MatchID)

tracker.mtx.Lock()
err = tracker.revokeMatch(matchID, true)
tracker.mtx.Unlock()
if err != nil {
return fmt.Errorf("unable to revoke match %s for order %s: %v", matchID, tracker.ID(), err)
}
Expand Down
57 changes: 48 additions & 9 deletions client/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4874,9 +4874,7 @@ func TestMatchStatusResolution(t *testing.T) {
match := &matchTracker{
id: matchID,
MetaMatch: db.MetaMatch{
MetaData: &db.MatchMetaData{
Proof: db.MatchProof{},
},
MetaData: &db.MatchMetaData{},
Match: &order.UserMatch{
Address: addr,
},
Expand Down Expand Up @@ -4938,7 +4936,7 @@ func TestMatchStatusResolution(t *testing.T) {
}
}

// Call setProof before setAuth
// Call setProof before setAuthSigs
setProof := func(status order.MatchStatus) {
isMaker := match.Match.Side == order.Maker
match.SetStatus(status)
Expand Down Expand Up @@ -5099,11 +5097,6 @@ func TestMatchStatusResolution(t *testing.T) {
servers: order.MakerRedeemed,
side: order.Taker,
},
{ // Same status should behave the same way.
ours: order.MakerSwapCast,
servers: order.MakerSwapCast,
side: order.Maker,
},
}

// Backwards resolvers won't update the match status, but also won't revoke
Expand Down Expand Up @@ -5275,6 +5268,52 @@ func TestMatchStatusResolution(t *testing.T) {
t.Fatalf("(%s) match not self-revoked during nonsense resolution", testName(tt))
}
}

// Run two matches for the same order.
match2ID := ordertest.RandomMatchID()
match2 := &matchTracker{
id: match2ID,
MetaMatch: db.MetaMatch{
MetaData: &db.MatchMetaData{},
Match: &order.UserMatch{
Address: addr,
},
},
}
trade.matches[match2ID] = match2
setAuthSigs(order.NewlyMatched)
setProof(order.NewlyMatched)
match2.Match.Side = order.Taker
match2.MetaData.Proof = match.MetaData.Proof

srvMatches := connectMatches(order.MakerSwapCast)
srvMatches = append(srvMatches, &msgjson.Match{OrderID: oid[:],
MatchID: match2ID[:],
Status: uint8(order.MakerSwapCast),
Side: uint8(order.Taker),
})

res1 := setMatchResults(order.MakerSwapCast)
res2 := setMatchResults(order.MakerSwapCast)
res2.MatchID = match2ID[:]

rig.queueConnect(nil, srvMatches, nil)
rig.ws.queueResponse(msgjson.MatchStatusRoute, func(msg *msgjson.Message, f msgFunc) error {
resp, _ := msgjson.NewResponse(msg.ID, []*msgjson.MatchStatusResult{res1, res2}, nil)
f(resp)
return nil
})
tCore.authDEX(dc)
trade.mtx.Lock()
newStatus1 := match.MetaData.Status
newStatus2 := match2.MetaData.Status
trade.mtx.Unlock()
if newStatus1 != order.MakerSwapCast {
t.Fatalf("wrong status for match 1: %s", newStatus1)
}
if newStatus2 != order.MakerSwapCast {
t.Fatalf("wrong status for match 2: %s", newStatus2)
}
}

func TestSuspectTrades(t *testing.T) {
Expand Down
92 changes: 45 additions & 47 deletions client/core/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package core

import (
"fmt"
"sync"

"decred.org/dcrdex/client/asset"
"decred.org/dcrdex/dex/msgjson"
Expand All @@ -17,37 +18,27 @@ func statusResolutionID(dc *dexConnection, trade *trackedTrade, match *matchTrac
return fmt.Sprintf("host = %s, order = %s, match = %s", dc.acct.host, trade.ID(), match.id)
}

// resolveMatchConflicts accepts a dexConnection and a slice of match status
// conflicts. resolveMatchConflicts will block until the call to match_status
// returns, but then goroutines are started to handle the actual resolution.
// The trackedTrades' mutexes will remain locked until resolution completes.
func (c *Core) resolveMatchConflicts(dc *dexConnection, statusConflicts []*matchStatusConflict) {
// resolveMatchConflicts attempts to resolve conflicts between the server's
// reported match status and our own. This involves a 'match_status' request to
// the server and possibly some wallet operations. ResolveMatchConflicts will
// block until resolution is complete.
func (c *Core) resolveMatchConflicts(dc *dexConnection, statusConflicts map[order.OrderID]*matchStatusConflict) {

statusRequests := make([]*msgjson.MatchRequest, 0, len(statusConflicts))
// Lock the trade mutexes now until we're done. The other option would be
// to lock the trade mutexes in resolveConflictWithServerData, but that
// might allow a tick to occur between the match_status request and
// resolveConflictWithServerData. Our resolvers never send any requests or
// or responses. The longest running functions only request some wallet
// data, so I wouldn't expect these locks to be held for long. Regardless,
// we'll run each resolveConflictWithServerData below as a goroutine just to
// make sure we're not stacking those wallet calls.
for _, conflict := range statusConflicts {
conflict.trade.mtx.Lock()
statusRequests = append(statusRequests, &msgjson.MatchRequest{
Base: conflict.trade.Base(),
Quote: conflict.trade.Quote(),
MatchID: conflict.match.id[:],
})
for _, match := range conflict.matches {
statusRequests = append(statusRequests, &msgjson.MatchRequest{
Base: conflict.trade.Base(),
Quote: conflict.trade.Quote(),
MatchID: match.id[:],
})
}
}

var msgStatuses []*msgjson.MatchStatusResult
err := sendRequest(dc.WsConn, msgjson.MatchStatusRoute, statusRequests, &msgStatuses, DefaultResponseTimeout)
if err != nil {
for _, conflict := range statusConflicts {
conflict.trade.mtx.Unlock()
}
c.log.Errorf("match_status request error for %s requesting %d match statuses. %s: %v", dc.acct.host, len(statusRequests), err)
c.log.Errorf("match_status request error for %s requesting %d match statuses: %v", dc.acct.host, len(statusRequests), err)
return
}

Expand All @@ -59,32 +50,31 @@ func (c *Core) resolveMatchConflicts(dc *dexConnection, statusConflicts []*match
resMap[matchID] = msgStatus
}

var wg sync.WaitGroup
for _, conflict := range statusConflicts {
srvData := resMap[conflict.match.id]
if srvData == nil {
// I don't really know how this would happen, considering the server
// reported the match as active in the connect response. I'm also
// not sure what action to take. Maybe just revoke the match.
c.log.Errorf("Server did not report a status for match during resolution. %s", statusResolutionID(dc, conflict.trade, conflict.match))
// revokeMatch only returns an error for a missing match ID,
// and we already checked in compareServerMatches.
conflict.trade.revokeMatch(conflict.match.id, false)
conflict.trade.mtx.Unlock()
continue
}

if order.MatchStatus(srvData.Status) != order.MatchComplete && !srvData.Active {
// Server has revoked the match. We'll still go through
// resolveConflictWithServerData to collect any extra data the
// server has, but setting ServerRevoked will prevent us from
// trying to update the state with the server.
conflict.match.MetaData.Proof.ServerRevoked = true
}
go func(trade *trackedTrade, match *matchTracker) {
c.resolveConflictWithServerData(dc, trade, match, srvData)
trade.mtx.Unlock()
}(conflict.trade, conflict.match)
wg.Add(1)
go func(trade *trackedTrade, matches []*matchTracker) {
defer wg.Done()
trade.mtx.Lock()
defer trade.mtx.Unlock()
for _, match := range matches {
srvData := resMap[match.id]
if srvData == nil {
// I don't really know how this would happen, considering the server
// reported the match as active in the connect response. I'm also
// not sure what action to take. Maybe just revoke the match.
c.log.Errorf("Server did not report a status for match during resolution. %s", statusResolutionID(dc, trade, match))
// revokeMatch only returns an error for a missing match ID,
// and we already checked in compareServerMatches.
trade.revokeMatch(match.id, false)
continue
}
c.resolveConflictWithServerData(dc, trade, match, srvData)
}
}(conflict.trade, conflict.matches)
}

wg.Wait()
}

// The matchConflictResolver is unique to a MatchStatus pair and handles
Expand Down Expand Up @@ -129,6 +119,14 @@ func conflictResolver(ours, servers order.MatchStatus) matchConflictResolver {
func (c *Core) resolveConflictWithServerData(dc *dexConnection, trade *trackedTrade, match *matchTracker, srvData *msgjson.MatchStatusResult) {
srvStatus := order.MatchStatus(srvData.Status)

if srvStatus != order.MatchComplete && !srvData.Active {
// Server has revoked the match. We'll still go through
// resolveConflictWithServerData to collect any extra data the
// server has, but setting ServerRevoked will prevent us from
// trying to update the state with the server.
match.MetaData.Proof.ServerRevoked = true
}

if srvStatus == match.MetaData.Status || match.MetaData.Proof.IsRevoked() {
// On startup, there's no chance for a tick between the connect request
// and the match_status request, so this would be unlikely. But if not
Expand Down
23 changes: 12 additions & 11 deletions client/core/trade.go
Original file line number Diff line number Diff line change
Expand Up @@ -1091,10 +1091,9 @@ func (t *trackedTrade) revoke() {
t.maybeReturnCoins()
}

// revokeMatch sets the status as revoked for the specified match.
// revokeMatch sets the status as revoked for the specified match. revokeMatch
// must be called with the mtx write-locked.
func (t *trackedTrade) revokeMatch(matchID order.MatchID, fromServer bool) error {
t.mtx.Lock()
defer t.mtx.Unlock()
var revokedMatch *matchTracker
for _, match := range t.matches {
if match.id == matchID {
Expand Down Expand Up @@ -1360,10 +1359,11 @@ func (c *Core) finalizeSwapAction(t *trackedTrade, match *matchTracker, coinID,
var msgErr *msgjson.Error
if errors.As(err, &msgErr) && msgErr.Code == msgjson.SettlementSequenceError {
// Try resolving the match status conflict.
go c.resolveMatchConflicts(t.dc, []*matchStatusConflict{{
trade: t,
match: match,
}})
go c.resolveMatchConflicts(t.dc, map[order.OrderID]*matchStatusConflict{
t.ID(): {
trade: t,
matches: []*matchTracker{match},
}})
}
errs.add("error sending 'init' message for match %s: %v", match.id, err)
} else if err := t.dc.acct.checkSig(init.Serialize(), ack.Sig); err != nil {
Expand Down Expand Up @@ -1527,10 +1527,11 @@ func (c *Core) finalizeRedeemAction(t *trackedTrade, match *matchTracker, coinID
var msgErr *msgjson.Error
if errors.As(err, &msgErr) && msgErr.Code == msgjson.SettlementSequenceError {
// Try resolving the match status conflict.
go c.resolveMatchConflicts(t.dc, []*matchStatusConflict{{
trade: t,
match: match,
}})
go c.resolveMatchConflicts(t.dc, map[order.OrderID]*matchStatusConflict{
t.ID(): {
trade: t,
matches: []*matchTracker{match},
}})
}
ack.Sig = nil // in case of partial unmarshal
errs.add("error sending 'redeem' message for match %s: %v", match.id, err)
Expand Down

0 comments on commit c09017d

Please sign in to comment.