From c40a6b6661e815cf882505b9e836eb00556b5ca8 Mon Sep 17 00:00:00 2001 From: dencoded <33698537+dencoded@users.noreply.github.com> Date: Thu, 27 Sep 2018 01:50:41 -0400 Subject: [PATCH] moved RPC to rpc package --- api_definition.go | 8 +- api_loader.go | 8 +- main.go | 63 ++-- policy.go | 6 +- redis_analytics_purger.go | 38 +++ redis_signals.go | 4 +- .../rpc_analytics_purger.go | 52 +--- .../rpc_storage_handler.go | 278 +++++++++++------- rpc_test.go | 24 +- 9 files changed, 294 insertions(+), 187 deletions(-) create mode 100644 redis_analytics_purger.go rename rpc_analytics_purger.go => rpc/rpc_analytics_purger.go (55%) rename rpc_storage_handler.go => rpc/rpc_storage_handler.go (78%) diff --git a/api_definition.go b/api_definition.go index 4a9a65e7eacc..d5b4d9429c86 100644 --- a/api_definition.go +++ b/api_definition.go @@ -17,6 +17,8 @@ import ( "text/template" "time" + "github.com/TykTechnologies/tyk/rpc" + "github.com/Sirupsen/logrus" "github.com/rubyist/circuitbreaker" @@ -341,11 +343,11 @@ func (a APIDefinitionLoader) FromDashboardService(endpoint, secret string) ([]*A // FromCloud will connect and download ApiDefintions from a Mongo DB instance. func (a APIDefinitionLoader) FromRPC(orgId string) []*APISpec { - if rpcEmergencyMode { + if rpc.IsRPCEmergencyMode() { return LoadDefinitionsFromRPCBackup() } - store := RPCStorageHandler{UserKey: config.Global().SlaveOptions.APIKey, Address: config.Global().SlaveOptions.ConnectionString} + store := rpc.RPCStorageHandler{SlaveOptions: config.Global().SlaveOptions} if !store.Connect() { return nil } @@ -361,7 +363,7 @@ func (a APIDefinitionLoader) FromRPC(orgId string) []*APISpec { //store.Disconnect() - if rpcLoadCount > 0 { + if rpc.GetRPCLoadCount() > 0 { saveRPCDefinitionsBackup(apiCollection) } diff --git a/api_loader.go b/api_loader.go index 86a7c8f185ad..ad02830ca0f0 100644 --- a/api_loader.go +++ b/api_loader.go @@ -12,6 +12,8 @@ import ( "strings" "time" + "github.com/TykTechnologies/tyk/rpc" + "github.com/Sirupsen/logrus" "github.com/gorilla/mux" "github.com/justinas/alice" @@ -35,12 +37,12 @@ type ChainObject struct { Subrouter *mux.Router } -func prepareStorage() (storage.RedisCluster, storage.RedisCluster, storage.RedisCluster, *RPCStorageHandler, *RPCStorageHandler) { +func prepareStorage() (storage.RedisCluster, storage.RedisCluster, storage.RedisCluster, *rpc.RPCStorageHandler, *rpc.RPCStorageHandler) { redisStore := storage.RedisCluster{KeyPrefix: "apikey-", HashKeys: config.Global().HashKeys} redisOrgStore := storage.RedisCluster{KeyPrefix: "orgkey."} healthStore := storage.RedisCluster{KeyPrefix: "apihealth."} - rpcAuthStore := RPCStorageHandler{KeyPrefix: "apikey-", HashKeys: config.Global().HashKeys, UserKey: config.Global().SlaveOptions.APIKey, Address: config.Global().SlaveOptions.ConnectionString} - rpcOrgStore := RPCStorageHandler{KeyPrefix: "orgkey.", UserKey: config.Global().SlaveOptions.APIKey, Address: config.Global().SlaveOptions.ConnectionString} + rpcAuthStore := rpc.RPCStorageHandler{KeyPrefix: "apikey-", HashKeys: config.Global().HashKeys, SlaveOptions: config.Global().SlaveOptions} + rpcOrgStore := rpc.RPCStorageHandler{KeyPrefix: "orgkey.", SlaveOptions: config.Global().SlaveOptions} FallbackKeySesionManager.Init(&redisStore) diff --git a/main.go b/main.go index 2ec4d3d9e319..49417609bbf0 100644 --- a/main.go +++ b/main.go @@ -44,6 +44,7 @@ import ( "github.com/TykTechnologies/tyk/config" logger "github.com/TykTechnologies/tyk/log" "github.com/TykTechnologies/tyk/regexp" + "github.com/TykTechnologies/tyk/rpc" "github.com/TykTechnologies/tyk/storage" "github.com/TykTechnologies/tyk/user" ) @@ -62,7 +63,7 @@ var ( DefaultQuotaStore DefaultSessionManager FallbackKeySesionManager = SessionHandler(&DefaultSessionManager{}) MonitoringHandler config.TykEventHandler - RPCListener RPCStorageHandler + RPCListener rpc.RPCStorageHandler DashService DashboardServiceSender CertificateManager *certs.CertificateManager NewRelicApplication newrelic.Application @@ -137,6 +138,14 @@ func setupGlobals() { healthCheckStore := storage.RedisCluster{KeyPrefix: "host-checker:"} InitHostCheckManager(healthCheckStore) + // Get the notifier ready + mainLog.Debug("Notifier will not work in hybrid mode") + mainNotifierStore := storage.RedisCluster{} + mainNotifierStore.Connect() + MainNotifier = RedisNotifier{mainNotifierStore, RedisPubSubChannel} + + setupRPC() + if config.Global().EnableAnalytics && analytics.Store == nil { globalConf := config.Global() globalConf.LoadIgnoredIPs() @@ -158,7 +167,12 @@ func setupGlobals() { rpcPurgeOnce.Do(func() { store := storage.RedisCluster{KeyPrefix: "analytics-"} - purger := RPCPurger{Store: &store} + purger := rpc.RPCPurger{ + Store: &store, + RecordFunc: func() interface{} { + return AnalyticsRecord{} + }, + } purger.Connect() go purger.PurgeLoop(rpcPurgeTicker) }) @@ -175,12 +189,6 @@ func setupGlobals() { } } - // Get the notifier ready - mainLog.Debug("Notifier will not work in hybrid mode") - mainNotifierStore := storage.RedisCluster{} - mainNotifierStore.Connect() - MainNotifier = RedisNotifier{mainNotifierStore, RedisPubSubChannel} - if config.Global().Monitor.EnableTriggerMonitors { h := &WebHookHandler{} if err := h.Init(config.Global().Monitor.Config); err != nil { @@ -854,8 +862,8 @@ func afterConfSetup(conf *config.Config) { conf.SlaveOptions.PingTimeout = 60 } - GlobalRPCPingTimeout = time.Second * time.Duration(conf.SlaveOptions.PingTimeout) - GlobalRPCCallTimeout = time.Second * time.Duration(conf.SlaveOptions.CallTimeout) + rpc.GlobalRPCPingTimeout = time.Second * time.Duration(conf.SlaveOptions.PingTimeout) + rpc.GlobalRPCCallTimeout = time.Second * time.Duration(conf.SlaveOptions.CallTimeout) initGenericEventHandlers(conf) regexp.ResetCache(time.Second*time.Duration(conf.RegexpCacheExpire), !conf.DisableRegexpCache) } @@ -877,11 +885,10 @@ func getHostDetails() { func getGlobalStorageHandler(keyPrefix string, hashKeys bool) storage.Handler { if config.Global().SlaveOptions.UseRPC { - return &RPCStorageHandler{ - KeyPrefix: keyPrefix, - HashKeys: hashKeys, - UserKey: config.Global().SlaveOptions.APIKey, - Address: config.Global().SlaveOptions.ConnectionString, + return &rpc.RPCStorageHandler{ + KeyPrefix: keyPrefix, + HashKeys: hashKeys, + SlaveOptions: config.Global().SlaveOptions, } } return storage.RedisCluster{KeyPrefix: keyPrefix, HashKeys: hashKeys} @@ -1075,10 +1082,9 @@ func start() { if slaveOptions := config.Global().SlaveOptions; slaveOptions.UseRPC { mainLog.Debug("Starting RPC reload listener") - RPCListener = RPCStorageHandler{ + RPCListener = rpc.RPCStorageHandler{ KeyPrefix: "rpc.listener.", - UserKey: slaveOptions.APIKey, - Address: slaveOptions.ConnectionString, + SlaveOptions: slaveOptions, SuppressRegister: true, } @@ -1094,6 +1100,25 @@ func start() { go reloadQueueLoop() } +func setupRPC() { + if !config.Global().SlaveOptions.UseRPC { + return + } + + log.Info("Preparing RPC client sub-system") + + // prepare RPC package + rpc.Log = log + rpc.Instrument = instrument + rpc.MainNotifier = &MainNotifier + rpc.SessionCache = SessionCache + rpc.ReloadCallback = doReload + rpc.ReloadURLStructureCallback = reloadURLStructure + rpc.HandleAddKeyCallback = handleAddKey + rpc.HandleDeleteKeyCallback = handleDeleteKey + rpc.HandleDeleteHashedKeyCallback = handleDeleteHashedKey +} + func generateListener(listenPort int) (net.Listener, error) { listenAddress := config.Global().ListenAddress @@ -1337,7 +1362,7 @@ func listen(listener, controlListener net.Listener, err error) { fmt.Fprintf(w, "Hello Tiki") }) - if !rpcEmergencyMode { + if !rpc.IsRPCEmergencyMode() { doReload() } } diff --git a/policy.go b/policy.go index db6c195088ac..25c1dc208462 100644 --- a/policy.go +++ b/policy.go @@ -7,6 +7,8 @@ import ( "os" "time" + "github.com/TykTechnologies/tyk/rpc" + "github.com/Sirupsen/logrus" "github.com/TykTechnologies/tyk/config" @@ -157,11 +159,11 @@ func parsePoliciesFromRPC(list string) (map[string]user.Policy, error) { } func LoadPoliciesFromRPC(orgId string) map[string]user.Policy { - if rpcEmergencyMode { + if rpc.IsRPCEmergencyMode() { return LoadPoliciesFromRPCBackup() } - store := &RPCStorageHandler{UserKey: config.Global().SlaveOptions.APIKey, Address: config.Global().SlaveOptions.ConnectionString} + store := &rpc.RPCStorageHandler{SlaveOptions: config.Global().SlaveOptions} if !store.Connect() { return nil } diff --git a/redis_analytics_purger.go b/redis_analytics_purger.go new file mode 100644 index 000000000000..34f06ac26210 --- /dev/null +++ b/redis_analytics_purger.go @@ -0,0 +1,38 @@ +package main + +import ( + "time" + + "github.com/TykTechnologies/tyk/config" + "github.com/TykTechnologies/tyk/storage" +) + +// Purger is an interface that will define how the in-memory store will be purged +// of analytics data to prevent it growing too large +type Purger interface { + PurgeCache() + PurgeLoop(<-chan time.Time) +} + +type RedisPurger struct { + Store storage.Handler +} + +func (r RedisPurger) PurgeLoop(ticker <-chan time.Time) { + for { + <-ticker + r.PurgeCache() + } +} + +func (r *RedisPurger) PurgeCache() { + expireAfter := config.Global().AnalyticsConfig.StorageExpirationTime + if expireAfter == 0 { + expireAfter = 60 // 1 minute + } + + exp, _ := r.Store.GetExp(analyticsKeyName) + if exp <= 0 { + r.Store.SetExp(analyticsKeyName, int64(expireAfter)) + } +} diff --git a/redis_signals.go b/redis_signals.go index 2875e14c9e7c..497798e865ac 100644 --- a/redis_signals.go +++ b/redis_signals.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "github.com/TykTechnologies/tyk/rpc" + "github.com/garyburd/redigo/redis" "github.com/TykTechnologies/goverify" @@ -123,7 +125,7 @@ func handleKeySpaceEventCacheFlush(payload string) { key = splitKeys[0] } - RPCGlobalCache.Delete("apikey-" + key) + rpc.RPCGlobalCache.Delete("apikey-" + key) SessionCache.Delete(key) } } diff --git a/rpc_analytics_purger.go b/rpc/rpc_analytics_purger.go similarity index 55% rename from rpc_analytics_purger.go rename to rpc/rpc_analytics_purger.go index 658edef0431f..d00fc99fa1be 100644 --- a/rpc_analytics_purger.go +++ b/rpc/rpc_analytics_purger.go @@ -1,4 +1,4 @@ -package main +package rpc import ( "encoding/json" @@ -6,27 +6,22 @@ import ( "gopkg.in/vmihailenco/msgpack.v2" - "github.com/TykTechnologies/tyk/config" "github.com/TykTechnologies/tyk/storage" ) -// Purger is an interface that will define how the in-memory store will be purged -// of analytics data to prevent it growing too large -type Purger interface { - PurgeCache() - PurgeLoop(<-chan time.Time) -} +const analyticsKeyName = "tyk-system-analytics" // RPCPurger will purge analytics data into a Mongo database, requires that the Mongo DB string is specified // in the Config object type RPCPurger struct { - Store storage.Handler + Store storage.Handler + RecordFunc func() interface{} } // Connect Connects to RPC func (r *RPCPurger) Connect() { if RPCClientIsConnected && RPCCLientSingleton != nil && RPCFuncClientSingleton != nil { - log.Info("RPC Analytics client using singleton") + Log.Info("RPC Analytics client using singleton") return } } @@ -43,7 +38,7 @@ func (r RPCPurger) PurgeLoop(ticker <-chan time.Time) { // PurgeCache will pull the data from the in-memory store and drop it into the specified MongoDB collection func (r *RPCPurger) PurgeCache() { if _, err := RPCFuncClientSingleton.Call("Ping", nil); err != nil { - log.Error("Can't purge cache, failed to ping RPC: ", err) + Log.Error("Can't purge cache, failed to ping RPC: ", err) return } @@ -51,51 +46,28 @@ func (r *RPCPurger) PurgeCache() { if len(analyticsValues) == 0 { return } - keys := make([]AnalyticsRecord, len(analyticsValues)) + keys := make([]interface{}, len(analyticsValues)) for i, v := range analyticsValues { - decoded := AnalyticsRecord{} + decoded := r.RecordFunc() if err := msgpack.Unmarshal(v.([]byte), &decoded); err != nil { - log.Error("Couldn't unmarshal analytics data: ", err) + Log.Error("Couldn't unmarshal analytics data: ", err) } else { - log.Debug("Decoded Record: ", decoded) + Log.Debug("Decoded Record: ", decoded) keys[i] = decoded } } data, err := json.Marshal(keys) if err != nil { - log.Error("Failed to marshal analytics data") + Log.Error("Failed to marshal analytics data") return } // Send keys to RPC if _, err := RPCFuncClientSingleton.Call("PurgeAnalyticsData", string(data)); err != nil { emitRPCErrorEvent(rpcFuncClientSingletonCall, "PurgeAnalyticsData", err) - log.Warn("Failed to call purge, retrying: ", err) - } - -} - -type RedisPurger struct { - Store storage.Handler -} - -func (r RedisPurger) PurgeLoop(ticker <-chan time.Time) { - for { - <-ticker - r.PurgeCache() + Log.Warn("Failed to call purge, retrying: ", err) } -} -func (r *RedisPurger) PurgeCache() { - expireAfter := config.Global().AnalyticsConfig.StorageExpirationTime - if expireAfter == 0 { - expireAfter = 60 // 1 minute - } - - exp, _ := r.Store.GetExp(analyticsKeyName) - if exp <= 0 { - r.Store.SetExp(analyticsKeyName, int64(expireAfter)) - } } diff --git a/rpc_storage_handler.go b/rpc/rpc_storage_handler.go similarity index 78% rename from rpc_storage_handler.go rename to rpc/rpc_storage_handler.go index 0067567717f3..b852c38e7729 100644 --- a/rpc_storage_handler.go +++ b/rpc/rpc_storage_handler.go @@ -1,4 +1,4 @@ -package main +package rpc import ( "crypto/tls" @@ -11,15 +11,15 @@ import ( "sync/atomic" "time" + "github.com/Sirupsen/logrus" "github.com/garyburd/redigo/redis" + "github.com/gocraft/health" "github.com/lonelycode/gorpc" - cache "github.com/pmylund/go-cache" + "github.com/pmylund/go-cache" "github.com/satori/go.uuid" "github.com/TykTechnologies/tyk/config" "github.com/TykTechnologies/tyk/storage" - - "github.com/Sirupsen/logrus" ) type InboundData struct { @@ -51,38 +51,65 @@ type GroupKeySpaceRequest struct { GroupID string } +// interface to send notifications +type Notifier interface { + Notify(interface{}) bool +} + var ( // rpcLoadCount is a counter to check if this is a cold boot rpcLoadCount int rpcEmergencyMode bool rpcEmergencyModeLoaded bool - GlobalRPCCallTimeout time.Duration - GlobalRPCPingTimeout time.Duration + GlobalRPCCallTimeout = time.Second * 30 + GlobalRPCPingTimeout = time.Second * 60 + Log = &logrus.Logger{} + Instrument = &health.Stream{} + MainNotifier Notifier + SessionCache *cache.Cache + + ReloadCallback func() // former "doReload()" + ReloadURLStructureCallback func(func()) // former "reloadURLStructure(nil)" + + HandleAddKeyCallback func(string, string, string, string) // former handleAddKey + HandleDeleteKeyCallback func(string, string) (interface{}, int) // former handleDeleteKey + HandleDeleteHashedKeyCallback func(string, string) (interface{}, int) // former handleDeleteHashedKey ) +func ResetEmergencyMode() { + rpcLoadCount = 0 + rpcEmergencyMode = false + rpcEmergencyModeLoaded = false +} + +type notification struct { + Command string `json:"command"` + Payload string `json:"payload"` + Signature string `json:"signature"` +} + // RPCStorageHandler is a storage manager that uses the redis database. type RPCStorageHandler struct { KeyPrefix string HashKeys bool - UserKey string - Address string killChan chan int Killed bool Connected bool ID string SuppressRegister bool + SlaveOptions config.SlaveOptionsConfig } func (r *RPCStorageHandler) Register() { r.ID = uuid.NewV4().String() r.killChan = make(chan int) - log.Debug("RPC Client registered") + Log.Debug("RPC Client registered") } func (r *RPCStorageHandler) checkDisconnect() { res := <-r.killChan - log.Info("RPC Client disconnecting: ", res) + Log.Info("RPC Client disconnecting: ", res) r.Killed = true r.Disconnect() } @@ -103,9 +130,12 @@ const ( ) func emitRPCErrorEvent(jobName string, funcName string, err error) { - job := instrument.NewJob(jobName) + if Instrument == nil { + return + } + job := Instrument.NewJob(jobName) if emitErr := job.EventErr(funcName, err); emitErr != nil { - log.WithError(emitErr).WithFields(logrus.Fields{ + Log.WithError(emitErr).WithFields(logrus.Fields{ "jobName": jobName, "funcName": funcName, }) @@ -113,9 +143,13 @@ func emitRPCErrorEvent(jobName string, funcName string, err error) { } func emitRPCErrorEventKv(jobName string, funcName string, err error, kv map[string]string) { - job := instrument.NewJob(jobName) + if Instrument == nil { + return + } + + job := Instrument.NewJob(jobName) if emitErr := job.EventErrKv(funcName, err, kv); emitErr != nil { - log.WithError(emitErr).WithFields(logrus.Fields{ + Log.WithError(emitErr).WithFields(logrus.Fields{ "jobName": jobName, "funcName": funcName, "kv": kv, @@ -123,6 +157,14 @@ func emitRPCErrorEventKv(jobName string, funcName string, err error, kv map[stri } } +func IsRPCEmergencyMode() bool { + return rpcEmergencyMode +} + +func GetRPCLoadCount() int { + return rpcLoadCount +} + var rpcConnectMu sync.Mutex // Connect will establish a connection to the DB @@ -131,7 +173,7 @@ func (r *RPCStorageHandler) Connect() bool { defer rpcConnectMu.Unlock() if RPCClientIsConnected { - log.Debug("Using RPC singleton for connection") + Log.Debug("Using RPC singleton for connection") return true } @@ -139,9 +181,8 @@ func (r *RPCStorageHandler) Connect() bool { return rpcEmergencyMode != true } - // RPC Client is unset // Set up the cache - log.Info("Setting new RPC connection!") + Log.Info("Setting new RPC connection!") connID := uuid.NewV4().String() @@ -150,23 +191,23 @@ func (r *RPCStorageHandler) Connect() bool { panic("connID is too long") } - if slaveOptions := config.Global().SlaveOptions; slaveOptions.UseSSL { + if r.SlaveOptions.UseSSL { clientCfg := &tls.Config{ - InsecureSkipVerify: slaveOptions.SSLInsecureSkipVerify, + InsecureSkipVerify: r.SlaveOptions.SSLInsecureSkipVerify, } - RPCCLientSingleton = gorpc.NewTLSClient(r.Address, clientCfg) + RPCCLientSingleton = gorpc.NewTLSClient(r.SlaveOptions.ConnectionString, clientCfg) } else { - RPCCLientSingleton = gorpc.NewTCPClient(r.Address) + RPCCLientSingleton = gorpc.NewTCPClient(r.SlaveOptions.ConnectionString) } - if log.Level != logrus.DebugLevel { + if Log.Level != logrus.DebugLevel { RPCCLientSingleton.LogError = gorpc.NilErrorLogger } RPCCLientSingleton.OnConnect = r.OnConnectFunc - RPCCLientSingleton.Conns = config.Global().SlaveOptions.RPCPoolSize + RPCCLientSingleton.Conns = r.SlaveOptions.RPCPoolSize if RPCCLientSingleton.Conns == 0 { RPCCLientSingleton.Conns = 20 } @@ -177,11 +218,11 @@ func (r *RPCStorageHandler) Connect() bool { KeepAlive: 30 * time.Second, } - useSSL := config.Global().SlaveOptions.UseSSL + useSSL := r.SlaveOptions.UseSSL if useSSL { cfg := &tls.Config{ - InsecureSkipVerify: config.Global().SlaveOptions.SSLInsecureSkipVerify, + InsecureSkipVerify: r.SlaveOptions.SSLInsecureSkipVerify, } conn, err = tls.DialWithDialer(dialer, "tcp", addr, cfg) @@ -231,6 +272,7 @@ func (r *RPCStorageHandler) OnConnectFunc(remoteAddr string, rwc io.ReadWriteClo defer RPCCLientSingletonMu.Unlock() RPCClientIsConnected = true + return rwc, nil } @@ -250,7 +292,7 @@ func (r *RPCStorageHandler) hashKey(in string) string { func (r *RPCStorageHandler) fixKey(keyName string) string { setKeyName := r.KeyPrefix + r.hashKey(keyName) - log.Debug("Input key was: ", setKeyName) + Log.Debug("Input key was: ", setKeyName) return setKeyName } @@ -271,10 +313,12 @@ func (r *RPCStorageHandler) ReAttemptLogin(err error) bool { rpcLoginMu.Lock() if rpcLoadCount == 0 && !rpcEmergencyModeLoaded { - log.Warning("[RPC Store] --> Detected cold start, attempting to load from cache") - log.Warning("[RPC Store] ----> Found APIs... beginning emergency load") + Log.Warning("[RPC Store] --> Detected cold start, attempting to load from cache") + Log.Warning("[RPC Store] ----> Found APIs... beginning emergency load") rpcEmergencyModeLoaded = true - go doReload() + if ReloadCallback != nil { + go ReloadCallback() + } } rpcLoginMu.Unlock() @@ -286,19 +330,19 @@ func (r *RPCStorageHandler) ReAttemptLogin(err error) bool { return false } - log.Warning("[RPC Store] Login failed, waiting 3s to re-attempt") + Log.Warning("[RPC Store] Login failed, waiting 3s to re-attempt") return r.Login() } func (r *RPCStorageHandler) GroupLogin() bool { groupLoginData := GroupLoginRequest{ - UserKey: r.UserKey, - GroupID: config.Global().SlaveOptions.GroupID, + UserKey: r.SlaveOptions.APIKey, + GroupID: r.SlaveOptions.GroupID, } ok, err := RPCFuncClientSingleton.CallTimeout("LoginWithGroup", groupLoginData, GlobalRPCCallTimeout) if err != nil { - log.Error("RPC Login failed: ", err) + Log.Error("RPC Login failed: ", err) emitRPCErrorEventKv( rpcFuncClientSingletonCall, "LoginWithGroup", @@ -313,39 +357,41 @@ func (r *RPCStorageHandler) GroupLogin() bool { } if ok == false { - log.Error("RPC Login incorrect") + Log.Error("RPC Login incorrect") rpcEmergencyMode = true go r.ReAttemptLogin(errors.New("Login incorrect")) return false } - log.Debug("[RPC Store] Group Login complete") + Log.Debug("[RPC Store] Group Login complete") rpcLoadCount++ // Recovery if rpcEmergencyMode { rpcEmergencyMode = false rpcEmergencyModeLoaded = false - reloadURLStructure(nil) + if ReloadURLStructureCallback != nil { + ReloadURLStructureCallback(nil) + } } return true } func (r *RPCStorageHandler) Login() bool { - log.Debug("[RPC Store] Login initiated") + Log.Debug("[RPC Store] Login initiated") - if len(r.UserKey) == 0 { - log.Fatal("No API Key set!") + if len(r.SlaveOptions.APIKey) == 0 { + Log.Fatal("No API Key set!") } // If we have a group ID, lets login as a group - if config.Global().SlaveOptions.GroupID != "" { + if r.SlaveOptions.GroupID != "" { return r.GroupLogin() } - ok, err := RPCFuncClientSingleton.CallTimeout("Login", r.UserKey, GlobalRPCCallTimeout) + ok, err := RPCFuncClientSingleton.CallTimeout("Login", r.SlaveOptions.APIKey, GlobalRPCCallTimeout) if err != nil { - log.Error("RPC Login failed: ", err) + Log.Error("RPC Login failed: ", err) emitRPCErrorEvent(rpcFuncClientSingletonCall, "Login", err) rpcEmergencyMode = true go r.ReAttemptLogin(err) @@ -353,18 +399,20 @@ func (r *RPCStorageHandler) Login() bool { } if ok == false { - log.Error("RPC Login incorrect") + Log.Error("RPC Login incorrect") rpcEmergencyMode = true go r.ReAttemptLogin(errors.New("Login incorrect")) return false } - log.Debug("[RPC Store] Login complete") + Log.Debug("[RPC Store] Login complete") rpcLoadCount++ if rpcEmergencyMode { rpcEmergencyMode = false rpcEmergencyModeLoaded = false - reloadURLStructure(nil) + if ReloadURLStructureCallback != nil { + ReloadURLStructureCallback(nil) + } } return true @@ -373,23 +421,23 @@ func (r *RPCStorageHandler) Login() bool { // GetKey will retrieve a key from the database func (r *RPCStorageHandler) GetKey(keyName string) (string, error) { start := time.Now() // get current time - log.Debug("[STORE] Getting WAS: ", keyName) - log.Debug("[STORE] Getting: ", r.fixKey(keyName)) + Log.Debug("[STORE] Getting WAS: ", keyName) + Log.Debug("[STORE] Getting: ", r.fixKey(keyName)) value, err := r.GetRawKey(r.fixKey(keyName)) elapsed := time.Since(start) - log.Debug("GetKey took ", elapsed) + Log.Debug("GetKey took ", elapsed) return value, err } func (r *RPCStorageHandler) GetRawKey(keyName string) (string, error) { // Check the cache first - if config.Global().SlaveOptions.EnableRPCCache { - log.Debug("Using cache for: ", keyName) + if r.SlaveOptions.EnableRPCCache { + Log.Debug("Using cache for: ", keyName) cachedVal, found := RPCGlobalCache.Get(keyName) - log.Debug("--> Found? ", found) + Log.Debug("--> Found? ", found) if found { return cachedVal.(string), nil } @@ -410,10 +458,10 @@ func (r *RPCStorageHandler) GetRawKey(keyName string) (string, error) { return r.GetRawKey(keyName) } } - log.Debug("Error trying to get value:", err) + Log.Debug("Error trying to get value:", err) return "", storage.ErrKeyNotFound } - if config.Global().SlaveOptions.EnableRPCCache { + if r.SlaveOptions.EnableRPCCache { // Cache key RPCGlobalCache.Set(keyName, value, cache.DefaultExpiration) } @@ -422,7 +470,7 @@ func (r *RPCStorageHandler) GetRawKey(keyName string) (string, error) { } func (r *RPCStorageHandler) GetExp(keyName string) (int64, error) { - log.Debug("GetExp called") + Log.Debug("GetExp called") value, err := RPCFuncClientSingleton.CallTimeout("GetExp", r.fixKey(keyName), GlobalRPCCallTimeout) if err != nil { emitRPCErrorEventKv( @@ -439,14 +487,14 @@ func (r *RPCStorageHandler) GetExp(keyName string) (int64, error) { return r.GetExp(keyName) } } - log.Error("Error trying to get TTL: ", err) + Log.Error("Error trying to get TTL: ", err) return 0, storage.ErrKeyNotFound } return value.(int64), nil } func (r *RPCStorageHandler) SetExp(keyName string, timeout int64) error { - log.Error("SetExp Not Implemented") + Log.Error("SetExp Not Implemented") return nil } @@ -477,12 +525,12 @@ func (r *RPCStorageHandler) SetKey(keyName, session string, timeout int64) error } } - log.Debug("Error trying to set value:", err) + Log.Debug("Error trying to set value:", err) return err } elapsed := time.Since(start) - log.Debug("SetKey took ", elapsed) + Log.Debug("SetKey took ", elapsed) return nil } @@ -493,7 +541,7 @@ func (r *RPCStorageHandler) SetRawKey(keyName, session string, timeout int64) er // Decrement will decrement a key in redis func (r *RPCStorageHandler) Decrement(keyName string) { - log.Warning("Decrement called") + Log.Warning("Decrement called") _, err := RPCFuncClientSingleton.CallTimeout("Decrement", keyName, GlobalRPCCallTimeout) if err != nil { emitRPCErrorEventKv( @@ -539,7 +587,7 @@ func (r *RPCStorageHandler) IncrememntWithExpire(keyName string, expire int64) i } if val == nil { - log.Warning("RPC increment returned nil value, returning 0") + Log.Warning("RPC increment returned nil value, returning 0") return 0 } @@ -549,7 +597,7 @@ func (r *RPCStorageHandler) IncrememntWithExpire(keyName string, expire int64) i // GetKeys will return all keys according to the filter (filter is a prefix - e.g. tyk.keys.*) func (r *RPCStorageHandler) GetKeys(filter string) []string { - log.Error("GetKeys Not Implemented") + Log.Error("GetKeys Not Implemented") return nil } @@ -557,7 +605,7 @@ func (r *RPCStorageHandler) GetKeys(filter string) []string { func (r *RPCStorageHandler) GetKeysAndValuesWithFilter(filter string) map[string]string { searchStr := r.KeyPrefix + r.hashKey(filter) + "*" - log.Debug("[STORE] Getting list by: ", searchStr) + Log.Debug("[STORE] Getting list by: ", searchStr) kvPair, err := RPCFuncClientSingleton.CallTimeout("GetKeysAndValuesWithFilter", searchStr, GlobalRPCCallTimeout) if err != nil { @@ -618,8 +666,8 @@ func (r *RPCStorageHandler) GetKeysAndValues() map[string]string { // DeleteKey will remove a key from the database func (r *RPCStorageHandler) DeleteKey(keyName string) bool { - log.Debug("DEL Key was: ", keyName) - log.Debug("DEL Key became: ", r.fixKey(keyName)) + Log.Debug("DEL Key was: ", keyName) + Log.Debug("DEL Key became: ", r.fixKey(keyName)) ok, err := RPCFuncClientSingleton.CallTimeout("DeleteKey", r.fixKey(keyName), GlobalRPCCallTimeout) if err != nil { emitRPCErrorEventKv( @@ -673,7 +721,7 @@ func (r *RPCStorageHandler) DeleteKeys(keys []string) bool { asInterface[i] = r.fixKey(v) } - log.Debug("Deleting: ", asInterface) + Log.Debug("Deleting: ", asInterface) ok, err := RPCFuncClientSingleton.CallTimeout("DeleteKeys", asInterface, GlobalRPCCallTimeout) if err != nil { emitRPCErrorEventKv( @@ -695,23 +743,23 @@ func (r *RPCStorageHandler) DeleteKeys(keys []string) bool { return ok == true } - log.Debug("RPCStorageHandler called DEL - Nothing to delete") + Log.Debug("RPCStorageHandler called DEL - Nothing to delete") return true } // StartPubSubHandler will listen for a signal and run the callback with the message func (r *RPCStorageHandler) StartPubSubHandler(channel string, callback func(redis.Message)) error { - log.Warning("NO PUBSUB DEFINED") + Log.Warning("NO PUBSUB DEFINED") return nil } func (r *RPCStorageHandler) Publish(channel, message string) error { - log.Warning("NO PUBSUB DEFINED") + Log.Warning("NO PUBSUB DEFINED") return nil } func (r *RPCStorageHandler) GetAndDeleteSet(keyName string) []interface{} { - log.Error("GetAndDeleteSet Not implemented, please disable your purger") + Log.Error("GetAndDeleteSet Not implemented, please disable your purger") return nil } @@ -776,10 +824,10 @@ func (r *RPCStorageHandler) SetRollingWindow(keyName string, per int64, val stri } elapsed := time.Since(start) - log.Debug("SetRollingWindow took ", elapsed) + Log.Debug("SetRollingWindow took ", elapsed) if intVal == nil { - log.Warning("RPC Handler: SetRollingWindow() returned nil, returning 0") + Log.Warning("RPC Handler: SetRollingWindow() returned nil, returning 0") return 0, nil } @@ -788,16 +836,16 @@ func (r *RPCStorageHandler) SetRollingWindow(keyName string, per int64, val stri } func (r RPCStorageHandler) GetSet(keyName string) (map[string]string, error) { - log.Error("Not implemented") + Log.Error("Not implemented") return nil, nil } func (r RPCStorageHandler) AddToSet(keyName, value string) { - log.Error("Not implemented") + Log.Error("Not implemented") } func (r RPCStorageHandler) RemoveFromSet(keyName, value string) { - log.Error("Not implemented") + Log.Error("Not implemented") } func (r RPCStorageHandler) IsAccessError(err error) bool { @@ -834,10 +882,10 @@ func (r *RPCStorageHandler) GetApiDefinitions(orgId string, tags []string) strin return "" } - log.Debug("API Definitions retrieved") + Log.Debug("API Definitions retrieved") if defString == nil { - log.Warning("RPC Handler: GetApiDefinitions() returned nil, returning empty string") + Log.Warning("RPC Handler: GetApiDefinitions() returned nil, returning empty string") return "" } return defString.(string) @@ -873,7 +921,7 @@ func (r *RPCStorageHandler) GetPolicies(orgId string) string { // CheckForReload will start a long poll func (r *RPCStorageHandler) CheckForReload(orgId string) { - log.Debug("[RPC STORE] Check Reload called...") + Log.Debug("[RPC STORE] Check Reload called...") reload, err := RPCFuncClientSingleton.CallTimeout("CheckReload", orgId, GlobalRPCPingTimeout) if err != nil { emitRPCErrorEventKv( @@ -885,30 +933,32 @@ func (r *RPCStorageHandler) CheckForReload(orgId string) { }, ) if r.IsAccessError(err) { - log.Warning("[RPC STORE] CheckReload: Not logged in") + Log.Warning("[RPC STORE] CheckReload: Not logged in") if r.Login() { r.CheckForReload(orgId) } } else if !strings.Contains(err.Error(), "Cannot obtain response during") { - log.Warning("[RPC STORE] RPC Reload Checker encountered unexpected error: ", err) + Log.Warning("[RPC STORE] RPC Reload Checker encountered unexpected error: ", err) } time.Sleep(1 * time.Second) } else if reload == true { // Do the reload! - log.Warning("[RPC STORE] Received Reload instruction!") - go func() { - MainNotifier.Notify(Notification{Command: NoticeGroupReload}) - }() + Log.Warning("[RPC STORE] Received Reload instruction!") + if MainNotifier != nil { + go func() { + MainNotifier.Notify(notification{Command: "GroupReload"}) + }() + } } } func (r *RPCStorageHandler) StartRPCLoopCheck(orgId string) { - if config.Global().SlaveOptions.DisableKeySpaceSync { + if r.SlaveOptions.DisableKeySpaceSync { return } - log.Info("[RPC] Starting keyspace poller") + Log.Info("[RPC] Starting keyspace poller") for { r.CheckForKeyspaceChanges(orgId) @@ -917,13 +967,13 @@ func (r *RPCStorageHandler) StartRPCLoopCheck(orgId string) { } func (r *RPCStorageHandler) StartRPCKeepaliveWatcher() { - log.WithFields(logrus.Fields{ + Log.WithFields(logrus.Fields{ "prefix": "RPC Conn Mgr", }).Info("[RPC Conn Mgr] Starting keepalive watcher...") for { if err := r.SetKey("0000", "0000", 10); err != nil { - log.WithError(err).WithFields(logrus.Fields{ + Log.WithError(err).WithFields(logrus.Fields{ "prefix": "RPC Conn Mgr", }).Info("Can't connect to RPC layer") @@ -944,7 +994,7 @@ func (r *RPCStorageHandler) StartRPCKeepaliveWatcher() { // CheckForKeyspaceChanges will poll for keysace changes func (r *RPCStorageHandler) CheckForKeyspaceChanges(orgId string) { - log.Debug("Checking for keyspace changes...") + Log.Debug("Checking for keyspace changes...") var keys interface{} var err error @@ -952,7 +1002,7 @@ func (r *RPCStorageHandler) CheckForKeyspaceChanges(orgId string) { var req interface{} reqData := map[string]string{} - if groupID := config.Global().SlaveOptions.GroupID; groupID == "" { + if groupID := r.SlaveOptions.GroupID; groupID == "" { funcName = "GetKeySpaceUpdate" req = orgId reqData["orgId"] = orgId @@ -979,17 +1029,17 @@ func (r *RPCStorageHandler) CheckForKeyspaceChanges(orgId string) { r.CheckForKeyspaceChanges(orgId) } } - log.Warning("Keysapce warning: ", err) + Log.Warning("Keysapce warning: ", err) return } if keys == nil { - log.Info("Keys returned nil object, skipping check") + Log.Info("Keys returned nil object, skipping check") return } if len(keys.([]string)) > 0 { - log.Info("Keyspace changes detected, updating local cache") + Log.Info("Keyspace changes detected, updating local cache") go r.ProcessKeySpaceChanges(keys.([]string)) } } @@ -998,9 +1048,13 @@ func getSessionAndCreate(keyName string, r *RPCStorageHandler) { newKeyName := "apikey-" + storage.HashStr(keyName) sessionString, err := r.GetRawKey(keyName) if err != nil { - log.Error("Key not found in master - skipping") + Log.Error("Key not found in master - skipping") } else { - handleAddKey(keyName, newKeyName[7:], sessionString, "-1") + if HandleAddKeyCallback != nil { + HandleAddKeyCallback(keyName, newKeyName[7:], sessionString, "-1") + } else { + Log.Error("HandleAddKeyCallback is not set, handleAddKey is skipped!") + } } } @@ -1010,47 +1064,59 @@ func (r *RPCStorageHandler) ProcessKeySpaceChanges(keys []string) { if len(splitKeys) > 1 { key = splitKeys[0] if splitKeys[1] == "hashed" { - log.Info("--> removing cached (hashed) key: ", splitKeys[0]) - handleDeleteHashedKey(splitKeys[0], "") + Log.Info("--> removing cached (hashed) key: ", splitKeys[0]) + if HandleDeleteHashedKeyCallback != nil { + HandleDeleteHashedKeyCallback(splitKeys[0], "") + } else { + Log.Error("HandleDeleteHashedKeyCallback is not set, handleDeleteHashedKey is skipped!") + } getSessionAndCreate(splitKeys[0], r) } } else { - log.Info("--> removing cached key: ", key) - handleDeleteKey(key, "-1") + Log.Info("--> removing cached key: ", key) + if HandleDeleteKeyCallback != nil { + HandleDeleteKeyCallback(key, "-1") + } else { + Log.Error("HandleDeleteKeyCallback is not set, handleDeleteKey is skipped!") + } getSessionAndCreate(splitKeys[0], r) } - SessionCache.Delete(key) + if SessionCache != nil { + SessionCache.Delete(key) + } else { + Log.Error("SessionCache is not set, SessionCache.Delete is skipped!") + } RPCGlobalCache.Delete(r.KeyPrefix + key) } // Notify rest of gateways in cluster to flush cache - n := Notification{ - Command: KeySpaceUpdateNotification, + n := notification{ + Command: "KeySpaceUpdateNotification", Payload: strings.Join(keys, ","), } MainNotifier.Notify(n) } func (r *RPCStorageHandler) DeleteScanMatch(pattern string) bool { - log.Error("Not implemented") + Log.Error("Not implemented") return false } func (r *RPCStorageHandler) GetKeyPrefix() string { - log.Error("Not implemented") + Log.Error("Not implemented") return "" } func (r *RPCStorageHandler) AddToSortedSet(keyName, value string, score float64) { - log.Error("Not implemented") + Log.Error("Not implemented") } func (r *RPCStorageHandler) GetSortedSetRange(keyName, scoreFrom, scoreTo string) ([]string, []float64, error) { - log.Error("Not implemented") + Log.Error("Not implemented") return nil, nil, nil } func (r *RPCStorageHandler) RemoveSortedSetRange(keyName, scoreFrom, scoreTo string) error { - log.Error("Not implemented") + Log.Error("Not implemented") return nil } diff --git a/rpc_test.go b/rpc_test.go index 2092926765f3..b993bea4c3d1 100644 --- a/rpc_test.go +++ b/rpc_test.go @@ -9,12 +9,13 @@ import ( "github.com/lonelycode/gorpc" "github.com/TykTechnologies/tyk/config" + "github.com/TykTechnologies/tyk/rpc" "github.com/TykTechnologies/tyk/test" ) func startRPCMock(dispatcher *gorpc.Dispatcher) *gorpc.Server { - GlobalRPCCallTimeout = 100 * time.Millisecond + rpc.GlobalRPCCallTimeout = 100 * time.Millisecond globalConf := config.Global() globalConf.SlaveOptions.UseRPC = true @@ -56,20 +57,18 @@ func stopRPCMock(server *gorpc.Server) { server.Stop() } - RPCCLientSingleton.Stop() - RPCClientIsConnected = false - RPCCLientSingleton = nil - RPCFuncClientSingleton = nil - rpcLoadCount = 0 - rpcEmergencyMode = false - rpcEmergencyModeLoaded = false + rpc.RPCCLientSingleton.Stop() + rpc.RPCClientIsConnected = false + rpc.RPCCLientSingleton = nil + rpc.RPCFuncClientSingleton = nil + rpc.ResetEmergencyMode() } // Our RPC layer too racy, but not harmul, mostly global variables like RPCIsClientConnected func TestSyncAPISpecsRPCFailure(t *testing.T) { // Mock RPC dispatcher := gorpc.NewDispatcher() - dispatcher.AddFunc("GetApiDefinitions", func(clientAddr string, dr *DefRequest) (string, error) { + dispatcher.AddFunc("GetApiDefinitions", func(clientAddr string, dr *rpc.DefRequest) (string, error) { return "malformed json", nil }) dispatcher.AddFunc("Login", func(clientAddr, userKey string) bool { @@ -88,7 +87,7 @@ func TestSyncAPISpecsRPCFailure(t *testing.T) { func TestSyncAPISpecsRPCSuccess(t *testing.T) { // Mock RPC dispatcher := gorpc.NewDispatcher() - dispatcher.AddFunc("GetApiDefinitions", func(clientAddr string, dr *DefRequest) (string, error) { + dispatcher.AddFunc("GetApiDefinitions", func(clientAddr string, dr *rpc.DefRequest) (string, error) { return jsonMarshalString(buildAPI(func(spec *APISpec) { spec.UseKeylessAccess = false })), nil @@ -164,11 +163,10 @@ func TestSyncAPISpecsRPCSuccess(t *testing.T) { }) t.Run("RPC is back, hard reload", func(t *testing.T) { - rpcEmergencyModeLoaded = false - rpcEmergencyMode = false + rpc.ResetEmergencyMode() dispatcher := gorpc.NewDispatcher() - dispatcher.AddFunc("GetApiDefinitions", func(clientAddr string, dr *DefRequest) (string, error) { + dispatcher.AddFunc("GetApiDefinitions", func(clientAddr string, dr *rpc.DefRequest) (string, error) { return jsonMarshalString(buildAPI( func(spec *APISpec) { spec.UseKeylessAccess = false }, func(spec *APISpec) { spec.UseKeylessAccess = false },