Skip to content

Commit

Permalink
exchanges: add an aggregated orderbook
Browse files Browse the repository at this point in the history
This adds an aggregated orderbook.

Combines the order book data into a single aggregated orderbook
available at (ExchangeBot).QuickDepths, with special token
'aggregated'. A stacked depth chart is added to /charts. The depth
chart gap width is also now displayed.
  • Loading branch information
buck54321 authored and chappjc committed May 13, 2019
1 parent 1929b0c commit 4ee1782
Show file tree
Hide file tree
Showing 8 changed files with 678 additions and 92 deletions.
186 changes: 163 additions & 23 deletions exchanges/bot.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ const (
DefaultRequestExpiry = "60m"

defaultDCRRatesPort = "7778"

aggregatedOrderbookKey = "aggregated"
orderbookKey = "depth"
)

var grpcClient dcrrates.DCRRatesClient
Expand Down Expand Up @@ -125,6 +128,14 @@ func (state *ExchangeBotState) BtcToFiat(btc float64) float64 {
return state.BtcPrice * btc
}

// FiatToBtc converts an amount of fiat in the default index to a value in BTC.
func (state *ExchangeBotState) FiatToBtc(fiat float64) float64 {
if state.BtcPrice == 0 {
return -1
}
return fiat / state.BtcPrice
}

// ExchangeState doesn't have a Token field, so if the states are returned as a
// slice (rather than ranging over a map), a token is needed.
type tokenedExchange struct {
Expand All @@ -148,6 +159,39 @@ func (state *ExchangeBotState) VolumeOrderedExchanges() []*tokenedExchange {
return xcList
}

// A price bin for the aggregated orderbook. The Volumes array will be length
// N = number of depth-reporting exchanges. If any exchange has an order book
// entry at price Price, then an agBookPt should be created. If a different
// exchange does not have an order at Price, there will be a 0 in Volumes at
// the exchange's index. An exchange's index in Volumes is set by its index
// in (aggregateOrderbook).Tokens.
type agBookPt struct {
Price float64 `json:"price"`
Volumes []float64 `json:"volumes"`
}

// The aggregated depth data. Similar to DepthData, but with agBookPts instead.
// For aggregateData, the Time will indicate the most recent time at which an
// exchange with non-nil DepthData was updated.
type aggregateData struct {
Time int64 `json:"time"`
Bids []agBookPt `json:"bids"`
Asks []agBookPt `json:"asks"`
}

// An aggregated orderbook. Combines all data from the DepthData of each
// Exchange. For aggregatedOrderbook, the Expiration is set to the time of the
// most recent DepthData update plus an additional (ExchangeBot).RequestExpiry,
// though new data may be available before then.
type aggregateOrderbook struct {
BtcIndex string `json:"btc_index"`
Price float64 `json:"price"`
Tokens []string `json:"tokens"`
UpdateTimes []int64 `json:"update_times"`
Data aggregateData `json:"data"`
Expiration int64 `json:"expiration"`
}

// FiatIndices maps currency codes to Bitcoin exchange rates.
type FiatIndices map[string]float64

Expand Down Expand Up @@ -202,7 +246,6 @@ type depthResponse struct {
type versionedChart struct {
chartID string
dataID int
time time.Time
chart []byte
}

Expand Down Expand Up @@ -687,7 +730,8 @@ func (bot *ExchangeBot) updateExchange(update *ExchangeUpdate) error {
}
}
if update.State.Depth != nil {
bot.incrementChart(genCacheID(update.Token, "depth"))
bot.incrementChart(genCacheID(update.Token, orderbookKey))
bot.incrementChart(genCacheID(aggregatedOrderbookKey, orderbookKey))
}
bot.currentState.DcrBtc[update.Token] = update.State
return bot.updateState()
Expand Down Expand Up @@ -881,48 +925,144 @@ func (bot *ExchangeBot) QuickSticks(token string, rawBin string) ([]byte, error)
vChart := &versionedChart{
chartID: chartID,
dataID: bestVersion,
time: expiration,
chart: chart,
}

bot.versionedCharts[chartID] = vChart
return vChart.chart, nil
}

