Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

app: show more details in user orders table #378

Merged
merged 2 commits into from
May 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions client/core/bookie.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,10 @@ func handleBookOrderMsg(_ *Core, dc *dexConnection, msg *msgjson.Message) error
return err
}
book.send(&BookUpdate{
Action: msg.Route,
Payload: minifyOrder(note.OrderID, &note.TradeNote, 0),
Action: msg.Route,
DEX: dc.acct.url,
MarketID: note.MarketID,
Payload: minifyOrder(note.OrderID, &note.TradeNote, 0),
})
return nil
}
Expand All @@ -319,8 +321,10 @@ func handleUnbookOrderMsg(_ *Core, dc *dexConnection, msg *msgjson.Message) erro
return err
}
book.send(&BookUpdate{
Action: msg.Route,
Payload: &MiniOrder{Token: token(note.OrderID)},
Action: msg.Route,
DEX: dc.acct.url,
MarketID: note.MarketID,
Payload: &MiniOrder{Token: token(note.OrderID)},
})

return nil
Expand Down Expand Up @@ -348,7 +352,9 @@ func handleUpdateRemainingMsg(_ *Core, dc *dexConnection, msg *msgjson.Message)
return err
}
book.send(&BookUpdate{
Action: msg.Route,
Action: msg.Route,
DEX: dc.acct.url,
MarketID: note.MarketID,
Payload: &RemainingUpdate{
Token: token(note.OrderID),
Qty: float64(note.Remaining) / conversionFactor,
Expand All @@ -366,7 +372,9 @@ func handleEpochOrderMsg(c *Core, dc *dexConnection, msg *msgjson.Message) error
return fmt.Errorf("epoch order note unmarshal error: %v", err)
}

c.setEpoch(note.Epoch)
if dc.setEpoch(note.MarketID, note.Epoch) {
c.refreshUser()
}

dc.booksMtx.RLock()
defer dc.booksMtx.RUnlock()
Expand All @@ -383,8 +391,10 @@ func handleEpochOrderMsg(c *Core, dc *dexConnection, msg *msgjson.Message) error
}
// Send a mini-order for book updates.
book.send(&BookUpdate{
Action: msg.Route,
Payload: minifyOrder(note.OrderID, &note.TradeNote, note.Epoch),
Action: msg.Route,
DEX: dc.acct.url,
MarketID: note.MarketID,
Payload: minifyOrder(note.OrderID, &note.TradeNote, note.Epoch),
})

return nil
Expand Down
130 changes: 107 additions & 23 deletions client/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ type dexConnection struct {

tradeMtx sync.RWMutex
trades map[order.OrderID]*trackedTrade

epochMtx sync.RWMutex
epoch map[string]uint64
}

// refreshMarkets rebuilds, saves, and returns the market map. The map itself
Expand Down Expand Up @@ -264,17 +267,71 @@ func (dc *dexConnection) compareServerMatches(matches map[order.OrderID]*serverM
}

// tickAsset checks open matches related to a specific asset for needed action.
func (dc *dexConnection) tickAsset(assetID uint32) {
func (dc *dexConnection) tickAsset(assetID uint32) (numUpdated int) {
dc.tradeMtx.RLock()
defer dc.tradeMtx.RUnlock()
for _, trade := range dc.trades {
if trade.Base() == assetID || trade.Quote() == assetID {
err := trade.tick()
n, err := trade.tick()
if err != nil {
log.Errorf("%s tick error: %v", dc.acct.url, err)
}
numUpdated += n
}
}
return
}

// market gets the *Market from the marketMap, or nil if mktID is unknown.
func (dc *dexConnection) market(mktID string) *Market {
dc.marketMtx.RLock()
defer dc.marketMtx.RUnlock()
return dc.marketMap[mktID]
}

// setEpoch sets the epoch. If the passed epoch is greater than the highest
// previously passed epoch, send an epoch notification to all subscribers.
func (dc *dexConnection) setEpoch(mktID string, epochIdx uint64) bool {
dc.epochMtx.Lock()
defer dc.epochMtx.Unlock()
if epochIdx > dc.epoch[mktID] {
dc.epoch[mktID] = epochIdx
dc.notify(newEpochNotification(dc.acct.url, mktID, epochIdx))
var updateCount int
dc.tradeMtx.Lock()
for _, trade := range dc.trades {
if trade.processEpoch(epochIdx) {
updateCount++
}
}
dc.tradeMtx.Unlock()
if updateCount > 0 {
dc.refreshMarkets()
return true
}
}
return false
}

// marketEpochDuration gets the market's epoch duration. If the market is not
// known, an error is logged and 0 is returned.
func (dc *dexConnection) marketEpochDuration(mktID string) uint64 {
mkt := dc.market(mktID)
if mkt == nil {
log.Errorf("marketEpoch called for unknown market %s", mktID)
return 0
}
return mkt.EpochLen
}

// marketEpoch gets the epoch index for the specified market and time stamp. If
// the market is not known, 0 is returned.
func (dc *dexConnection) marketEpoch(mktID string, stamp time.Time) uint64 {
epochLen := dc.marketEpochDuration(mktID)
if epochLen == 0 {
return 0
}
return encode.UnixMilliU(stamp) / epochLen
}

// blockWaiter is a message waiting to be stamped, signed, and sent once a
Expand Down Expand Up @@ -323,9 +380,6 @@ type Core struct {

noteMtx sync.RWMutex
noteChans []chan Notification

epochMtx sync.RWMutex
epoch uint64
}

// New is the constructor for a new Core.
Expand Down Expand Up @@ -1175,6 +1229,12 @@ func (c *Core) Trade(pw []byte, form *TradeForm) (*Order, error) {
return nil, fmt.Errorf("unknown DEX %s", form.DEX)
}

mktID := marketName(form.Base, form.Quote)
mkt := dc.market(mktID)
if mkt == nil {
return nil, fmt.Errorf("order placed for unknown market")
}

rate, qty := form.Rate, form.Qty
if form.IsLimit && rate == 0 {
return nil, fmt.Errorf("zero-rate order not allowed")
Expand Down Expand Up @@ -1326,7 +1386,7 @@ func (c *Core) Trade(pw []byte, form *TradeForm) (*Order, error) {
}

// Prepare and store the tracker and get the core.Order to return.
tracker := newTrackedTrade(dbOrder, preImg, dc, c.db, c.latencyQ, wallets, coins, c.notify)
tracker := newTrackedTrade(dbOrder, preImg, dc, mkt.EpochLen, c.db, c.latencyQ, wallets, coins, c.notify)
corder, _ := tracker.coreOrder()
dc.tradeMtx.Lock()
dc.trades[tracker.ID()] = tracker
Expand Down Expand Up @@ -1742,6 +1802,12 @@ func (c *Core) dbTrackers(dc *dexConnection) (map[order.OrderID]*trackedTrade, e
trackers := make(map[order.OrderID]*trackedTrade, len(dbOrders))
for _, dbOrder := range dbOrders {
ord, md := dbOrder.Order, dbOrder.MetaData
mktID := marketName(ord.Base(), ord.Quote())
mkt := dc.market(mktID)
if mkt == nil {
log.Errorf("active %s order retrieved for unknown market %s", ord.ID(), mktID)
continue
}
var preImg order.Preimage
copy(preImg[:], md.Proof.Preimage)
if co, ok := ord.(*order.CancelOrder); ok {
Expand All @@ -1750,7 +1816,7 @@ func (c *Core) dbTrackers(dc *dexConnection) (map[order.OrderID]*trackedTrade, e
} else {
var preImg order.Preimage
copy(preImg[:], dbOrder.MetaData.Proof.Preimage)
trackers[dbOrder.Order.ID()] = newTrackedTrade(dbOrder, preImg, dc, c.db, c.latencyQ, nil, nil, c.notify)
trackers[dbOrder.Order.ID()] = newTrackedTrade(dbOrder, preImg, dc, mkt.EpochLen, c.db, c.latencyQ, nil, nil, c.notify)
}
}
for oid := range cancels {
Expand Down Expand Up @@ -1991,6 +2057,7 @@ func (c *Core) connectDEX(acctInfo *db.AccountInfo) (*dexConnection, error) {
}

marketMap := make(map[string]*Market)
epochMap := make(map[string]uint64)
for _, mkt := range dexCfg.Markets {
base, quote := assets[mkt.Base], assets[mkt.Quote]
market := &Market{
Expand All @@ -2004,6 +2071,7 @@ func (c *Core) connectDEX(acctInfo *db.AccountInfo) (*dexConnection, error) {
MarketBuyBuffer: mkt.MarketBuyBuffer,
}
marketMap[mkt.Name] = market
epochMap[mkt.Name] = 0
}

// Create the dexConnection and listen for incoming messages.
Expand All @@ -2017,6 +2085,7 @@ func (c *Core) connectDEX(acctInfo *db.AccountInfo) (*dexConnection, error) {
marketMap: marketMap,
trades: make(map[order.OrderID]*trackedTrade),
notify: c.notify,
epoch: epochMap,
}

dc.refreshMarkets()
Expand All @@ -2027,17 +2096,6 @@ func (c *Core) connectDEX(acctInfo *db.AccountInfo) (*dexConnection, error) {
return dc, nil
}

// If the passed epoch is greater than the highest previously passed epoch,
// send an epoch notification to all subscribers.
func (c *Core) setEpoch(epochIdx uint64) {
c.epochMtx.Lock()
defer c.epochMtx.Unlock()
if epochIdx > c.epoch {
c.epoch = epochIdx
c.notify(newEpochNotification(epochIdx))
}
}

// handleReconnect is called when a WsConn indicates that a lost connection has
// been re-established.
func (c *Core) handleReconnect(uri string) {
Expand All @@ -2053,7 +2111,9 @@ func handleMatchProofMsg(c *Core, dc *dexConnection, msg *msgjson.Message) error
}

// Expire the epoch
c.setEpoch(note.Epoch + 1)
if dc.setEpoch(note.MarketID, note.Epoch+1) {
c.refreshUser()
}

dc.booksMtx.RLock()
defer dc.booksMtx.RUnlock()
Expand Down Expand Up @@ -2164,14 +2224,20 @@ out:
log.Error(err)
}
case <-ticker.C:
var numUpdated int
dc.tradeMtx.Lock()
for _, trade := range dc.trades {
err := trade.tick()
n, err := trade.tick()
if err != nil {
log.Error(err)
}
numUpdated += n
}
dc.tradeMtx.Unlock()
if numUpdated > 0 {
dc.refreshMarkets()
c.refreshUser()
}
case <-c.ctx.Done():
break out
}
Expand Down Expand Up @@ -2260,7 +2326,12 @@ func handleAuditRoute(c *Core, dc *dexConnection, msg *msgjson.Message) error {
if err != nil {
return err
}
return tracker.tick()
numUpdated, err := tracker.tick()
if numUpdated > 0 {
dc.refreshMarkets()
c.refreshUser()
}
return err
}

// handleRedemptionRoute handles the DEX-originating redemption request, which
Expand All @@ -2281,7 +2352,12 @@ func handleRedemptionRoute(c *Core, dc *dexConnection, msg *msgjson.Message) err
if err != nil {
return err
}
return tracker.tick()
numUpdated, err := tracker.tick()
if numUpdated > 0 {
dc.refreshMarkets()
c.refreshUser()
}
return err
}

// removeWaiter removes a blockWaiter from the map.
Expand Down Expand Up @@ -2319,10 +2395,18 @@ func (c *Core) tipChange(assetID uint32, nodeErr error) {
}
c.waiterMtx.Unlock()
c.connMtx.RLock()
var numUpdated int
for _, dc := range c.conns {
dc.tickAsset(assetID)
n := dc.tickAsset(assetID)
if n > 0 {
dc.refreshMarkets()
}
numUpdated += n
}
c.connMtx.RUnlock()
if numUpdated > 0 {
c.refreshUser()
}
}

// convertAssetInfo converts from a *msgjson.Asset to the nearly identical
Expand Down
Loading