Skip to content

Commit

Permalink
WIP concurrent tickAsset
Browse files Browse the repository at this point in the history
also move the maker init sleep to swapMatches
  • Loading branch information
chappjc committed Sep 17, 2020
1 parent 85d7b90 commit 4a22c9d
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 25 deletions.
44 changes: 19 additions & 25 deletions client/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,15 +602,22 @@ func (dc *dexConnection) tickAsset(assetID uint32) assetMap {
}
dc.tradeMtx.RUnlock()

updated := make(assetMap)
updateChan := make(chan assetMap)
for _, trade := range assetTrades {
newUpdates, err := trade.tick()
if err != nil {
log.Errorf("%s tick error: %v", dc.acct.host, err)
}
updated.merge(newUpdates)
trade := trade // bad go, bad
go func() {
newUpdates, err := trade.tick()
if err != nil {
log.Errorf("%s tick error: %v", dc.acct.host, err)
}
updateChan <- newUpdates
}()
}

updated := make(assetMap)
for range assetTrades {
updated.merge(<-updateChan)
}
return updated
}

Expand Down Expand Up @@ -3347,12 +3354,12 @@ func (c *Core) listen(dc *dexConnection) {
var doneTrades, activeTrades []*trackedTrade
dc.tradeMtx.Lock()
for oid, trade := range dc.trades {
if !trade.isActive() {
doneTrades = append(doneTrades, trade)
delete(dc.trades, oid)
if trade.isActive() {
activeTrades = append(activeTrades, trade)
continue
}
activeTrades = append(activeTrades, trade)
doneTrades = append(doneTrades, trade)
delete(dc.trades, oid)
}
dc.tradeMtx.Unlock()

Expand Down Expand Up @@ -3529,25 +3536,12 @@ func handleMatchRoute(c *Core, dc *dexConnection, msg *msgjson.Message) error {
// dc.addPendingSend(resp) // e.g.
}

defer c.refreshUser() // update WalletState

// Begin match negotiation.
//
// TODO: We need to know that the server has first recorded the ack
// signature, otherwise 'init' will fail. Until this is solved with a
// response to the ack, sleep for a moment if we are the maker on any of the
// orders. However, resendPendingRequests will retry the request when the
// trackedTrade ticks again when the timer fires in listen.
for _, m := range msgMatches {
if m.Side == uint8(order.Maker) {
time.Sleep(250 * time.Millisecond)
break
}
}

updatedAssets, err := dc.runMatches(matches)
if len(updatedAssets) > 0 {
c.updateBalances(updatedAssets)
} else {
c.refreshUser() // would be called by updateBalances
}

return err
Expand Down
7 changes: 7 additions & 0 deletions client/core/trade.go
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,7 @@ func (t *trackedTrade) swapMatches(matches []*matchTracker) error {
contracts := make([]*asset.Contract, len(matches))
// These matches may have different fee rates, matched in different epochs.
var highestFeeRate uint64
var includesMakerSwap bool
for i, match := range matches {
dbMatch, _, proof, auth := match.parts()
value := dbMatch.Quantity
Expand All @@ -1063,6 +1064,7 @@ func (t *trackedTrade) swapMatches(matches []*matchTracker) error {
secretHash := sha256.Sum256(proof.Secret)
proof.SecretHash = secretHash[:]
lockTime = matchTime.Add(t.lockTimeMaker).UTC().Unix()
includesMakerSwap = true
}

contracts[i] = &asset.Contract{
Expand Down Expand Up @@ -1159,6 +1161,11 @@ func (t *trackedTrade) swapMatches(matches []*matchTracker) error {
t.change = change
t.db.UpdateOrderMetaData(t.ID(), t.metaData)

// Workaround for server recording match ack sig, to avoid an 'init' retry.
if includesMakerSwap {
time.Sleep(250 * time.Millisecond)
}

// Process the swap for each match by sending the `init` request
// to the DEX and updating the match with swap details.
// Add any errors encountered to `errs` and proceed to next match
Expand Down

0 comments on commit 4a22c9d

Please sign in to comment.