Skip to content

Commit

Permalink
Merge branch 'master' into skip-kid-1551
Browse files Browse the repository at this point in the history
  • Loading branch information
buger committed Jun 14, 2018
2 parents 7c861b5 + fd3af48 commit 3d38f2d
Show file tree
Hide file tree
Showing 271 changed files with 45,850 additions and 10,112 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,6 @@ petstore.json
coprocess_gen_test.go
session_state_gen_test.go
__pycache__/
tyk.test

tyk_linux_*
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ matrix:
include:
- go: 1.9.x
env: LATEST_GO=true # run linters and report coverage
- go: 1.10.x
env: LATEST_GO=true # run linters and report coverage


services:
- redis-server
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
Tyk is a lightweight, open source API Gateway and Management Platform enables you to control who accesses your API, when they access it and how they access it. Tyk will
also record detailed analytics on how your users are interacting with your API and when things go wrong.

Go version 1.8 or later is required to build `master`, the current
Go version 1.9 is required to build `master`, the current
development version. Tyk is officially supported on `linux/amd64`,
`linux/i386` and `linux/arm64`.

Tests are run against both Go versions 1.9 & 1.10, however at present, only Go 1.9 is officially supported.

## What is an API Gateway?

An API Gateway sits in front of your application(s) and manages the heavy lifting of authorisation, access control and throughput limiting to your services. Ideally,
Expand Down
185 changes: 131 additions & 54 deletions analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package main
import (
"fmt"
"net"
"regexp"
"sync"
"sync/atomic"
"time"

"github.com/jeffail/tunny"
"github.com/oschwald/maxminddb-golang"
"gopkg.in/vmihailenco/msgpack.v2"

"github.com/TykTechnologies/tyk/config"
"github.com/TykTechnologies/tyk/regexp"
"github.com/TykTechnologies/tyk/storage"
)

