Skip to content

Commit

Permalink
update config with context
Browse files Browse the repository at this point in the history
  • Loading branch information
eric committed Jan 29, 2020
1 parent 6b80da2 commit fb6ca23
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 32 deletions.
16 changes: 7 additions & 9 deletions api/api.go
Expand Up @@ -90,7 +90,7 @@ func (a *Api) GetSpotTicker(exchange, pair string) *Response {
}

func (a *Api) SubscribeSpotDepth(exchange, pair string, period int64) *Response {
a.logger.Infof("SubscribeSpotDepth %s %s %d", exchange, pair, period)
a.logger.Infof("SubscribeSpotDepth %s %s %dms", exchange, pair, period)
if !validateSpot(exchange) {
return &Response{
Status: -1,
Expand All @@ -105,9 +105,9 @@ func (a *Api) SubscribeSpotDepth(exchange, pair string, period int64) *Response
go worker.NewSpotDepthWorker(exc.Ctx, a.data, exc.SpotApi, exchange, exc.Pair, exc.Period)
}
} else {
c := a.cfg.AddConfig(exchange, pair, period, DataFlag_Depth)
c := a.cfg.AddConfig(a.ctx, exchange, pair, period, DataFlag_Depth)
if c != nil {
go worker.NewSpotDepthWorker(a.ctx, a.data, c.SpotApi, exchange, c.Pair, c.Period)
go worker.NewSpotDepthWorker(c.Ctx, a.data, c.SpotApi, exchange, c.Pair, c.Period)
}
}
return &Response{
Expand All @@ -131,9 +131,8 @@ func (a *Api) SubscribeSpotTicker(exchange, pair string, period int64) *Response
go worker.NewSpotTickerWorker(exc.Ctx, a.data, exc.SpotApi, exchange, exc.Pair, exc.Period)
}
} else {
c := a.cfg.AddConfig(exchange, pair, period, DataFlag_Ticker)
c := a.cfg.AddConfig(a.ctx, exchange, pair, period, DataFlag_Ticker)
if c != nil {
c.Ctx, c.CancelFunc = context.WithCancel(a.ctx)
go worker.NewSpotTickerWorker(c.Ctx, a.data, c.SpotApi, exchange, c.Pair, c.Period)
}
}
Expand Down Expand Up @@ -203,9 +202,9 @@ func (a *Api) SubscribeFutureDepth(exchange, contractType, pair string, period i
go worker.NewFutureDepthWorker(exc.Ctx, a.data, exc.FutureApi, exchange, contractType, exc.Pair, exc.Period)
}
} else {
c := a.cfg.AddConfig(exchange, pair, period, DataFlag_Depth)
c := a.cfg.AddConfig(a.ctx, exchange, pair, period, DataFlag_Depth)
if c != nil {
go worker.NewFutureDepthWorker(a.ctx, a.data, c.FutureApi, exchange, contractType, c.Pair, c.Period)
go worker.NewFutureDepthWorker(c.Ctx, a.data, c.FutureApi, exchange, contractType, c.Pair, c.Period)
}
}
return &Response{
Expand All @@ -229,9 +228,8 @@ func (a *Api) SubscribeFutureTicker(exchange, contractType, pair string, period
go worker.NewFutureTickerWorker(exc.Ctx, a.data, exc.FutureApi, exchange, contractType, exc.Pair, exc.Period)
}
} else {
c := a.cfg.AddConfig(exchange, pair, period, DataFlag_Ticker)
c := a.cfg.AddConfig(a.ctx, exchange, pair, period, DataFlag_Ticker)
if c != nil {
c.Ctx, c.CancelFunc = context.WithCancel(a.ctx)
go worker.NewFutureTickerWorker(c.Ctx, a.data, c.FutureApi, exchange, contractType, c.Pair, c.Period)
}
}
Expand Down
52 changes: 31 additions & 21 deletions config/config.go
Expand Up @@ -41,11 +41,13 @@ func NewConfig() *Config {
return &Config{ExchangesConfig: make([]ExchangeConfig, 0)}
}

