From 1c5f92bb1883d8e5be075009fc85786bc9f7c634 Mon Sep 17 00:00:00 2001 From: Marvin Hansen Date: Fri, 9 Jul 2021 19:34:13 +0800 Subject: [PATCH] Go-ws streaming SDK (#111) * Added go-ws web streaming SDK. * Added missing go.sum * Updated Readme for go-ws * Renamed Reconnect to ResetConnection to reflect intent! * Applied fix for Issue https://github.com/coinapi/coinapi-sdk/issues/110 * Updated sample code * Removed test covering volume bug * Updated code comment. --- data-api/go-ws/README.md | 23 ++ data-api/go-ws/api/sdk.go | 10 + data-api/go-ws/api/types/config.go | 16 ++ data-api/go-ws/api/types/enums.go | 48 ++++ data-api/go-ws/api/types/functions.go | 11 + data-api/go-ws/api/types/interface.go | 23 ++ data-api/go-ws/api/types/messages.go | 358 +++++++++++++++++++++++++ data-api/go-ws/api/utils.go | 59 ++++ data-api/go-ws/api/v1/dbg_utils.go | 16 ++ data-api/go-ws/api/v1/endpoints.go | 28 ++ data-api/go-ws/api/v1/sdk.go | 42 +++ data-api/go-ws/api/v1/sdk_utils.go | 21 ++ data-api/go-ws/api/v1/set_methods.go | 50 ++++ data-api/go-ws/api/v1/websocket.go | 88 ++++++ data-api/go-ws/api/v1/ws_connection.go | 54 ++++ data-api/go-ws/api/v1/ws_handler.go | 163 +++++++++++ data-api/go-ws/go.mod | 12 + data-api/go-ws/go.sum | 20 ++ data-api/go-ws/main.go | 286 ++++++++++++++++++++ 19 files changed, 1328 insertions(+) create mode 100644 data-api/go-ws/README.md create mode 100644 data-api/go-ws/api/sdk.go create mode 100644 data-api/go-ws/api/types/config.go create mode 100644 data-api/go-ws/api/types/enums.go create mode 100644 data-api/go-ws/api/types/functions.go create mode 100644 data-api/go-ws/api/types/interface.go create mode 100644 data-api/go-ws/api/types/messages.go create mode 100644 data-api/go-ws/api/utils.go create mode 100644 data-api/go-ws/api/v1/dbg_utils.go create mode 100644 data-api/go-ws/api/v1/endpoints.go create mode 100644 data-api/go-ws/api/v1/sdk.go create mode 100644 data-api/go-ws/api/v1/sdk_utils.go create mode 100644 data-api/go-ws/api/v1/set_methods.go create mode 100644 data-api/go-ws/api/v1/websocket.go create mode 100644 data-api/go-ws/api/v1/ws_connection.go create mode 100644 data-api/go-ws/api/v1/ws_handler.go create mode 100644 data-api/go-ws/go.mod create mode 100644 data-api/go-ws/go.sum create mode 100644 data-api/go-ws/main.go diff --git a/data-api/go-ws/README.md b/data-api/go-ws/README.md new file mode 100644 index 0000000000..dbaa46c765 --- /dev/null +++ b/data-api/go-ws/README.md @@ -0,0 +1,23 @@ + + +## Installation + +Install the following dependencies: + +```shell +go mod download github.com/bitly/go-simplejson +go mod download github.com/shopspring/decimal +go mod download github.com/gorilla/websocket +``` + + +## Run examples: + +```bash +cd /path/to/workspace +export GOPATH=/path/to/workspace +go get -u github.com/CoinAPI/coinapi-sdk/data-api/go-ws +wget -v https://raw.githubusercontent.com/coinapi/coinapi-sdk/master/go-ws/main.go +# update api key inside main.go +go run ./main.go +``` diff --git a/data-api/go-ws/api/sdk.go b/data-api/go-ws/api/sdk.go new file mode 100644 index 0000000000..a6736b2a03 --- /dev/null +++ b/data-api/go-ws/api/sdk.go @@ -0,0 +1,10 @@ +package api + +import . "go-ws/api/types" + +func NewSDK(apiKey string) (sdk SDK) { + validateApiKey(apiKey) + config := getSDKConfig(apiKey) + sdk = getSDK(config) + return sdk +} diff --git a/data-api/go-ws/api/types/config.go b/data-api/go-ws/api/types/config.go new file mode 100644 index 0000000000..93170932c0 --- /dev/null +++ b/data-api/go-ws/api/types/config.go @@ -0,0 +1,16 @@ +package types + +type SdkConfig struct { + ApiKey string + ApiVersion ApiVersion + EnvironmentType EnvironmentType + ReconnectType ReconnectType +} + +// WsConfig webservice configuration +type WsConfig struct { + ApiKey string + Endpoint string + WebsocketKeepalive bool + WebsocketTimeout int +} diff --git a/data-api/go-ws/api/types/enums.go b/data-api/go-ws/api/types/enums.go new file mode 100644 index 0000000000..271abc1a30 --- /dev/null +++ b/data-api/go-ws/api/types/enums.go @@ -0,0 +1,48 @@ +package types + +// ApiVersion custom ENUM for SDK forward compatibility +type ApiVersion int + +const ( + ApiV1 ApiVersion = iota +) + +// EnvironmentType +// https://docs.coinapi.io/#endpoints-2 +type EnvironmentType int + +const ( + ProdEncrypted EnvironmentType = iota + ProdInsecure + TestEncrypted + TestInsecure +) + +// MessageType replicates the official incoming message types as (kinda) string enum. +// https://docs.coinapi.io/#messages +type MessageType string + +const ( + TRADE MessageType = "trade" + QUOTE MessageType = "quote" + BOOK_L2_FULL MessageType = "book" // Orderbook L2 (Full) + BOOK_L2_TOP_5 MessageType = "book5" // Orderbook L2 (5 best Bid / Ask) + BOOK_L2_TOP_20 MessageType = "book20" // Orderbook L2 (20 best Bid / Ask) + BOOK_L2_TOP_50 MessageType = "book50" // Orderbook L2 (50 best Bid / Ask) + BOOK_L3_FULL MessageType = "book_l3" // Orderbook L3 (Full) https://docs.coinapi.io/#orderbook-l3-full-in + OHLCV MessageType = "ohlcv" + VOLUME MessageType = "volume" + HEARTBEAT MessageType = "hearbeat" // DO NOT FIX! it's a typo in the official msg spec! + ERROR MessageType = "error" // Otherwise processMessage(.) fails to handle heartbeat messages! + EXCHANGERATE MessageType = "exrate" + RECONNECT MessageType = "reconnect" +) + +// ReconnectType defines the reconnect behavior upon receiving a reconnect message +// https://docs.coinapi.io/#reconnect-in +type ReconnectType int + +const ( + OnConnectionClose ReconnectType = iota + OnReconnectMessage +) diff --git a/data-api/go-ws/api/types/functions.go b/data-api/go-ws/api/types/functions.go new file mode 100644 index 0000000000..95de876eec --- /dev/null +++ b/data-api/go-ws/api/types/functions.go @@ -0,0 +1,11 @@ +package types + +// InvokeFunction is a unified function type for all event handlers. +// https://yourbasic.org/golang/function-pointer-type-declaration/ +type InvokeFunction func(message *DataMessage) (err error) + +// WsHandler handle raw websocket message +type WsHandler func(message []byte) + +// WsErrHandler handles raw websocket errors +type WsErrHandler func(err error) diff --git a/data-api/go-ws/api/types/interface.go b/data-api/go-ws/api/types/interface.go new file mode 100644 index 0000000000..83a96159eb --- /dev/null +++ b/data-api/go-ws/api/types/interface.go @@ -0,0 +1,23 @@ +package types + +type SDK interface { + SendHello(hello *Hello) (err error) + OpenConnection() (err error) + CloseConnection() (err error) + // ResetConnection hard reset: closes current connection, opens a new one, + // and resends the last hello message. No message buffering! + ResetConnection() (err error) + + // sys handlers + SetErrorInvoke(function InvokeFunction) + SetHeartBeatInvoke(function InvokeFunction) + SetReconnectInvoke(function InvokeFunction) + + // Data handlers + SetExRateInvoke(function InvokeFunction) + SetTradesInvoke(function InvokeFunction) + SetQuoteInvoke(function InvokeFunction) + SetBookInvoke(function InvokeFunction) + SetOHLCVInvoke(function InvokeFunction) + SetVolumeInvoke(function InvokeFunction) +} diff --git a/data-api/go-ws/api/types/messages.go b/data-api/go-ws/api/types/messages.go new file mode 100644 index 0000000000..0c368c1b0f --- /dev/null +++ b/data-api/go-ws/api/types/messages.go @@ -0,0 +1,358 @@ +package types + +import ( + "bytes" + "encoding/json" + "fmt" + "github.com/shopspring/decimal" + "log" + "time" +) + +// DataMessage composite type substitutes missing type inheritance. +type DataMessage struct { + Type *MessageType + ErrorMessage *ErrorMessage + ExchangeRate *ExchangeRate + Orderbook *OrderBook + Ohlcv *Ohlcv + Quote *Quote + Reconnect *Reconnect + Hearbeat *Heartbeat + Trade *Trade + Volume *Volume +} + +func (s DataMessage) String() string { + return fmt.Sprintf("{Type: %v, ErrorMessage: %v, ExchangeRate %v, Orderbook: %v, Ohlcv: %v, Quote: %v, Reconnect: %v, Reconnect: %v, Hearbeat: %v, Trade: %v, Volume: %v}", + s.Type, + s.ErrorMessage, + s.ExchangeRate, + s.Orderbook, + s.Ohlcv, + s.Quote, + s.Reconnect, + s.Hearbeat, + s.Trade, + s.Volume, + ) +} + +// Ask subtype for OrderBook +type Ask struct { + Type MessageType `json:"type"` + Price decimal.Decimal `json:"price"` + Size decimal.Decimal `json:"size"` +} + +func (s Ask) String() string { + return fmt.Sprintf("{Type: %v, Price: %v, Size: %v}", + s.Type, + s.Price, + s.Size, + ) +} + +// Bid subtype for OrderBook +type Bid struct { + Type MessageType `json:"type"` + Price decimal.Decimal `json:"price"` + Size decimal.Decimal `json:"size"` +} + +func (s Bid) String() string { + return fmt.Sprintf("{Type: %v, Price: %v, Size: %v}", + s.Type, + s.Price, + s.Size, + ) +} + +// ErrorMessage expect that the underlying WebSocket connection will be closed by us after sending an error message. +// https://docs.coinapi.io/#error-handling +type ErrorMessage struct { + Type MessageType `json:"type"` + Message string `json:"message"` +} + +func (s ErrorMessage) String() string { + return fmt.Sprintf("{Type: %v, Message: %v}", + s.Type, + s.Message, + ) +} + +// ExchangeRate message delivering updates of (VWAP-24H) Volume Weighted Average Price between pair of assets +// https://docs.coinapi.io/#exchange-rate-in +type ExchangeRate struct { + Type MessageType `json:"type"` + Asset_id_base string `json:"asset_id_base"` + Asset_id_quote string `json:"asset_id_quote"` + Time time.Time `json:"time"` + Rate decimal.Decimal `json:"rate"` +} + +func (s ExchangeRate) String() string { + return fmt.Sprintf("{Type: %v, Asset_id_base: %v, Asset_id_quote: %v Time: %v, Rate: %v, }", + s.Type, + s.Asset_id_base, + s.Asset_id_quote, + s.Time, + s.Rate, + ) +} + +// Hello After your WebSocket connection is established, you must send us a Hello message which contains: +// * Stream preferences +// * API key for authorization +// If your message will be incorrect, we will send you error message and disconnect connection afterwards. +// Hello message can be repeated, each one will cause subscription scope override without interruption of your WebSocket connection. +// https://docs.coinapi.io/#hello-out +type Hello struct { + Type MessageType `json:"type"` + Api_key string `json:"apikey"` + Heartbeat bool `json:"heartbeat"` + Subscribe_data_type []string `json:"subscribe_data_type"` + Subscribe_filter_symbol_id []string `json:"subscribe_filter_symbol_id"` + Subscribe_filter_asset_id []string `json:"subscribe_filter_asset_id"` + Subscribe_filter_period_id []string `json:"subscribe_filter_period_id"` + Subscribe_filter_exchange_id []string `json:"subscribe_filter_exchange_id"` + Subscribe_update_limit_ms_quote uint32 `json:"subscribe_update_limit_ms_quote"` + Subscribe_update_limit_ms_book_snapshot uint32 `json:"subscribe_update_limit_ms_book_snapshot"` +} + +func (s Hello) String() string { + return fmt.Sprintf("{Type: %v, Api_key: %v, Heartbeat: %v Subscribe_data_type: %v, Subscribe_filter_symbol_id: %v, Subscribe_filter_asset_id: %v, Subscribe_filter_period_id: %v, Subscribe_filter_exchange_id: %v, Subscribe_update_limit_ms_quote: %v, Subscribe_update_limit_ms_book_snapshot: %v}", + s.Type, + s.Api_key, + s.Heartbeat, + s.Subscribe_data_type, + s.Subscribe_filter_symbol_id, + s.Subscribe_filter_asset_id, + s.Subscribe_filter_period_id, + s.Subscribe_filter_exchange_id, + s.Subscribe_update_limit_ms_quote, + s.Subscribe_update_limit_ms_book_snapshot, + ) +} +func (s Hello) GetJSON() (b []byte, err error) { + + b, err = json.Marshal(s) + if err != nil { + log.Println("Error marshaling Hello object") + log.Println(err) + return nil, err + } + + var prettyJSON bytes.Buffer + err = json.Indent(&prettyJSON, b, "", "\t") + if err != nil { + log.Println("Error making JSON pretty") + log.Println(err) + return nil, err + } + + return b, nil +} + +func (s Hello) PrintJSON() { + b, err := s.GetJSON() + if err != nil { + log.Println(err) + } + println(string(b)) + +} + +// OrderBook Book message is sent for each snapshot or update of the order book. +// After subscription to this data type is initialized, we immediately start delivering updates to the order book +// and at least one snapshot will be provided as soon as possible with the nearest update of the book. +// Book message represents total amount of bids and asks aggregated by price level. +// https://docs.coinapi.io/#orderbook-l2-full-in +type OrderBook struct { + Type MessageType `json:"type"` + Symbol_id string `json:"symbol_id"` + Sequence uint32 `json:"sequence"` + Time_exchange time.Time `json:"time_exchange"` + Time_coinapi time.Time `json:"time_coinapi"` + Asks []Bid `json:"asks"` + Bids []Bid `json:"bids"` +} + +func (s OrderBook) String() string { + return fmt.Sprintf(" { MessageType: %v, Symbol_id: %v, Sequence: %v, Time_exchange: %v, Time_coinapi: %v, Asks: %v, Bids: %v, }", + s.Type, + s.Symbol_id, + s.Sequence, + s.Time_exchange, + s.Time_coinapi, + s.Asks, + s.Bids, + ) +} + +// Ohlcv A OHLCV message is sent for each update on the periods between 1SEC and 1MIN. +// https://docs.coinapi.io/#ohlcv-in +type Ohlcv struct { + Type MessageType `json:"type"` + Symbol_id string `json:"symbol_id"` + Sequence uint32 `json:"sequence"` + PeriodID string `json:"period_id"` + Time_period_start time.Time `json:"time_period_start"` + Time_period_end time.Time `json:"time_period_end"` + Time_open time.Time `json:"time_open"` + Time_close time.Time `json:"time_close"` + Price_open decimal.Decimal `json:"price_open"` + Price_high decimal.Decimal `json:"price_high"` + Price_low decimal.Decimal `json:"price_low"` + Price_close decimal.Decimal `json:"price_close"` + Volume_traded decimal.Decimal `json:"volume_traded"` + Trades_count uint32 `json:"trades_count"` +} + +func (s Ohlcv) String() string { + return fmt.Sprintf(" {Type: %v, Symbol_id: %v, Sequence: %v, Time_period_start: %v, Time_period_start: %v, Time_period_end: %v, Time_open: %v, Time_close: %v, Price_open: %v, Price_high: %v, Price_low: %v,Price_close: %v, Volume_traded: %v, Trades_count: %v}", + s.Type, + s.Symbol_id, + s.Sequence, + s.PeriodID, + s.Time_period_start, + s.Time_period_end, + s.Time_open, + s.Time_close, + s.Price_open, + s.Price_high, + s.Price_low, + s.Price_close, + s.Volume_traded, + s.Trades_count, + ) +} + +// Quote Quote message is sent for each update on orderbook first best bid or ask level. +// https://docs.coinapi.io/#quotes-in +type Quote struct { + Type MessageType `json:"type"` + Symbol_id string `json:"symbol_id"` + Sequence uint32 `json:"sequence"` + Time_exchange time.Time `json:"time_exchange"` + Time_coinapi time.Time `json:"time_coinapi"` + Ask_price decimal.Decimal `json:"ask_price"` + Ask_size decimal.Decimal `json:"ask_size"` + Bid_price decimal.Decimal `json:"bid_price"` + Bid_size decimal.Decimal `json:"bid_size"` +} + +func (s Quote) String() string { + return fmt.Sprintf(" {Type: %v, Symbol_id: %v, Sequence: %v, Time_exchange: %v, Time_coinapi: %v, Ask_price: %v, Ask_siz: %v, Bid_price: %v, Bid_size: %v}", + s.Type, + s.Symbol_id, + s.Sequence, + s.Time_exchange, + s.Time_coinapi, + s.Ask_price, + s.Ask_size, + s.Bid_price, + s.Bid_size, + ) +} + +// Reconnect message is sent by the server to all connected clients when the server will be restarted or shut down at the defined exact time included in the message content. After the period specified in message passes, +// the client must expect that the underlying WebSocket connection will be closed from the server-side. +// A new connection will automatically be established to a different server. +// The correct way of handling this event is documented online: +// https://docs.coinapi.io/#reconnect-in +type Reconnect struct { + Type MessageType `json:"type"` + Within_seconds uint32 `json:"within_seconds"` + Before_time time.Time `json:"before_time"` +} + +func (s Reconnect) String() string { + return fmt.Sprintf("{Type: %v, Within_seconds: %v, Before_time: %v,}", + s.Type, + s.Within_seconds, + s.Before_time, + ) +} + +// Heartbeat message is sent to you every time there is one second of silence in communication between us, +// if you agreed on this feature in Hello message. +// https://docs.coinapi.io/#heartbeat-in +type Heartbeat struct { + Type MessageType `json:"type"` +} + +func (s Heartbeat) String() string { + return fmt.Sprintf("{Type: %v}", + s.Type, + ) +} + +// Trade message is sent for every executed transaction (orderbook match). +// https://docs.coinapi.io/#trades-in +type Trade struct { + Type MessageType `json:"type"` + Symbol_id string `json:"symbol_id"` + Sequence uint32 `json:"sequence"` + Time_exchange time.Time `json:"time_exchange"` + Time_coinapi time.Time `json:"time_coinapi"` + Uuid string `json:"uuid"` + Price decimal.Decimal `json:"price"` + Size decimal.Decimal `json:"size"` + Taker_side string `json:"taker_side"` +} + +func (s Trade) String() string { + return fmt.Sprintf(" { MessageType: %v, Symbol_id: %v, Sequence: %v, Time_exchange: %v, Time_coinapi: %v, Uuid: %v, Price: %v, Size: %v, Taker_side: %v}", + s.Type, + s.Symbol_id, + s.Sequence, + s.Time_exchange, + s.Time_coinapi, + s.Uuid, + s.Price, + s.Size, + s.Taker_side, + ) +} + +// Volume message is sent periodically to update last volume information per symbols on periods 1HRS, 1DAY and 1MTH. +// The 1HRS and 1DAY volumes are updated every minute and 1MTH every 10 minutes. +// Subscription to this message type triggers redelivery of the last published messages for each period. +// https://docs.coinapi.io/#volume-in +type Volume struct { + Type MessageType `json:"type"` + Period_id string `json:"period_id"` + Time_coinapi time.Time `json:"time_coinapi"` + Volume_by_symbol []VolumeBySymbol `json:"volume_by_symbol"` +} + +func (s Volume) String() string { + return fmt.Sprintf("{Type: %v, Period_id: %v, Time_coinapi: %v Volume_by_symbol: %v}", + s.Type, + s.Period_id, + s.Time_coinapi, + s.Volume_by_symbol, + ) +} + +// VolumeBySymbol subtype for Volume +// https://docs.coinapi.io/#volume-in +type VolumeBySymbol struct { + Symbol_id string `json:"symbol_id"` + Asset_id_base string `json:"asset_id_base"` + Asset_id_quote string `json:"asset_id_quote"` + Volume_base decimal.Decimal `json:"volume_base"` + Volume_quote decimal.Decimal `json:"volume_quote"` +} + +func (s VolumeBySymbol) String() string { + return fmt.Sprintf("{Symbol_id: %v, Asset_id_base: %v, Asset_id_quote: %v, Volume_base: %v, Volume_quote: %v, }", + s.Symbol_id, + s.Asset_id_base, + s.Asset_id_quote, + s.Volume_base, + s.Volume_quote, + ) +} diff --git a/data-api/go-ws/api/utils.go b/data-api/go-ws/api/utils.go new file mode 100644 index 0000000000..1d02a053b2 --- /dev/null +++ b/data-api/go-ws/api/utils.go @@ -0,0 +1,59 @@ +package api + +import ( + . "go-ws/api/types" + v1 "go-ws/api/v1" + "strings" +) + +func validateApiKey(apiKey string) { + + // Check if key empty + if apiKey == "" { + printErrorMessage("API key is empty!", "Add a valid API key to main!") + panic("Invalid API KEY - Abort!") + } + + // Check for test key + if strings.Contains(apiKey, "SAMPLE-KEY") || strings.Contains(apiKey, "SAMPLE") || strings.Contains(apiKey, "TEST") { + printErrorMessage("Inactive example key detected!", "Add your own valid API key to main!") + panic("Invalid API KEY - Abort!") + } + + // Check for copy & paste error. Test keys should be 36 characters long! + // It's unclear to me if key length is fixed or dynamic, thus 21 seems to be the safest option. + if len(apiKey) <= 21 { + printErrorMessage("API key is too short. Should be 30 or more characters long!", "Add a valid API key to main!") + panic("Invalid API KEY - Abort!") + } +} + +func printErrorMessage(problem, solution string) { + println() + println("Problem : ", problem) + println("Solution: ", solution) + println("Web Link: https://www.coinapi.io/pricing?apikey") + println() +} + +func getSDKConfig(apiKey string) (sdkConfig *SdkConfig) { + sdkConfig = &SdkConfig{ + ApiKey: apiKey, + ApiVersion: ApiV1, + EnvironmentType: TestInsecure, + ReconnectType: OnConnectionClose, + } + return sdkConfig +} + +func getSDK(sdkConfig *SdkConfig) (sdk SDK) { + switch sdkConfig.ApiVersion { + case ApiV1: + // Bind interface to implementation matching the selected API version. + sdk = v1.NewCoinApiSDKV1(sdkConfig) + + default: + sdk = v1.NewCoinApiSDKV1(sdkConfig) + } + return sdk +} diff --git a/data-api/go-ws/api/v1/dbg_utils.go b/data-api/go-ws/api/v1/dbg_utils.go new file mode 100644 index 0000000000..036d362b9d --- /dev/null +++ b/data-api/go-ws/api/v1/dbg_utils.go @@ -0,0 +1,16 @@ +package v1 + +import "log" + +func logError(err error) { + if err != nil { + log.Println(err) + } +} + +func checkError(err error) error { + if err != nil { + return nil + } + return err +} diff --git a/data-api/go-ws/api/v1/endpoints.go b/data-api/go-ws/api/v1/endpoints.go new file mode 100644 index 0000000000..40002d8546 --- /dev/null +++ b/data-api/go-ws/api/v1/endpoints.go @@ -0,0 +1,28 @@ +package v1 + +import t "go-ws/api/types" + +// Endpoints WebSocket endpoint provides real-time market data streaming which works in Subscribe-Publish communication model. +// https://docs.coinapi.io/#md-websocket-api +const ( + ProductionEncrypted = "wss://ws.coinapi.io/v1/" + ProductionInsecure = "ws://ws.coinapi.io/v1/" + SandboxEncrypted = "wss://ws-sandbox.coinapi.io/v1/" + SandboxInsecure = "ws://ws-sandbox.coinapi.io/v1/" +) + +func getUrl(env t.EnvironmentType) (url string) { + switch env { + case t.ProdEncrypted: + return ProductionEncrypted + case t.ProdInsecure: + return ProductionInsecure + case t.TestEncrypted: + return SandboxEncrypted + case t.TestInsecure: + return SandboxInsecure + default: + return SandboxEncrypted + } + +} diff --git a/data-api/go-ws/api/v1/sdk.go b/data-api/go-ws/api/v1/sdk.go new file mode 100644 index 0000000000..22de7a9dd7 --- /dev/null +++ b/data-api/go-ws/api/v1/sdk.go @@ -0,0 +1,42 @@ +package v1 + +import ( + "github.com/gorilla/websocket" + . "go-ws/api/types" +) + +type SDKImpl struct { + config *SdkConfig +} + +var ( + con *websocket.Conn + stopC chan struct{} + doneC chan struct{} + running bool + helloMsg *Hello +) + +var ( + // data handlers + tradesInvoke InvokeFunction + quotesInvoke InvokeFunction + bookInvoke InvokeFunction + ohlcvInvoke InvokeFunction + volumeInvoke InvokeFunction + exchangeInvoke InvokeFunction + // sys handlers + errorInvoke InvokeFunction + heartBeatInvoke InvokeFunction + reconnectInvoke InvokeFunction +) + +func NewCoinApiSDKV1(sdkConfig *SdkConfig) (sdk *SDKImpl) { + sdk = &SDKImpl{sdkConfig} + sdk.init() + return sdk +} + +func (s SDKImpl) init() { + _ = s.OpenConnection() // errors get handled inside connect() +} diff --git a/data-api/go-ws/api/v1/sdk_utils.go b/data-api/go-ws/api/v1/sdk_utils.go new file mode 100644 index 0000000000..3d59b0012d --- /dev/null +++ b/data-api/go-ws/api/v1/sdk_utils.go @@ -0,0 +1,21 @@ +package v1 + +import ( + "go-ws/api/types" + "log" +) + +func (s SDKImpl) getWSConfig() (wsCfg *types.WsConfig) { + url := getUrl(s.config.EnvironmentType) + wsCfg = &types.WsConfig{ + ApiKey: s.config.ApiKey, + Endpoint: url, + } + return wsCfg +} + +func printRawMsg(message []byte) { + msg := string(message) + log.Println("raw message: ") + log.Println(msg) +} diff --git a/data-api/go-ws/api/v1/set_methods.go b/data-api/go-ws/api/v1/set_methods.go new file mode 100644 index 0000000000..3907bddc84 --- /dev/null +++ b/data-api/go-ws/api/v1/set_methods.go @@ -0,0 +1,50 @@ +package v1 + +import ( + . "go-ws/api/types" +) + +func (s SDKImpl) SetTradesInvoke(function InvokeFunction) { + tradesInvoke = function +} + +func (s SDKImpl) SetQuoteInvoke(function InvokeFunction) { + quotesInvoke = function +} + +func (s SDKImpl) SetBookInvoke(function InvokeFunction) { + bookInvoke = function +} + +func (s SDKImpl) SetOHLCVInvoke(function InvokeFunction) { + ohlcvInvoke = function +} + +func (s SDKImpl) SetVolumeInvoke(function InvokeFunction) { + volumeInvoke = function +} + +func (s SDKImpl) SetExRateInvoke(function InvokeFunction) { + exchangeInvoke = function +} + +// sys handlers + +func (s SDKImpl) SetErrorInvoke(function InvokeFunction) { + errorInvoke = function +} + +func (s SDKImpl) SetHeartBeatInvoke(function InvokeFunction) { + heartBeatInvoke = function +} + +func (s SDKImpl) SetReconnectInvoke(function InvokeFunction) { + reconnectInvoke = function +} + +func (s SDKImpl) setHelloMessage(helloMessage *Hello) { + helloMsg = helloMessage +} +func (s SDKImpl) getHelloMessage() (helloMessage *Hello) { + return helloMsg +} diff --git a/data-api/go-ws/api/v1/websocket.go b/data-api/go-ws/api/v1/websocket.go new file mode 100644 index 0000000000..32becda53c --- /dev/null +++ b/data-api/go-ws/api/v1/websocket.go @@ -0,0 +1,88 @@ +package v1 + +import ( + "go-ws/api/types" + "log" +) + +func (s SDKImpl) SendHello(hello *types.Hello) (err error) { + + b, err := hello.GetJSON() + logError(err) + + err = con.WriteMessage(1, b) + if err != nil { + log.Println("can't send Hello message!") + logError(err) + return err + } + + // store last hello message in case of a hard re-connect. + s.setHelloMessage(hello) + + if running == false { + err = s.startProcessing() + if err != nil { + log.Println("can't start message processing!") + logError(err) + return err + } + running = true + } + return err +} + +func (s SDKImpl) startProcessing() (err error) { + errHandler := logError + handler := s.getWSHandler(errHandler) + doneC, stopC, err = s.process(handler, errHandler) + if err != nil { + log.Println("error starting message processing!") + logError(err) + running = false + return err + } + running = true + return nil +} + +func (s SDKImpl) process(handler types.WsHandler, errHandler types.WsErrHandler) (doneC, stopC chan struct{}, err error) { + + doneC = make(chan struct{}) + stopC = make(chan struct{}) + + go func() { + // This function will exit either on error from ReadMessage + // or when the stopC channel is closed by the client. + defer close(doneC) + + // Wait for the stopC channel to be closed. We do that in a + // separate goroutine because ReadMessage is a blocking operation. + silent := false + go func() { + select { + case <-stopC: + silent = true + case <-doneC: + } + _ = con.Close() + }() + + var message []byte + for { + _, message, err = con.ReadMessage() + if err != nil { + if !silent { + log.Println("Error reading message!") + errHandler(err) + } + return + } + + // + //printRawMsg(message) + handler(message) + } + }() + return +} diff --git a/data-api/go-ws/api/v1/ws_connection.go b/data-api/go-ws/api/v1/ws_connection.go new file mode 100644 index 0000000000..c68aaa3e09 --- /dev/null +++ b/data-api/go-ws/api/v1/ws_connection.go @@ -0,0 +1,54 @@ +package v1 + +import ( + "github.com/gorilla/websocket" + "log" +) + +func (s SDKImpl) OpenConnection() (err error) { + mtd := "connect: " + wsConfig := s.getWSConfig() + url := wsConfig.Endpoint + con, _, err = websocket.DefaultDialer.Dial(wsConfig.Endpoint, nil) + if err != nil { + log.Println(mtd, err) + panic(mtd + "Cannot connect to: " + url) + } + running = false + return err +} + +func (s SDKImpl) CloseConnection() (err error) { + + // Stop processing messages + running = false + + // close WS channel if its not yet fully closed! + if stopC != nil { + close(stopC) + } + + // close connection + err = con.Close() + if err != nil { + //log.Println("can't close connection") + log.Println(err) + } + return err +} + +func (s SDKImpl) ResetConnection() (err error) { + + err = s.CloseConnection() + logError(err) + + err = s.OpenConnection() + logError(err) + + hello := s.getHelloMessage() + + err = s.SendHello(hello) + logError(err) + + return err +} diff --git a/data-api/go-ws/api/v1/ws_handler.go b/data-api/go-ws/api/v1/ws_handler.go new file mode 100644 index 0000000000..af516ff2a2 --- /dev/null +++ b/data-api/go-ws/api/v1/ws_handler.go @@ -0,0 +1,163 @@ +package v1 + +import ( + "encoding/json" + "github.com/bitly/go-simplejson" + t "go-ws/api/types" +) + +func (s SDKImpl) getWSHandler(errHandler t.WsErrHandler) (wsHandler t.WsHandler) { + wsHandler = func(message []byte) { + err := s.processMessage(message, errHandler) + if err != nil { + errHandler(err) + } + } + return wsHandler +} + +func (s SDKImpl) processMessage(message []byte, errHandler t.WsErrHandler) (err error) { + + var dataMessage = new(t.DataMessage) + messageType := s.getMessageType(message, errHandler) + + switch messageType { + case t.EXCHANGERATE: + // https://docs.coinapi.io/#exchange-rate-in + msg := new(t.ExchangeRate) + msg.Type = messageType + _ = json.Unmarshal(message, msg) + dataMessage.ExchangeRate = msg + err = exchangeInvoke(dataMessage) + return checkError(err) + + case t.BOOK_L2_TOP_5: + dataMessage = s.unMarshalOrderBook(message, messageType, errHandler) + err = bookInvoke(dataMessage) + return checkError(err) + + case t.BOOK_L2_TOP_20: + dataMessage = s.unMarshalOrderBook(message, messageType, errHandler) + err = bookInvoke(dataMessage) + return checkError(err) + + case t.BOOK_L2_TOP_50: + dataMessage = s.unMarshalOrderBook(message, messageType, errHandler) + err = bookInvoke(dataMessage) + return checkError(err) + + case t.BOOK_L2_FULL: + // https://docs.coinapi.io/#orderbook-l2-full-in + dataMessage = s.unMarshalOrderBook(message, messageType, errHandler) + err = bookInvoke(dataMessage) + return checkError(err) + + case t.BOOK_L3_FULL: + // https://docs.coinapi.io/#orderbook-l3-full-in + dataMessage = s.unMarshalOrderBook(message, messageType, errHandler) + err = bookInvoke(dataMessage) + return checkError(err) + + case t.OHLCV: + // https://docs.coinapi.io/#ohlcv-in + msg := new(t.Ohlcv) + msg.Type = messageType + _ = json.Unmarshal(message, msg) + dataMessage = new(t.DataMessage) + dataMessage.Ohlcv = msg + err = ohlcvInvoke(dataMessage) + return checkError(err) + + case t.QUOTE: + // https://docs.coinapi.io/#quotes-in + msg := new(t.Quote) + msg.Type = messageType + _ = json.Unmarshal(message, msg) + dataMessage = new(t.DataMessage) + dataMessage.Quote = msg + err = quotesInvoke(dataMessage) + return checkError(err) + + case t.TRADE: + // https://docs.coinapi.io/#trades-in + msg := new(t.Trade) + msg.Type = messageType + _ = json.Unmarshal(message, msg) + dataMessage = new(t.DataMessage) + dataMessage.Trade = msg + err = tradesInvoke(dataMessage) + return checkError(err) + + case t.VOLUME: + // https://docs.coinapi.io/#volume-in + msg := new(t.Volume) + msg.Type = messageType + _ = json.Unmarshal(message, msg) + dataMessage = new(t.DataMessage) + dataMessage.Volume = msg + err = volumeInvoke(dataMessage) + return checkError(err) + + case t.ERROR: + // https://docs.coinapi.io/#error-handling + errorMsg := new(t.ErrorMessage) + _ = json.Unmarshal(message, errorMsg) + dataMessage.ErrorMessage = errorMsg + err = errorInvoke(dataMessage) + return checkError(err) + + case t.RECONNECT: + // https://docs.coinapi.io/#reconnect-in + msg := new(t.Reconnect) + msg.Type = messageType + _ = json.Unmarshal(message, msg) + dataMessage = new(t.DataMessage) + dataMessage.Reconnect = msg + err = reconnectInvoke(dataMessage) + return checkError(err) + + case t.HEARTBEAT: + // https://docs.coinapi.io/#heartbeat-in + msg := new(t.Heartbeat) + msg.Type = messageType + _ = json.Unmarshal(message, msg) + dataMessage = new(t.DataMessage) + dataMessage.Hearbeat = msg + err = heartBeatInvoke(dataMessage) + return checkError(err) + } + + return nil +} + +func (s SDKImpl) unMarshalOrderBook(message []byte, msgType t.MessageType, errHandler t.WsErrHandler) (dataMessage *t.DataMessage) { + bookMsg := new(t.OrderBook) + bookMsg.Type = msgType + err := json.Unmarshal(message, bookMsg) + if err != nil { + errHandler(err) + return nil + } + dataMessage = new(t.DataMessage) + dataMessage.Orderbook = bookMsg + return dataMessage + +} + +func (s SDKImpl) getMessageType(message []byte, errHandler t.WsErrHandler) (messageType t.MessageType) { + j, err := newJSON(message) + if err != nil { + errHandler(err) + return + } + messageType = t.MessageType(j.Get("type").MustString()) + return messageType +} + +func newJSON(data []byte) (j *simplejson.Json, err error) { + j, err = simplejson.NewJson(data) + if err != nil { + return nil, err + } + return j, nil +} diff --git a/data-api/go-ws/go.mod b/data-api/go-ws/go.mod new file mode 100644 index 0000000000..f0dc107939 --- /dev/null +++ b/data-api/go-ws/go.mod @@ -0,0 +1,12 @@ +module go-ws + +go 1.15 + +require ( + github.com/bitly/go-simplejson v0.5.0 + github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect + github.com/gorilla/websocket v1.4.2 + github.com/kr/pretty v0.2.1 // indirect + github.com/shopspring/decimal v1.2.0 + github.com/stretchr/testify v1.7.0 // indirect +) diff --git a/data-api/go-ws/go.sum b/data-api/go-ws/go.sum new file mode 100644 index 0000000000..51221aa652 --- /dev/null +++ b/data-api/go-ws/go.sum @@ -0,0 +1,20 @@ +github.com/bitly/go-simplejson v0.5.0 h1:6IH+V8/tVMab511d5bn4M7EwGXZf9Hj6i2xSwkNEM+Y= +github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ= +github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/data-api/go-ws/main.go b/data-api/go-ws/main.go new file mode 100644 index 0000000000..5b7f54deb2 --- /dev/null +++ b/data-api/go-ws/main.go @@ -0,0 +1,286 @@ +package main + +import ( + "errors" + "go-ws/api" + t "go-ws/api/types" + "log" + "time" +) + +const apiKey = "THIS-IS-A-SAMPLE-API-KEY-C2A7CB1EF" + +// All exchanges & data integrations: +// https://www.coinapi.io/integration + +func main() { + println(" * NewSDK!") + sdk := api.NewSDK(apiKey) + // verbose switches on / off console printing of heartbeat & reconnect messages + sys := getSysInvokes(true) + + println(" * SetErrorInvoke!") + sdk.SetErrorInvoke(sys.ErrorInvoke) + + println(" * SetHeartBeatInvoke!") + sdk.SetHeartBeatInvoke(sys.HeartBeatInvoke) + + println(" * SetReconnectInvoke!") + sdk.SetReconnectInvoke(sys.ReconnectInvoke) + + println(" * SetOHLCVInvoke!") + OHLCVInvoke := GetInvokeFunction(t.OHLCV) + sdk.SetOHLCVInvoke(OHLCVInvoke) + + println(" * SetTradesInvoke!") + tradeInvoke := GetInvokeFunction(t.TRADE) + sdk.SetTradesInvoke(tradeInvoke) + + println(" * SetQuoteInvoke!") + quoteInvoke := GetInvokeFunction(t.QUOTE) + sdk.SetQuoteInvoke(quoteInvoke) + + println(" * SetExRateInvoke!") + exRateInvoke := GetInvokeFunction(t.EXCHANGERATE) + sdk.SetExRateInvoke(exRateInvoke) + + println(" * SetBookInvoke!") + bookInvoke := GetInvokeFunction(t.BOOK_L2_FULL) + sdk.SetBookInvoke(bookInvoke) + + volInvoke := GetInvokeFunction(t.VOLUME) + sdk.SetVolumeInvoke(volInvoke) + + println(" * SendHello: Single data type!") + hello := getHello(false, false) + _ = sdk.SendHello(hello) + + println(" * Wait for messages!") + time.Sleep(time.Second * 5) + + println("******************") + println("* Hard Conn Reset *") + println("******************") + _ = sdk.ResetConnection() + + println(" * SendHello: Expanded data t!") + hello = getHello(true, false) + _ = sdk.SendHello(hello) + + println(" * Wait for messages!") + time.Sleep(time.Second * 5) + + println(" * SendHello: Heartbeat!") + hello = getHello(false, true) + _ = sdk.SendHello(hello) + + println(" * Wait for messages!") + time.Sleep(time.Second * 5) + + println(" * GetHello: Volume only!") + hello = getExchangeVolumeHello(false) + + println(" * SendHello: Requesting Volume type only !") + _ = sdk.SendHello(hello) + + println(" * Wait for messages!") + time.Sleep(time.Second * 3) + + // stop volume feed... + hello = getHello(false, false) + _ = sdk.SendHello(hello) + time.Sleep(time.Second * 1) + + println(" * CloseConnection!") + _ = sdk.CloseConnection() + + println("Goodbye!") +} + +func getExchangeVolumeHello(heartbeat bool) (hello *t.Hello) { + // For volume data, only asset ID is required. + // It seems there is no filtering so expect huge volume from all connected exchanges.. + var assets []string + var dataTypes []string + + assets = append(assets, "BTC") + dataTypes = append(dataTypes, "volume") + + hello = &t.Hello{ + Type: "hello", + Api_key: apiKey, + Heartbeat: heartbeat, + Subscribe_data_type: dataTypes, + Subscribe_filter_asset_id: assets, + } + return hello +} + +func getHello(expanded, heartbeat bool) (hello *t.Hello) { + // After your WebSocket connection is established, you must send us a Hello message which contains: + // * Stream preferences (Heartbeat and subscription details) + // * API key for authorization + // If your message will be incorrect, we will send you error message and disconnect connection afterwards. + // Hello message can be repeated, each one will cause subscription scope override without interruption + // of your WebSocket connection. + // https://docs.coinapi.io/#hello-out + + var datat []string + var symbolIds []string + var periodIDs []string + + datat = append(datat, "ohlcv") + if expanded { + datat = append(datat, "trade") + datat = append(datat, "quote") + datat = append(datat, "exrate") + datat = append(datat, "book5") + + } + + symbolIds = append(symbolIds, "COINBASE_SPOT_BTC_USD") + periodIDs = append(periodIDs, "1MIN") + + hello = &t.Hello{ + Type: "hello", + Api_key: apiKey, + Heartbeat: heartbeat, + Subscribe_data_type: datat, + Subscribe_filter_period_id: periodIDs, + Subscribe_filter_symbol_id: symbolIds, + } + return hello +} + +type SysInvokes struct { + ErrorInvoke t.InvokeFunction + HeartBeatInvoke t.InvokeFunction + ReconnectInvoke t.InvokeFunction +} + +func getSysInvokes(verbose bool) SysInvokes { + errorInvoke := GetErrorInvoke() + heartBeatInvoke := GetHeartBeatInvoke(verbose) + reconnectInvoke := GetReconnectInvoke(verbose) + + return SysInvokes{ + ErrorInvoke: errorInvoke, + HeartBeatInvoke: heartBeatInvoke, + ReconnectInvoke: reconnectInvoke, + } +} + +func GetReconnectInvoke(verbose bool) t.InvokeFunction { + // Reconnect message is sent by the server to all connected clients when the server will be restarted + // or shut down at the defined exact time included in the message content. + // After the period specified in message passes, the client must expect that the + // underlying WebSocket connection will be closed from the server-side. + // A new connection will automatically be established to a different server. + // The correct way of handling this event depends on the specific requirements of the integration... + // https://docs.coinapi.io/#reconnect-in + return func(message *t.DataMessage) (err error) { + + //sec := message.Reconnect.Within_seconds + //before := message.Reconnect.Before_time + + if verbose { + log.Println() + log.Println("!!!!!!!!!!!!!!!!!") + log.Println("!!!!Reconnect!!!! ") + log.Println("!!!!!!!!!!!!!!!!!") + log.Println() + } + + return nil + } +} + +func GetHeartBeatInvoke(verbose bool) t.InvokeFunction { + // WebSocket working on TCP protocol which doesn’t have the feature indicating that the connection is broken without + // trying exchange data over it. As you will not be actively sending messages to us, + // with Heartbeat you can distinguish a situation where there are no market data updates + // for you (Heartbeat is delivered but no market data updates) + // or connection between us is broken (Heartbeat and market data updates are not delivered). + // https://docs.coinapi.io/#reconnect-in + return func(message *t.DataMessage) (err error) { + if verbose { + log.Println() + log.Println("===============================") + log.Println("= Tracking heartbeat message! =") + log.Println("===============================") + log.Println() + } + return nil + } +} + +func GetErrorInvoke() t.InvokeFunction { + // You need to be prepared to receive an error message from us when you send something wrong; + // all errors are permanent and you should expect that the underlying + // WebSocket connection will be closed by us after sending an error message. + // Good practice is to store all error messages somewhere for further manual review. + // https://docs.coinapi.io/#error-handling + return func(message *t.DataMessage) (err error) { + mtd := "ErrorHandler: " + println(mtd) + msg := message.ErrorMessage.Message + if msg != "" { + log.Println(mtd+"ErrorMessage: ", msg) + return errors.New(msg) + } + return nil + } +} + +func GetInvokeFunction(msgType t.MessageType) t.InvokeFunction { + return func(message *t.DataMessage) (err error) { + printMessage(msgType, message) + return nil + } +} + +func printMessage(msgType t.MessageType, message *t.DataMessage) { + switch msgType { + case t.ERROR: + msg := message.ErrorMessage + log.Println(msg) + println() + case t.EXCHANGERATE: + msg := message.ExchangeRate + log.Println(msg) + println() + case t.BOOK_L2_TOP_5: + printBook(message) + case t.BOOK_L2_TOP_20: + printBook(message) + case t.BOOK_L2_TOP_50: + printBook(message) + case t.BOOK_L2_FULL: + printBook(message) + case t.BOOK_L3_FULL: + printBook(message) + case t.OHLCV: + msg := message.Ohlcv + log.Println(msg) + println() + case t.QUOTE: + msg := message.Quote + log.Println(msg) + println() + case t.TRADE: + msg := message.Trade + log.Println(msg) + println() + case t.VOLUME: + msg := message.Volume + log.Println(msg) + println() + + } +} + +func printBook(message *t.DataMessage) { + msg := message.Orderbook + log.Println(msg) + println() +}