Expand Down Expand Up @@ -62,10 +63,13 @@ type GeoData struct {

const analyticsKeyName = "tyk-system-analytics"

const (
minRecordsBufferSize = 1000
recordsBufferFlushInterval = 200 * time.Millisecond
recordsBufferForcedFlushInterval = 1 * time.Second
)

func (a *AnalyticsRecord) GetGeo(ipStr string) {
if !config.Global().AnalyticsConfig.EnableGeoIP {
return
}
// Not great, tightly coupled
if analytics.GeoIPDB == nil {
return
Expand Down Expand Up @@ -118,14 +122,14 @@ func initNormalisationPatterns() (pats config.NormaliseURLPatterns) {
return
}

func (a *AnalyticsRecord) NormalisePath() {
if config.Global().AnalyticsConfig.NormaliseUrls.NormaliseUUIDs {
a.Path = config.Global().AnalyticsConfig.NormaliseUrls.CompiledPatternSet.UUIDs.ReplaceAllString(a.Path, "{uuid}")
func (a *AnalyticsRecord) NormalisePath(globalConfig *config.Config) {
if globalConfig.AnalyticsConfig.NormaliseUrls.NormaliseUUIDs {
a.Path = globalConfig.AnalyticsConfig.NormaliseUrls.CompiledPatternSet.UUIDs.ReplaceAllString(a.Path, "{uuid}")
}
if config.Global().AnalyticsConfig.NormaliseUrls.NormaliseNumbers {
a.Path = config.Global().AnalyticsConfig.NormaliseUrls.CompiledPatternSet.IDs.ReplaceAllString(a.Path, "/{id}")
if globalConfig.AnalyticsConfig.NormaliseUrls.NormaliseNumbers {
a.Path = globalConfig.AnalyticsConfig.NormaliseUrls.CompiledPatternSet.IDs.ReplaceAllString(a.Path, "/{id}")
}
for _, r := range config.Global().AnalyticsConfig.NormaliseUrls.CompiledPatternSet.Custom {
for _, r := range globalConfig.AnalyticsConfig.NormaliseUrls.CompiledPatternSet.Custom {
a.Path = r.ReplaceAllString(a.Path, "{var}")
}
}
Expand All @@ -145,18 +149,21 @@ func (a *AnalyticsRecord) SetExpiry(expiresInSeconds int64) {
// RedisAnalyticsHandler will record analytics data to a redis back end
// as defined in the Config object
type RedisAnalyticsHandler struct {
Store storage.Handler
Clean Purger
GeoIPDB *maxminddb.Reader

AnalyticsPool *tunny.WorkPool
Store storage.Handler
Clean Purger
GeoIPDB *maxminddb.Reader
globalConf config.Config
recordsChan chan *AnalyticsRecord
workerBufferSize uint64
shouldStop uint32
poolWg sync.WaitGroup
}

func (r *RedisAnalyticsHandler) Init() {
var err error
if config.Global().AnalyticsConfig.EnableGeoIP {
db, err := maxminddb.Open(config.Global().AnalyticsConfig.GeoIPDBLocation)
if err != nil {
func (r *RedisAnalyticsHandler) Init(globalConf config.Config) {
r.globalConf = globalConf

if r.globalConf.AnalyticsConfig.EnableGeoIP {
if db, err := maxminddb.Open(r.globalConf.AnalyticsConfig.GeoIPDBLocation); err != nil {
log.Error("Failed to init GeoIP Database: ", err)
} else {
r.GeoIPDB = db
Expand All @@ -165,53 +172,123 @@ func (r *RedisAnalyticsHandler) Init() {

analytics.Store.Connect()

ps := config.Global().AnalyticsConfig.PoolSize
ps := r.globalConf.AnalyticsConfig.PoolSize
if ps == 0 {
ps = 50
}
log.WithField("ps", ps).Debug("Analytics pool workers number")

r.AnalyticsPool, err = tunny.CreatePoolGeneric(ps).Open()
if err != nil {
log.Error("Failed to init analytics pool")
recordsBufferSize := r.globalConf.AnalyticsConfig.RecordsBufferSize
if recordsBufferSize < minRecordsBufferSize {
recordsBufferSize = minRecordsBufferSize // force it to this value
}
}
log.WithField("recordsBufferSize", recordsBufferSize).Debug("Analytics total buffer (channel) size")

// RecordHit will store an AnalyticsRecord in Redis
func (r *RedisAnalyticsHandler) RecordHit(record AnalyticsRecord) error {
r.AnalyticsPool.SendWork(func() {
// If we are obfuscating API Keys, store the hashed representation (config check handled in hashing function)
record.APIKey = storage.HashKey(record.APIKey)

if config.Global().SlaveOptions.UseRPC {
// Extend tag list to include this data so wecan segment by node if necessary
record.Tags = append(record.Tags, "tyk-hybrid-rpc")
}
r.workerBufferSize = recordsBufferSize / uint64(ps)
log.WithField("workerBufferSize", r.workerBufferSize).Debug("Analytics pool worker buffer size")

if config.Global().DBAppConfOptions.NodeIsSegmented {
// Extend tag list to include this data so wecan segment by node if necessary
record.Tags = append(record.Tags, config.Global().DBAppConfOptions.Tags...)
}
r.recordsChan = make(chan *AnalyticsRecord, recordsBufferSize)

// Lets add some metadata
if record.APIKey != "" {
record.Tags = append(record.Tags, "key-"+record.APIKey)
}
// start worker pool
atomic.SwapUint32(&r.shouldStop, 0)
for i := 0; i < ps; i++ {
r.poolWg.Add(1)
go r.recordWorker()
}
}

if record.OrgID != "" {
record.Tags = append(record.Tags, "org-"+record.OrgID)
}
func (r *RedisAnalyticsHandler) Stop() {
// flag to stop sending records into channel
atomic.SwapUint32(&r.shouldStop, 1)

record.Tags = append(record.Tags, "api-"+record.APIID)
// close channel to stop workers
close(r.recordsChan)

encoded, err := msgpack.Marshal(record)
// wait for all workers to be done
r.poolWg.Wait()
}

if err != nil {
log.Error("Error encoding analytics data: ", err)
}
// RecordHit will store an AnalyticsRecord in Redis
func (r *RedisAnalyticsHandler) RecordHit(record *AnalyticsRecord) error {
// check if we should stop sending records 1st
if atomic.LoadUint32(&r.shouldStop) > 0 {
return nil
}

r.Store.AppendToSet(analyticsKeyName, string(encoded))
})
// just send record to channel consumed by pool of workers
// leave all data crunching and Redis I/O work for pool workers
r.recordsChan <- record

return nil
}

func (r *RedisAnalyticsHandler) recordWorker() {
defer r.poolWg.Done()

// this is buffer to send one pipelined command to redis
// use r.recordsBufferSize as cap to reduce slice re-allocations
recordsBuffer := make([]string, 0, r.workerBufferSize)

// read records from channel and process
lastSentTs := time.Now()
for {
readyToSend := false
select {

case record, ok := <-r.recordsChan:
// check if channel was closed and it is time to exit from worker
if !ok {
// send what is left in buffer
r.Store.AppendToSetPipelined(analyticsKeyName, recordsBuffer)
return
}

// we have new record - prepare it and add to buffer

// If we are obfuscating API Keys, store the hashed representation (config check handled in hashing function)
record.APIKey = storage.HashKey(record.APIKey)

if r.globalConf.SlaveOptions.UseRPC {
// Extend tag list to include this data so wecan segment by node if necessary
record.Tags = append(record.Tags, "tyk-hybrid-rpc")
}

if r.globalConf.DBAppConfOptions.NodeIsSegmented {
// Extend tag list to include this data so we can segment by node if necessary
record.Tags = append(record.Tags, r.globalConf.DBAppConfOptions.Tags...)
}

// Lets add some metadata
if record.APIKey != "" {
record.Tags = append(record.Tags, "key-"+record.APIKey)
}

if record.OrgID != "" {
record.Tags = append(record.Tags, "org-"+record.OrgID)
}

record.Tags = append(record.Tags, "api-"+record.APIID)

if encoded, err := msgpack.Marshal(record); err != nil {
log.WithError(err).Error("Error encoding analytics data")
} else {
recordsBuffer = append(recordsBuffer, string(encoded))
}

// identify that buffer is ready to be sent
readyToSend = uint64(len(recordsBuffer)) == r.workerBufferSize

case <-time.After(recordsBufferFlushInterval):
// nothing was received for that period of time
// anyways send whatever we have, don't hold data too long in buffer
readyToSend = true
}

// send data to Redis and reset buffer
if len(recordsBuffer) > 0 && (readyToSend || time.Since(lastSentTs) >= recordsBufferForcedFlushInterval) {
r.Store.AppendToSetPipelined(analyticsKeyName, recordsBuffer)
recordsBuffer = make([]string, 0, r.workerBufferSize)
lastSentTs = time.Now()
}
}
}
24 changes: 12 additions & 12 deletions analytics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ func TestURLReplacer(t *testing.T) {
globalConf.AnalyticsConfig.NormaliseUrls.CompiledPatternSet = initNormalisationPatterns()
config.SetGlobal(globalConf)

recordUUID1.NormalisePath()
recordUUID2.NormalisePath()
recordUUID3.NormalisePath()
recordUUID4.NormalisePath()
recordID1.NormalisePath()
recordCust.NormalisePath()
recordUUID1.NormalisePath(&globalConf)
recordUUID2.NormalisePath(&globalConf)
recordUUID3.NormalisePath(&globalConf)
recordUUID4.NormalisePath(&globalConf)
recordID1.NormalisePath(&globalConf)
recordCust.NormalisePath(&globalConf)

if recordUUID1.Path != "/{uuid}/search" {
t.Error("Path not altered, is:")
Expand Down Expand Up @@ -105,12 +105,12 @@ func BenchmarkURLReplacer(b *testing.B) {
recordID1 := AnalyticsRecord{Path: "/widgets/123456/getParams"}
recordCust := AnalyticsRecord{Path: "/widgets/123456/getParams/ihatethisstring"}

recordUUID1.NormalisePath()
recordUUID2.NormalisePath()
recordUUID3.NormalisePath()
recordUUID4.NormalisePath()
recordID1.NormalisePath()
recordCust.NormalisePath()
recordUUID1.NormalisePath(&globalConf)
recordUUID2.NormalisePath(&globalConf)
recordUUID3.NormalisePath(&globalConf)
recordUUID4.NormalisePath(&globalConf)
recordID1.NormalisePath(&globalConf)
recordCust.NormalisePath(&globalConf)
}
}

Expand Down
Loading

0 comments on commit 3d38f2d

Please sign in to comment.