Skip to content

Commit

Permalink
Bitflyer futures scraper (#197)
Browse files Browse the repository at this point in the history
* Coinflex scraper + refactors for Deribit, Huobi, FTX (#190) (#1)

* [+] added huobi futures scraper

* [*] 1. refactored futures huobi scraper && 2. ftx futures scraper

* [+] futures ftx: not writing the pongs & subscription & unsubscription messages

* [+] futures scraper: added deribit & refactors to all other futures scrapers

* [*] refactors: ftx, deribit, huobi

* [+] futures scarper: coinflex

* [*] post-merge conflicts resolve

* [+] bitflyer: futures scraper added & refactors for logging + instantiation methods for all the futures scrapers
  • Loading branch information
nazariyv authored and kaythxbye committed Dec 23, 2019
1 parent 3b16702 commit 6d59313
Show file tree
Hide file tree
Showing 7 changed files with 286 additions and 106 deletions.
3 changes: 2 additions & 1 deletion internal/pkg/exchange-scrapers/APIDerivativesScraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"

writers "github.com/diadata-org/diadata/internal/pkg/scraper-writers"
zap "go.uber.org/zap"
)

// DeribitScraperKind - used to distinguish between the futures and options scrapers
Expand All @@ -22,7 +23,7 @@ type DeribitScraper struct {
Markets []string
WaitGroup *sync.WaitGroup
Writer writers.Writer
Logger *log.Logger
Logger *zap.SugaredLogger

// required for deribit to:
// 1. authenticate (trades is a private channel)
Expand Down
161 changes: 161 additions & 0 deletions internal/pkg/exchange-scrapers/FuturesBitflyerScraper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package scrapers

import (
"fmt"
"net/url"
"sync"
"time"

zap "go.uber.org/zap"

writers "github.com/diadata-org/diadata/internal/pkg/scraper-writers"
"github.com/gorilla/websocket"
)

const scrapeDataSaveLocationBitflyer = ""

// BitflyerScraper - use the NewBitflyerFuturesScraper function to create an instance
type BitflyerScraper struct {
Markets []string
WaitGroup *sync.WaitGroup
Writer writers.Writer
Logger *zap.SugaredLogger
}

// NewBitflyerFuturesScraper - returns an instance of an options scraper.
func NewBitflyerFuturesScraper(markets []string) FuturesScraper {
wg := sync.WaitGroup{}
writer := writers.FileWriter{}
logger := zap.NewExample().Sugar() // or NewProduction, or NewDevelopment
defer logger.Sync()

var scraper FuturesScraper = &BitflyerScraper{
WaitGroup: &wg,
Markets: markets,
Writer: &writer,
Logger: logger,
}

return scraper
}

func (s *BitflyerScraper) send(message *map[string]interface{}, market string, websocketConn *websocket.Conn) error {
err := websocketConn.WriteJSON(*message)
if err != nil {
return err
}
s.Logger.Debugf("sent message [%s]: %s", market, message)
return nil
}

// Authenticate - placeholder here, since we do not need to authneticate the connection.
func (s *BitflyerScraper) Authenticate(market string, connection interface{}) error {
return nil
}

// ScraperClose - safely closes the scraper; We pass the interface connection as the second argument
// primarily for the reason that Huobi scraper does not use the gorilla websocket; It uses golang's x websocket;
// If we did not define this method in our FuturesScraper interface, we could have easily used the pointer
// to gorilla websocket here; However, to make FuturesScraper more ubiquituous, we need an interface here.
func (s *BitflyerScraper) ScraperClose(market string, connection interface{}) error {
switch c := connection.(type) {
case *websocket.Conn:
// unsubscribe from the channel
err := s.send(&map[string]interface{}{"jsonrpc": "2.0", "method": "unsubscribe", "params": &map[string]interface{}{"channel": "lightning_ticker_" + market}}, market, c)
if err != nil {
s.Logger.Errorf("could not send a channel unsubscription message, err: %s", err)
return err
}
// close the websocket connection
err = s.write(websocket.CloseMessage, []byte{}, c)
if err != nil {
return err
}
err = c.Close()
if err != nil {
return err
}
time.Sleep(time.Duration(retryIn) * time.Second)
return nil
default:
return fmt.Errorf("unknown connection type: %T", connection)
}
}

// Scrape starts a websocket scraper for market
func (s *BitflyerScraper) Scrape(market string) {
for {
// immediately invoked function expression for easy clenup with defer
func() {
u := url.URL{Scheme: "wss", Host: "ws.lightstream.bitflyer.com", Path: "/json-rpc"}
s.Logger.Debugf("connecting to [%s], market: [%s]", u.String(), market)
ws, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil {
s.Logger.Errorf("could not dial Bitflyer websocket: %s", err)
time.Sleep(time.Duration(retryIn) * time.Second)
return
}
defer s.ScraperClose(market, ws)
ws.SetPongHandler(func(appData string) error {
s.Logger.Debugf("received a pong frame")
return nil
})
err = s.send(&map[string]interface{}{"jsonrpc": "2.0", "method": "subscribe", "params": &map[string]interface{}{"channel": "lightning_ticker_" + market}}, market, ws)
if err != nil {
s.Logger.Errorf("could not send a channel subscription message. retrying, err: %s", err)
return
}
tick := time.NewTicker(15 * time.Second)
defer tick.Stop()
go func() {
for {
select {
case <-tick.C:
err := s.write(websocket.PingMessage, []byte{}, ws)
if err != nil {
s.Logger.Errorf("error experienced pinging coinflex, err: %s", err)
return
}
s.Logger.Debugf("pinged the coinflex server. market: [%s]", market)
}
}
}()
for {
_, message, err := ws.ReadMessage()
if err != nil {
s.Logger.Errorf("repeated read error, restarting")
return
}
s.Logger.Debugf("received new message: %s, saving new message", message)
_, err = s.Writer.Write(string(message)+"\n", scrapeDataSaveLocationBitflyer+s.Writer.GetWriteFileName("Bitflyer", market))
if err != nil {
s.Logger.Errorf("could not write to file, err: %s", err)
return
}
}
}()
}
}

// write's primary purpose is to write a ping frame op code to keep the websocket connection alive
func (s *BitflyerScraper) write(mt int, payload []byte, ws *websocket.Conn) error {
ws.SetWriteDeadline(time.Now().Add(15 * time.Second))
return ws.WriteMessage(mt, payload)
}

// ScrapeMarkets - will scrape the markets specified during instantiation
func (s *BitflyerScraper) ScrapeMarkets() {
for _, market := range s.Markets {
s.WaitGroup.Add(1)
go s.Scrape(market)
}
s.WaitGroup.Wait()
}

// usage example
// func main() {
// wg := sync.WaitGroup{}
// futuresDeribit := scrapers.NewBitflyerFuturesScraper([]string{"BTCJPY27DEC2019", "BTCJPY03JAN2020", "BTCJPY27MAR2020"})
// futuresDeribit.ScrapeMarkets()
// wg.Wait()
// }
73 changes: 36 additions & 37 deletions internal/pkg/exchange-scrapers/FuturesCoinflexScraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
"strings"
Expand All @@ -13,6 +12,7 @@ import (

"github.com/gorilla/websocket"
writers "github.com/diadata-org/diadata/internal/pkg/scraper-writers"
zap "go.uber.org/zap"
)

const scrapeDataSaveLocationCoinflex = ""
Expand All @@ -22,7 +22,7 @@ type CoinflexFuturesScraper struct {
Markets []string
WaitGroup *sync.WaitGroup
Writer writers.Writer
Logger *log.Logger
Logger *zap.SugaredLogger
}

type tradeMessageCoinflex struct {
Expand Down Expand Up @@ -80,12 +80,29 @@ type ordersMatchedCoinflex struct {
AskCounterFee int64 `json:"ask_counter_fee"`
}

// NewCoinflexFuturesScraper - returns an instance of the coinflex scraper
func NewCoinflexFuturesScraper(markets []string) FuturesScraper {
wg := sync.WaitGroup{}
writer := writers.FileWriter{}
logger := zap.NewExample().Sugar() // or NewProduction, or NewDevelopment
defer logger.Sync()

var scraper FuturesScraper = &CoinflexFuturesScraper{
WaitGroup: &wg,
Markets: markets,
Writer: &writer,
Logger: logger,
}

return scraper
}

func (s *CoinflexFuturesScraper) send(message *map[string]interface{}, market string, websocketConn *websocket.Conn) error {
err := websocketConn.WriteJSON(*message)
if err != nil {
return err
}
s.Logger.Printf("[DEBUG] sent message [%s]: %s", market, message)
s.Logger.Debugf("sent message [%s]: %s", market, message)
return nil
}

Expand Down Expand Up @@ -118,44 +135,44 @@ func (s *CoinflexFuturesScraper) ScraperClose(market string, connection interfac
func (s *CoinflexFuturesScraper) Scrape(market string) {
validated, err := s.validateMarket(market)
if !validated || err != nil {
s.Logger.Printf("[ERROR] could not validate %s market", market)
s.Logger.Errorf("could not validate %s market", market)
if err != nil {
s.Logger.Printf("[ERROR] issue with validating, err: %s", err)
s.Logger.Errorf("issue with validating, err: %s", err)
}
return
}
baseID, quoteID, err := s.getBaseAndCounterID(market)
// splits the string market into the base and the counter and then finds the int id of them.
// coinflex expects that we provide an int for the assets when we make the websocket requests.
if err != nil {
s.Logger.Printf("[ERROR] issue with getting an id for base and quote: %s", err)
s.Logger.Errorf("issue with getting an id for base and quote: %s", err)
return
}
for {
// immediately invoked function expression for easy clenup with defer
func() {
u := url.URL{Scheme: "wss", Host: "api.coinflex.com", Path: "/v1"}
s.Logger.Printf("[DEBUG] connecting to [%s], market: [%s]", u.String(), market)
s.Logger.Debugf("connecting to [%s], market: [%s]", u.String(), market)
ws, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil {
s.Logger.Printf("[ERROR] dial: %s", err)
s.Logger.Errorf("dial: %s", err)
time.Sleep(time.Duration(retryIn) * time.Second)
return
}
defer s.ScraperClose(market, ws)
// to let you know that the websocket connection is alive. Coinflex do not have the heartbeat channel
// and they send you frame pong messages. Thus, this handler.
ws.SetPongHandler(func(appData string) error {
s.Logger.Printf("[DEBUG] received a pong frame")
s.Logger.Debugf("received a pong frame")
return nil
})
err = s.send(&map[string]interface{}{"base": baseID, "counter": quoteID, "watch": true, "method": "WatchOrders"}, market, ws)
if err != nil {
s.Logger.Printf("[ERROR] could not send a channel subscription message. retrying")
s.Logger.Errorf("could not send a channel subscription message. retrying")
return
}
if err != nil {
s.Logger.Printf("[ERROR] could not send an initial ping message. retrying")
s.Logger.Errorf("could not send an initial ping message. retrying")
return
}
tick := time.NewTicker(30 * time.Second) // every 45 seconds we have to ping Coinflex. we also have a 15 second write limit of the ping frame (thus, 30 seconds here)
Expand All @@ -169,31 +186,31 @@ func (s *CoinflexFuturesScraper) Scrape(market string) {
case <-tick.C:
err := s.write(websocket.PingMessage, []byte{}, ws)
if err != nil {
s.Logger.Printf("[ERROR] error experienced pinging coinflex, err: %s", err)
s.Logger.Errorf("error experienced pinging coinflex, err: %s", err)
return
}
s.Logger.Printf("[DEBUG] pinged the coinflex server. market: [%s]", market)
s.Logger.Debugf("pinged the coinflex server. market: [%s]", market)
}
}
}()
for {
_, message, err := ws.ReadMessage()
msg := ordersMatchedCoinflex{}
if err != nil {
s.Logger.Printf("[ERROR] problem reading coinflex on [%s], err: %s", market, err)
s.Logger.Errorf("problem reading coinflex on [%s], err: %s", market, err)
return
}
err = json.Unmarshal(message, &msg)
if err != nil {
s.Logger.Printf("[ERROR] could not unmarshal coinflex message on [%s], err: %s", market, err)
s.Logger.Errorf("could not unmarshal coinflex message on [%s], err: %s", market, err)
return
}
s.Logger.Printf("[DEBUG] received a message: %s", message)
s.Logger.Debugf("received a message: %s", message)
if msg.Notice == "OrdersMatched" {
s.Logger.Printf("[DEBUG] received new match message on [%s]: %s", market, message)
s.Logger.Debugf("received new match message on [%s]: %s", market, message)
_, err = s.Writer.Write(string(message)+"\n", scrapeDataSaveLocationCoinflex+s.Writer.GetWriteFileName("coinflex", market))
if err != nil {
s.Logger.Printf("[ERROR] could not save to file: %s, on market: [%s], err: %s", scrapeDataSaveLocationCoinflex+s.Writer.GetWriteFileName("coinflex", market), market, err)
s.Logger.Errorf("could not save to file: %s, on market: [%s], err: %s", scrapeDataSaveLocationCoinflex+s.Writer.GetWriteFileName("coinflex", market), market, err)
return
}
}
Expand Down Expand Up @@ -280,7 +297,7 @@ func (s *CoinflexFuturesScraper) getAllAssets() ([]assetCoinflex, error) {
if err != nil {
return nil, err
}
s.Logger.Printf("[DEBUG] retrieved all of the Coinflex assets: %s", string(body))
s.Logger.Debugf("retrieved all of the Coinflex assets: %s", string(body))
assets := []assetCoinflex{}
err = json.Unmarshal(body, &assets)
if err != nil {
Expand All @@ -303,21 +320,3 @@ func (s *CoinflexFuturesScraper) assetID(asset string) (int64, error) {
}
return assetsID, nil
}

// example usage
// func main() {
// wg := sync.WaitGroup{}
// writer := writers.FileWriter{}
// logger := log.New(os.Stdout, "", log.Ldate|log.Ltime|log.Lshortfile)

// var coinflexScraper scrapers.FuturesScraper = &scrapers.CoinflexFuturesScraper{
// WaitGroup: &wg,
// Markets: []string{"FLEX/USDT"}, // this market is good to test that the pong frames come back
// // Markets: []string{"ETH/USDT", "BCH/USDT", "XBT/USDT", "FLEX/USDT", "USDC/USDT", "XBTDEC/USDTDEC", "BCHDEC/USDTDEC", "ETHDEC/USDTDEC", "USDCDEC/USDTDEC", "DOTF/USDTDOT", "DFNF/USDTDFN"},
// Writer: &writer,
// Logger: logger,
// }
// coinflexScraper.ScrapeMarkets()

// wg.Wait()
// }

0 comments on commit 6d59313

Please sign in to comment.