Skip to content

Commit

Permalink
Merge branch 'rateScraper'
Browse files Browse the repository at this point in the history
  • Loading branch information
kaythxbye committed Mar 16, 2020
2 parents 5ec80a6 + 6987339 commit c5acf44
Show file tree
Hide file tree
Showing 29 changed files with 1,458 additions and 1,113 deletions.
15 changes: 15 additions & 0 deletions build/Dockerfile-ratescraper
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
FROM golang:latest as build

WORKDIR $GOPATH/src/

COPY . .

WORKDIR $GOPATH/src/github.com/diadata-org/diadata/cmd/ratescrapers

RUN go install

FROM gcr.io/distroless/base

COPY --from=build /go/bin/ratescrapers /bin/ratescrapers

CMD ["ratescrapers"]
122 changes: 122 additions & 0 deletions cmd/exchange-scrapers/collector/collectRates.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package main

import (
"flag"
"github.com/diadata-org/diadata/internal/pkg/exchange-scrapers"
"github.com/diadata-org/diadata/pkg/dia"
"github.com/diadata-org/diadata/pkg/dia/helpers/configCollectors"
"github.com/diadata-org/diadata/pkg/dia/helpers/kafkaHelper"
"github.com/diadata-org/diadata/pkg/model"
"github.com/segmentio/kafka-go"
log "github.com/sirupsen/logrus"
"github.com/tkanos/gonfig"
"os/user"
"strings"
"sync"
"time"
)

const (
watchdogDelay = 60.0 * 3.5
)

// handleTrades delegates trade information to Kafka
func handleTrades(c chan *dia.Trade, wg *sync.WaitGroup, w *kafka.Writer) {
lastTradeTime := time.Now()
t := time.NewTicker(watchdogDelay * time.Second)
for {
select {
case <-t.C:
duration := time.Since(lastTradeTime)
if duration.Seconds() > watchdogDelay {
log.Error(duration)
panic("frozen? ")
}
case t, ok := <-c:
if !ok {
wg.Done()
log.Error("handleTrades")
return
}
lastTradeTime = time.Now()
kafkaHelper.WriteMessage(w, t)
}
}
}

func getConfig(exchange string) (*dia.ConfigApi, error) {
var configApi dia.ConfigApi
usr, _ := user.Current()
dir := usr.HomeDir
configFileApi := dir + "/config/secrets/api_" + strings.ToLower(exchange)
err := gonfig.GetConf(configFileApi, &configApi)
return &configApi, err
}

var (
exchange = flag.String("exchange", "", "which exchange")
onePairPerSymbol = flag.Bool("onePairPerSymbol", false, "one Pair max Per Symbol ?")
)

func init() {
flag.Parse()
if *exchange == "" {
flag.Usage()
log.Println(dia.Exchanges())
log.Fatal("exchange is required")
}
}

// main manages all PairScrapers and handles incoming trade information
func main() {

ds, err := models.NewRedisDataStore()
if err != nil {
log.Errorln("NewDataStore:", err)
} else {

}
pairsExchange, err := ds.GetAvailablePairsForExchange(*exchange)

if err != nil || len(pairsExchange) == 0 {
log.Error("error on GetAvailablePairsForExchange", err)
cc := configCollectors.NewConfigCollectors(*exchange)
pairsExchange = cc.AllPairs()
}

configApi, err := dia.GetConfig(*exchange)
if err != nil {
log.Warning("no config for exchange's api ", err)
}
es := scrapers.NewAPIScraper(*exchange, configApi.ApiKey, configApi.SecretKey)

w := kafkaHelper.NewWriter(kafkaHelper.TopicTrades)
defer w.Close()

wg := sync.WaitGroup{}

pairs := make(map[string]string)

for _, configPair := range pairsExchange {
dontAddPair := false
if *onePairPerSymbol {
_, dontAddPair = pairs[configPair.Symbol]
pairs[configPair.Symbol] = configPair.Symbol
}
if dontAddPair {
log.Println("Skipping pair:", configPair.Symbol, configPair.ForeignName, "on exchange", *exchange)
} else {
log.Println("Adding pair:", configPair.Symbol, configPair.ForeignName, "on exchange", *exchange)
_, err := es.ScrapePair(dia.Pair{
Symbol: configPair.Symbol,
ForeignName: configPair.ForeignName})
if err != nil {
log.Println(err)
} else {
wg.Add(1)
}
}
defer wg.Wait()
}
go handleTrades(es.Channel(), &wg, w)
}
Binary file not shown.
14 changes: 9 additions & 5 deletions cmd/exchange-scrapers/ecb/ecb.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package main

