Skip to content

Commit

Permalink
BitMex futures scraper (#199)
Browse files Browse the repository at this point in the history
* Coinflex scraper + refactors for Deribit, Huobi, FTX (#190) (#1)

* [+] added huobi futures scraper

* [*] 1. refactored futures huobi scraper && 2. ftx futures scraper

* [+] futures ftx: not writing the pongs & subscription & unsubscription messages

* [+] futures scraper: added deribit & refactors to all other futures scrapers

* [*] refactors: ftx, deribit, huobi

* [+] futures scarper: coinflex

* [*] post-merge conflicts resolve

* Merge (#3)

* Bitflyer futures scraper (#197)

* Coinflex scraper + refactors for Deribit, Huobi, FTX (#190) (#1)

* [+] added huobi futures scraper

* [*] 1. refactored futures huobi scraper && 2. ftx futures scraper

* [+] futures ftx: not writing the pongs & subscription & unsubscription messages

* [+] futures scraper: added deribit & refactors to all other futures scrapers

* [*] refactors: ftx, deribit, huobi

* [+] futures scarper: coinflex

* [*] post-merge conflicts resolve

* [+] bitflyer: futures scraper added & refactors for logging + instantiation methods for all the futures scrapers

* VIX Algo (#195)

* Coinflex scraper + refactors for Deribit, Huobi, FTX (#190) (#1)

* [+] added huobi futures scraper

* [*] 1. refactored futures huobi scraper && 2. ftx futures scraper

* [+] futures ftx: not writing the pongs & subscription & unsubscription messages

* [+] futures scraper: added deribit & refactors to all other futures scrapers

* [*] refactors: ftx, deribit, huobi

* [+] futures scarper: coinflex

* [+] deribit scraper: 1. unique refresh token for each scraper; 2. better handling of failure to refresh the token; 3. better handling of errors; 4. better saving of trades; 5. only futures markets allowed in the futures scraper (excluded options);

* [+] futures-scrapers: 1. better retry after error; 2. better file writing

* [+] deribit: generalized code to accommodate for options

* [+] deribit: options scraper introduced & options data

* [+] gitignore: added .vscode, bin/ and pkg/mod/

* [+] CVI algo implemented

* [+] bitmex scraper added & goroutine that listens for interrupts to perform graceful shutdown of all of the websockets

* [*] VIX CVI band changed
  • Loading branch information
nazariyv committed Feb 24, 2020
1 parent d725ec8 commit 250a889
Show file tree
Hide file tree
Showing 7 changed files with 357 additions and 70 deletions.
45 changes: 33 additions & 12 deletions internal/pkg/exchange-scrapers/FuturesBitflyerScraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package scrapers
import (
"fmt"
"net/url"
"os"
"os/signal"
"sync"
"syscall"
"time"

zap "go.uber.org/zap"
Expand Down Expand Up @@ -75,6 +78,7 @@ func (s *BitflyerScraper) ScraperClose(market string, connection interface{}) er
if err != nil {
return err
}
s.Logger.Infof("gracefully shutdown bitflyer scraper on market: %s", market)
time.Sleep(time.Duration(retryIn) * time.Second)
return nil
default:
Expand All @@ -84,6 +88,16 @@ func (s *BitflyerScraper) ScraperClose(market string, connection interface{}) er

// Scrape starts a websocket scraper for market
func (s *BitflyerScraper) Scrape(market string) {
// this block is for listening to sigterms and interupts
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
userCancelled := make(chan bool, 1)
go func() {
sig := <-sigs
fmt.Println(sig)
userCancelled <- true
}()

for {
// immediately invoked function expression for easy clenup with defer
func() {
Expand Down Expand Up @@ -121,16 +135,23 @@ func (s *BitflyerScraper) Scrape(market string) {
}
}()
for {
_, message, err := ws.ReadMessage()
if err != nil {
s.Logger.Errorf("repeated read error, restarting")
return
}
s.Logger.Debugf("received new message: %s, saving new message", message)
_, err = s.Writer.Write(string(message)+"\n", scrapeDataSaveLocationBitflyer+s.Writer.GetWriteFileName("Bitflyer", market))
if err != nil {
s.Logger.Errorf("could not write to file, err: %s", err)
return
select {
case <-userCancelled:
s.Logger.Infof("received interrupt, gracefully shutting down")
s.ScraperClose(market, ws)
os.Exit(0)
default:
_, message, err := ws.ReadMessage()
if err != nil {
s.Logger.Errorf("repeated read error, restarting")
return
}
s.Logger.Debugf("received new message: %s, saving new message", message)
_, err = s.Writer.Write(string(message)+"\n", scrapeDataSaveLocationBitflyer+s.Writer.GetWriteFileName("Bitflyer", market))
if err != nil {
s.Logger.Errorf("could not write to file, err: %s", err)
return
}
}
}
}()
Expand All @@ -155,7 +176,7 @@ func (s *BitflyerScraper) ScrapeMarkets() {
// usage example
// func main() {
// wg := sync.WaitGroup{}
// futuresDeribit := scrapers.NewBitflyerFuturesScraper([]string{"BTCJPY27DEC2019", "BTCJPY03JAN2020", "BTCJPY27MAR2020"})
// futuresDeribit.ScrapeMarkets()
// futuresBitflyer := scrapers.NewBitflyerFuturesScraper([]string{"BTCJPY27DEC2019", "BTCJPY03JAN2020", "BTCJPY27MAR2020"})
// futuresBitflyer.ScrapeMarkets()
// wg.Wait()
// }
182 changes: 182 additions & 0 deletions internal/pkg/exchange-scrapers/FuturesBitmexScraper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package scrapers

import (
"fmt"
"net/url"
"os"
"os/signal"
"sync"
"syscall"
"time"

zap "go.uber.org/zap"

"github.com/gorilla/websocket"
writers "github.com/diadata-org/diadata/internal/pkg/scraper-writers"
)

const scrapeDataSaveLocationBitmex = ""

// BitmexScraper - use the NewBitmexFuturesScraper function to create an instance
type BitmexScraper struct {
Markets []string
WaitGroup *sync.WaitGroup
Writer writers.Writer
Logger *zap.SugaredLogger
}

// NewBitmexFuturesScraper - returns an instance of an options scraper.
func NewBitmexFuturesScraper(markets []string) FuturesScraper {
wg := sync.WaitGroup{}
writer := writers.FileWriter{}
logger := zap.NewExample().Sugar() // or NewProduction, or NewDevelopment
defer logger.Sync()

var scraper FuturesScraper = &BitmexScraper{
WaitGroup: &wg,
Markets: markets,
Writer: &writer,
Logger: logger,
}

return scraper
}

func (s *BitmexScraper) send(message *map[string]interface{}, market string, websocketConn *websocket.Conn) error {
err := websocketConn.WriteJSON(*message)
if err != nil {
return err
}
s.Logger.Debugf("sent message [%s]: %s", market, message)
return nil
}

// Authenticate - placeholder here, since we do not need to authneticate the connection.
func (s *BitmexScraper) Authenticate(market string, connection interface{}) error {
return nil
}

// ScraperClose - safely closes the scraper; We pass the interface connection as the second argument
// primarily for the reason that Huobi scraper does not use the gorilla websocket; It uses golang's x websocket;
// If we did not define this method in our FuturesScraper interface, we could have easily used the pointer
// to gorilla websocket here; However, to make FuturesScraper more ubiquituous, we need an interface here.
func (s *BitmexScraper) ScraperClose(market string, connection interface{}) error {
switch c := connection.(type) {
case *websocket.Conn:
// unsubscribe from the channel
err := s.send(&map[string]interface{}{"op": "unsubscribe", "args": []string{"trade:" + market}}, market, c)
if err != nil {
s.Logger.Errorf("could not send a channel unsubscription message, err: %s", err)
return err
}
// close the websocket connection
err = s.write(websocket.CloseMessage, []byte{}, c)
if err != nil {
return err
}
err = c.Close()
if err != nil {
return err
}
s.Logger.Infof("gracefully shutdown bitmex scraper on market: %s", market)
time.Sleep(time.Duration(retryIn) * time.Second)
return nil
default:
return fmt.Errorf("unknown connection type: %T", connection)
}
}

// Scrape starts a websocket scraper for market
func (s *BitmexScraper) Scrape(market string) {
// this block is for listening to sigterms and interupts
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
userCancelled := make(chan bool, 1)
go func() {
sig := <-sigs
fmt.Println(sig)
userCancelled <- true
}()

for {
// immediately invoked function expression for easy clenup with defer
func() {
u := url.URL{Scheme: "wss", Host: "www.bitmex.com", Path: "/realtime"}
s.Logger.Debugf("connecting to [%s], market: [%s]", u.String(), market)
ws, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil {
s.Logger.Errorf("could not dial Bitmex websocket: %s", err)
time.Sleep(time.Duration(retryIn) * time.Second)
return
}
defer s.ScraperClose(market, ws)
ws.SetPongHandler(func(appData string) error {
s.Logger.Debugf("received a pong frame")
return nil
})
err = s.send(&map[string]interface{}{"op": "subscribe", "args": []string{"trade:" + market}}, market, ws)
if err != nil {
s.Logger.Errorf("could not send a channel subscription message. retrying, err: %s", err)
return
}
tick := time.NewTicker(15 * time.Second)
defer tick.Stop()
go func() {
for {
select {
case <-tick.C:
err := s.write(websocket.PingMessage, []byte{}, ws)
if err != nil {
s.Logger.Errorf("error experienced pinging coinflex, err: %s", err)
return
}
s.Logger.Debugf("pinged the coinflex server. market: [%s]", market)
}
}
}()
for {
select {
case <-userCancelled:
s.Logger.Infof("received interrupt, gracefully shutting down")
s.ScraperClose(market, ws)
os.Exit(0)
default:
_, message, err := ws.ReadMessage()
if err != nil {
s.Logger.Errorf("repeated read error, restarting")
return
}
s.Logger.Debugf("received new message: %s, saving new message", message)
_, err = s.Writer.Write(string(message)+"\n", scrapeDataSaveLocationBitmex+s.Writer.GetWriteFileName("Bitmex", market))
if err != nil {
s.Logger.Errorf("could not write to file, err: %s", err)
return
}
}
}
}()
}
}

// write's primary purpose is to write a ping frame op code to keep the websocket connection alive
func (s *BitmexScraper) write(mt int, payload []byte, ws *websocket.Conn) error {
ws.SetWriteDeadline(time.Now().Add(15 * time.Second))
return ws.WriteMessage(mt, payload)
}

// ScrapeMarkets - will scrape the markets specified during instantiation
func (s *BitmexScraper) ScrapeMarkets() {
for _, market := range s.Markets {
s.WaitGroup.Add(1)
go s.Scrape(market)
}
s.WaitGroup.Wait()
}

// usage example
// func main() {
// wg := sync.WaitGroup{}
// futuresBitmex := scrapers.NewBitmexFuturesScraper([]string{"XBTUSD", "XBTZ19", "XBTH20", "XBTM20", "ETHUSD", "ETHZ19", "ETHH20"})
// futuresBitmex.ScrapeMarkets()
// wg.Wait()
// }
54 changes: 38 additions & 16 deletions internal/pkg/exchange-scrapers/FuturesCoinflexScraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ import (
"io/ioutil"
"net/http"
"net/url"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"

"github.com/gorilla/websocket"
Expand Down Expand Up @@ -124,6 +127,7 @@ func (s *CoinflexFuturesScraper) ScraperClose(market string, connection interfac
if err != nil {
return err
}
s.Logger.Infof("gracefully shutdown coinflex scraper on market: %s", market)
time.Sleep(time.Duration(retryIn) * time.Second)
return nil
default:
Expand All @@ -148,6 +152,17 @@ func (s *CoinflexFuturesScraper) Scrape(market string) {
s.Logger.Errorf("issue with getting an id for base and quote: %s", err)
return
}

// this block is for listening to sigterms and interupts
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
userCancelled := make(chan bool, 1)
go func() {
sig := <-sigs
fmt.Println(sig)
userCancelled <- true
}()

for {
// immediately invoked function expression for easy clenup with defer
func() {
Expand Down Expand Up @@ -194,25 +209,32 @@ func (s *CoinflexFuturesScraper) Scrape(market string) {
}
}()
for {
_, message, err := ws.ReadMessage()
msg := ordersMatchedCoinflex{}
if err != nil {
s.Logger.Errorf("problem reading coinflex on [%s], err: %s", market, err)
return
}
err = json.Unmarshal(message, &msg)
if err != nil {
s.Logger.Errorf("could not unmarshal coinflex message on [%s], err: %s", market, err)
return
}
s.Logger.Debugf("received a message: %s", message)
if msg.Notice == "OrdersMatched" {
s.Logger.Debugf("received new match message on [%s]: %s", market, message)
_, err = s.Writer.Write(string(message)+"\n", scrapeDataSaveLocationCoinflex+s.Writer.GetWriteFileName("coinflex", market))
select {
case <-userCancelled:
s.Logger.Infof("received interrupt, gracefully shutting down")
s.ScraperClose(market, ws)
os.Exit(0)
default:
_, message, err := ws.ReadMessage()
msg := ordersMatchedCoinflex{}
if err != nil {
s.Logger.Errorf("could not save to file: %s, on market: [%s], err: %s", scrapeDataSaveLocationCoinflex+s.Writer.GetWriteFileName("coinflex", market), market, err)
s.Logger.Errorf("problem reading coinflex on [%s], err: %s", market, err)
return
}
err = json.Unmarshal(message, &msg)
if err != nil {
s.Logger.Errorf("could not unmarshal coinflex message on [%s], err: %s", market, err)
return
}
s.Logger.Debugf("received a message: %s", message)
if msg.Notice == "OrdersMatched" {
s.Logger.Debugf("received new match message on [%s]: %s", market, message)
_, err = s.Writer.Write(string(message)+"\n", scrapeDataSaveLocationCoinflex+s.Writer.GetWriteFileName("coinflex", market))
if err != nil {
s.Logger.Errorf("could not save to file: %s, on market: [%s], err: %s", scrapeDataSaveLocationCoinflex+s.Writer.GetWriteFileName("coinflex", market), market, err)
return
}
}
}
}
}()
Expand Down
19 changes: 19 additions & 0 deletions internal/pkg/exchange-scrapers/FuturesDeribitScraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ import (
"io/ioutil"
"net/http"
"net/url"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"

"github.com/gorilla/websocket"
Expand Down Expand Up @@ -76,6 +79,7 @@ func (s *DeribitScraper) ScraperClose(market string, websocketConnection interfa
if err != nil {
return err
}
s.Logger.Infof("gracefully shutdown deribit scraper on market: %s", market)
time.Sleep(time.Duration(retryIn) * time.Second)
return nil
default:
Expand Down Expand Up @@ -128,6 +132,17 @@ func (s *DeribitScraper) refreshToken(previousToken string, market string, webso
func (s *DeribitScraper) Scrape(market string) {
s.validateMarket(market, s.MarketKind)
s.validateRefreshEveryToken()

// this block is for listening to sigterms and interupts
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
userCancelled := make(chan bool, 1)
go func() {
sig := <-sigs
fmt.Println(sig)
userCancelled <- true
}()

for {
// immediately invoked function expression for easy clenup with defer
func() {
Expand Down Expand Up @@ -211,6 +226,10 @@ func (s *DeribitScraper) Scrape(market string) {
}()
for {
select {
case <-userCancelled:
s.Logger.Infof("received interrupt, gracefully shutting down")
s.ScraperClose(market, ws)
os.Exit(0)
case <-failedToRefreshToken:
s.Logger.Errorf("failed to refresh token numerous times. restarting the scraper")
time.Sleep(time.Duration(retryIn) * time.Second)
Expand Down

0 comments on commit 250a889

Please sign in to comment.