Skip to content

Commit

Permalink
addressing PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dencoded committed Jun 6, 2018
1 parent 1ed4d5c commit 6f17b7c
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 24 deletions.
46 changes: 25 additions & 21 deletions analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ type GeoData struct {
const analyticsKeyName = "tyk-system-analytics"

const (
minRecordsBufferSize = 1000
recordsBufferFlushInterval = 200 * time.Millisecond
minRecordsBufferSize = 1000
recordsBufferFlushInterval = 200 * time.Millisecond
recordsBufferForcedFlushInterval = 1 * time.Second
)

func (a *AnalyticsRecord) GetGeo(ipStr string) {
Expand Down Expand Up @@ -148,14 +149,14 @@ func (a *AnalyticsRecord) SetExpiry(expiresInSeconds int64) {
// RedisAnalyticsHandler will record analytics data to a redis back end
// as defined in the Config object
type RedisAnalyticsHandler struct {
Store storage.Handler
Clean Purger
GeoIPDB *maxminddb.Reader
globalConf config.Config
recordsChan chan *AnalyticsRecord
recordsBufferSize uint64
shouldStop uint32
poolWg sync.WaitGroup
Store storage.Handler
Clean Purger
GeoIPDB *maxminddb.Reader
globalConf config.Config
recordsChan chan *AnalyticsRecord
workerBufferSize uint64
shouldStop uint32
poolWg sync.WaitGroup
}

func (r *RedisAnalyticsHandler) Init(globalConf config.Config) {
Expand All @@ -177,15 +178,16 @@ func (r *RedisAnalyticsHandler) Init(globalConf config.Config) {
}
log.WithField("ps", ps).Debug("Analytics pool workers number")

r.recordsBufferSize = r.globalConf.AnalyticsConfig.RecordsBufferSize
if r.recordsBufferSize < minRecordsBufferSize {
r.recordsBufferSize = minRecordsBufferSize // force it to this value
recordsBufferSize := r.globalConf.AnalyticsConfig.RecordsBufferSize
if recordsBufferSize < minRecordsBufferSize {
recordsBufferSize = minRecordsBufferSize // force it to this value
}
log.WithField("recordsBufferSize", r.recordsBufferSize).Debug("Analytics records buffer size")
log.WithField("recordsBufferSize", recordsBufferSize).Debug("Analytics total buffer (channel) size")

chanSize := uint64(ps) * r.recordsBufferSize
log.WithField("chanSize", chanSize).Debug("Analytics channel size")
r.recordsChan = make(chan *AnalyticsRecord, chanSize)
r.workerBufferSize = recordsBufferSize / uint64(ps)
log.WithField("workerBufferSize", r.workerBufferSize).Debug("Analytics pool worker buffer size")

r.recordsChan = make(chan *AnalyticsRecord, recordsBufferSize)

// start worker pool
atomic.SwapUint32(&r.shouldStop, 0)
Expand Down Expand Up @@ -225,9 +227,10 @@ func (r *RedisAnalyticsHandler) recordWorker() {

// this is buffer to send one pipelined command to redis
// use r.recordsBufferSize as cap to reduce slice re-allocations
recordsBuffer := make([]string, 0, r.recordsBufferSize)
recordsBuffer := make([]string, 0, r.workerBufferSize)

// read records from channel and process
lastSentTs := time.Now()
for {
readyToSend := false
select {
Expand Down Expand Up @@ -273,7 +276,7 @@ func (r *RedisAnalyticsHandler) recordWorker() {
}

// identify that buffer is ready to be sent
readyToSend = uint64(len(recordsBuffer)) == r.recordsBufferSize
readyToSend = uint64(len(recordsBuffer)) == r.workerBufferSize

case <-time.After(recordsBufferFlushInterval):
// nothing was received for that period of time
Expand All @@ -282,9 +285,10 @@ func (r *RedisAnalyticsHandler) recordWorker() {
}

// send data to Redis and reset buffer
if readyToSend && len(recordsBuffer) > 0 {
if len(recordsBuffer) > 0 && (readyToSend || time.Since(lastSentTs) >= recordsBufferForcedFlushInterval) {
r.Store.AppendToSetPipelined(analyticsKeyName, recordsBuffer)
recordsBuffer = make([]string, 0, r.recordsBufferSize)
recordsBuffer = make([]string, 0, r.workerBufferSize)
lastSentTs = time.Now()
}
}
}
6 changes: 3 additions & 3 deletions gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ func TestAnalytics(t *testing.T) {

// Cleanup before test
// let records to to be sent
time.Sleep(250 * time.Millisecond)
time.Sleep(recordsBufferFlushInterval + 50)
analytics.Store.GetAndDeleteSet(analyticsKeyName)

t.Run("Log errors", func(t *testing.T) {
Expand All @@ -603,7 +603,7 @@ func TestAnalytics(t *testing.T) {
}...)

// let records to to be sent
time.Sleep(250 * time.Millisecond)
time.Sleep(recordsBufferFlushInterval + 50)

results := analytics.Store.GetAndDeleteSet(analyticsKeyName)
if len(results) != 2 {
Expand All @@ -629,7 +629,7 @@ func TestAnalytics(t *testing.T) {
})

// let records to to be sent
time.Sleep(250 * time.Millisecond)
time.Sleep(recordsBufferFlushInterval + 50)

results := analytics.Store.GetAndDeleteSet(analyticsKeyName)
if len(results) != 1 {
Expand Down

0 comments on commit 6f17b7c

Please sign in to comment.