Skip to content

Commit

Permalink
Merge c40a6b6 into 5dafcff
Browse files Browse the repository at this point in the history
  • Loading branch information
dencoded committed Sep 27, 2018
2 parents 5dafcff + c40a6b6 commit e66cfe6
Show file tree
Hide file tree
Showing 9 changed files with 294 additions and 187 deletions.
8 changes: 5 additions & 3 deletions api_definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"text/template"
"time"

"github.com/TykTechnologies/tyk/rpc"

"github.com/Sirupsen/logrus"
"github.com/rubyist/circuitbreaker"

Expand Down Expand Up @@ -341,11 +343,11 @@ func (a APIDefinitionLoader) FromDashboardService(endpoint, secret string) ([]*A

// FromCloud will connect and download ApiDefintions from a Mongo DB instance.
func (a APIDefinitionLoader) FromRPC(orgId string) []*APISpec {
if rpcEmergencyMode {
if rpc.IsRPCEmergencyMode() {
return LoadDefinitionsFromRPCBackup()
}

store := RPCStorageHandler{UserKey: config.Global().SlaveOptions.APIKey, Address: config.Global().SlaveOptions.ConnectionString}
store := rpc.RPCStorageHandler{SlaveOptions: config.Global().SlaveOptions}
if !store.Connect() {
return nil
}
Expand All @@ -361,7 +363,7 @@ func (a APIDefinitionLoader) FromRPC(orgId string) []*APISpec {

//store.Disconnect()

if rpcLoadCount > 0 {
if rpc.GetRPCLoadCount() > 0 {
saveRPCDefinitionsBackup(apiCollection)
}

Expand Down
8 changes: 5 additions & 3 deletions api_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"strings"
"time"

"github.com/TykTechnologies/tyk/rpc"

"github.com/Sirupsen/logrus"
"github.com/gorilla/mux"
"github.com/justinas/alice"
Expand All @@ -35,12 +37,12 @@ type ChainObject struct {
Subrouter *mux.Router
}

func prepareStorage() (storage.RedisCluster, storage.RedisCluster, storage.RedisCluster, *RPCStorageHandler, *RPCStorageHandler) {
func prepareStorage() (storage.RedisCluster, storage.RedisCluster, storage.RedisCluster, *rpc.RPCStorageHandler, *rpc.RPCStorageHandler) {
redisStore := storage.RedisCluster{KeyPrefix: "apikey-", HashKeys: config.Global().HashKeys}
redisOrgStore := storage.RedisCluster{KeyPrefix: "orgkey."}
healthStore := storage.RedisCluster{KeyPrefix: "apihealth."}
rpcAuthStore := RPCStorageHandler{KeyPrefix: "apikey-", HashKeys: config.Global().HashKeys, UserKey: config.Global().SlaveOptions.APIKey, Address: config.Global().SlaveOptions.ConnectionString}
rpcOrgStore := RPCStorageHandler{KeyPrefix: "orgkey.", UserKey: config.Global().SlaveOptions.APIKey, Address: config.Global().SlaveOptions.ConnectionString}
rpcAuthStore := rpc.RPCStorageHandler{KeyPrefix: "apikey-", HashKeys: config.Global().HashKeys, SlaveOptions: config.Global().SlaveOptions}
rpcOrgStore := rpc.RPCStorageHandler{KeyPrefix: "orgkey.", SlaveOptions: config.Global().SlaveOptions}

FallbackKeySesionManager.Init(&redisStore)

Expand Down
63 changes: 44 additions & 19 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/TykTechnologies/tyk/config"
logger "github.com/TykTechnologies/tyk/log"
"github.com/TykTechnologies/tyk/regexp"
"github.com/TykTechnologies/tyk/rpc"
"github.com/TykTechnologies/tyk/storage"
"github.com/TykTechnologies/tyk/user"
)
Expand All @@ -62,7 +63,7 @@ var (
DefaultQuotaStore DefaultSessionManager
FallbackKeySesionManager = SessionHandler(&DefaultSessionManager{})
MonitoringHandler config.TykEventHandler
RPCListener RPCStorageHandler
RPCListener rpc.RPCStorageHandler
DashService DashboardServiceSender
CertificateManager *certs.CertificateManager
NewRelicApplication newrelic.Application
Expand Down Expand Up @@ -137,6 +138,14 @@ func setupGlobals() {
healthCheckStore := storage.RedisCluster{KeyPrefix: "host-checker:"}
InitHostCheckManager(healthCheckStore)

// Get the notifier ready
mainLog.Debug("Notifier will not work in hybrid mode")
mainNotifierStore := storage.RedisCluster{}
mainNotifierStore.Connect()
MainNotifier = RedisNotifier{mainNotifierStore, RedisPubSubChannel}

setupRPC()

if config.Global().EnableAnalytics && analytics.Store == nil {
globalConf := config.Global()
globalConf.LoadIgnoredIPs()
Expand All @@ -158,7 +167,12 @@ func setupGlobals() {

rpcPurgeOnce.Do(func() {
store := storage.RedisCluster{KeyPrefix: "analytics-"}
purger := RPCPurger{Store: &store}
purger := rpc.RPCPurger{
Store: &store,
RecordFunc: func() interface{} {
return AnalyticsRecord{}
},
}
purger.Connect()
go purger.PurgeLoop(rpcPurgeTicker)
})
Expand All @@ -175,12 +189,6 @@ func setupGlobals() {
}
}

// Get the notifier ready
mainLog.Debug("Notifier will not work in hybrid mode")
mainNotifierStore := storage.RedisCluster{}
mainNotifierStore.Connect()
MainNotifier = RedisNotifier{mainNotifierStore, RedisPubSubChannel}

if config.Global().Monitor.EnableTriggerMonitors {
h := &WebHookHandler{}
if err := h.Init(config.Global().Monitor.Config); err != nil {
Expand Down Expand Up @@ -854,8 +862,8 @@ func afterConfSetup(conf *config.Config) {
conf.SlaveOptions.PingTimeout = 60
}

GlobalRPCPingTimeout = time.Second * time.Duration(conf.SlaveOptions.PingTimeout)
GlobalRPCCallTimeout = time.Second * time.Duration(conf.SlaveOptions.CallTimeout)
rpc.GlobalRPCPingTimeout = time.Second * time.Duration(conf.SlaveOptions.PingTimeout)
rpc.GlobalRPCCallTimeout = time.Second * time.Duration(conf.SlaveOptions.CallTimeout)
initGenericEventHandlers(conf)
regexp.ResetCache(time.Second*time.Duration(conf.RegexpCacheExpire), !conf.DisableRegexpCache)
}
Expand All @@ -877,11 +885,10 @@ func getHostDetails() {

func getGlobalStorageHandler(keyPrefix string, hashKeys bool) storage.Handler {
if config.Global().SlaveOptions.UseRPC {
return &RPCStorageHandler{
KeyPrefix: keyPrefix,
HashKeys: hashKeys,
UserKey: config.Global().SlaveOptions.APIKey,
Address: config.Global().SlaveOptions.ConnectionString,
return &rpc.RPCStorageHandler{
KeyPrefix: keyPrefix,
HashKeys: hashKeys,
SlaveOptions: config.Global().SlaveOptions,
}
}
return storage.RedisCluster{KeyPrefix: keyPrefix, HashKeys: hashKeys}
Expand Down Expand Up @@ -1075,10 +1082,9 @@ func start() {

if slaveOptions := config.Global().SlaveOptions; slaveOptions.UseRPC {
mainLog.Debug("Starting RPC reload listener")
RPCListener = RPCStorageHandler{
RPCListener = rpc.RPCStorageHandler{
KeyPrefix: "rpc.listener.",
UserKey: slaveOptions.APIKey,
Address: slaveOptions.ConnectionString,
SlaveOptions: slaveOptions,
SuppressRegister: true,
}

Expand All @@ -1094,6 +1100,25 @@ func start() {
go reloadQueueLoop()
}

func setupRPC() {
if !config.Global().SlaveOptions.UseRPC {
return
}

log.Info("Preparing RPC client sub-system")

// prepare RPC package
rpc.Log = log
rpc.Instrument = instrument
rpc.MainNotifier = &MainNotifier
rpc.SessionCache = SessionCache
rpc.ReloadCallback = doReload
rpc.ReloadURLStructureCallback = reloadURLStructure
rpc.HandleAddKeyCallback = handleAddKey
rpc.HandleDeleteKeyCallback = handleDeleteKey
rpc.HandleDeleteHashedKeyCallback = handleDeleteHashedKey
}

func generateListener(listenPort int) (net.Listener, error) {
listenAddress := config.Global().ListenAddress

Expand Down Expand Up @@ -1337,7 +1362,7 @@ func listen(listener, controlListener net.Listener, err error) {
fmt.Fprintf(w, "Hello Tiki")
})

if !rpcEmergencyMode {
if !rpc.IsRPCEmergencyMode() {
doReload()
}
}
6 changes: 4 additions & 2 deletions policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"os"
"time"

"github.com/TykTechnologies/tyk/rpc"

"github.com/Sirupsen/logrus"

"github.com/TykTechnologies/tyk/config"
Expand Down Expand Up @@ -157,11 +159,11 @@ func parsePoliciesFromRPC(list string) (map[string]user.Policy, error) {
}

func LoadPoliciesFromRPC(orgId string) map[string]user.Policy {
if rpcEmergencyMode {
if rpc.IsRPCEmergencyMode() {
return LoadPoliciesFromRPCBackup()
}

store := &RPCStorageHandler{UserKey: config.Global().SlaveOptions.APIKey, Address: config.Global().SlaveOptions.ConnectionString}
store := &rpc.RPCStorageHandler{SlaveOptions: config.Global().SlaveOptions}
if !store.Connect() {
return nil
}
Expand Down
38 changes: 38 additions & 0 deletions redis_analytics_purger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package main

import (
"time"

"github.com/TykTechnologies/tyk/config"
"github.com/TykTechnologies/tyk/storage"
)

// Purger is an interface that will define how the in-memory store will be purged
// of analytics data to prevent it growing too large
type Purger interface {
PurgeCache()
PurgeLoop(<-chan time.Time)
}

type RedisPurger struct {
Store storage.Handler
}

func (r RedisPurger) PurgeLoop(ticker <-chan time.Time) {
for {
<-ticker
r.PurgeCache()
}
}

func (r *RedisPurger) PurgeCache() {
expireAfter := config.Global().AnalyticsConfig.StorageExpirationTime
if expireAfter == 0 {
expireAfter = 60 // 1 minute
}

exp, _ := r.Store.GetExp(analyticsKeyName)
if exp <= 0 {
r.Store.SetExp(analyticsKeyName, int64(expireAfter))
}
}
4 changes: 3 additions & 1 deletion redis_signals.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"sync"
"time"

"github.com/TykTechnologies/tyk/rpc"

"github.com/garyburd/redigo/redis"

"github.com/TykTechnologies/goverify"
Expand Down Expand Up @@ -123,7 +125,7 @@ func handleKeySpaceEventCacheFlush(payload string) {
key = splitKeys[0]
}

RPCGlobalCache.Delete("apikey-" + key)
rpc.RPCGlobalCache.Delete("apikey-" + key)
SessionCache.Delete(key)
}
}
Expand Down
52 changes: 12 additions & 40 deletions rpc_analytics_purger.go → rpc/rpc_analytics_purger.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,27 @@
package main
package rpc

import (
"encoding/json"
"time"

"gopkg.in/vmihailenco/msgpack.v2"

"github.com/TykTechnologies/tyk/config"
"github.com/TykTechnologies/tyk/storage"
)

// Purger is an interface that will define how the in-memory store will be purged
// of analytics data to prevent it growing too large
type Purger interface {
PurgeCache()
PurgeLoop(<-chan time.Time)
}
const analyticsKeyName = "tyk-system-analytics"

// RPCPurger will purge analytics data into a Mongo database, requires that the Mongo DB string is specified
// in the Config object
type RPCPurger struct {
Store storage.Handler
Store storage.Handler
RecordFunc func() interface{}
}

// Connect Connects to RPC
func (r *RPCPurger) Connect() {
if RPCClientIsConnected && RPCCLientSingleton != nil && RPCFuncClientSingleton != nil {
log.Info("RPC Analytics client using singleton")
Log.Info("RPC Analytics client using singleton")
return
}
}
Expand All @@ -43,59 +38,36 @@ func (r RPCPurger) PurgeLoop(ticker <-chan time.Time) {
// PurgeCache will pull the data from the in-memory store and drop it into the specified MongoDB collection
func (r *RPCPurger) PurgeCache() {
if _, err := RPCFuncClientSingleton.Call("Ping", nil); err != nil {
log.Error("Can't purge cache, failed to ping RPC: ", err)
Log.Error("Can't purge cache, failed to ping RPC: ", err)
return
}

analyticsValues := r.Store.GetAndDeleteSet(analyticsKeyName)
if len(analyticsValues) == 0 {
return
}
keys := make([]AnalyticsRecord, len(analyticsValues))
keys := make([]interface{}, len(analyticsValues))

for i, v := range analyticsValues {
decoded := AnalyticsRecord{}
decoded := r.RecordFunc()
if err := msgpack.Unmarshal(v.([]byte), &decoded); err != nil {
log.Error("Couldn't unmarshal analytics data: ", err)
Log.Error("Couldn't unmarshal analytics data: ", err)
} else {
log.Debug("Decoded Record: ", decoded)
Log.Debug("Decoded Record: ", decoded)
keys[i] = decoded
}
}

data, err := json.Marshal(keys)
if err != nil {
log.Error("Failed to marshal analytics data")
Log.Error("Failed to marshal analytics data")
return
}

// Send keys to RPC
if _, err := RPCFuncClientSingleton.Call("PurgeAnalyticsData", string(data)); err != nil {
emitRPCErrorEvent(rpcFuncClientSingletonCall, "PurgeAnalyticsData", err)
log.Warn("Failed to call purge, retrying: ", err)
}

}

type RedisPurger struct {
Store storage.Handler
}

func (r RedisPurger) PurgeLoop(ticker <-chan time.Time) {
for {
<-ticker
r.PurgeCache()
Log.Warn("Failed to call purge, retrying: ", err)
}
}

func (r *RedisPurger) PurgeCache() {
expireAfter := config.Global().AnalyticsConfig.StorageExpirationTime
if expireAfter == 0 {
expireAfter = 60 // 1 minute
}

exp, _ := r.Store.GetExp(analyticsKeyName)
if exp <= 0 {
r.Store.SetExp(analyticsKeyName, int64(expireAfter))
}
}

0 comments on commit e66cfe6

Please sign in to comment.