import (
"github.com/diadata-org/diadata/internal/pkg/exchange-scrapers"
"sync"

scrapers "github.com/diadata-org/diadata/internal/pkg/exchange-scrapers"
"github.com/diadata-org/diadata/pkg/dia"
"github.com/diadata-org/diadata/pkg/model"
models "github.com/diadata-org/diadata/pkg/model"
log "github.com/sirupsen/logrus"
"sync"
)

// pairs contains all pairs currently supported by the DIA scrapers
Expand Down Expand Up @@ -83,7 +84,7 @@ func main() {
if err != nil {
log.Errorln("NewDataStore:", err)
} else {
sECB := scrapers.NewECBScraper(ds)
sECB := scrapers.SpawnECBScraper(ds)
defer sECB.Close()

for _, pair := range pairs {
Expand All @@ -96,7 +97,10 @@ func main() {
}
wg.Add(1)
}
sECB.Update()

// This should be uncommented in case "go mainLoop.go" is deleted from SpawnECBScraper
// go sECB.MainLoop()

go handleTrades(sECB.Channel(), &wg, ds)
defer wg.Wait()
}
Expand Down
42 changes: 42 additions & 0 deletions cmd/exchange-scrapers/ester/ester.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package main

import (
"sync"

scrapers "github.com/diadata-org/diadata/internal/pkg/exchange-scrapers"
models "github.com/diadata-org/diadata/pkg/model"
log "github.com/sirupsen/logrus"
)

// handleInterestRate delegates rate information to Kafka
func handleInterestRate(c chan *models.InterestRate, wg *sync.WaitGroup, ds models.Datastore) {
defer wg.Done()
// Pull from channel as long as not empty
for {
t, ok := <-c
if !ok {
log.Error("error")
return
}
ds.SetInterestRate(t)
}
}

// main manages all Scraper and handles incoming trade information
func main() {

wg := sync.WaitGroup{}
ds, err := models.NewDataStore()
if err != nil {
log.Errorln("NewDataStore:", err)
} else {

// sESTER points to a ESTERScraper struct
sESTER := scrapers.SpawnESTERScraper(ds)
defer sESTER.Close()

wg.Add(1)
go handleInterestRate(sESTER.Channel(), &wg, ds)
defer wg.Wait()
}
}
Binary file added cmd/exchange-scrapers/sofr/sofr
Binary file not shown.
19 changes: 10 additions & 9 deletions cmd/exchange-scrapers/sofr/sofr.go
Original file line number Diff line number Diff line change
@@ -1,41 +1,42 @@
package main

import (
"sync"

scrapers "github.com/diadata-org/diadata/internal/pkg/exchange-scrapers"
"github.com/diadata-org/diadata/pkg/dia"
models "github.com/diadata-org/diadata/pkg/model"
log "github.com/sirupsen/logrus"
)

// handleInterestRate delegates rate information to Kafka
func handleInterestRate(c chan *dia.InterestRate, ds models.Datastore) {

func handleInterestRate(c chan *models.InterestRate, wg *sync.WaitGroup, ds models.Datastore) {
defer wg.Done()
// Pull from channel as long as not empty
for {
t, ok := <-c
if !ok {
log.Error("error")
return
}
log.Println(t.Symbol)
ds.SetInterestRate(*t)
ds.SetInterestRate(t)
}
}

// main manages all Scraper and handles incoming trade information
func main() {

wg := sync.WaitGroup{}
ds, err := models.NewDataStore()
if err != nil {
log.Errorln("NewDataStore:", err)
} else {

// sSOFR points to a SOFRScraper struct
sSOFR := scrapers.NewSOFRScraper(ds)
sSOFR := scrapers.SpawnSOFRScraper(ds)
defer sSOFR.Close()

sSOFR.Update()
handleInterestRate(sSOFR.Channel(), ds)

wg.Add(1)
go handleInterestRate(sSOFR.Channel(), &wg, ds)
defer wg.Wait()
}
}
7 changes: 5 additions & 2 deletions cmd/http/restServer/restServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@ package main

import (
"github.com/blockstatecom/gin-jwt"
"time"
_ "github.com/diadata-org/diadata/api/docs"
"github.com/diadata-org/diadata/pkg/dia"
"github.com/diadata-org/diadata/pkg/dia/helpers/kafkaHelper"
"github.com/diadata-org/diadata/pkg/http/restServer/diaApi"
"github.com/diadata-org/diadata/pkg/http/restServer/kafkaApi"
"github.com/diadata-org/diadata/pkg/model"
models "github.com/diadata-org/diadata/pkg/model"
"github.com/gin-contrib/cache"
"github.com/gin-contrib/cache/persistence"
"github.com/gin-gonic/contrib/static"
"github.com/gin-gonic/gin"
log "github.com/sirupsen/logrus"
"github.com/swaggo/gin-swagger"
"github.com/swaggo/gin-swagger/swaggerFiles"
"time"
)

// @Title diadata.org API
Expand Down Expand Up @@ -195,6 +195,9 @@ func main() {
dia.GET("/symbols", cache.CachePage(memoryStore, cachingTimeShort, diaApiEnv.GetAllSymbols))
dia.GET("/coins", cache.CachePage(memoryStore, cachingTimeShort, diaApiEnv.GetCoins))
dia.GET("/pairs", cache.CachePage(memoryStore, cachingTimeShort, diaApiEnv.GetPairs))

dia.GET("/interestrate/:symbol/:time", cache.CachePage(memoryStore, cachingTimeShort, diaApiEnv.GetInterestRate))

dia.GET("/chartPoints/:filter/:exchange/:symbol", cache.CachePage(memoryStore, cachingTimeShort, diaApiEnv.GetChartPoints))
dia.GET("/chartPointsAllExchanges/:filter/:symbol", cache.CachePage(memoryStore, cachingTimeShort, diaApiEnv.GetChartPointsAllExchanges))
}
Expand Down
49 changes: 49 additions & 0 deletions cmd/ratescrapers/rate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package main

import (
"sync"
"flag"

ratescrapers "github.com/diadata-org/diadata/internal/pkg/ratescrapers"
// scrapers "github.com/diadata-org/diadata/internal/pkg/exchange-scrapers"
models "github.com/diadata-org/diadata/pkg/model"
log "github.com/sirupsen/logrus"
)

// handleInterestRate delegates rate information to Kafka
func handleInterestRate(c chan *models.InterestRate, wg *sync.WaitGroup, ds models.Datastore) {
defer wg.Done()
// Pull from channel as long as not empty
for {
t, ok := <-c
if !ok {
log.Error("error")
return
}
ds.SetInterestRate(t)
}
}

// main manages all Scraper and handles incoming trade information
func main() {

// Parse the option for the type of interest rate
rateType := flag.String("type", "ESTER", "Type of interest rate")
flag.Parse()

wg := sync.WaitGroup{}
ds, err := models.NewDataStore()
if err != nil {
log.Errorln("NewDataStore:", err)
} else {

// Spawn the corresponding rate scraper
sRate := ratescrapers.SpawnRateScraper(ds, *rateType)
defer sRate.Close()

// Send rates to the database while the scraper scrapes
wg.Add(1)
go handleInterestRate(sRate.Channel(), &wg, ds)
defer wg.Wait()
}
}
12 changes: 0 additions & 12 deletions deployments/docker-compose.exchange-scrapers.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,6 @@ services:
options:
max-size: "50m"

sofr:
build:
context: ../../../..
dockerfile: github.com/diadata-org/diadata/build/Dockerfile-sofr
image: ${DOCKER_HUB_LOGIN}/${STACKNAME}_sofr:latest
networks:
- redis-network
- influxdb-network
logging:
options:
max-size: "50m"

hitbtccollector:
depends_on: [genericcollector]
image: ${DOCKER_HUB_LOGIN}/${STACKNAME}_genericcollector:latest
Expand Down

0 comments on commit c5acf44

Please sign in to comment.