func (c *Config) AddConfig(exchange, pair string, period int64, flag DataFlag) *PairConfig {
func (c *Config) AddConfig(parentCtx context.Context, exchange, pair string, period int64, flag DataFlag) *PairConfig {
proxy := os.Getenv("HTTP_PROXY")
if proxy != "" {
log.Printf("add config with proxy:%s", proxy)
}
ctx, cancelFunc := context.WithCancel(parentCtx)

for k, ex := range c.ExchangesConfig {
if ex.ExchangeName == exchange {
for _, p := range ex.Pair {
Expand All @@ -55,20 +57,24 @@ func (c *Config) AddConfig(exchange, pair string, period int64, flag DataFlag) *
}
if !IsFutureExchange(exchange) {
c.ExchangesConfig[k].Pair = append(c.ExchangesConfig[k].Pair, PairConfig{
SpotApi: builder.NewAPIBuilder().HttpProxy(proxy).Build(exchange),
FutureApi: nil,
Pair: goex.NewCurrencyPair2(pair),
Period: time.Duration(period * int64(time.Millisecond)),
Flag: flag,
SpotApi: builder.NewAPIBuilder().HttpProxy(proxy).Build(exchange),
FutureApi: nil,
Pair: goex.NewCurrencyPair2(pair),
Period: time.Duration(period * int64(time.Millisecond)),
Flag: flag,
CancelFunc: cancelFunc,
Ctx: ctx,
//Ticker: time.NewTicker(time.Duration(period * int64(time.Millisecond))),
})
} else {
c.ExchangesConfig[k].Pair = append(c.ExchangesConfig[k].Pair, PairConfig{
FutureApi: builder.NewAPIBuilder().HttpProxy(proxy).BuildFuture(exchange),
SpotApi: nil,
Pair: goex.NewCurrencyPair2(pair),
Period: time.Duration(period * int64(time.Millisecond)),
Flag: flag,
FutureApi: builder.NewAPIBuilder().HttpProxy(proxy).BuildFuture(exchange),
SpotApi: nil,
Pair: goex.NewCurrencyPair2(pair),
Period: time.Duration(period * int64(time.Millisecond)),
Flag: flag,
CancelFunc: cancelFunc,
Ctx: ctx,
//Ticker: time.NewTicker(time.Duration(period * int64(time.Millisecond))),
})
}
Expand All @@ -80,11 +86,13 @@ func (c *Config) AddConfig(exchange, pair string, period int64, flag DataFlag) *
ExchangeName: exchange,
Pair: []PairConfig{
{
SpotApi: builder.NewAPIBuilder().HttpProxy(proxy).Build(exchange),
FutureApi: nil,
Pair: goex.NewCurrencyPair2(pair),
Period: time.Duration(period * int64(time.Millisecond)),
Flag: flag,
SpotApi: builder.NewAPIBuilder().HttpProxy(proxy).Build(exchange),
FutureApi: nil,
Pair: goex.NewCurrencyPair2(pair),
Period: time.Duration(period * int64(time.Millisecond)),
Flag: flag,
CancelFunc: cancelFunc,
Ctx: ctx,
//Ticker: time.NewTicker(time.Duration(period * int64(time.Millisecond))),
},
},
Expand All @@ -94,11 +102,13 @@ func (c *Config) AddConfig(exchange, pair string, period int64, flag DataFlag) *
ExchangeName: exchange,
Pair: []PairConfig{
{
FutureApi: builder.NewAPIBuilder().HttpProxy(proxy).BuildFuture(exchange),
SpotApi: nil,
Pair: goex.NewCurrencyPair2(pair),
Period: time.Duration(period * int64(time.Millisecond)),
Flag: flag,
FutureApi: builder.NewAPIBuilder().HttpProxy(proxy).BuildFuture(exchange),
SpotApi: nil,
Pair: goex.NewCurrencyPair2(pair),
Period: time.Duration(period * int64(time.Millisecond)),
Flag: flag,
CancelFunc: cancelFunc,
Ctx: ctx,
//Ticker: time.NewTicker(time.Duration(period * int64(time.Millisecond))),
},
},
Expand Down
24 changes: 22 additions & 2 deletions config/config_test.go
@@ -1,7 +1,27 @@
package config

import "testing"
import (
"context"
"testing"
)

func TestNewConfig(t *testing.T) {
var c = NewConfig()

func TestConfig_AddConfig(t *testing.T) {
c1 := c.AddConfig(context.Background(), "binance.com", "BTC_USDT", 100, 1)
c1.CancelFunc()
}

func TestConfig_FindConfig(t *testing.T) {
c.AddConfig(context.Background(), "binance.com", "BTC_USDT", 100, 1)
c2 := c.FindConfig("binance.com", "BTC_USDT", 100, 1)
c2.CancelFunc()
}

func TestPairConfig_UpdatePeriod(t *testing.T) {
c1 := c.AddConfig(context.Background(), "binance.com", "BTC_USDT", 100, 1)
c2 := c.FindConfig("binance.com", "BTC_USDT", 100, 1)
t.Log(c1)
c2.UpdatePeriod(50)
t.Log(c2)
}

0 comments on commit fb6ca23

Please sign in to comment.