Skip to content

Commit

Permalink
mw VersionCheck and analytics optimizations, redis pipelined method a…
Browse files Browse the repository at this point in the history
…dded
  • Loading branch information
dencoded committed Jun 1, 2018
1 parent d546d90 commit 0cda5be
Show file tree
Hide file tree
Showing 15 changed files with 260 additions and 95 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,6 @@ petstore.json
coprocess_gen_test.go
session_state_gen_test.go
__pycache__/
tyk.test

tyk_linux_*
170 changes: 122 additions & 48 deletions analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import (
"fmt"
"net"
"regexp"
"sync"
"sync/atomic"
"time"

"github.com/jeffail/tunny"
"github.com/oschwald/maxminddb-golang"
"gopkg.in/vmihailenco/msgpack.v2"

Expand Down Expand Up @@ -62,10 +63,13 @@ type GeoData struct {

const analyticsKeyName = "tyk-system-analytics"

const (
minRecordsBufferSize = 1000
minRecordsWriteBatchSize = 200
recordsBufferFlushInterval = 200 * time.Millisecond
)

func (a *AnalyticsRecord) GetGeo(ipStr string) {
if !config.Global().AnalyticsConfig.EnableGeoIP {
return
}
// Not great, tightly coupled
if analytics.GeoIPDB == nil {
return
Expand Down Expand Up @@ -145,18 +149,21 @@ func (a *AnalyticsRecord) SetExpiry(expiresInSeconds int64) {
// RedisAnalyticsHandler will record analytics data to a redis back end
// as defined in the Config object
type RedisAnalyticsHandler struct {
Store storage.Handler
Clean Purger
GeoIPDB *maxminddb.Reader

AnalyticsPool *tunny.WorkPool
Store storage.Handler
Clean Purger
GeoIPDB *maxminddb.Reader
globalConf config.Config
recordsChan chan *AnalyticsRecord
recordsBufferSize uint64
shouldStop uint32
poolWg sync.WaitGroup
}

func (r *RedisAnalyticsHandler) Init() {
var err error
if config.Global().AnalyticsConfig.EnableGeoIP {
db, err := maxminddb.Open(config.Global().AnalyticsConfig.GeoIPDBLocation)
if err != nil {
func (r *RedisAnalyticsHandler) Init(globalConf config.Config) {
r.globalConf = globalConf

if r.globalConf.AnalyticsConfig.EnableGeoIP {
if db, err := maxminddb.Open(r.globalConf.AnalyticsConfig.GeoIPDBLocation); err != nil {
log.Error("Failed to init GeoIP Database: ", err)
} else {
r.GeoIPDB = db
Expand All @@ -165,53 +172,120 @@ func (r *RedisAnalyticsHandler) Init() {

analytics.Store.Connect()

ps := config.Global().AnalyticsConfig.PoolSize
ps := r.globalConf.AnalyticsConfig.PoolSize
if ps == 0 {
ps = 50
}
log.WithField("ps", ps).Debug("Analytics pool workers number")

r.AnalyticsPool, err = tunny.CreatePoolGeneric(ps).Open()
if err != nil {
log.Error("Failed to init analytics pool")
r.recordsBufferSize = r.globalConf.AnalyticsConfig.RecordsBufferSize
if r.recordsBufferSize < minRecordsBufferSize {
r.recordsBufferSize = minRecordsBufferSize // force it to this value
}
}

// RecordHit will store an AnalyticsRecord in Redis
func (r *RedisAnalyticsHandler) RecordHit(record AnalyticsRecord) error {
r.AnalyticsPool.SendWork(func() {
// If we are obfuscating API Keys, store the hashed representation (config check handled in hashing function)
record.APIKey = storage.HashKey(record.APIKey)

if config.Global().SlaveOptions.UseRPC {
// Extend tag list to include this data so wecan segment by node if necessary
record.Tags = append(record.Tags, "tyk-hybrid-rpc")
}
log.WithField("recordsBufferSize", r.recordsBufferSize).Debug("Analytics records buffer size")

if config.Global().DBAppConfOptions.NodeIsSegmented {
// Extend tag list to include this data so wecan segment by node if necessary
record.Tags = append(record.Tags, config.Global().DBAppConfOptions.Tags...)
}
chanSize := uint64(ps) * r.recordsBufferSize
log.WithField("chanSize", chanSize).Debug("Analytics channel size")
r.recordsChan = make(chan *AnalyticsRecord, chanSize)

// Lets add some metadata
if record.APIKey != "" {
record.Tags = append(record.Tags, "key-"+record.APIKey)
}
// start worker pool
atomic.SwapUint32(&r.shouldStop, 0)
for i := 0; i < ps; i++ {
r.poolWg.Add(1)
go r.recordWorker()
}
}

if record.OrgID != "" {
record.Tags = append(record.Tags, "org-"+record.OrgID)
}
func (r *RedisAnalyticsHandler) Stop() {
// flag to stop sending records into channel
atomic.SwapUint32(&r.shouldStop, 1)

record.Tags = append(record.Tags, "api-"+record.APIID)
// close channel to stop workers
close(r.recordsChan)

encoded, err := msgpack.Marshal(record)
// wait for all workers to be done
r.poolWg.Wait()
}

if err != nil {
log.Error("Error encoding analytics data: ", err)
}
// RecordHit will store an AnalyticsRecord in Redis
func (r *RedisAnalyticsHandler) RecordHit(record *AnalyticsRecord) error {
// check if we should stop sending records 1st
if atomic.LoadUint32(&r.shouldStop) > 0 {
return nil
}

r.Store.AppendToSet(analyticsKeyName, string(encoded))
})
// just send record to channel consumed by pool of workers
// leave all data crunching and Redis I/O work for pool workers
r.recordsChan <- record

return nil
}

func (r *RedisAnalyticsHandler) recordWorker() {
defer r.poolWg.Done()

// this is buffer to send one pipelined command to redis
// use minRecordsWriteBatchSize as cap to reduce slice re-allocations
recordsBuffer := make([]string, 0, minRecordsWriteBatchSize)

// read records from channel and process
for {
readyToSend := false
select {

case record, ok := <-r.recordsChan:
// check if channel was closed and it is time to exit from worker
if !ok {
// send what is left in buffer
r.Store.AppendToSetPipelined(analyticsKeyName, recordsBuffer)
return
}

// we have new record - prepare it and add to buffer

// If we are obfuscating API Keys, store the hashed representation (config check handled in hashing function)
record.APIKey = storage.HashKey(record.APIKey)

if r.globalConf.SlaveOptions.UseRPC {
// Extend tag list to include this data so wecan segment by node if necessary
record.Tags = append(record.Tags, "tyk-hybrid-rpc")
}

if r.globalConf.DBAppConfOptions.NodeIsSegmented {
// Extend tag list to include this data so we can segment by node if necessary
record.Tags = append(record.Tags, r.globalConf.DBAppConfOptions.Tags...)
}

// Lets add some metadata
if record.APIKey != "" {
record.Tags = append(record.Tags, "key-"+record.APIKey)
}

if record.OrgID != "" {
record.Tags = append(record.Tags, "org-"+record.OrgID)
}

record.Tags = append(record.Tags, "api-"+record.APIID)

if encoded, err := msgpack.Marshal(record); err != nil {
log.WithError(err).Error("Error encoding analytics data")
} else {
recordsBuffer = append(recordsBuffer, string(encoded))
}

// identify that buffer is ready to be sent
readyToSend = uint64(len(recordsBuffer)) == minRecordsWriteBatchSize

case <-time.After(recordsBufferFlushInterval):
// nothing was received for that period of time
// anyways send whatever we have, don't hold data too long in buffer
readyToSend = true
}

// send data to Redis and reset buffer
if readyToSend && len(recordsBuffer) > 0 {
r.Store.AppendToSetPipelined(analyticsKeyName, recordsBuffer)
recordsBuffer = make([]string, 0, minRecordsWriteBatchSize)
}
}
}
53 changes: 32 additions & 21 deletions api_definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,21 @@ var ServiceNonce string
// keyed to the Api version name, which is determined during routing to speed up lookups
func (a APIDefinitionLoader) MakeSpec(def *apidef.APIDefinition) *APISpec {
spec := &APISpec{}

// parse version expiration time stamps
for key, ver := range def.VersionData.Versions {
if ver.Expires == "" || ver.Expires == "-1" {
continue
}
// calculate the time
if t, err := time.Parse("2006-01-02 15:04", ver.Expires); err != nil {
log.WithError(err).WithField("Expires", ver.Expires).Error("Could not parse expiry date for API")
} else {
ver.ExpiresTs = t
def.VersionData.Versions[key] = ver
}
}

spec.APIDefinition = def

// We'll push the default HealthChecker:
Expand Down Expand Up @@ -873,18 +888,16 @@ func (a *APISpec) URLAllowedAndIgnored(r *http.Request, rxPaths []URLSpec, white
if v.MethodActions != nil {
// We are using an extended path set, check for the method
methodMeta, matchMethodOk := v.MethodActions[r.Method]

if !matchMethodOk {
continue
}

// Matched the method, check what status it is:
if methodMeta.Action == apidef.NoAction {
// NoAction status means we're not treating this request in any special or exceptional way
return a.getURLStatus(v.Status), nil
}
// Matched the method, check what status it is
// TODO: Extend here for additional reply options
switch methodMeta.Action {
case apidef.NoAction:
// NoAction status means we're not treating this request in any special or exceptional way
return a.getURLStatus(v.Status), nil
case apidef.Reply:
return StatusRedirectFlowByReply, &methodMeta
default:
Expand Down Expand Up @@ -1033,7 +1046,7 @@ func (a *APISpec) getVersionFromRequest(r *http.Request) string {
return r.URL.Query().Get(a.VersionDefinition.Key)

case "url":
uPath := strings.Replace(r.URL.Path, a.Proxy.ListenPath, "", 1)
uPath := r.URL.Path[len(a.Proxy.ListenPath):]
// First non-empty part of the path is the version ID
for _, part := range strings.Split(uPath, "/") {
if part != "" {
Expand All @@ -1057,16 +1070,15 @@ func (a *APISpec) VersionExpired(versionDef *apidef.VersionInfo) (bool, *time.Ti
return false, nil
}

// otherwise - calculate the time
t, err := time.Parse("2006-01-02 15:04", versionDef.Expires)
if err != nil {
log.Error("Could not parse expiry date for API, dissallow: ", err)
// otherwise use parsed timestamp
if versionDef.ExpiresTs.IsZero() {
log.Error("Could not parse expiry date for API, disallow")
return true, nil
}

// It's in the past, expire
// It's in the future, keep going
return time.Since(t) >= 0, &t
return time.Since(versionDef.ExpiresTs) >= 0, &versionDef.ExpiresTs
}

// RequestValid will check if an incoming request has valid version
Expand Down Expand Up @@ -1124,17 +1136,17 @@ func (a *APISpec) Version(r *http.Request) (*apidef.VersionInfo, []URLSpec, bool
} else {
// Extract Version Info
// First checking for if default version is set
vname := a.getVersionFromRequest(r)
if vname == "" && a.VersionData.DefaultVersion != "" {
vname = a.VersionData.DefaultVersion
vName := a.getVersionFromRequest(r)
if vName == "" {
if a.VersionData.DefaultVersion == "" {
return &version, nil, false, VersionNotFound
}
vName = a.VersionData.DefaultVersion
ctxSetDefaultVersion(r)
}
if vname == "" && a.VersionData.DefaultVersion == "" {
return &version, nil, false, VersionNotFound
}
// Load Version Data - General
var ok bool
if version, ok = a.VersionData.Versions[vname]; !ok {
if version, ok = a.VersionData.Versions[vName]; !ok {
return &version, nil, false, VersionDoesNotExist
}
}
Expand All @@ -1145,13 +1157,12 @@ func (a *APISpec) Version(r *http.Request) (*apidef.VersionInfo, []URLSpec, bool

// Load path data and whitelist data for version
rxPaths, rxOk := a.RxPaths[version.Name]
whiteListStatus, wlOk := a.WhiteListEnabled[version.Name]

if !rxOk {
log.Error("no RX Paths found for version ", version.Name)
return &version, nil, false, VersionDoesNotExist
}

whiteListStatus, wlOk := a.WhiteListEnabled[version.Name]
if !wlOk {
log.Error("No whitelist data found")
return &version, nil, false, VersionWhiteListStatusNotFound
Expand Down
9 changes: 6 additions & 3 deletions apidef/api_definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/lonelycode/osin"
"gopkg.in/mgo.v2/bson"

"time"

"github.com/TykTechnologies/gojsonschema"
)

Expand Down Expand Up @@ -207,9 +209,10 @@ type ExtendedPathsSet struct {
}

type VersionInfo struct {
Name string `bson:"name" json:"name"`
Expires string `bson:"expires" json:"expires"`
Paths struct {
Name string `bson:"name" json:"name"`
Expires string `bson:"expires" json:"expires"`
ExpiresTs time.Time `bson:"-" json:"-"`
Paths struct {
Ignored []string `bson:"ignored" json:"ignored"`
WhiteList []string `bson:"white_list" json:"white_list"`
BlackList []string `bson:"black_list" json:"black_list"`
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type AnalyticsConfigConfig struct {
GeoIPDBLocation string `json:"geo_ip_db_path"`
NormaliseUrls NormalisedURLConfig `json:"normalise_urls"`
PoolSize int `json:"pool_size"`
RecordsBufferSize uint64 `json:"records_buffer_size"`
StorageExpirationTime int `json:"storage_expiration_time"`
ignoredIPsCompiled map[string]bool
}
Expand Down
8 changes: 8 additions & 0 deletions gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,8 @@ func TestAnalytics(t *testing.T) {
})

// Cleanup before test
// let records to to be sent
time.Sleep(250 * time.Millisecond)
analytics.Store.GetAndDeleteSet(analyticsKeyName)

t.Run("Log errors", func(t *testing.T) {
Expand All @@ -600,6 +602,9 @@ func TestAnalytics(t *testing.T) {
{Path: "/", Code: 401},
}...)

// let records to to be sent
time.Sleep(250 * time.Millisecond)

results := analytics.Store.GetAndDeleteSet(analyticsKeyName)
if len(results) != 2 {
t.Error("Should return 2 record", len(results))
Expand All @@ -623,6 +628,9 @@ func TestAnalytics(t *testing.T) {
Path: "/", Headers: authHeaders, Code: 200,
})

// let records to to be sent
time.Sleep(250 * time.Millisecond)

results := analytics.Store.GetAndDeleteSet(analyticsKeyName)
if len(results) != 1 {
t.Error("Should return 1 record: ", len(results))
Expand Down
Loading

0 comments on commit 0cda5be

Please sign in to comment.