Skip to content

Commit

Permalink
multi: add market data API endpoints
Browse files Browse the repository at this point in the history
Exposes a rate-limited HTTP API. Data endpoints are accessible via HTTP 
or WebSockets. The result for the WebSocket request is identical to the 
REST API response.

/spots is the spot price and booked volume of all markets.

/candles is candlestick data, available in bin sizes of 24h, 1h, 15m, 
and per-epoch sticks. An example URL is /candles/dcr/btc/15m.

/orderbook is already a WebSocket route, but is now also accessible by 
HTTP. An example URL is /orderbook/dcr/btc

/config is another WebSocket route that is also now available over HTTP 
too.

The data API implements a data cache, but does not cache pre-encoded 
responses for /candles or /orderbook (yet).

**server/db**

Market history is stored as a row per epoch. I've created a new table 
for this, though we could also combine with the existing epoch table 
that holds match proof info if preferred. 

I've made no attempt to integrate pre-existing data. Data will be 
collected from the time of upgrade only. The upgrade is silent, since we 
don't have upgrade infrastructure in place yet for the server. 

**server/comms**

The config route is now registered as an http handler, but also called 
from the websocket message handler.

HTTP requests are rate-limited using `"golang.org/x/time/rate"`. There 
is both a global rate limiter and a per-IP rate limiter. The rate limits 
apply to un-authenticated routes only. The /config request is has an 
exception as a "critical' route.

* enable/disable via admin api. config route not metered on ws

* use better http json response encoding pattern
  • Loading branch information
buck54321 committed Dec 8, 2020
1 parent c1cea6f commit 08afde3
Show file tree
Hide file tree
Showing 46 changed files with 2,400 additions and 189 deletions.
20 changes: 20 additions & 0 deletions client/core/bookie.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,26 @@ func handleUpdateRemainingMsg(_ *Core, dc *dexConnection, msg *msgjson.Message)
return nil
}

// handleEpochReportMsg is called when an epoch_report notification is
// received.
func handleEpochReportMsg(_ *Core, dc *dexConnection, msg *msgjson.Message) error {
note := new(msgjson.EpochReportNote)
err := msg.Unmarshal(note)
if err != nil {
return fmt.Errorf("epoch report note unmarshal error: %w", err)
}
book, ok := dc.books[note.MarketID]
if !ok {
return fmt.Errorf("no order book found with market id '%v'",
note.MarketID)
}
err = book.LogEpochReport(note)
if err != nil {
return fmt.Errorf("error logging epoch report: %w", err)
}
return nil
}

