Skip to content

Commit

Permalink
client/core: purge legacy preimage request handling
Browse files Browse the repository at this point in the history
The initial preimage request did not include the order commitment for
the preimage being requested.  This had us doing some tricks because
the request only provided the order ID, which is not known to the client
until the order submission response was received, creating a race.

Since this flawed design was resolved in 850e8a6, in v0.2.0 about 18
months ago, by including the commitment in the preimage request,
we can safely remove the legacy client-side handling for requests
that do not include the commitment.  This change removes the piSyncers
map and legacy code path from handlePreimageRequest.
  • Loading branch information
chappjc committed Nov 23, 2022
1 parent 881e720 commit dde6466
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 146 deletions.
140 changes: 27 additions & 113 deletions client/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,9 +641,6 @@ func (c *Core) tryCancelTrade(dc *dexConnection, tracker *trackedTrade) error {
return fmt.Errorf("error storing cancel order info %s: %w", co.ID(), err)
}

// Now that the trackedTrade is updated, sync with the preimage request.
c.syncOrderPlaced(co.ID())

// Store the cancel order.
err = c.db.UpdateOrder(&db.MetaOrder{
MetaData: &db.OrderMetaData{
Expand Down Expand Up @@ -671,26 +668,6 @@ func (c *Core) tryCancelTrade(dc *dexConnection, tracker *trackedTrade) error {
return nil
}

// Synchronize with the preimage request, in case that request came before
// we had an order ID and added this order to the trades map or cancel field.
// (V0PURGE)
func (c *Core) syncOrderPlaced(oid order.OrderID) {
c.piSyncMtx.Lock()
syncChan, found := c.piSyncers[oid]
if found {
// If we found the channel, the preimage request is already waiting.
// Close the channel to allow that request to proceed.
close(syncChan)
delete(c.piSyncers, oid)
} else {
// If there is no channel, the preimage request hasn't come yet. Add a
// channel to signal readiness. Could insert nil unless syncOrderPlaced
// is erroneously called twice for the same order ID.
c.piSyncers[oid] = make(chan struct{})
}
c.piSyncMtx.Unlock()
}

// signAndRequest signs and sends the request, unmarshaling the response into
// the provided interface.
func (dc *dexConnection) signAndRequest(signable msgjson.Signable, route string, result interface{}, timeout time.Duration) error {
Expand Down Expand Up @@ -1366,9 +1343,6 @@ type Core struct {
noteMtx sync.RWMutex
noteChans map[uint64]chan Notification

piSyncMtx sync.Mutex
piSyncers map[order.OrderID]chan struct{} // V0PURGE

sentCommitsMtx sync.Mutex
sentCommits map[order.Commitment]chan struct{}

Expand Down Expand Up @@ -1470,7 +1444,6 @@ func New(cfg *Config) (*Core, error) {
lockTimeTaker: dex.LockTimeTaker(cfg.Net),
lockTimeMaker: dex.LockTimeMaker(cfg.Net),
blockWaiters: make(map[string]*blockWaiter),
piSyncers: make(map[order.OrderID]chan struct{}),
sentCommits: make(map[order.Commitment]chan struct{}),
tickSched: make(map[order.OrderID]*time.Timer),
// Allowing to change the constructor makes testing a lot easier.
Expand Down Expand Up @@ -5556,9 +5529,6 @@ func (c *Core) sendTradeRequest(tr *tradeRequest) (*Order, error) {
dc.trades[tracker.ID()] = tracker
dc.tradeMtx.Unlock()

// Now that the trades map is updated, sync with the preimage request.
c.syncOrderPlaced(ord.ID())

// Send a low-priority notification.
corder := tracker.coreOrder()
if !form.IsLimit && !form.Sell {
Expand Down Expand Up @@ -7654,85 +7624,35 @@ func handlePreimageRequest(c *Core, dc *dexConnection, msg *msgjson.Message) err
return err
}

// NEW protocol with commitment specified.
if len(req.Commitment) == order.CommitmentSize {
// See if we recognize that commitment, and if we do, just wait for the
// order ID, and process the request.
var commit order.Commitment
copy(commit[:], req.Commitment)

c.sentCommitsMtx.Lock()
defer c.sentCommitsMtx.Unlock()
commitSig, found := c.sentCommits[commit]
if !found { // this is the main benefit of a commitment index
return fmt.Errorf("received preimage request for unknown commitment %v, order %v",
req.Commitment, oid)
}
delete(c.sentCommits, commit)
if len(req.Commitment) != order.CommitmentSize {
return fmt.Errorf("received preimage request for %v with no corresponding order submission response.", oid)
}

dc.log.Debugf("Received preimage request for order %v with known commitment %v", oid, commit)
// See if we recognize that commitment, and if we do, just wait for the
// order ID, and process the request.
var commit order.Commitment
copy(commit[:], req.Commitment)

// Go async while waiting.
go func() {
// Order request success OR fail closes the channel.
<-commitSig
if err := processPreimageRequest(c, dc, msg.ID, oid, req.CommitChecksum); err != nil {
c.log.Errorf("async processPreimageRequest for %v failed: %v", oid, err)
} else {
c.log.Debugf("async processPreimageRequest for %v succeeded", oid)
}
c.sentCommitsMtx.Lock()
defer c.sentCommitsMtx.Unlock()
commitSig, found := c.sentCommits[commit]
if !found { // this is the main benefit of a commitment index
return fmt.Errorf("received preimage request for unknown commitment %v, order %v",
req.Commitment, oid)
}
delete(c.sentCommits, commit)

// There or not, delete this oid entry from the deprecated map.
c.piSyncMtx.Lock()
delete(c.piSyncers, oid)
c.piSyncMtx.Unlock()
}()
dc.log.Debugf("Received preimage request for order %v with known commitment %v", oid, commit)

return nil
} // else no or invalid commitment, eventually error (v0 DEPRECATION)

// OLD protocol below without the commitment is DEPRECATED. Remove when
// protocol version reaches 1.

// Sync with order placement, the response from which provides the order ID.
c.piSyncMtx.Lock()
if _, found := c.piSyncers[oid]; found {
// If we found a map entry, the tracker is already in the trades map,
// and we can go ahead.
delete(c.piSyncers, oid)
c.piSyncMtx.Unlock()
dc.log.Debugf("Received preimage request for known order %v", oid)
return processPreimageRequest(c, dc, msg.ID, oid, req.CommitChecksum)
}

// If we didn't find an entry, Trade or Cancel is still running. Add a chan
// and wait for Trade to close it.
syncChan := make(chan struct{})
c.piSyncers[oid] = syncChan
c.piSyncMtx.Unlock()

// The order submission could be timing out waiting for a response, or this
// could be a bogus preimage request, so we do not want to block the caller,
// (*Core).listen if this hangs. Preimage requests are ok to handle
// asynchronously since there can be no matches until we respond to this.
c.log.Warnf("Received preimage request for %v with no corresponding order submission response! Waiting...", oid)
// Go async while waiting.
go func() {
select {
case <-syncChan:
if err := processPreimageRequest(c, dc, msg.ID, oid, req.CommitChecksum); err != nil {
c.log.Errorf("async processPreimageRequest for %v failed: %v", oid, err)
} else {
c.log.Debugf("async processPreimageRequest for %v succeeded", oid)
}
// The channel is deleted from the piSyncers map by syncOrderPlaced.
return
case <-time.After(DefaultResponseTimeout):
c.log.Errorf("Timed out syncing preimage request from %s, order %s", dc.acct.host, oid)
case <-c.ctx.Done():
// Order request success OR fail closes the channel.
<-commitSig
if err := processPreimageRequest(c, dc, msg.ID, oid, req.CommitChecksum); err != nil {
c.log.Errorf("async processPreimageRequest for %v failed: %v", oid, err)
} else {
c.log.Debugf("async processPreimageRequest for %v succeeded", oid)
}
c.piSyncMtx.Lock()
delete(c.piSyncers, oid)
c.piSyncMtx.Unlock()
}()

return nil
Expand All @@ -7749,8 +7669,8 @@ func processPreimageRequest(c *Core, dc *dexConnection, reqID uint64, oid order.
return fmt.Errorf("no active order found for preimage request for %s", oid)
} // delete the entry in match/nomatch
} else {
// Record the csum if this preimage request is novel, and deny it if this is
// a duplicate request with an altered csum.
// Record the csum if this preimage request is novel, and deny it if
// this is a duplicate request with an altered csum.
if !acceptCsum(tracker, isCancel, commitChecksum) {
csumErr := errors.New("invalid csum in duplicate preimage request")
resp, err := msgjson.NewResponse(reqID, nil,
Expand All @@ -7767,14 +7687,6 @@ func processPreimageRequest(c *Core, dc *dexConnection, reqID uint64, oid order.
}
}

// Clean up the sentCommits now that we loaded the commitment. This can be
// removed when the old piSyncers method is removed. (V0PURGE)
defer func() {
// Note the commitment is not tracker.Commitment() for cancel orders.
c.sentCommitsMtx.Lock()
delete(c.sentCommits, preImg.Commit()) // redundant if the commitment was in request
c.sentCommitsMtx.Unlock()
}()
resp, err := msgjson.NewResponse(reqID, &msgjson.PreimageResponse{
Preimage: preImg[:],
}, nil)
Expand All @@ -7785,13 +7697,15 @@ func processPreimageRequest(c *Core, dc *dexConnection, reqID uint64, oid order.
if err != nil {
return fmt.Errorf("preimage send error: %w", err)
}

if tracker != nil {
topic := TopicPreimageSent
if isCancel {
topic = TopicCancelPreimageSent
}
c.notify(newOrderNote(topic, "", "", db.Data, tracker.coreOrder()))
}

return nil
}

Expand Down
49 changes: 16 additions & 33 deletions client/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1266,7 +1266,6 @@ func newTestRig() *testRig {
lockTimeMaker: dex.LockTimeMaker(dex.Testnet),
wallets: make(map[uint32]*xcWallet),
blockWaiters: make(map[string]*blockWaiter),
piSyncers: make(map[order.OrderID]chan struct{}),
sentCommits: make(map[order.Commitment]chan struct{}),
tickSched: make(map[order.OrderID]*time.Timer),
wsConstructor: func(*comms.WsCfg) (comms.WsConn, error) {
Expand Down Expand Up @@ -3729,6 +3728,8 @@ func TestHandlePreimageRequest(t *testing.T) {
ord := &order.LimitOrder{P: order.Prefix{ServerTime: time.Now()}}
oid := ord.ID()
preImg := newPreimage()

// It is no longer OK for server to omit the commitment.
payload := &msgjson.PreimageRequest{
OrderID: oid[:],
// No commitment in this request.
Expand All @@ -3745,13 +3746,6 @@ func TestHandlePreimageRequest(t *testing.T) {
metaData: &db.OrderMetaData{},
}

// Simulate an order submission request having completed.
loadSyncer := func() {
rig.core.piSyncMtx.Lock()
rig.core.piSyncers[oid] = nil // set nil to ensure it's just a map entry
rig.core.piSyncMtx.Unlock()
}

// resetCsum resets csum for further preimage request since multiple
// testing scenarios use the same tracker object.
resetCsum := func(tracker *trackedTrade) {
Expand All @@ -3761,10 +3755,9 @@ func TestHandlePreimageRequest(t *testing.T) {
}

rig.dc.trades[oid] = tracker
loadSyncer()
err := handlePreimageRequest(rig.core, rig.dc, reqNoCommit)
if err != nil {
t.Fatalf("handlePreimageRequest error: %v", err)
if err == nil {
t.Fatalf("handlePreimageRequest succeeded with no commitment in the request")
}
resetCsum(tracker)

Expand Down Expand Up @@ -3810,7 +3803,6 @@ func TestHandlePreimageRequest(t *testing.T) {
// negative paths
ensureErr := func(tag string, req *msgjson.Message, errPrefix string) {
t.Helper()
loadSyncer()
commitSig := readyCommitment(commit)
close(commitSig) // ready before preimage request
err := handlePreimageRequest(rig.core, rig.dc, req)
Expand All @@ -3830,18 +3822,6 @@ func TestHandlePreimageRequest(t *testing.T) {
}
reqCommitBad, _ := msgjson.NewRequest(rig.dc.NextID(), msgjson.PreimageRoute, payloadBad)
ensureErr("unknown commitment", reqCommitBad, "received preimage request for unknown commitment")
// all other errors for

// Trade-not-found error only returned on synchronous non-commitment request
// handling, so use reqNoCommit to test that part of processPreimageRequest.
delete(rig.dc.trades, oid)
ensureErr("no tracker", reqNoCommit, "no active order found for preimage request")
rig.dc.trades[oid] = tracker // reset

// Response send error also only returned on synchronous request handling.
rig.ws.sendErr = tErr
ensureErr("send error", reqNoCommit, "preimage send error")
rig.ws.sendErr = nil // reset
})
t.Run("csum for order", func(t *testing.T) {
rig := newTestRig()
Expand Down Expand Up @@ -7546,6 +7526,7 @@ func TestPreimageSync(t *testing.T) {
tBtcWallet.fundRedeemScripts = []dex.Bytes{nil}

limitRouteProcessing := make(chan order.OrderID)
var commit order.Commitment

rig.ws.queueResponse(msgjson.LimitRoute, func(msg *msgjson.Message, f msgFunc) error {
t.Helper()
Expand All @@ -7558,8 +7539,8 @@ func TestPreimageSync(t *testing.T) {
lo := convertMsgLimitOrder(msgOrder)
resp := orderResponse(msg.ID, msgOrder, lo, false, false, false)
limitRouteProcessing <- lo.ID()
<-time.NewTimer(time.Millisecond * 100).C
f(resp)
commit = lo.Commit // accessed below only after errChan receive indicating Trade done
f(resp) // e.g. the UnmarshalJSON in sendRequest
return nil
})

Expand All @@ -7575,25 +7556,27 @@ func TestPreimageSync(t *testing.T) {
var oid order.OrderID
select {
case oid = <-limitRouteProcessing:
case <-time.NewTimer(time.Second).C:
case <-time.After(time.Second):
t.Fatalf("limit route never hit")
}

err := <-errChan
if err != nil {
t.Fatalf("trade error: %v", err)
}

// So ideally, we're calling handlePreimageRequest about 100 ms before we
// even have an order id back from the server. This shouldn't result in an
// error.
payload := &msgjson.PreimageRequest{
OrderID: oid[:],
OrderID: oid[:],
Commitment: commit[:],
}
req, _ := msgjson.NewRequest(rig.dc.NextID(), msgjson.PreimageRoute, payload)
err := handlePreimageRequest(rig.core, rig.dc, req)
err = handlePreimageRequest(rig.core, rig.dc, req)
if err != nil {
t.Fatalf("early preimage request error: %v", err)
}
err = <-errChan
if err != nil {
t.Fatalf("trade error: %v", err)
}
}

func TestAccelerateOrder(t *testing.T) {
Expand Down

0 comments on commit dde6466

Please sign in to comment.