diff --git a/.gitignore b/.gitignore index 31351679099..94f7bb3dead 100644 --- a/.gitignore +++ b/.gitignore @@ -47,5 +47,6 @@ petstore.json coprocess_gen_test.go session_state_gen_test.go __pycache__/ +tyk.test tyk_linux_* diff --git a/analytics.go b/analytics.go index c0087f33e5d..cdb2478fea2 100644 --- a/analytics.go +++ b/analytics.go @@ -4,9 +4,10 @@ import ( "fmt" "net" "regexp" + "sync" + "sync/atomic" "time" - "github.com/jeffail/tunny" "github.com/oschwald/maxminddb-golang" "gopkg.in/vmihailenco/msgpack.v2" @@ -62,10 +63,13 @@ type GeoData struct { const analyticsKeyName = "tyk-system-analytics" +const ( + minRecordsBufferSize = 1000 + minRecordsWriteBatchSize = 200 + recordsBufferFlushInterval = 200 * time.Millisecond +) + func (a *AnalyticsRecord) GetGeo(ipStr string) { - if !config.Global().AnalyticsConfig.EnableGeoIP { - return - } // Not great, tightly coupled if analytics.GeoIPDB == nil { return @@ -145,18 +149,21 @@ func (a *AnalyticsRecord) SetExpiry(expiresInSeconds int64) { // RedisAnalyticsHandler will record analytics data to a redis back end // as defined in the Config object type RedisAnalyticsHandler struct { - Store storage.Handler - Clean Purger - GeoIPDB *maxminddb.Reader - - AnalyticsPool *tunny.WorkPool + Store storage.Handler + Clean Purger + GeoIPDB *maxminddb.Reader + globalConf config.Config + recordsChan chan *AnalyticsRecord + recordsBufferSize uint64 + shouldStop uint32 + poolWg sync.WaitGroup } -func (r *RedisAnalyticsHandler) Init() { - var err error - if config.Global().AnalyticsConfig.EnableGeoIP { - db, err := maxminddb.Open(config.Global().AnalyticsConfig.GeoIPDBLocation) - if err != nil { +func (r *RedisAnalyticsHandler) Init(globalConf config.Config) { + r.globalConf = globalConf + + if r.globalConf.AnalyticsConfig.EnableGeoIP { + if db, err := maxminddb.Open(r.globalConf.AnalyticsConfig.GeoIPDBLocation); err != nil { log.Error("Failed to init GeoIP Database: ", err) } else { r.GeoIPDB = db @@ -165,53 +172,120 @@ func (r *RedisAnalyticsHandler) Init() { analytics.Store.Connect() - ps := config.Global().AnalyticsConfig.PoolSize + ps := r.globalConf.AnalyticsConfig.PoolSize if ps == 0 { ps = 50 } + log.WithField("ps", ps).Debug("Analytics pool workers number") - r.AnalyticsPool, err = tunny.CreatePoolGeneric(ps).Open() - if err != nil { - log.Error("Failed to init analytics pool") + r.recordsBufferSize = r.globalConf.AnalyticsConfig.RecordsBufferSize + if r.recordsBufferSize < minRecordsBufferSize { + r.recordsBufferSize = minRecordsBufferSize // force it to this value } -} - -// RecordHit will store an AnalyticsRecord in Redis -func (r *RedisAnalyticsHandler) RecordHit(record AnalyticsRecord) error { - r.AnalyticsPool.SendWork(func() { - // If we are obfuscating API Keys, store the hashed representation (config check handled in hashing function) - record.APIKey = storage.HashKey(record.APIKey) - - if config.Global().SlaveOptions.UseRPC { - // Extend tag list to include this data so wecan segment by node if necessary - record.Tags = append(record.Tags, "tyk-hybrid-rpc") - } + log.WithField("recordsBufferSize", r.recordsBufferSize).Debug("Analytics records buffer size") - if config.Global().DBAppConfOptions.NodeIsSegmented { - // Extend tag list to include this data so wecan segment by node if necessary - record.Tags = append(record.Tags, config.Global().DBAppConfOptions.Tags...) - } + chanSize := uint64(ps) * r.recordsBufferSize + log.WithField("chanSize", chanSize).Debug("Analytics channel size") + r.recordsChan = make(chan *AnalyticsRecord, chanSize) - // Lets add some metadata - if record.APIKey != "" { - record.Tags = append(record.Tags, "key-"+record.APIKey) - } + // start worker pool + atomic.SwapUint32(&r.shouldStop, 0) + for i := 0; i < ps; i++ { + r.poolWg.Add(1) + go r.recordWorker() + } +} - if record.OrgID != "" { - record.Tags = append(record.Tags, "org-"+record.OrgID) - } +func (r *RedisAnalyticsHandler) Stop() { + // flag to stop sending records into channel + atomic.SwapUint32(&r.shouldStop, 1) - record.Tags = append(record.Tags, "api-"+record.APIID) + // close channel to stop workers + close(r.recordsChan) - encoded, err := msgpack.Marshal(record) + // wait for all workers to be done + r.poolWg.Wait() +} - if err != nil { - log.Error("Error encoding analytics data: ", err) - } +// RecordHit will store an AnalyticsRecord in Redis +func (r *RedisAnalyticsHandler) RecordHit(record *AnalyticsRecord) error { + // check if we should stop sending records 1st + if atomic.LoadUint32(&r.shouldStop) > 0 { + return nil + } - r.Store.AppendToSet(analyticsKeyName, string(encoded)) - }) + // just send record to channel consumed by pool of workers + // leave all data crunching and Redis I/O work for pool workers + r.recordsChan <- record return nil +} + +func (r *RedisAnalyticsHandler) recordWorker() { + defer r.poolWg.Done() + + // this is buffer to send one pipelined command to redis + // use minRecordsWriteBatchSize as cap to reduce slice re-allocations + recordsBuffer := make([]string, 0, minRecordsWriteBatchSize) + + // read records from channel and process + for { + readyToSend := false + select { + + case record, ok := <-r.recordsChan: + // check if channel was closed and it is time to exit from worker + if !ok { + // send what is left in buffer + r.Store.AppendToSetPipelined(analyticsKeyName, recordsBuffer) + return + } + + // we have new record - prepare it and add to buffer + + // If we are obfuscating API Keys, store the hashed representation (config check handled in hashing function) + record.APIKey = storage.HashKey(record.APIKey) + + if r.globalConf.SlaveOptions.UseRPC { + // Extend tag list to include this data so wecan segment by node if necessary + record.Tags = append(record.Tags, "tyk-hybrid-rpc") + } + + if r.globalConf.DBAppConfOptions.NodeIsSegmented { + // Extend tag list to include this data so we can segment by node if necessary + record.Tags = append(record.Tags, r.globalConf.DBAppConfOptions.Tags...) + } + + // Lets add some metadata + if record.APIKey != "" { + record.Tags = append(record.Tags, "key-"+record.APIKey) + } + + if record.OrgID != "" { + record.Tags = append(record.Tags, "org-"+record.OrgID) + } + + record.Tags = append(record.Tags, "api-"+record.APIID) + + if encoded, err := msgpack.Marshal(record); err != nil { + log.WithError(err).Error("Error encoding analytics data") + } else { + recordsBuffer = append(recordsBuffer, string(encoded)) + } + + // identify that buffer is ready to be sent + readyToSend = uint64(len(recordsBuffer)) == minRecordsWriteBatchSize + + case <-time.After(recordsBufferFlushInterval): + // nothing was received for that period of time + // anyways send whatever we have, don't hold data too long in buffer + readyToSend = true + } + // send data to Redis and reset buffer + if readyToSend && len(recordsBuffer) > 0 { + r.Store.AppendToSetPipelined(analyticsKeyName, recordsBuffer) + recordsBuffer = make([]string, 0, minRecordsWriteBatchSize) + } + } } diff --git a/api_definition.go b/api_definition.go index 9afc6bd1ba7..a889ffe5982 100644 --- a/api_definition.go +++ b/api_definition.go @@ -164,6 +164,21 @@ var ServiceNonce string // keyed to the Api version name, which is determined during routing to speed up lookups func (a APIDefinitionLoader) MakeSpec(def *apidef.APIDefinition) *APISpec { spec := &APISpec{} + + // parse version expiration time stamps + for key, ver := range def.VersionData.Versions { + if ver.Expires == "" || ver.Expires == "-1" { + continue + } + // calculate the time + if t, err := time.Parse("2006-01-02 15:04", ver.Expires); err != nil { + log.WithError(err).WithField("Expires", ver.Expires).Error("Could not parse expiry date for API") + } else { + ver.ExpiresTs = t + def.VersionData.Versions[key] = ver + } + } + spec.APIDefinition = def // We'll push the default HealthChecker: @@ -873,18 +888,16 @@ func (a *APISpec) URLAllowedAndIgnored(r *http.Request, rxPaths []URLSpec, white if v.MethodActions != nil { // We are using an extended path set, check for the method methodMeta, matchMethodOk := v.MethodActions[r.Method] - if !matchMethodOk { continue } - // Matched the method, check what status it is: - if methodMeta.Action == apidef.NoAction { - // NoAction status means we're not treating this request in any special or exceptional way - return a.getURLStatus(v.Status), nil - } + // Matched the method, check what status it is // TODO: Extend here for additional reply options switch methodMeta.Action { + case apidef.NoAction: + // NoAction status means we're not treating this request in any special or exceptional way + return a.getURLStatus(v.Status), nil case apidef.Reply: return StatusRedirectFlowByReply, &methodMeta default: @@ -1033,7 +1046,7 @@ func (a *APISpec) getVersionFromRequest(r *http.Request) string { return r.URL.Query().Get(a.VersionDefinition.Key) case "url": - uPath := strings.Replace(r.URL.Path, a.Proxy.ListenPath, "", 1) + uPath := r.URL.Path[len(a.Proxy.ListenPath):] // First non-empty part of the path is the version ID for _, part := range strings.Split(uPath, "/") { if part != "" { @@ -1057,16 +1070,15 @@ func (a *APISpec) VersionExpired(versionDef *apidef.VersionInfo) (bool, *time.Ti return false, nil } - // otherwise - calculate the time - t, err := time.Parse("2006-01-02 15:04", versionDef.Expires) - if err != nil { - log.Error("Could not parse expiry date for API, dissallow: ", err) + // otherwise use parsed timestamp + if versionDef.ExpiresTs.IsZero() { + log.Error("Could not parse expiry date for API, disallow") return true, nil } // It's in the past, expire // It's in the future, keep going - return time.Since(t) >= 0, &t + return time.Since(versionDef.ExpiresTs) >= 0, &versionDef.ExpiresTs } // RequestValid will check if an incoming request has valid version @@ -1124,17 +1136,17 @@ func (a *APISpec) Version(r *http.Request) (*apidef.VersionInfo, []URLSpec, bool } else { // Extract Version Info // First checking for if default version is set - vname := a.getVersionFromRequest(r) - if vname == "" && a.VersionData.DefaultVersion != "" { - vname = a.VersionData.DefaultVersion + vName := a.getVersionFromRequest(r) + if vName == "" { + if a.VersionData.DefaultVersion == "" { + return &version, nil, false, VersionNotFound + } + vName = a.VersionData.DefaultVersion ctxSetDefaultVersion(r) } - if vname == "" && a.VersionData.DefaultVersion == "" { - return &version, nil, false, VersionNotFound - } // Load Version Data - General var ok bool - if version, ok = a.VersionData.Versions[vname]; !ok { + if version, ok = a.VersionData.Versions[vName]; !ok { return &version, nil, false, VersionDoesNotExist } } @@ -1145,13 +1157,12 @@ func (a *APISpec) Version(r *http.Request) (*apidef.VersionInfo, []URLSpec, bool // Load path data and whitelist data for version rxPaths, rxOk := a.RxPaths[version.Name] - whiteListStatus, wlOk := a.WhiteListEnabled[version.Name] - if !rxOk { log.Error("no RX Paths found for version ", version.Name) return &version, nil, false, VersionDoesNotExist } + whiteListStatus, wlOk := a.WhiteListEnabled[version.Name] if !wlOk { log.Error("No whitelist data found") return &version, nil, false, VersionWhiteListStatusNotFound diff --git a/apidef/api_definitions.go b/apidef/api_definitions.go index d87f7d59cae..af63ae75869 100644 --- a/apidef/api_definitions.go +++ b/apidef/api_definitions.go @@ -8,6 +8,8 @@ import ( "github.com/lonelycode/osin" "gopkg.in/mgo.v2/bson" + "time" + "github.com/TykTechnologies/gojsonschema" ) @@ -207,9 +209,10 @@ type ExtendedPathsSet struct { } type VersionInfo struct { - Name string `bson:"name" json:"name"` - Expires string `bson:"expires" json:"expires"` - Paths struct { + Name string `bson:"name" json:"name"` + Expires string `bson:"expires" json:"expires"` + ExpiresTs time.Time `bson:"-" json:"-"` + Paths struct { Ignored []string `bson:"ignored" json:"ignored"` WhiteList []string `bson:"white_list" json:"white_list"` BlackList []string `bson:"black_list" json:"black_list"` diff --git a/config/config.go b/config/config.go index f5a22950c23..8672b74fda1 100644 --- a/config/config.go +++ b/config/config.go @@ -71,6 +71,7 @@ type AnalyticsConfigConfig struct { GeoIPDBLocation string `json:"geo_ip_db_path"` NormaliseUrls NormalisedURLConfig `json:"normalise_urls"` PoolSize int `json:"pool_size"` + RecordsBufferSize uint64 `json:"records_buffer_size"` StorageExpirationTime int `json:"storage_expiration_time"` ignoredIPsCompiled map[string]bool } diff --git a/gateway_test.go b/gateway_test.go index e8d6d5369d7..9ac20cdb4ac 100644 --- a/gateway_test.go +++ b/gateway_test.go @@ -592,6 +592,8 @@ func TestAnalytics(t *testing.T) { }) // Cleanup before test + // let records to to be sent + time.Sleep(250 * time.Millisecond) analytics.Store.GetAndDeleteSet(analyticsKeyName) t.Run("Log errors", func(t *testing.T) { @@ -600,6 +602,9 @@ func TestAnalytics(t *testing.T) { {Path: "/", Code: 401}, }...) + // let records to to be sent + time.Sleep(250 * time.Millisecond) + results := analytics.Store.GetAndDeleteSet(analyticsKeyName) if len(results) != 2 { t.Error("Should return 2 record", len(results)) @@ -623,6 +628,9 @@ func TestAnalytics(t *testing.T) { Path: "/", Headers: authHeaders, Code: 200, }) + // let records to to be sent + time.Sleep(250 * time.Millisecond) + results := analytics.Store.GetAndDeleteSet(analyticsKeyName) if len(results) != 1 { t.Error("Should return 1 record: ", len(results)) diff --git a/handler_error.go b/handler_error.go index 382dc24348a..05a6a928386 100644 --- a/handler_error.go +++ b/handler_error.go @@ -103,13 +103,12 @@ func (e *ErrorHandler) HandleError(w http.ResponseWriter, r *http.Request, errMs } oauthClientID := "" - tags := make([]string, 0) session := ctxGetSession(r) - + tags := make([]string, 0, estimateTagsCapacity(session, e.Spec)) if session != nil { oauthClientID = session.OauthClientID alias = session.Alias - tags = session.Tags + tags = append(tags, session.Tags...) } if len(e.Spec.TagHeaders) > 0 { @@ -159,10 +158,12 @@ func (e *ErrorHandler) HandleError(w http.ResponseWriter, r *http.Request, errMs tags, alias, trackEP, - time.Now(), + t, } - record.GetGeo(ip) + if e.Spec.GlobalConfig.AnalyticsConfig.EnableGeoIP { + record.GetGeo(ip) + } expiresAfter := e.Spec.ExpireAnalyticsAfter if e.Spec.GlobalConfig.EnforceOrgDataAge { @@ -179,7 +180,7 @@ func (e *ErrorHandler) HandleError(w http.ResponseWriter, r *http.Request, errMs record.NormalisePath() } - go analytics.RecordHit(record) + analytics.RecordHit(&record) } // Report in health check diff --git a/handler_success.go b/handler_success.go index 6287d6fee25..3d8b694b2ba 100644 --- a/handler_success.go +++ b/handler_success.go @@ -79,6 +79,21 @@ func addVersionHeader(w http.ResponseWriter, r *http.Request, globalConf config. } } +func estimateTagsCapacity(session *user.SessionState, apiSpec *APISpec) int { + size := 5 // that number of tags expected to be added at least before we record hit + if session != nil { + size += len(session.Tags) + } + + if apiSpec.GlobalConfig.DBAppConfOptions.NodeIsSegmented { + size += len(apiSpec.GlobalConfig.DBAppConfOptions.Tags) + } + + size += len(apiSpec.TagHeaders) + + return size +} + func (s *SuccessHandler) RecordHit(r *http.Request, timing int64, code int, requestCopy *http.Request, responseCopy *http.Response) { if s.Spec.DoNotTrack { @@ -101,13 +116,12 @@ func (s *SuccessHandler) RecordHit(r *http.Request, timing int64, code int, requ // If OAuth, we need to grab it from the session, which may or may not exist oauthClientID := "" - tags := make([]string, 0) var alias string session := ctxGetSession(r) - + tags := make([]string, 0, estimateTagsCapacity(session, s.Spec)) if session != nil { oauthClientID = session.OauthClientID - tags = session.Tags + tags = append(tags, session.Tags...) alias = session.Alias } @@ -171,10 +185,12 @@ func (s *SuccessHandler) RecordHit(r *http.Request, timing int64, code int, requ tags, alias, trackEP, - time.Now(), + t, } - record.GetGeo(ip) + if s.Spec.GlobalConfig.AnalyticsConfig.EnableGeoIP { + record.GetGeo(ip) + } expiresAfter := s.Spec.ExpireAnalyticsAfter if s.Spec.GlobalConfig.EnforceOrgDataAge { @@ -191,7 +207,7 @@ func (s *SuccessHandler) RecordHit(r *http.Request, timing int64, code int, requ record.NormalisePath() } - go analytics.RecordHit(record) + analytics.RecordHit(&record) } // Report in health check diff --git a/ldap_auth_handler.go b/ldap_auth_handler.go index 635d054e7a8..e83c9ca67ee 100644 --- a/ldap_auth_handler.go +++ b/ldap_auth_handler.go @@ -4,7 +4,7 @@ import ( "errors" "strings" - ldap "github.com/mavricknz/ldap" + "github.com/mavricknz/ldap" ) // LDAPStorageHandler implements storage.Handler, this is a read-only implementation to access keys from an LDAP service @@ -177,6 +177,10 @@ func (l LDAPStorageHandler) AppendToSet(keyName, value string) { log.Error("Not implemented") } +func (r *LDAPStorageHandler) AppendToSetPipelined(key string, values []string) { + log.Error("Not implemented") +} + func (l LDAPStorageHandler) RemoveFromSet(keyName, value string) { log.Error("Not implemented") } diff --git a/lint/schema.go b/lint/schema.go index 35dff81610d..edc974e3167 100644 --- a/lint/schema.go +++ b/lint/schema.go @@ -98,6 +98,9 @@ const confSchema = `{ "pool_size": { "type": "integer" }, + "records_buffer_size": { + "type": "integer" + }, "storage_expiration_time": { "type": "integer" }, diff --git a/main.go b/main.go index beb15a27e83..3c7b93f6165 100644 --- a/main.go +++ b/main.go @@ -166,7 +166,7 @@ func setupGlobals() { analyticsStore := storage.RedisCluster{KeyPrefix: "analytics-"} analytics.Store = &analyticsStore - analytics.Init() + analytics.Init(globalConf) redisPurgeOnce.Do(func() { store := storage.RedisCluster{KeyPrefix: "analytics-"} @@ -1050,6 +1050,12 @@ func main() { mainLog.Info("Stop signal received.") + // stop analytics workers + if config.Global().EnableAnalytics && analytics.Store == nil { + analytics.Stop() + } + + // write pprof profiles writeProfiles() if config.Global().UseDBAppConfigs { diff --git a/mw_version_check.go b/mw_version_check.go index f444d595547..8a753ff00f3 100644 --- a/mw_version_check.go +++ b/mw_version_check.go @@ -42,10 +42,13 @@ func (v *VersionCheck) ProcessRequest(w http.ResponseWriter, r *http.Request, _ if !requestValid { // Fire a versioning failure event v.FireEvent(EventVersionFailure, EventVersionFailureMeta{ - EventMetaDefault: EventMetaDefault{Message: "Attempted access to disallowed version / path.", OriginatingRequest: EncodeRequestToEvent(r)}, - Path: r.URL.Path, - Origin: request.RealIP(r), - Reason: string(stat), + EventMetaDefault: EventMetaDefault{ + Message: "Attempted access to disallowed version / path.", + OriginatingRequest: EncodeRequestToEvent(r), + }, + Path: r.URL.Path, + Origin: request.RealIP(r), + Reason: string(stat), }) return errors.New(string(stat)), http.StatusForbidden } diff --git a/rpc_storage_handler.go b/rpc_storage_handler.go index 1ec5f6e79b0..bb7c5bf0a50 100644 --- a/rpc_storage_handler.go +++ b/rpc_storage_handler.go @@ -734,6 +734,14 @@ func (r *RPCStorageHandler) AppendToSet(keyName, value string) { } } +func (r *RPCStorageHandler) AppendToSetPipelined(key string, values []string) { + // just falls back to AppendToSet + // TODO: introduce new RPC method for pipelined operation + for _, val := range values { + r.AppendToSet(key, val) + } +} + // SetScrollingWindow is used in the rate limiter to handle rate limits fairly. func (r *RPCStorageHandler) SetRollingWindow(keyName string, per int64, val string, pipeline bool) (int, []interface{}) { start := time.Now() // get current time diff --git a/storage/redis_cluster.go b/storage/redis_cluster.go index dadd3dfed2b..4562432fea4 100644 --- a/storage/redis_cluster.go +++ b/storage/redis_cluster.go @@ -558,13 +558,38 @@ func (r RedisCluster) GetAndDeleteSet(keyName string) []interface{} { } func (r RedisCluster) AppendToSet(keyName, value string) { - log.Debug("Pushing to raw key list: ", keyName) - log.Debug("Appending to fixed key list: ", r.fixKey(keyName)) + fixedKey := r.fixKey(keyName) + log.WithField("keyName", keyName).Debug("Pushing to raw key list") + log.WithField("fixedKey", fixedKey).Debug("Appending to fixed key list") r.ensureConnection() - _, err := r.singleton().Do("RPUSH", r.fixKey(keyName), value) + if _, err := r.singleton().Do("RPUSH", fixedKey, value); err != nil { + log.WithError(err).Error("Error trying to append to set keys") + } +} - if err != nil { - log.Error("Error trying to delete keys: ", err) +func (r RedisCluster) AppendToSetPipelined(key string, values []string) { + if len(values) == 0 { + return + } + + fixedKey := r.fixKey(key) + + // prepare pipeline data + pipeLine := make([]rediscluster.ClusterTransaction, len(values)) + for index, val := range values { + pipeLine[index] = rediscluster.ClusterTransaction{ + Cmd: "RPUSH", + Args: []interface{}{ + fixedKey, + val, + }, + } + } + + // send pipelined command to Redis + r.ensureConnection() + if _, err := r.singleton().DoPipeline(pipeLine); err != nil { + log.WithError(err).Error("Error trying to append to set keys") } } diff --git a/storage/storage.go b/storage/storage.go index 8c8f5cc2a12..e5c6ac566ce 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -37,6 +37,7 @@ type Handler interface { GetSet(string) (map[string]string, error) AddToSet(string, string) AppendToSet(string, string) + AppendToSetPipelined(string, []string) GetAndDeleteSet(string) []interface{} RemoveFromSet(string, string) DeleteScanMatch(string) bool