// Move the DepthPoint array into a map whose entries are agBookPt, inserting
// the (DepthPoint).Quantity values at xcIndex of Volumes. Creates Volumes
// if it does not yet exist.
func mapifyDepthPoints(source []DepthPoint, target map[int64]agBookPt, xcIndex, ptCount int) {
for _, pt := range source {
k := eightPtKey(pt.Price)
_, found := target[k]
if !found {
target[k] = agBookPt{
Price: pt.Price,
Volumes: make([]float64, ptCount),
}
}
target[k].Volumes[xcIndex] = pt.Quantity
}
}

// A list of eightPtKey keys from an orderbook tracking map. Used for sorting.
func agBookMapKeys(book map[int64]agBookPt) []int64 {
keys := make([]int64, 0, len(book))
for k := range book {
keys = append(keys, k)
}
return keys
}

// After the aggregate orderbook map is fully assembled, sort the keys and
// process the map into a list of lists.
func unmapAgOrders(book map[int64]agBookPt, reverse bool) []agBookPt {
orderedBook := make([]agBookPt, 0, len(book))
keys := agBookMapKeys(book)
if reverse {
sort.Slice(keys, func(i, j int) bool { return keys[j] < keys[i] })
} else {
sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
}
for _, k := range keys {
orderedBook = append(orderedBook, book[k])
}
return orderedBook
}

// Make an aggregate orderbook from all depth data.
func (bot *ExchangeBot) aggOrderbook() *aggregateOrderbook {
state := bot.State()
if state == nil {
return nil
}
bids := make(map[int64]agBookPt)
asks := make(map[int64]agBookPt)

oldestUpdate := time.Now().Unix()
var newestTime int64
// First, grab the tokens for exchanges with depth data so that they can be
// counted and sorted alphabetically.
tokens := []string{}
for token, xcState := range state.DcrBtc {
if !xcState.HasDepth() {
continue
}
tokens = append(tokens, token)
}
numXc := len(tokens)
updateTimes := make([]int64, 0, numXc)
sort.Strings(tokens)
for i, token := range tokens {
xcState := state.DcrBtc[token]
depth := xcState.Depth
if depth.Time < oldestUpdate {
oldestUpdate = depth.Time
}
if depth.Time > newestTime {
newestTime = depth.Time
}
updateTimes = append(updateTimes, depth.Time)
mapifyDepthPoints(depth.Bids, bids, i, numXc)
mapifyDepthPoints(depth.Asks, asks, i, numXc)
}
return &aggregateOrderbook{
Tokens: tokens,
BtcIndex: bot.BtcIndex,
Price: state.Price,
UpdateTimes: updateTimes,
Data: aggregateData{
Time: newestTime,
Bids: unmapAgOrders(bids, true),
Asks: unmapAgOrders(asks, false),
},
Expiration: oldestUpdate + int64(bot.RequestExpiry.Seconds()),
}
}

