From ce9047912ae125204614caf585b213112307723f Mon Sep 17 00:00:00 2001 From: Gergely Novak Date: Tue, 5 Jan 2021 14:42:01 +0100 Subject: [PATCH] Data API v2 --- .circleci/config.yml | 5 +- alpaca/entities.go | 19 ++ alpaca/rest.go | 211 +++++++++++++++ examples/v2stream/v2stream.go | 58 ++++ go.mod | 8 +- go.sum | 68 ++++- v2/entities.go | 78 ++++++ v2/stream/datav2.go | 481 ++++++++++++++++++++++++++++++++++ v2/stream/entities.go | 40 +++ v2/stream/stream.go | 105 ++++++++ 10 files changed, 1063 insertions(+), 10 deletions(-) create mode 100644 examples/v2stream/v2stream.go create mode 100644 v2/entities.go create mode 100644 v2/stream/datav2.go create mode 100644 v2/stream/entities.go create mode 100644 v2/stream/stream.go diff --git a/.circleci/config.yml b/.circleci/config.yml index 952921b..6f0e2ca 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -2,9 +2,8 @@ version: 2 jobs: build: docker: - - image: circleci/golang:1.10.3 + - image: circleci/golang:1.14.13 working_directory: /go/src/github.com/alpacahq/alpaca-trade-api-go steps: - checkout - - run: dep ensure - - run: go test -v -cover ./... \ No newline at end of file + - run: go test -v -cover ./... diff --git a/alpaca/entities.go b/alpaca/entities.go index 10b6e3a..7967e8c 100644 --- a/alpaca/entities.go +++ b/alpaca/entities.go @@ -3,6 +3,7 @@ package alpaca import ( "time" + v2 "github.com/alpacahq/alpaca-trade-api-go/v2" "github.com/shopspring/decimal" ) @@ -203,6 +204,24 @@ type Aggregates struct { Results []AggV2 `json:"results"` } +type tradeResponse struct { + Symbol string `json:"symbol"` + NextPageToken *string `json:"next_page_token"` + Trades []v2.Trade `json:"trades"` +} + +type quoteResponse struct { + Symbol string `json:"symbol"` + NextPageToken *string `json:"next_page_token"` + Quotes []v2.Quote `json:"quotes"` +} + +type barResponse struct { + Symbol string `json:"symbol"` + NextPageToken *string `json:"next_page_token"` + Bars []v2.Bar `json:"bars"` +} + type CalendarDay struct { Date string `json:"date"` Open string `json:"open"` diff --git a/alpaca/rest.go b/alpaca/rest.go index 8cb6588..53d0630 100644 --- a/alpaca/rest.go +++ b/alpaca/rest.go @@ -14,6 +14,7 @@ import ( "time" "github.com/alpacahq/alpaca-trade-api-go/common" + v2 "github.com/alpacahq/alpaca-trade-api-go/v2" ) const ( @@ -66,6 +67,11 @@ func defaultDo(c *Client, req *http.Request) (*http.Response, error) { return resp, nil } +const ( + // v2MaxLimit is the maximum allowed limit parameter for all v2 endpoints + v2MaxLimit = 10000 +) + func init() { if s := os.Getenv("APCA_API_BASE_URL"); s != "" { base = s @@ -404,6 +410,189 @@ func (c *Client) GetLastTrade(symbol string) (*LastTradeResponse, error) { return lastTrade, nil } +// GetTrades returns a channel that will be populated with the trades for the given symbol +// that happened between the given start and end times, limited to the given limit. +func (c *Client) GetTrades(symbol string, start, end time.Time, limit int) <-chan v2.TradeItem { + ch := make(chan v2.TradeItem) + + go func() { + defer close(ch) + + u, err := url.Parse(fmt.Sprintf("%s/v2/stocks/%s/trades", dataURL, symbol)) + if err != nil { + ch <- v2.TradeItem{Error: err} + return + } + + q := u.Query() + q.Set("start", start.Format(time.RFC3339)) + q.Set("end", end.Format(time.RFC3339)) + + total := 0 + pageToken := "" + for { + actualLimit := limit - total + if actualLimit <= 0 { + return + } + if actualLimit > v2MaxLimit { + actualLimit = v2MaxLimit + } + q.Set("limit", fmt.Sprintf("%d", actualLimit)) + q.Set("page_token", pageToken) + u.RawQuery = q.Encode() + + resp, err := c.get(u) + if err != nil { + ch <- v2.TradeItem{Error: err} + return + } + + var tradeResp tradeResponse + if err = unmarshal(resp, &tradeResp); err != nil { + ch <- v2.TradeItem{Error: err} + return + } + + for _, trade := range tradeResp.Trades { + ch <- v2.TradeItem{Trade: trade} + } + if tradeResp.NextPageToken == nil { + return + } + pageToken = *tradeResp.NextPageToken + total += len(tradeResp.Trades) + } + }() + + return ch +} + +// GetQuotes returns a channel that will be populated with the quotes for the given symbol +// that happened between the given start and end times, limited to the given limit. +func (c *Client) GetQuotes(symbol string, start, end time.Time, limit int) <-chan v2.QuoteItem { + // NOTE: this method is very similar to GetTrades. + // With generics it would be almost trivial to refactor them to use a common base method, + // but without them it doesn't seem to be worth it + ch := make(chan v2.QuoteItem) + + go func() { + defer close(ch) + + u, err := url.Parse(fmt.Sprintf("%s/v2/stocks/%s/quotes", dataURL, symbol)) + if err != nil { + ch <- v2.QuoteItem{Error: err} + return + } + + q := u.Query() + q.Set("start", start.Format(time.RFC3339)) + q.Set("end", end.Format(time.RFC3339)) + + total := 0 + pageToken := "" + for { + actualLimit := limit - total + if actualLimit <= 0 { + return + } + if actualLimit > v2MaxLimit { + actualLimit = v2MaxLimit + } + q.Set("limit", fmt.Sprintf("%d", actualLimit)) + q.Set("page_token", pageToken) + u.RawQuery = q.Encode() + + resp, err := c.get(u) + if err != nil { + ch <- v2.QuoteItem{Error: err} + return + } + + var quoteResp quoteResponse + if err = unmarshal(resp, "eResp); err != nil { + ch <- v2.QuoteItem{Error: err} + return + } + + for _, quote := range quoteResp.Quotes { + ch <- v2.QuoteItem{Quote: quote} + } + if quoteResp.NextPageToken == nil { + return + } + pageToken = *quoteResp.NextPageToken + total += len(quoteResp.Quotes) + } + }() + + return ch +} + +// GetBars returns a channel that will be populated with the bars for the given symbol +// between the given start and end times, limited to the given limit, +// using the given and timeframe and adjustment. +func (c *Client) GetBars( + symbol string, timeFrame v2.TimeFrame, adjustment v2.Adjustment, + start, end time.Time, limit int, +) <-chan v2.BarItem { + ch := make(chan v2.BarItem) + + go func() { + defer close(ch) + + u, err := url.Parse(fmt.Sprintf("%s/v2/stocks/%s/bars", dataURL, symbol)) + if err != nil { + ch <- v2.BarItem{Error: err} + return + } + + q := u.Query() + q.Set("start", start.Format(time.RFC3339)) + q.Set("end", end.Format(time.RFC3339)) + q.Set("adjustment", string(adjustment)) + q.Set("timeframe", string(timeFrame)) + + total := 0 + pageToken := "" + for { + actualLimit := limit - total + if actualLimit <= 0 { + return + } + if actualLimit > v2MaxLimit { + actualLimit = v2MaxLimit + } + q.Set("limit", fmt.Sprintf("%d", actualLimit)) + q.Set("page_token", pageToken) + u.RawQuery = q.Encode() + + resp, err := c.get(u) + if err != nil { + ch <- v2.BarItem{Error: err} + return + } + + var barResp barResponse + if err = unmarshal(resp, &barResp); err != nil { + ch <- v2.BarItem{Error: err} + return + } + + for _, bar := range barResp.Bars { + ch <- v2.BarItem{Bar: bar} + } + if barResp.NextPageToken == nil { + return + } + pageToken = *barResp.NextPageToken + total += len(barResp.Bars) + } + }() + + return ch +} + // CloseAllPositions liquidates all open positions at market price. func (c *Client) CloseAllPositions() error { u, err := url.Parse(fmt.Sprintf("%s/%s/positions", base, apiVersion)) @@ -801,6 +990,28 @@ func GetLastTrade(symbol string) (*LastTradeResponse, error) { return DefaultClient.GetLastTrade(symbol) } +// GetTrades returns a channel that will be populated with the trades for the given symbol +// that happened between the given start and end times, limited to the given limit. +func GetTrades(symbol string, start, end time.Time, limit int) <-chan v2.TradeItem { + return DefaultClient.GetTrades(symbol, start, end, limit) +} + +// GetQuotes returns a channel that will be populated with the quotes for the given symbol +// that happened between the given start and end times, limited to the given limit. +func GetQuotes(symbol string, start, end time.Time, limit int) <-chan v2.QuoteItem { + return DefaultClient.GetQuotes(symbol, start, end, limit) +} + +// GetBars returns a channel that will be populated with the bars for the given symbol +// between the given start and end times, limited to the given limit, +// using the given and timeframe and adjustment. +func GetBars( + symbol string, timeFrame v2.TimeFrame, adjustment v2.Adjustment, + start, end time.Time, limit int, +) <-chan v2.BarItem { + return DefaultClient.GetBars(symbol, timeFrame, adjustment, start, end, limit) +} + // GetPosition returns the account's position for the // provided symbol using the default Alpaca client. func GetPosition(symbol string) (*Position, error) { diff --git a/examples/v2stream/v2stream.go b/examples/v2stream/v2stream.go new file mode 100644 index 0000000..dc540bc --- /dev/null +++ b/examples/v2stream/v2stream.go @@ -0,0 +1,58 @@ +package main + +import ( + "fmt" + "os" + + "github.com/alpacahq/alpaca-trade-api-go/alpaca" + "github.com/alpacahq/alpaca-trade-api-go/common" + "github.com/alpacahq/alpaca-trade-api-go/v2/stream" +) + +func main() { + // You can set your credentials here in the code, or (preferably) via the + // APCA_API_KEY_ID and APCA_API_SECRET_KEY environment variables + apiKey := "YOUR_API_KEY_HERE" + apiSecret := "YOUR_API_SECRET_HERE" + if common.Credentials().ID == "" { + os.Setenv(common.EnvApiKeyID, apiKey) + } + if common.Credentials().Secret == "" { + os.Setenv(common.EnvApiSecretKey, apiSecret) + } + + stream.UseFeed("sip") + + if err := stream.SubscribeTradeUpdates(tradeUpdateHandler); err != nil { + panic(err) + } + + if err := stream.SubscribeTrades(tradeHandler, "AAPL"); err != nil { + panic(err) + } + + if err := stream.SubscribeQuotes(quoteHandler, "MSFT"); err != nil { + panic(err) + } + if err := stream.SubscribeBars(barHandler, "IBM"); err != nil { + panic(err) + } + + select {} +} + +func tradeUpdateHandler(update alpaca.TradeUpdate) { + fmt.Println("trade update", update) +} + +func tradeHandler(trade stream.Trade) { + fmt.Println("trade", trade) +} + +func quoteHandler(quote stream.Quote) { + fmt.Println("quote", quote) +} + +func barHandler(bar stream.Bar) { + fmt.Println("bar", bar) +} diff --git a/go.mod b/go.mod index 61897ff..9ca2a1f 100644 --- a/go.mod +++ b/go.mod @@ -4,12 +4,14 @@ go 1.14 require ( github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927 // indirect - github.com/davecgh/go-spew v1.1.1 // indirect - github.com/gorilla/websocket v1.4.0 + github.com/gorilla/websocket v1.4.1 github.com/kr/pretty v0.1.0 // indirect github.com/matryer/try v0.0.0-20161228173917-9ac251b645a2 // indirect + github.com/mitchellh/mapstructure v1.4.0 github.com/shopspring/decimal v1.2.0 - github.com/stretchr/testify v1.4.0 + github.com/stretchr/testify v1.6.1 + github.com/vmihailenco/msgpack/v5 v5.1.4 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect gopkg.in/matryer/try.v1 v1.0.0-20150601225556-312d2599e12e + nhooyr.io/websocket v1.8.6 ) diff --git a/go.sum b/go.sum index 43cc745..d301758 100644 --- a/go.sum +++ b/go.sum @@ -4,26 +4,86 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= -github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= +github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= +github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= +github.com/gin-gonic/gin v1.6.3 h1:ahKqKTFpO5KTPHxWZjEdPScmYaGtLo8Y4DMHoEsnp14= +github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M= +github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A= +github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q= +github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= +github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no= +github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= +github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY= +github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI= +github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0= +github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= +github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8= +github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= +github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo= +github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM= +github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/golang/protobuf v1.3.5 h1:F768QJ1E9tib+q5Sc8MkdJi1RxLTbRcTf8LJV56aRls= +github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= +github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM= +github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/json-iterator/go v1.1.9 h1:9yzud/Ht36ygwatGx56VwCZtlI/2AD15T1X2sjSuGns= +github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/klauspost/compress v1.10.3 h1:OP96hzwJVBIHYU52pVTI6CczrxPvrGfgqF9N5eTO0Q8= +github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y= +github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= github.com/matryer/try v0.0.0-20161228173917-9ac251b645a2 h1:JAEbJn3j/FrhdWA9jW8B5ajsLIjeuEHLi8xE4fk997o= github.com/matryer/try v0.0.0-20161228173917-9ac251b645a2/go.mod h1:0KeJpeMD6o+O4hW7qJOT7vyQPKrWmj26uf5wMc/IiIs= +github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/mitchellh/mapstructure v1.4.0 h1:7ks8ZkOP5/ujthUsT07rNv+nkLXCQWKNHuwzOAesEks= +github.com/mitchellh/mapstructure v1.4.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ= github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= +github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= +github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= +github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= +github.com/vmihailenco/msgpack/v5 v5.1.4 h1:6K44/cU6dMNGkVTGGuu7ef2NdSRFMhAFGGLfE3cqtHM= +github.com/vmihailenco/msgpack/v5 v5.1.4/go.mod h1:C5gboKD0TJPqWDTVTtrQNfRbiBwHZGo8UTqP/9/XvLI= +github.com/vmihailenco/tagparser v0.1.2 h1:gnjoVuB/kljJ5wICEEOpx98oXMWPLj22G67Vbd1qPqc= +github.com/vmihailenco/tagparser v0.1.2/go.mod h1:OeAg3pn3UbLjkWt+rN9oFYB6u/cQgqMEUPoW2WPyhdI= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42 h1:vEOn+mP2zCOVzKckCZy6YsCtDblrpj/w7B9nxGNELpg= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/matryer/try.v1 v1.0.0-20150601225556-312d2599e12e h1:bJHzu9Qwc9wQRWJ/WVkJGAfs+riucl/tKAFNxf9pzqk= gopkg.in/matryer/try.v1 v1.0.0-20150601225556-312d2599e12e/go.mod h1:tve0rTLdGlwnXF7iBO9rbAEyeXvuuPx0n4DvXS/Nw7o= -gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +nhooyr.io/websocket v1.8.6 h1:s+C3xAMLwGmlI31Nyn/eAehUlZPwfYZu2JXM621Q5/k= +nhooyr.io/websocket v1.8.6/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0= diff --git a/v2/entities.go b/v2/entities.go new file mode 100644 index 0000000..ebc966c --- /dev/null +++ b/v2/entities.go @@ -0,0 +1,78 @@ +package v2 + +import "time" + +// Trade is a stock trade that happened on the market +type Trade struct { + ID int64 `json:"i"` + Exchange string `json:"x"` + Price float64 `json:"p"` + Size uint32 `json:"s"` + Timestamp time.Time `json:"t"` + Conditions []string `json:"c"` + Tape string `json:"z"` +} + +// TradeItem contains a single trade or an error +type TradeItem struct { + Trade Trade + Error error +} + +// Quote is a stock quote from the market +type Quote struct { + BidExchange string `json:"bx"` + BidPrice float64 `json:"bp"` + BidSize uint32 `json:"bs"` + AskExchange string `json:"ax"` + AskPrice float64 `json:"ap"` + AskSize uint32 `json:"as"` + Timestamp time.Time `json:"t"` + Conditions []string `json:"c"` + Tape string `json:"z"` +} + +// QuoteItem contains a single quote or an error +type QuoteItem struct { + Quote Quote + Error error +} + +// TimeFrame is the resolution of the bars +type TimeFrame string + +// List of time frames +const ( + Sec TimeFrame = "1Sec" + Min TimeFrame = "1Min" + Hour TimeFrame = "1Hour" + Day TimeFrame = "1Day" +) + +// Adjustment specifies the corporate action adjustment(s) for the bars +type Adjustment string + +// List of adjustments +const ( + None Adjustment = "--" + Raw Adjustment = "raw" + Split Adjustment = "split" + Dividend Adjustment = "dividend" + All Adjustment = "all" +) + +// Bar is an aggregate of trades +type Bar struct { + Open float64 `json:"o"` + High float64 `json:"h"` + Low float64 `json:"l"` + Close float64 `json:"c"` + Volume uint64 `json:"v"` + Timestamp time.Time `json:"t"` +} + +// BarItem contains a single bar or an error +type BarItem struct { + Bar Bar + Error error +} diff --git a/v2/stream/datav2.go b/v2/stream/datav2.go new file mode 100644 index 0000000..91fa0cf --- /dev/null +++ b/v2/stream/datav2.go @@ -0,0 +1,481 @@ +package stream + +import ( + "context" + "errors" + "log" + "net/http" + "net/url" + "os" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/alpacahq/alpaca-trade-api-go/common" + "github.com/mitchellh/mapstructure" + "github.com/vmihailenco/msgpack/v5" + "nhooyr.io/websocket" +) + +const ( + // MaxConnectionAttempts is the maximum number of retries for connecting to the websocket + MaxConnectionAttempts = 3 +) + +var ( + // DataStreamURL is the URL for the data websocket stream. + // The DATA_PROXY_WS environment variable overrides it. + DataStreamURL = "https://data.alpaca.markets" // TODO: Probably this URL will change. +) + +var ( + stream *datav2stream +) + +type datav2stream struct { + // opts + feed string + + // connection flow + conn *websocket.Conn + authenticated atomic.Value + closed atomic.Value + + // handlers + tradeHandlers map[string]func(trade Trade) + quoteHandlers map[string]func(quote Quote) + barHandlers map[string]func(bar Bar) + + // concurrency + readerOnce sync.Once + wsWriteMutex sync.Mutex + wsReadMutex sync.Mutex + handlersMutex sync.RWMutex +} + +func newDatav2Stream() *datav2stream { + if s := os.Getenv("DATA_PROXY_WS"); s != "" { + DataStreamURL = s + } + stream = &datav2stream{ + feed: "iex", + authenticated: atomic.Value{}, + tradeHandlers: make(map[string]func(trade Trade)), + quoteHandlers: make(map[string]func(quote Quote)), + barHandlers: make(map[string]func(bar Bar)), + } + + stream.authenticated.Store(false) + stream.closed.Store(false) + + return stream +} + +func (s *datav2stream) useFeed(feed string) error { + feed = strings.ToLower(feed) + switch feed { + case "iex", "sip": + default: + return errors.New("unsupported feed: " + feed) + } + if s.feed == feed { + return nil + } + s.feed = feed + s.connect() + return nil +} + +func (s *datav2stream) subscribeTrades(handler func(trade Trade), symbols ...string) error { + if err := s.ensureRunning(); err != nil { + return err + } + + if err := s.sub(symbols, nil, nil); err != nil { + return err + } + + s.handlersMutex.Lock() + defer s.handlersMutex.Unlock() + + for _, symbol := range symbols { + s.tradeHandlers[symbol] = handler + } + + return nil +} + +func (s *datav2stream) subscribeQuotes(handler func(quote Quote), symbols ...string) error { + if err := s.ensureRunning(); err != nil { + return err + } + + if err := s.sub(nil, symbols, nil); err != nil { + return err + } + + s.handlersMutex.Lock() + defer s.handlersMutex.Unlock() + + for _, symbol := range symbols { + s.quoteHandlers[symbol] = handler + } + + return nil +} + +func (s *datav2stream) subscribeBars(handler func(bar Bar), symbols ...string) error { + if err := s.ensureRunning(); err != nil { + return err + } + + if err := s.sub(nil, nil, symbols); err != nil { + return err + } + + s.handlersMutex.Lock() + defer s.handlersMutex.Unlock() + + for _, symbol := range symbols { + s.barHandlers[symbol] = handler + } + + return nil +} + +func (s *datav2stream) unsubscribe(trades []string, quotes []string, bars []string) error { + if err := s.ensureRunning(); err != nil { + return err + } + + s.handlersMutex.Lock() + defer s.handlersMutex.Unlock() + + for _, trade := range trades { + delete(s.tradeHandlers, trade) + } + for _, quote := range quotes { + delete(s.quoteHandlers, quote) + } + for _, bar := range bars { + delete(s.barHandlers, bar) + } + + if err := s.unsub(trades, quotes, bars); err != nil { + return err + } + + return nil +} + +func (s *datav2stream) close(final bool) error { + if s.conn == nil { + return nil + } + + s.wsWriteMutex.Lock() + defer s.wsWriteMutex.Unlock() + + if err := s.conn.Close(websocket.StatusNormalClosure, ""); err != nil { + return err + } + + if final { + s.closed.Store(true) + } + + s.conn = nil + + return nil +} + +func (s *datav2stream) ensureRunning() error { + if s.conn != nil { + return nil + } + + if err := s.connect(); err != nil { + return err + } + s.readerOnce.Do(func() { + go s.readForever() + }) + return nil +} + +func (s *datav2stream) connect() error { + // first close any previous connections + s.close(false) + + s.authenticated.Store(false) + conn, err := openSocket(s.feed) + if err != nil { + return err + } + s.conn = conn + if err := s.auth(); err != nil { + return err + } + trades := make([]string, 0, len(s.tradeHandlers)) + for trade := range s.tradeHandlers { + trades = append(trades, trade) + } + quotes := make([]string, 0, len(s.quoteHandlers)) + for quote := range s.quoteHandlers { + quotes = append(quotes, quote) + } + bars := make([]string, 0) + for bar := range s.barHandlers { + bars = append(bars, bar) + } + return s.sub(trades, quotes, bars) +} + +func (s *datav2stream) readForever() { + for { + s.wsReadMutex.Lock() + msgType, b, err := s.conn.Read(context.TODO()) + s.wsReadMutex.Unlock() + + if err != nil { + if websocket.CloseStatus(err) == websocket.StatusNormalClosure { + // if this was a graceful closure, don't reconnect + if s.closed.Load().(bool) { + return + } + } else { + log.Printf("alpaca stream read error (%v)", err) + } + + err := s.connect() + if err != nil { + panic(err) + } + } + if msgType != websocket.MessageBinary { + continue + } + + var messages []map[string]interface{} + if err = msgpack.Unmarshal(b, &messages); err != nil { + log.Printf("failed to incoming unmarshal message: %v", err) + continue + } + + for _, msg := range messages { + if err := s.handleMsg(msg); err != nil { + log.Printf("error handling incoming message: %v", err) + continue + } + } + } +} + +func (s *datav2stream) handleMsg(msg map[string]interface{}) error { + T, ok := msg["T"].(string) + if !ok { + return errors.New("unexpected message: T missing") + } + + switch T { + case "t", "q", "b": + default: + return nil + } + + symbol, ok := msg["S"].(string) + if !ok { + return errors.New("unexpected message: S missing") + } + + switch T { + case "t": + var trade Trade + if err := mapstructure.Decode(msg, &trade); err != nil { + return err + } + + s.handlersMutex.RLock() + defer s.handlersMutex.RUnlock() + + handler, ok := s.tradeHandlers[symbol] + if !ok { + handler, ok = s.tradeHandlers["*"] + if !ok { + return errors.New("trade handler missing for symbol: " + symbol) + } + } + handler(trade) + case "q": + var quote Quote + if err := mapstructure.Decode(msg, "e); err != nil { + return err + } + + s.handlersMutex.RLock() + defer s.handlersMutex.RUnlock() + handler, ok := s.quoteHandlers[symbol] + if !ok { + handler, ok = s.quoteHandlers["*"] + if !ok { + return errors.New("quote handler missing for symbol: " + symbol) + } + } + handler(quote) + case "b": + var bar Bar + if err := mapstructure.Decode(msg, &bar); err != nil { + return err + } + + s.handlersMutex.RLock() + defer s.handlersMutex.RUnlock() + + handler, ok := s.barHandlers[symbol] + if !ok { + handler, ok = s.barHandlers["*"] + if !ok { + return errors.New("bar handler missing for symbol: " + symbol) + } + } + handler(bar) + } + return nil +} + +func (s *datav2stream) sub(trades []string, quotes []string, bars []string) error { + return s.handleSubscription(true, trades, quotes, bars) +} + +func (s *datav2stream) unsub(trades []string, quotes []string, bars []string) error { + return s.handleSubscription(false, trades, quotes, bars) +} + +func (s *datav2stream) handleSubscription(subscribe bool, trades []string, quotes []string, bars []string) error { + if len(trades)+len(quotes)+len(bars) == 0 { + return nil + } + + action := "subscribe" + if !subscribe { + action = "unsubscribe" + } + + msg, err := msgpack.Marshal(map[string]interface{}{ + "action": action, + "trades": trades, + "quotes": quotes, + "bars": bars, + }) + if err != nil { + return err + } + + s.wsWriteMutex.Lock() + defer s.wsWriteMutex.Unlock() + + if err := s.conn.Write(context.TODO(), websocket.MessageBinary, msg); err != nil { + return err + } + + return nil +} + +func (s *datav2stream) isAuthenticated() bool { + return s.authenticated.Load().(bool) +} + +func (s *datav2stream) auth() (err error) { + if s.isAuthenticated() { + return + } + + msg, err := msgpack.Marshal(map[string]string{ + "action": "auth", + "key": common.Credentials().ID, + "secret": common.Credentials().Secret, + }) + if err != nil { + return err + } + + s.wsWriteMutex.Lock() + defer s.wsWriteMutex.Unlock() + + if err := s.conn.Write(context.TODO(), websocket.MessageBinary, msg); err != nil { + return err + } + + var resps []map[string]interface{} + + // ensure the auth response comes in a timely manner + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + + s.wsReadMutex.Lock() + defer s.wsReadMutex.Unlock() + + _, b, err := s.conn.Read(ctx) + if err != nil { + return err + } + if err := msgpack.Unmarshal(b, &resps); err != nil { + return err + } + if len(resps) < 1 { + return errors.New("received empty array") + } + if resps[0]["T"] != "success" || resps[0]["msg"] != "authenticated" { + return errors.New("failed to authorize alpaca stream") + } + + s.authenticated.Store(true) + + return +} + +func openSocket(feed string) (*websocket.Conn, error) { + scheme := "wss" + ub, _ := url.Parse(DataStreamURL) + switch ub.Scheme { + case "http", "ws": + scheme = "ws" + } + u := url.URL{Scheme: scheme, Host: ub.Host, Path: "/v2/stream/" + strings.ToLower(feed)} + for attempts := 1; attempts <= MaxConnectionAttempts; attempts++ { + c, _, err := websocket.Dial(context.TODO(), u.String(), &websocket.DialOptions{ + CompressionMode: websocket.CompressionContextTakeover, + HTTPHeader: http.Header{ + "Content-Type": []string{"application/msgpack"}, + }, + }) + if err == nil { + return c, readConnected(c) + } + log.Printf("failed to open Alpaca data stream: %v", err) + if attempts == MaxConnectionAttempts { + return nil, err + } + time.Sleep(time.Second) + } + return nil, errors.New("could not open Alpaca data stream (max retries exceeded)") +} + +func readConnected(conn *websocket.Conn) error { + _, b, err := conn.Read(context.TODO()) + if err != nil { + return err + } + var resps []map[string]interface{} + if err := msgpack.Unmarshal(b, &resps); err != nil { + return err + } + if len(resps) < 1 { + return errors.New("received empty array") + } + if resps[0]["T"] != "success" || resps[0]["msg"] != "connected" { + return errors.New("missing connected message") + } + return nil +} diff --git a/v2/stream/entities.go b/v2/stream/entities.go new file mode 100644 index 0000000..b809e38 --- /dev/null +++ b/v2/stream/entities.go @@ -0,0 +1,40 @@ +package stream + +import "time" + +// Trade is a stock trade that happened on the market +type Trade struct { + ID int64 `mapstructure:"i"` + Symbol string `mapstructure:"S"` + Exchange string `mapstructure:"x"` + Price float64 `mapstructure:"p"` + Size uint32 `mapstructure:"s"` + Timestamp time.Time `mapstructure:"t"` + Conditions []string `mapstructure:"c"` + Tape string `mapstructure:"z"` +} + +// Quote is a stock quote from the market +type Quote struct { + Symbol string `mapstructure:"S"` + BidExchange string `mapstructure:"bx"` + BidPrice float64 `mapstructure:"bp"` + BidSize uint32 `mapstructure:"bs"` + AskExchange string `mapstructure:"ax"` + AskPrice float64 `mapstructure:"ap"` + AskSize uint32 `mapstructure:"as"` + Timestamp time.Time `mapstructure:"t"` + Conditions []string `mapstructure:"c"` + Tape string `mapstructure:"z"` +} + +// Bar is an aggregate of trades +type Bar struct { + Symbol string `mapstructure:"S"` + Open float64 `mapstructure:"o"` + High float64 `mapstructure:"h"` + Low float64 `mapstructure:"l"` + Close float64 `mapstructure:"c"` + Volume uint64 `mapstructure:"v"` + Timestamp time.Time `mapstructure:"t"` +} diff --git a/v2/stream/stream.go b/v2/stream/stream.go new file mode 100644 index 0000000..37053d1 --- /dev/null +++ b/v2/stream/stream.go @@ -0,0 +1,105 @@ +package stream + +import ( + "log" + "sync" + + "github.com/alpacahq/alpaca-trade-api-go/alpaca" +) + +var ( + once sync.Once + dataStream *datav2stream + alpacaStream *alpaca.Stream +) + +func initStreamsOnce() { + once.Do(func() { + if dataStream == nil { + dataStream = newDatav2Stream() + } + if alpacaStream == nil { + alpacaStream = alpaca.GetStream() + } + }) +} + +// UseFeed sets the feed used by the data v2 stream. Supported feeds: iex, sip. +func UseFeed(feed string) error { + initStreamsOnce() + return dataStream.useFeed(feed) +} + +// SubscribeTrades issues a subscribe command to the given symbols and +// registers the handler to be called for each trade. +func SubscribeTrades(handler func(trade Trade), symbols ...string) error { + initStreamsOnce() + return dataStream.subscribeTrades(handler, symbols...) +} + +// SubscribeQuotes issues a subscribe command to the given symbols and +// registers the handler to be called for each quote. +func SubscribeQuotes(handler func(quote Quote), symbols ...string) error { + initStreamsOnce() + return dataStream.subscribeQuotes(handler, symbols...) +} + +// SubscribeBars issues a subscribe command to the given symbols and +// registers the handler to be called for each bar. +func SubscribeBars(handler func(bar Bar), symbols ...string) error { + initStreamsOnce() + return dataStream.subscribeBars(handler, symbols...) +} + +// SubscribeTradeUpdates issues a subscribe command to the user's trade updates and +// registers the handler to be called for each update. +func SubscribeTradeUpdates(handler func(update alpaca.TradeUpdate)) error { + initStreamsOnce() + return alpacaStream.Subscribe(alpaca.TradeUpdates, func(msg interface{}) { + update, ok := msg.(alpaca.TradeUpdate) + if !ok { + log.Printf("unexpected trade update: %v", msg) + return + } + handler(update) + }) +} + +// UnsubscribeTrades issues an unsubscribe command for the given trade symbols +func UnsubscribeTrades(symbols ...string) error { + initStreamsOnce() + return dataStream.unsubscribe(symbols, nil, nil) +} + +// UnsubscribeQuotes issues an unsubscribe command for the given quote symbols +func UnsubscribeQuotes(symbols ...string) error { + initStreamsOnce() + return dataStream.unsubscribe(nil, symbols, nil) +} + +// UnsubscribeBars issues an unsubscribe command for the given bar symbols +func UnsubscribeBars(symbols ...string) error { + initStreamsOnce() + return dataStream.unsubscribe(nil, nil, symbols) +} + +// UnsubscribeTradeUpdates issues an unsubscribe command for the user's trade updates +func UnsubscribeTradeUpdates() error { + initStreamsOnce() + return alpacaStream.Unsubscribe(alpaca.TradeUpdates) +} + +// Close gracefully closes all streams +func Close() error { + var alpacaErr, dataErr error + if alpacaStream != nil { + alpacaErr = alpacaStream.Close() + } + if dataStream != nil { + dataErr = dataStream.close(true) + } + if alpacaErr != nil { + return alpacaErr + } + return dataErr +}