Skip to content

Commit

Permalink
Merge pull request #234 from KyberNetwork/2524_modified_rate_limiter
Browse files Browse the repository at this point in the history
2524_modified_rate_limiter
  • Loading branch information
favadi committed Mar 5, 2019
2 parents efd1361 + b429080 commit c683175
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 9 deletions.
16 changes: 13 additions & 3 deletions lib/binance/binance_client.go
Expand Up @@ -61,6 +61,16 @@ func NewBinance(apiKey, secretKey string, sugar *zap.SugaredLogger, options ...O
return clnt
}

//waitN mimic the leaky bucket algorithm to wait for n drop
func (bc *Client) waitN(n int) error {
for i := 0; i < n; i++ {
if err := bc.rateLimiter.WaitN(context.Background(), 1); err != nil {
return err
}
}
return nil
}

func (bc *Client) fillRequest(req *http.Request, signNeeded bool, timepoint time.Time) error {
if req.Method == http.MethodPost || req.Method == http.MethodPut || req.Method == http.MethodDelete {
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
Expand Down Expand Up @@ -163,7 +173,7 @@ func (bc *Client) GetTradeHistory(symbol string, fromID int64) ([]TradeHistory,
)
const weight = 5
//Wait before creating the request to avoid timestamp request outside the recWindow
if err := bc.rateLimiter.WaitN(context.Background(), weight); err != nil {
if err := bc.waitN(weight); err != nil {
return result, err
}

Expand Down Expand Up @@ -192,7 +202,7 @@ func (bc *Client) GetAssetDetail() (AssetDetailResponse, error) {
)
const weight = 1
//Wait before creating the request to avoid timestamp request outside the recWindow
if err := bc.rateLimiter.WaitN(context.Background(), weight); err != nil {
if err := bc.waitN(weight); err != nil {
return result, err
}

Expand All @@ -218,7 +228,7 @@ func (bc *Client) GetWithdrawalHistory(fromTime, toTime time.Time) (WithdrawHist
)
const weight = 1
//Wait before creating the request to avoid timestamp request outside the recWindow
if err := bc.rateLimiter.WaitN(context.Background(), weight); err != nil {
if err := bc.waitN(weight); err != nil {
return result, err
}

Expand Down
45 changes: 45 additions & 0 deletions lib/binance/binance_client_test.go
Expand Up @@ -2,12 +2,15 @@ package binance

import (
"os"
"sync"
"testing"
"time"

"github.com/KyberNetwork/reserve-stats/lib/testutil"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"golang.org/x/time/rate"
)

func TestBinanceClient(t *testing.T) {
Expand Down Expand Up @@ -44,3 +47,45 @@ func TestBinanceClient(t *testing.T) {
_, err = binanceClient.GetWithdrawalHistory(fromTime, toTime)
assert.NoError(t, err, "binance client get withdraw history error: %s", err)
}

func TestBinanceClientWithLimiter(t *testing.T) {
//Uncomment the skip to run the test in dev mode.
//Alter these number to test binance's behaviour
t.Skip()
var (
rps = 20.0
limiter = rate.NewLimiter(rate.Limit(rps), 1)
wg = &sync.WaitGroup{}
)
logger, err := zap.NewDevelopment()
if err != nil {
t.Fatal(err)
}
defer logger.Sync()
sugar := logger.Sugar()

binanceAPIKey, ok := os.LookupEnv("BINANCE_API_KEY")
if !ok {
t.Skip("Binance API key is not available")
}

binanceSecretKey, ok := os.LookupEnv("BINANCE_SECRET_KEY")
if !ok {
t.Skip("Binance secret key is not available")
}

binanceClient := NewBinance(binanceAPIKey, binanceSecretKey, sugar, WithRateLimiter(limiter))

for i := 0; i < 100; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
_, err = binanceClient.GetTradeHistory("KNCETH", 0)
if err != nil {
panic(err)
}
assert.NoError(t, err, "binance client get trade history error: %s", err)
}(i)
}
wg.Wait()
}
3 changes: 2 additions & 1 deletion lib/etherscan/etherscan_client_test.go
Expand Up @@ -47,7 +47,7 @@ func TestEtherScanClientWithRateLimiter(t *testing.T) {
//at rps =6 etherscan will return 403 for some requests.
t.Skip()
var (
rps = 6
rps = 5
wg = &sync.WaitGroup{}
)
logger, err := zap.NewDevelopment()
Expand All @@ -72,6 +72,7 @@ func TestEtherScanClientWithRateLimiter(t *testing.T) {
defer wg.Done()
_, err := etherscanClient.EtherTotalSupply()
sugar.Debugw("finshed a request", "request_number", index, "finish_time", time.Now(), "error", err)
assert.NoError(t, err, fmt.Sprintf("got error: %v", err))
}(i)
}
wg.Wait()
Expand Down
2 changes: 1 addition & 1 deletion lib/etherscan/flags.go
Expand Up @@ -53,7 +53,7 @@ func NewEtherscanClientFromContext(c *cli.Context) (*etherscan.Client, error) {
if rps <= 0 {
return nil, errors.New("rate limit must be more than 0")
}

//Etherscan doesn't allow burst, i.e: 5 request per second really mean 1 request per 0.2 second
limiter := rate.NewLimiter(rate.Limit(rps), 1)
client.BeforeRequest = limitRate(limiter)
return client, nil
Expand Down
4 changes: 2 additions & 2 deletions lib/huobi/flags.go
Expand Up @@ -30,9 +30,9 @@ func NewCliFlags() []cli.Flag {
},
cli.Float64Flag{
Name: huobiRequestPerSecond,
Usage: "huobi request limit per second, default to 10 which huobi's normal rate limit (100 request per 10 sec)",
Usage: "huobi request limit per second, default to 8 which huobi's tested rate limit",
EnvVar: "HUOBI_REQUESTS_PER_SECOND",
Value: 10,
Value: 8,
},
}
}
Expand Down
49 changes: 47 additions & 2 deletions lib/huobi/huobi_client_test.go
Expand Up @@ -3,13 +3,15 @@ package huobi
import (
"fmt"
"os"
"sync"
"testing"
"time"

"github.com/KyberNetwork/reserve-stats/lib/testutil"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"

"github.com/KyberNetwork/reserve-stats/lib/testutil"
"golang.org/x/time/rate"
)

func TestHuobiClient(t *testing.T) {
Expand Down Expand Up @@ -45,3 +47,46 @@ func TestHuobiClient(t *testing.T) {
_, err = huobiClient.GetWithdrawHistory("ETH", 0)
assert.NoError(t, err)
}

func TestHuobiClientWithLimiter(t *testing.T) {
//Uncomment the skip to run the test in dev mode.
//Alter these number to test huobi's behaviour
t.Skip()
var (
rps = 8.0
limiter = rate.NewLimiter(rate.Limit(rps), 1)
wg = &sync.WaitGroup{}
startDate = time.Date(2018, time.January, 1, 0, 0, 0, 0, time.UTC)
endDate = time.Date(2019, time.February, 1, 0, 0, 0, 0, time.UTC)
)
logger, err := zap.NewDevelopment()
if err != nil {
t.Fatal(err)
}
defer logger.Sync()
sugar := logger.Sugar()

huobiAPIKey, ok := os.LookupEnv("HUOBI_API_KEY")
if !ok {
t.Skip("Huobi API key is not available")
}

huobiSecretKey, ok := os.LookupEnv("HUOBI_SECRET_KEY")
if !ok {
t.Skip("Huobi secret key is not available")
}

huobiClient := NewClient(huobiAPIKey, huobiSecretKey, sugar, WithRateLimiter(limiter))

assert.NoError(t, err, fmt.Sprintf("get history error: %v", err))
for i := 0; i < 100; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
_, err = huobiClient.GetTradeHistory("bixeth", startDate, endDate)
sugar.Debugw("done request", "number", i, "at time", time.Now(), "Err ", err)
assert.NoError(t, err)
}(i)
}
wg.Wait()
}

0 comments on commit c683175

Please sign in to comment.