// handleEpochOrderMsg is called when an epoch_order notification is
// received.
func handleEpochOrderMsg(c *Core, dc *dexConnection, msg *msgjson.Message) error {
Expand Down
1 change: 1 addition & 0 deletions client/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -3998,6 +3998,7 @@ var noteHandlers = map[string]routeHandler{
msgjson.EpochOrderRoute: handleEpochOrderMsg,
msgjson.UnbookOrderRoute: handleUnbookOrderMsg,
msgjson.UpdateRemainingRoute: handleUpdateRemainingMsg,
msgjson.EpochReportRoute: handleEpochReportMsg,
msgjson.SuspensionRoute: handleTradeSuspensionMsg,
msgjson.ResumptionRoute: handleTradeResumptionMsg,
msgjson.NotifyRoute: handleNotifyMsg,
Expand Down
6 changes: 6 additions & 0 deletions client/orderbook/orderbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,12 @@ func (ob *OrderBook) UpdateRemaining(note *msgjson.UpdateRemainingNote) error {
return ob.updateRemaining(note, false)
}

// LogEpochReport just checks the notification sequence.
func (ob *OrderBook) LogEpochReport(note *msgjson.EpochReportNote) error {
ob.setSeq(note.Seq)
return nil
}

// unbook is the workhorse of the exported Unbook function. It allows unbooking
// cached and uncached order notes.
func (ob *OrderBook) unbook(note *msgjson.UnbookOrderNote, cached bool) error {
Expand Down
12 changes: 9 additions & 3 deletions client/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,16 @@ func writeJSON(w http.ResponseWriter, thing interface{}) {
// ResponseWriter with the specified response code.
func writeJSONWithStatus(w http.ResponseWriter, thing interface{}, code int) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(code)
encoder := json.NewEncoder(w)
if err := encoder.Encode(thing); err != nil {
b, err := json.Marshal(thing)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
log.Errorf("JSON encode error: %v", err)
return
}
w.WriteHeader(code)
_, err = w.Write(b)
if err != nil {
log.Errorf("Write error: %v", err)
}
}

Expand Down
17 changes: 9 additions & 8 deletions client/webserver/webserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,14 +498,15 @@ func writeJSON(w http.ResponseWriter, thing interface{}, indent bool) {
// ResponseWriter with the specified response code.
func writeJSONWithStatus(w http.ResponseWriter, thing interface{}, code int, indent bool) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(code)
encoder := json.NewEncoder(w)
indentStr := ""
if indent {
indentStr = " "
b, err := json.Marshal(thing)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
log.Errorf("JSON encode error: %v", err)
return
}
encoder.SetIndent("", indentStr)
if err := encoder.Encode(thing); err != nil {
log.Infof("JSON encode error: %v", err)
w.WriteHeader(code)
_, err = w.Write(append(b, byte('\n')))
if err != nil {
log.Errorf("Write error: %v", err)
}
}
1 change: 1 addition & 0 deletions client/webserver/webserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ func TestAPIRegister(t *testing.T) {
defer shutdown()

ensure := func(want string) {
t.Helper()
ensureResponse(t, s.apiRegister, want, reader, writer, body)
}

Expand Down
27 changes: 10 additions & 17 deletions client/websocket/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -41,9 +40,9 @@ type wsClient struct {
feedLoop *dex.StartStopWaiter
}

func newWSClient(ip string, conn ws.Connection, hndlr func(msg *msgjson.Message) *msgjson.Error, logger dex.Logger) *wsClient {
func newWSClient(ip dex.IPKey, addr string, conn ws.Connection, hndlr func(msg *msgjson.Message) *msgjson.Error, logger dex.Logger) *wsClient {
return &wsClient{
WSLink: ws.NewWSLink(ip, conn, pingPeriod, hndlr, logger),
WSLink: ws.NewWSLink(ip, addr, conn, pingPeriod, hndlr, logger),
cid: atomic.AddInt32(&cidCounter, 1),
}
}
Expand Down Expand Up @@ -96,13 +95,7 @@ func (s *Server) Shutdown() {
// able to cancel the hijacked connection handler at a later time since this
// function is not blocking.
func (s *Server) HandleConnect(ctx context.Context, w http.ResponseWriter, r *http.Request) {
// If the IP address includes a port, remove it.
ip := r.RemoteAddr
// If a host:port can be parsed, the IP is only the host portion.
host, _, err := net.SplitHostPort(ip)
if err == nil && host != "" {
ip = host
}
ip := dex.NewIPKey(r.RemoteAddr)
wsConn, err := ws.NewConnection(w, r, pongWait)
if err != nil {
s.log.Errorf("ws connection error: %v", err)
Expand All @@ -116,7 +109,7 @@ func (s *Server) HandleConnect(ctx context.Context, w http.ResponseWriter, r *ht
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.connect(ctx, wsConn, ip)
s.connect(ctx, wsConn, r.RemoteAddr, ip)
}()

s.log.Trace("HandleConnect done.")
Expand All @@ -125,15 +118,15 @@ func (s *Server) HandleConnect(ctx context.Context, w http.ResponseWriter, r *ht
// connect handles a new websocket client by creating a new wsClient, starting
// it, and blocking until the connection closes. This method should be
// run as a goroutine.
func (s *Server) connect(ctx context.Context, conn ws.Connection, ip string) {
s.log.Debugf("New websocket client %s", ip)
func (s *Server) connect(ctx context.Context, conn ws.Connection, addr string, ip dex.IPKey) {
s.log.Debugf("New websocket client %s", addr)
// Create a new websocket client to handle the new websocket connection
// and wait for it to shutdown. Once it has shutdown (and hence
// disconnected), remove it.
var cl *wsClient
cl = newWSClient(ip, conn, func(msg *msgjson.Message) *msgjson.Error {
cl = newWSClient(ip, addr, conn, func(msg *msgjson.Message) *msgjson.Error {
return s.handleMessage(cl, msg)
}, s.log.SubLogger(ip))
}, s.log.SubLogger(addr))

// Lock the clients map before starting the connection listening so that
// synchronized map accesses are guaranteed to reflect this connection.
Expand Down Expand Up @@ -167,7 +160,7 @@ func (s *Server) connect(ctx context.Context, conn ws.Connection, ip string) {
}()

cm.Wait() // also waits for any handleMessage calls in (*WSLink).inHandler
s.log.Tracef("Disconnected websocket client %s", ip)
s.log.Tracef("Disconnected websocket client %s", addr)
}

// Notify sends a notification to the websocket client.
Expand All @@ -182,7 +175,7 @@ func (s *Server) Notify(route string, payload interface{}) {
for _, cl := range s.clients {
if err = cl.Send(msg); err != nil {
s.log.Warnf("Failed to send %v notification to client %v at %v: %v",
msg.Route, cl.cid, cl.IP(), err)
msg.Route, cl.cid, cl.Addr(), err)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions client/websocket/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func newLink() *tLink {
respReady: make(chan []byte, 1),
close: make(chan struct{}, 1),
}
cl := newWSClient("localhost", conn, func(*msgjson.Message) *msgjson.Error { return nil }, dex.StdOutLogger("ws_TEST", dex.LevelTrace))
cl := newWSClient(dex.IPKey{}, "addr", conn, func(*msgjson.Message) *msgjson.Error { return nil }, dex.StdOutLogger("ws_TEST", dex.LevelTrace))
return &tLink{
cl: cl,
conn: conn,
Expand Down Expand Up @@ -263,7 +263,7 @@ func TestClientMap(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
srv.connect(ctx, conn, "someip")
srv.connect(ctx, conn, "someaddr", dex.IPKey{})
wg.Done()
}()

Expand Down
28 changes: 28 additions & 0 deletions dex/ip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// This code is available on the terms of the project LICENSE.md file,
// also available online at https://blueoakcouncil.org/license/1.0.0.

package dex

import (
"net"
)

// IPKey is a IP address byte array. For an IPV6 address, the IPKey drops the
// interface identifier, which is the second half of the address.
type IPKey [net.IPv6len / 2]byte

// NewIPKey parses an IP address string into an IPKey.
func NewIPKey(addr string) IPKey {
host, _, err := net.SplitHostPort(addr)
if err == nil && host != "" {
addr = host
}
netIP := net.ParseIP(addr)
ip := netIP.To4()
if ip == nil {
ip = netIP.To16()
}
var ipKey IPKey
copy(ipKey[:], ip)
return ipKey
}
93 changes: 93 additions & 0 deletions dex/ip_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// This code is available on the terms of the project LICENSE.md file,
// also available online at https://blueoakcouncil.org/license/1.0.0.

package dex

import (
"testing"
)

func TestIPKey(t *testing.T) {
if NewIPKey("127.0.0.1") == NewIPKey("127.0.0.2") {
t.Fatalf("IPKey v4 failed basic comparison test, %x = %x", NewIPKey("127.0.0.1"), NewIPKey("127.0.0.2"))
}
if NewIPKey("[a:b:c:d::]:1234") == NewIPKey("[a:b:c:e::]:1234") {
t.Fatalf("IPKey v6 failed basic comparison test, %x = %x", NewIPKey("[a:b:c:d]"), NewIPKey("[a:b:c:d]"))
}

tests := []struct {
name, addr1, addr2 string
wantEqual bool
}{
{
name: "ipv4 unequal",
addr1: "127.0.0.1",
addr2: "127.0.0.2",
wantEqual: false,
},
{
name: "ipv4 equal",
addr1: "127.0.0.1",
addr2: "127.0.0.1",
wantEqual: true,
},
{
name: "ipv4 port unequal",
addr1: "127.0.0.1:1234",
addr2: "127.0.0.2:1234",
wantEqual: false,
},
{
name: "ipv4 port equal",
addr1: "127.0.0.1:1234",
addr2: "127.0.0.1:1234",
wantEqual: true,
},
{
name: "ipv4 port equal noport",
addr1: "127.0.0.1",
addr2: "127.0.0.1:1234",
wantEqual: true,
},
{
name: "ipv6 port unequal",
addr1: "[a:b:c:d::]:1234",
addr2: "[a:b:c:e::]:1234",
wantEqual: false,
},
{
name: "ipv6 port equal",
addr1: "[a:b:c:d::]:1234",
addr2: "[a:b:c:d::]:1234",
wantEqual: true,
},
{
name: "ipv6 port equal noport",
addr1: "a:b:c:d::",
addr2: "[a:b:c:d::]:1234",
wantEqual: true,
},
{
name: "ipv6 mask equal",
addr1: "[a:b:c:d:e:f:a:b]:1234",
addr2: "[a:b:c:d:f:d:c:f]:1234",
wantEqual: true,
},
}

zeroKey := IPKey{}

for _, tt := range tests {
ipKey1 := NewIPKey(tt.addr1)
ipKey2 := NewIPKey(tt.addr2)

if ipKey1 == zeroKey || ipKey2 == zeroKey {
t.Fatalf("%s: zeroKey found. ipKey1 = %x, ipKey2 = %x", tt.name, ipKey1[:], ipKey2[:])
}

if (ipKey1 == ipKey2) != tt.wantEqual {
t.Fatalf("%s: wantEqual = %t, addr1 = %s, addr2 = %s, ipKey1 = %x, ipKey2 = %x",
tt.name, tt.wantEqual, tt.addr1, tt.addr2, ipKey1[:], ipKey2[:])
}
}
}
Loading

0 comments on commit 08afde3

Please sign in to comment.