// QuickDepth returns the up-to-date depth chart data for the specified
// exchange, pulling from the cache if appropriate.
func (bot *ExchangeBot) QuickDepth(token string) ([]byte, error) {
chartID := genCacheID(token, "depth")
func (bot *ExchangeBot) QuickDepth(token string) (chart []byte, err error) {
chartID := genCacheID(token, orderbookKey)
data, bestVersion, isGood := bot.fetchFromCache(chartID)
if isGood {
return data, nil
}

// No hit on cache. Re-encode.

bot.mtx.Lock()
defer bot.mtx.Unlock()
state, found := bot.currentState.DcrBtc[token]
if !found {
return []byte{}, fmt.Errorf("Failed to find DCR exchange state for %s", token)
}
if state.Depth == nil {
return []byte{}, fmt.Errorf("Failed to find depth for %s", token)
if token == aggregatedOrderbookKey {
agDepth := bot.aggOrderbook()
if agDepth == nil {
return nil, fmt.Errorf("Failed to find depth for %s", token)
}
chart, err = bot.jsonify(agDepth)
} else {
bot.mtx.Lock()
defer bot.mtx.Unlock()
xcState, found := bot.currentState.DcrBtc[token]
if !found {
return nil, fmt.Errorf("Failed to find DCR exchange state for %s", token)
}
if xcState.Depth == nil {
return nil, fmt.Errorf("Failed to find depth for %s", token)
}
chart, err = bot.jsonify(&depthResponse{
BtcIndex: bot.BtcIndex,
Price: bot.currentState.Price,
Data: xcState.Depth,
Expiration: xcState.Depth.Time + int64(bot.RequestExpiry.Seconds()),
})
}
chart, err := bot.jsonify(&depthResponse{
BtcIndex: bot.BtcIndex,
Price: bot.currentState.Price,
Data: state.Depth,
Expiration: state.Depth.Time + int64(bot.RequestExpiry.Seconds()),
})
if err != nil {
return []byte{}, fmt.Errorf("JSON encode error for %s depth chart", token)
return nil, fmt.Errorf("JSON encode error for %s depth chart", token)
}

vChart := &versionedChart{
chartID: chartID,
dataID: bestVersion,
time: time.Unix(state.Depth.Time, 0),
chart: chart,
}

Expand Down
10 changes: 10 additions & 0 deletions exchanges/exchanges.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package exchanges
import (
"encoding/json"
"fmt"
"math"
"net/http"
"sort"
"strconv"
Expand Down Expand Up @@ -232,6 +233,15 @@ func (sticks Candlesticks) needsUpdate(bin candlestickKey) bool {
return time.Now().After(lastStick.Start.Add(bin.duration() * 2))
}

// Most exchanges bin price values on a float precision of 8 decimal points.
// eightPtKey reliably converts the float to an int64 that is unique for a price
// bin.
func eightPtKey(rate float64) int64 {
// Bittrex values are sometimes parsed with floating point error that can
// affect the key if you simply use int64(rate).
return int64(math.Round(rate * 1e8))
}

// ExchangeState is the simple template for a price. The only member that is
// guaranteed is a price. For Decred exchanges, the volumes will also be
// populated.
Expand Down
45 changes: 37 additions & 8 deletions exchanges/exchanges_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/decred/slog"
)

func testExchanges(asSlave bool, t *testing.T) {
func testExchanges(asSlave, quickTest bool, t *testing.T) {
UseLogger(slog.NewBackend(os.Stdout).Logger("EXE"))
log.SetLevel(slog.LevelTrace)

Expand Down Expand Up @@ -59,22 +59,41 @@ func testExchanges(asSlave bool, t *testing.T) {
t.Fatalf("Error creating bot. Shutting down: %v", err)
}

updateCounts := make(map[string]int)
for token := range bot.Exchanges {
updateCounts[token] = 0
}
logUpdate := func(token string) {
if !quickTest {
return
}
updateCounts[token]++
lowest := updateCounts[token]
for _, v := range updateCounts {
if v < lowest {
lowest = v
}
}
if lowest > 0 {
log.Infof("Quick test conditions met. Shutting down early")
shutdown()
}
}

wg.Add(1)
go bot.Start(ctx, wg)

quitTimer := time.NewTimer(time.Minute * 7)
var updated []string

ch := bot.UpdateChannels()

out:
for {
select {
case update := <-ch.Exchange:
updated = append(updated, update.Token)
logUpdate(update.Token)
log.Infof("Update received from exchange %s", update.Token)
case update := <-ch.Index:
updated = append(updated, update.Token)
logUpdate(update.Token)
log.Infof("Update received from index %s", update.Token)
case <-ch.Quit:
t.Errorf("Exchange bot has quit.")
Expand All @@ -91,7 +110,7 @@ out:
}

logMissing := func(token string) {
for _, xc := range updated {
for xc := range updateCounts {
if xc == token {
return
}
Expand All @@ -103,6 +122,12 @@ out:
logMissing(token)
}

depth, err := bot.QuickDepth(aggregatedOrderbookKey)
if err != nil {
t.Errorf("Failed to create aggregated orderbook")
}
log.Infof("aggregated orderbook size: %d kiB", len(depth)/1024)

log.Infof("%d Bitcoin indices available", len(bot.AvailableIndices()))
log.Infof("final state is %d kiB", len(bot.StateBytes())/1024)

Expand All @@ -111,13 +136,17 @@ out:
}

func TestExchanges(t *testing.T) {
testExchanges(false, t)
testExchanges(false, false, t)
}

func TestSlaveBot(t *testing.T) {
// Points to DCRData on local machine port 7778.
// Start server with --exchange-refresh=1m --exchange-expiry=2m
testExchanges(true, t)
testExchanges(true, false, t)
}

func TestQuickExchanges(t *testing.T) {
testExchanges(false, true, t)
}

var initialPoloniexOrderbook = []byte(`[
Expand Down
2 changes: 1 addition & 1 deletion public/images/exchange-logos-25.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit 4ee1782

Please sign in to comment.