diff --git a/api_definition.go b/api_definition.go index 6aaaadea6b7a..d1740cbde08e 100644 --- a/api_definition.go +++ b/api_definition.go @@ -298,8 +298,14 @@ func (a APIDefinitionLoader) FromDashboardService(endpoint, secret string) []*AP // FromCloud will connect and download ApiDefintions from a Mongo DB instance. func (a APIDefinitionLoader) FromRPC(orgId string) []*APISpec { + if rpcEmergencyMode { + return LoadDefinitionsFromRPCBackup() + } + store := RPCStorageHandler{UserKey: config.Global.SlaveOptions.APIKey, Address: config.Global.SlaveOptions.ConnectionString} - store.Connect() + if !store.Connect() { + return nil + } // enable segments var tags []string diff --git a/api_definition_test.go b/api_definition_test.go index d95a2e1d1800..4f3d72e33c75 100644 --- a/api_definition_test.go +++ b/api_definition_test.go @@ -12,7 +12,6 @@ import ( "time" "github.com/garyburd/redigo/redis" - "github.com/lonelycode/gorpc" "github.com/TykTechnologies/tyk/apidef" "github.com/TykTechnologies/tyk/config" @@ -191,80 +190,6 @@ func TestIgnored(t *testing.T) { }) } -func startRPCMock(dispatcher *gorpc.Dispatcher) *gorpc.Server { - configMu.Lock() - defer configMu.Unlock() - - config.Global.SlaveOptions.UseRPC = true - config.Global.SlaveOptions.RPCKey = "test_org" - config.Global.SlaveOptions.APIKey = "test" - - server := gorpc.NewTCPServer("127.0.0.1:0", dispatcher.NewHandlerFunc()) - list := &customListener{} - server.Listener = list - server.LogError = gorpc.NilErrorLogger - - if err := server.Start(); err != nil { - panic(err) - } - config.Global.SlaveOptions.ConnectionString = list.L.Addr().String() - - return server -} - -func stopRPCMock(server *gorpc.Server) { - config.Global.SlaveOptions.ConnectionString = "" - config.Global.SlaveOptions.RPCKey = "" - config.Global.SlaveOptions.APIKey = "" - config.Global.SlaveOptions.UseRPC = false - - server.Listener.Close() - server.Stop() - - RPCCLientSingleton.Stop() - RPCClientIsConnected = false - RPCCLientSingleton = nil - RPCFuncClientSingleton = nil -} - -func TestSyncAPISpecsRPCFailure(t *testing.T) { - // Mock RPC - dispatcher := gorpc.NewDispatcher() - dispatcher.AddFunc("GetApiDefinitions", func(clientAddr string, dr *DefRequest) (string, error) { - return "malformed json", nil - }) - dispatcher.AddFunc("Login", func(clientAddr, userKey string) bool { - return true - }) - - rpc := startRPCMock(dispatcher) - defer stopRPCMock(rpc) - - count := syncAPISpecs() - if count != 0 { - t.Error("Should return empty value for malformed rpc response", apiSpecs) - } -} - -func TestSyncAPISpecsRPCSuccess(t *testing.T) { - // Mock RPC - dispatcher := gorpc.NewDispatcher() - dispatcher.AddFunc("GetApiDefinitions", func(clientAddr string, dr *DefRequest) (string, error) { - return "[{}]", nil - }) - dispatcher.AddFunc("Login", func(clientAddr, userKey string) bool { - return true - }) - - rpc := startRPCMock(dispatcher) - defer stopRPCMock(rpc) - - count := syncAPISpecs() - if count != 1 { - t.Error("Should return array with one spec", apiSpecs) - } -} - func TestSyncAPISpecsDashboardSuccess(t *testing.T) { // Mock Dashboard ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/api_loader.go b/api_loader.go index 081fd7bc87c5..2c9d125a3d93 100644 --- a/api_loader.go +++ b/api_loader.go @@ -665,8 +665,6 @@ func loadApps(specs []*APISpec, muxer *mux.Router) { }).Info("Initialised API Definitions") if config.Global.SlaveOptions.UseRPC { - //log.Warning("TODO: PUT THE KEEPALIVE WATCHER BACK") - startRPCKeepaliveWatcher(rpcAuthStore) startRPCKeepaliveWatcher(rpcOrgStore) } } diff --git a/config/config.go b/config/config.go index 23bc7b998811..8820e6b6203f 100644 --- a/config/config.go +++ b/config/config.go @@ -66,6 +66,7 @@ type AnalyticsConfigConfig struct { GeoIPDBLocation string `json:"geo_ip_db_path"` NormaliseUrls NormalisedURLConfig `json:"normalise_urls"` PoolSize int `json:"pool_size"` + StorageExpirationTime int `json:"storage_expiration_time"` ignoredIPsCompiled map[string]bool } diff --git a/ldap_auth_handler.go b/ldap_auth_handler.go index 989b03210822..6a90d7f6a847 100644 --- a/ldap_auth_handler.go +++ b/ldap_auth_handler.go @@ -95,10 +95,16 @@ func (l *LDAPStorageHandler) GetRawKey(filter string) (string, error) { return "", nil } +func (l *LDAPStorageHandler) SetExp(cn string, exp int64) error { + log.Warning("Not implementated") + return nil +} + func (l *LDAPStorageHandler) GetExp(cn string) (int64, error) { log.Warning("Not implementated") return 0, nil } + func (l *LDAPStorageHandler) GetKeys(filter string) []string { log.Warning("Not implementated") s := []string{} diff --git a/lint/schema.go b/lint/schema.go index 87f97d5f9804..f41384757ba1 100644 --- a/lint/schema.go +++ b/lint/schema.go @@ -92,6 +92,9 @@ const confSchema = `{ "pool_size": { "type": "integer" }, + "storage_expiration_time": { + "type": "integer" + }, "type": { "type": "string" } diff --git a/main.go b/main.go index 80cc4223b321..b707a05bb60d 100644 --- a/main.go +++ b/main.go @@ -124,6 +124,9 @@ func apisByIDLen() int { return len(apisByID) } +var redisPurgeOnce sync.Once +var rpcPurgeOnce sync.Once + // Create all globals and init connection handlers func setupGlobals() { reloadMu.Lock() @@ -152,13 +155,19 @@ func setupGlobals() { analytics.Store = &analyticsStore analytics.Init() + redisPurgeOnce.Do(func() { + redisPurger := RedisPurger{Store: &analyticsStore} + go redisPurger.PurgeLoop(1 * time.Second) + }) + if config.Global.AnalyticsConfig.Type == "rpc" { log.Debug("Using RPC cache purge") - purger := RPCPurger{Store: &analyticsStore} - purger.Connect() - analytics.Clean = &purger - go analytics.Clean.PurgeLoop(10 * time.Second) + rpcPurgeOnce.Do(func() { + purger := RPCPurger{Store: &analyticsStore} + purger.Connect() + go purger.PurgeLoop(10 * time.Second) + }) } } @@ -274,7 +283,7 @@ func syncAPISpecs() int { return len(apiSpecs) } -func syncPolicies() { +func syncPolicies() int { var pols map[string]user.Policy log.WithFields(logrus.Fields{ @@ -308,7 +317,7 @@ func syncPolicies() { log.WithFields(logrus.Fields{ "prefix": "main", }).Debug("No policy record name defined, skipping...") - return + return 0 } pols = LoadPoliciesFromFile(config.Global.Policies.PolicyRecordName) } @@ -321,11 +330,13 @@ func syncPolicies() { }).Infof(" - %s", id) } + policiesMu.Lock() + defer policiesMu.Unlock() if len(pols) > 0 { - policiesMu.Lock() policiesByID = pols - policiesMu.Unlock() } + + return len(pols) } // stripSlashes removes any trailing slashes from the request's URL @@ -676,9 +687,9 @@ func doReload() { mainRouter = newRouter - // Unset these - rpcEmergencyModeLoaded = false - rpcEmergencyMode = false + // // Unset these + // rpcEmergencyModeLoaded = false + // rpcEmergencyMode = false } // startReloadChan and reloadDoneChan are used by the two reload loops @@ -1275,6 +1286,8 @@ func handleDashboardRegistration() { go DashService.StartBeating() } +var drlOnce sync.Once + func startDRL() { switch { case config.Global.ManagementNode: @@ -1311,6 +1324,8 @@ func listen(l, controlListener net.Listener, err error) { writeTimeout = config.Global.HttpServerOptions.WriteTimeout } + drlOnce.Do(startDRL) + // Handle reload when SIGUSR2 is received if err != nil { // Listen on a TCP or a UNIX domain socket (TCP here). @@ -1321,8 +1336,6 @@ func listen(l, controlListener net.Listener, err error) { // handle dashboard registration and nonces if available handleDashboardRegistration() - startDRL() - if !rpcEmergencyMode { count := syncAPISpecs() if count > 0 { @@ -1396,7 +1409,6 @@ func listen(l, controlListener net.Listener, err error) { os.Setenv("TYK_SERVICE_NONCE", "") os.Setenv("TYK_SERVICE_NODEID", "") } - startDRL() // Resume accepting connections in a new goroutine. if !rpcEmergencyMode { diff --git a/policy.go b/policy.go index 4e31d1570754..da136bd5110a 100644 --- a/policy.go +++ b/policy.go @@ -139,29 +139,45 @@ func LoadPoliciesFromDashboard(endpoint, secret string, allowExplicit bool) map[ return policies } -func LoadPoliciesFromRPC(orgId string) map[string]user.Policy { +func parsePoliciesFromRPC(list string) (map[string]user.Policy, error) { var dbPolicyList []user.Policy + if err := json.Unmarshal([]byte(list), &dbPolicyList); err != nil { + return nil, err + } + + policies := make(map[string]user.Policy, len(dbPolicyList)) + + for _, p := range dbPolicyList { + p.ID = p.MID.Hex() + policies[p.MID.Hex()] = p + } + + return policies, nil +} + +func LoadPoliciesFromRPC(orgId string) map[string]user.Policy { + if rpcEmergencyMode { + return LoadPoliciesFromRPCBackup() + } + store := &RPCStorageHandler{UserKey: config.Global.SlaveOptions.APIKey, Address: config.Global.SlaveOptions.ConnectionString} - store.Connect() + if !store.Connect() { + return nil + } rpcPolicies := store.GetPolicies(orgId) - //store.Disconnect() + policies, err := parsePoliciesFromRPC(rpcPolicies) - if err := json.Unmarshal([]byte(rpcPolicies), &dbPolicyList); err != nil { + if err != nil { log.WithFields(logrus.Fields{ "prefix": "policy", - }).Error("Failed decode: ", err) + }).Error("Failed decode: ", err, rpcPolicies) return nil } - policies := make(map[string]user.Policy, len(dbPolicyList)) - - for _, p := range dbPolicyList { - p.ID = p.MID.Hex() - policies[p.MID.Hex()] = p - } + saveRPCPoliciesBackup(rpcPolicies) return policies } diff --git a/rpc_analytics_purger.go b/rpc_analytics_purger.go index 79e02fe6a941..f2148e789435 100644 --- a/rpc_analytics_purger.go +++ b/rpc_analytics_purger.go @@ -6,6 +6,7 @@ import ( "gopkg.in/vmihailenco/msgpack.v2" + "github.com/TykTechnologies/tyk/config" "github.com/TykTechnologies/tyk/storage" ) @@ -41,6 +42,11 @@ func (r RPCPurger) PurgeLoop(sleep time.Duration) { // PurgeCache will pull the data from the in-memory store and drop it into the specified MongoDB collection func (r *RPCPurger) PurgeCache() { + if _, err := RPCFuncClientSingleton.Call("Ping", nil); err != nil { + log.Error("Failed to ping RPC: ", err) + return + } + analyticsValues := r.Store.GetAndDeleteSet(analyticsKeyName) if len(analyticsValues) == 0 { return @@ -70,3 +76,28 @@ func (r *RPCPurger) PurgeCache() { } } + +type RedisPurger struct { + Store storage.Handler +} + +func (r RedisPurger) PurgeLoop(sleep time.Duration) { + for { + time.Sleep(sleep) + r.PurgeCache() + } +} + +func (r *RedisPurger) PurgeCache() { + configMu.Lock() + expireAfter := config.Global.AnalyticsConfig.StorageExpirationTime + configMu.Unlock() + if expireAfter == 0 { + expireAfter = 60 // 1 minute + } + + exp, _ := r.Store.GetExp(analyticsKeyName) + if exp <= 0 { + r.Store.SetExp(analyticsKeyName, int64(expireAfter)) + } +} diff --git a/rpc_backup_handlers.go b/rpc_backup_handlers.go index 3edce925382a..2a97502b05ad 100644 --- a/rpc_backup_handlers.go +++ b/rpc_backup_handlers.go @@ -6,18 +6,18 @@ import ( "crypto/rand" "encoding/base64" "io" - "net/http" "strings" "github.com/Sirupsen/logrus" - "github.com/gorilla/mux" "github.com/TykTechnologies/tyk/config" "github.com/TykTechnologies/tyk/storage" + "github.com/TykTechnologies/tyk/user" ) const RPCKeyPrefix = "rpc:" -const BackupKeyBase = "node-definition-backup:" +const BackupApiKeyBase = "node-definition-backup:" +const BackupPolicyKeyBase = "node-policy-backup:" func getTagListAsString() string { tagList := "" @@ -28,8 +28,34 @@ func getTagListAsString() string { return tagList } +func LoadDefinitionsFromRPCBackup() []*APISpec { + tagList := getTagListAsString() + checkKey := BackupApiKeyBase + tagList + + store := storage.RedisCluster{KeyPrefix: RPCKeyPrefix} + connected := store.Connect() + log.Info("[RPC] --> Loading API definitions from backup") + + if !connected { + log.Error("[RPC] --> RPC Backup recovery failed: redis connection failed") + return nil + } + + secret := rightPad2Len(config.Global.Secret, "=", 32) + cryptoText, err := store.GetKey(checkKey) + apiListAsString := decrypt([]byte(secret), cryptoText) + + if err != nil { + log.Error("[RPC] --> Failed to get node backup (", checkKey, "): ", err) + return nil + } + + a := APIDefinitionLoader{} + return a.processRPCDefinitions(apiListAsString) +} + func saveRPCDefinitionsBackup(list string) { - log.Info("Storing RPC backup") + log.Info("Storing RPC Definitions backup") tagList := getTagListAsString() log.Info("--> Connecting to DB") @@ -46,91 +72,67 @@ func saveRPCDefinitionsBackup(list string) { secret := rightPad2Len(config.Global.Secret, "=", 32) cryptoText := encrypt([]byte(secret), list) - err := store.SetKey(BackupKeyBase+tagList, cryptoText, -1) + err := store.SetKey(BackupApiKeyBase+tagList, cryptoText, -1) if err != nil { log.Error("Failed to store node backup: ", err) } } -func LoadDefinitionsFromRPCBackup() []*APISpec { +func LoadPoliciesFromRPCBackup() map[string]user.Policy { tagList := getTagListAsString() - checkKey := BackupKeyBase + tagList + checkKey := BackupPolicyKeyBase + tagList store := storage.RedisCluster{KeyPrefix: RPCKeyPrefix} connected := store.Connect() - log.Info("[RPC] --> Connected to DB") + log.Info("[RPC] Loading Policies from backup") if !connected { - log.Error("[RPC] --> RPC Backup recovery failed: redis connection failed") + log.Error("[RPC] --> RPC Policy Backup recovery failed: redis connection failed") return nil } secret := rightPad2Len(config.Global.Secret, "=", 32) cryptoText, err := store.GetKey(checkKey) - apiListAsString := decrypt([]byte(secret), cryptoText) + listAsString := decrypt([]byte(secret), cryptoText) if err != nil { - log.Error("[RPC] --> Failed to get node backup (", checkKey, "): ", err) + log.Error("[RPC] --> Failed to get node policy backup (", checkKey, "): ", err) return nil } - a := APIDefinitionLoader{} - return a.processRPCDefinitions(apiListAsString) -} - -func doLoadWithBackup(specs []*APISpec) { - - log.Warning("[RPC Backup] --> Load Policies too!") - - if len(specs) == 0 { + if policies, err := parsePoliciesFromRPC(listAsString); err != nil { log.WithFields(logrus.Fields{ - "prefix": "main", - }).Warning("No API Definitions found, not loading backup") - return + "prefix": "policy", + }).Error("Failed decode: ", err) + return nil + } else { + return policies } +} - // Reset the JSVM - GlobalEventsJSVM.Init(nil) - log.Warning("[RPC Backup] --> Initialised JSVM") - - newRouter := mux.NewRouter() +func saveRPCPoliciesBackup(list string) { + log.Info("Storing RPC policies backup") + tagList := getTagListAsString() - log.Warning("[RPC Backup] --> Set up routers") - log.Warning("[RPC Backup] --> Loading endpoints") + log.Info("--> Connecting to DB") - loadAPIEndpoints(newRouter) + store := storage.RedisCluster{KeyPrefix: RPCKeyPrefix} + connected := store.Connect() - log.Warning("[RPC Backup] --> Loading APIs") - loadApps(specs, newRouter) - log.Warning("[RPC Backup] --> API Load Done") + log.Info("--> Connected to DB") - if config.Global.NewRelic.AppName != "" { - log.Warning("[RPC Backup] --> Adding NewRelic instrumentation") - AddNewRelicInstrumentation(NewRelicApplication, newRouter) - log.Warning("[RPC Backup] --> NewRelic instrumentation added") + if !connected { + log.Error("--> RPC Backup save failed: redis connection failed") + return } - newServeMux := http.NewServeMux() - newServeMux.Handle("/", newRouter) - - mainRouter = newRouter - - http.DefaultServeMux = newServeMux - log.Warning("[RPC Backup] --> Replaced muxer") - - log.WithFields(logrus.Fields{ - "prefix": "main", - }).Info("API backup load complete") - - log.Warning("[RPC Backup] --> Ready to listen") - rpcEmergencyModeLoaded = true - - l, err := generateListener(0) + secret := rightPad2Len(config.Global.Secret, "=", 32) + cryptoText := encrypt([]byte(secret), list) + err := store.SetKey(BackupPolicyKeyBase+tagList, cryptoText, -1) if err != nil { - log.Error("Failed to generate listener:", err) + log.Error("Failed to store node backup: ", err) } - listen(l, nil, nil) } // encrypt string to base64 crypto using AES diff --git a/rpc_storage_handler.go b/rpc_storage_handler.go index 4a469888a6e0..30b085c08b3d 100644 --- a/rpc_storage_handler.go +++ b/rpc_storage_handler.go @@ -61,10 +61,10 @@ var ( ) func rpcKeepAliveCheck(r *RPCStorageHandler) { - // Only run when connected if !RPCClientIsConnected { return } + // Make sure the auth back end is still alive c1 := make(chan string, 1) @@ -236,7 +236,9 @@ func (r *RPCStorageHandler) Connect() bool { RPCFuncClientSingleton = d.NewFuncClient(RPCCLientSingleton) } - r.Login() + if !r.Login() { + return false + } if !r.SuppressRegister { r.Register() @@ -280,31 +282,25 @@ func (r *RPCStorageHandler) cleanKey(keyName string) string { return setKeyName } -func (r *RPCStorageHandler) ReAttemptLogin(err error) { +func (r *RPCStorageHandler) ReAttemptLogin(err error) bool { log.Warning("[RPC Store] Login failed, waiting 3s to re-attempt") if rpcLoadCount == 0 && !rpcEmergencyModeLoaded { log.Warning("[RPC Store] --> Detected cold start, attempting to load from cache") - apiList := LoadDefinitionsFromRPCBackup() - log.Warning("[RPC Store] --> Done") - if apiList != nil { - rpcEmergencyMode = true - log.Warning("[RPC Store] ----> Found APIs... beginning emergency load") - doLoadWithBackup(apiList) - } - - //LoadPoliciesFromRPCBackup() + log.Warning("[RPC Store] ----> Found APIs... beginning emergency load") + doReload() + rpcEmergencyModeLoaded = true } time.Sleep(time.Second * 3) if strings.Contains(err.Error(), "Cannot obtain response during timeout") { r.ReConnect() - return + return false } - r.Login() + return r.Login() } -func (r *RPCStorageHandler) GroupLogin() { +func (r *RPCStorageHandler) GroupLogin() bool { groupLoginData := GroupLoginRequest{ UserKey: r.UserKey, GroupID: config.Global.SlaveOptions.GroupID, @@ -320,20 +316,31 @@ func (r *RPCStorageHandler) GroupLogin() { "GroupID": groupLoginData.GroupID, }, ) - r.ReAttemptLogin(err) - return + rpcEmergencyMode = true + go r.ReAttemptLogin(err) + return false } if ok == false { log.Error("RPC Login incorrect") - r.ReAttemptLogin(errors.New("Login incorrect")) - return + rpcEmergencyMode = true + go r.ReAttemptLogin(errors.New("Login incorrect")) + return false } log.Debug("[RPC Store] Group Login complete") rpcLoadCount++ + + // Recovery + if rpcEmergencyMode { + doReload() + } + + rpcEmergencyMode = false + rpcEmergencyModeLoaded = false + return true } -func (r *RPCStorageHandler) Login() { +func (r *RPCStorageHandler) Login() bool { log.Debug("[RPC Store] Login initiated") if len(r.UserKey) == 0 { @@ -342,25 +349,34 @@ func (r *RPCStorageHandler) Login() { // If we have a group ID, lets login as a group if config.Global.SlaveOptions.GroupID != "" { - r.GroupLogin() - return + return r.GroupLogin() } ok, err := RPCFuncClientSingleton.CallTimeout("Login", r.UserKey, GlobalRPCCallTimeout) if err != nil { log.Error("RPC Login failed: ", err) emitRPCErrorEvent(rpcFuncClientSingletonCall, "Login", err) - r.ReAttemptLogin(err) - return + rpcEmergencyMode = true + go r.ReAttemptLogin(err) + return false } if ok == false { log.Error("RPC Login incorrect") - r.ReAttemptLogin(errors.New("Login incorrect")) - return + rpcEmergencyMode = true + go r.ReAttemptLogin(errors.New("Login incorrect")) + return false } log.Debug("[RPC Store] Login complete") rpcLoadCount++ + + if rpcEmergencyMode { + doReload() + } + + rpcEmergencyMode = false + rpcEmergencyModeLoaded = false + return true } // GetKey will retrieve a key from the database @@ -441,6 +457,11 @@ func (r *RPCStorageHandler) GetExp(keyName string) (int64, error) { return value.(int64), nil } +func (r *RPCStorageHandler) SetExp(keyName string, timeout int64) error { + log.Error("SetExp Not Implemented") + return nil +} + // SetKey will create (or update) a key value in the store func (r *RPCStorageHandler) SetKey(keyName, session string, timeout int64) error { start := time.Now() // get current time diff --git a/rpc_test.go b/rpc_test.go new file mode 100644 index 000000000000..f505fc539374 --- /dev/null +++ b/rpc_test.go @@ -0,0 +1,184 @@ +// +build !race + +package main + +import ( + "fmt" + "testing" + "time" + + "github.com/lonelycode/gorpc" + + "github.com/TykTechnologies/tyk/config" + "github.com/TykTechnologies/tyk/test" +) + +func startRPCMock(dispatcher *gorpc.Dispatcher) *gorpc.Server { + configMu.Lock() + defer configMu.Unlock() + + GlobalRPCCallTimeout = 100 * time.Millisecond + config.Global.SlaveOptions.UseRPC = true + config.Global.SlaveOptions.RPCKey = "test_org" + config.Global.SlaveOptions.APIKey = "test" + config.Global.Policies.PolicySource = "rpc" + config.Global.SlaveOptions.CallTimeout = 1 + config.Global.SlaveOptions.RPCPoolSize = 2 + + server := gorpc.NewTCPServer("127.0.0.1:0", dispatcher.NewHandlerFunc()) + list := &customListener{} + server.Listener = list + server.LogError = gorpc.NilErrorLogger + + if err := server.Start(); err != nil { + panic(err) + } + config.Global.SlaveOptions.ConnectionString = list.L.Addr().String() + + return server +} + +func stopRPCMock(server *gorpc.Server) { + config.Global.SlaveOptions.ConnectionString = "" + config.Global.SlaveOptions.RPCKey = "" + config.Global.SlaveOptions.APIKey = "" + config.Global.SlaveOptions.UseRPC = false + config.Global.Policies.PolicySource = "" + + if server != nil { + server.Listener.Close() + server.Stop() + } + + RPCCLientSingleton.Stop() + RPCClientIsConnected = false + RPCCLientSingleton = nil + RPCFuncClientSingleton = nil + rpcLoadCount = 0 + rpcEmergencyMode = false + rpcEmergencyModeLoaded = false +} + +// Our RPC layer too racy, but not harmul, mostly global variables like RPCIsClientConnected +func TestSyncAPISpecsRPCFailure(t *testing.T) { + // Mock RPC + dispatcher := gorpc.NewDispatcher() + dispatcher.AddFunc("GetApiDefinitions", func(clientAddr string, dr *DefRequest) (string, error) { + return "malformed json", nil + }) + dispatcher.AddFunc("Login", func(clientAddr, userKey string) bool { + return true + }) + + rpc := startRPCMock(dispatcher) + defer stopRPCMock(rpc) + + count := syncAPISpecs() + if count != 0 { + t.Error("Should return empty value for malformed rpc response", apiSpecs) + } +} + +func TestSyncAPISpecsRPCSuccess(t *testing.T) { + // Mock RPC + dispatcher := gorpc.NewDispatcher() + dispatcher.AddFunc("GetApiDefinitions", func(clientAddr string, dr *DefRequest) (string, error) { + return "[" + sampleAPI + "]", nil + }) + dispatcher.AddFunc("GetPolicies", func(clientAddr string, orgid string) (string, error) { + return `[{"_id":"507f191e810c19729de860ea", "rate":1, "per":1}]`, nil + }) + dispatcher.AddFunc("Login", func(clientAddr, userKey string) bool { + return true + }) + dispatcher.AddFunc("GetKey", func(clientAddr, key string) (string, error) { + return "", fmt.Errorf("Not found") + }) + + t.Run("RPC is live", func(t *testing.T) { + rpc := startRPCMock(dispatcher) + defer stopRPCMock(rpc) + ts := newTykTestServer() + defer ts.Close() + + apiBackup := LoadDefinitionsFromRPCBackup() + if len(apiBackup) != 1 { + t.Fatal("Should have APIs in backup") + } + + policyBackup := LoadPoliciesFromRPCBackup() + if len(policyBackup) != 1 { + t.Fatal("Should have Policies in backup") + } + + ts.Run(t, []test.TestCase{ + {Path: "/sample", Code: 200}, + }...) + + count := syncAPISpecs() + if count != 1 { + t.Error("Should return array with one spec", apiSpecs) + } + }) + + t.Run("RPC down, load backup", func(t *testing.T) { + // Point rpc to non existent address + config.Global.SlaveOptions.ConnectionString = testHttpFailure + config.Global.SlaveOptions.UseRPC = true + config.Global.SlaveOptions.RPCKey = "test_org" + config.Global.SlaveOptions.APIKey = "test" + config.Global.Policies.PolicySource = "rpc" + + // RPC layer is down + ts := newTykTestServer() + defer ts.Close() + + // Wait for backup to load + time.Sleep(100 * time.Millisecond) + + // Still should work! + ts.Run(t, []test.TestCase{ + {Path: "/sample", Code: 200}, + }...) + + stopRPCMock(nil) + }) + + t.Run("RPC is back", func(t *testing.T) { + rpcEmergencyModeLoaded = false + rpcEmergencyMode = false + + dispatcher := gorpc.NewDispatcher() + dispatcher.AddFunc("GetApiDefinitions", func(clientAddr string, dr *DefRequest) (string, error) { + return "[" + sampleAPI + "," + sampleAPI + "]", nil + }) + dispatcher.AddFunc("GetPolicies", func(clientAddr string, orgid string) (string, error) { + return `[{"_id":"507f191e810c19729de860ea", "rate":1, "per":1}, {"_id":"507f191e810c19729de860eb", "rate":1, "per":1}]`, nil + }) + dispatcher.AddFunc("Login", func(clientAddr, userKey string) bool { + return true + }) + dispatcher.AddFunc("GetKey", func(clientAddr, key string) (string, error) { + return "", fmt.Errorf("Not found") + }) + // Back to live + rpc := startRPCMock(dispatcher) + defer stopRPCMock(rpc) + ts := newTykTestServer() + defer ts.Close() + + time.Sleep(100 * time.Millisecond) + + ts.Run(t, []test.TestCase{ + {Path: "/sample", Code: 200}, + }...) + + if count := syncAPISpecs(); count != 2 { + t.Error("Should fetch latest specs", count) + } + + if count := syncPolicies(); count != 2 { + t.Error("Should fetch latest policies", count) + } + }) +} diff --git a/storage/redis_cluster.go b/storage/redis_cluster.go index 1ce7ae036804..73b1cb09cd3e 100644 --- a/storage/redis_cluster.go +++ b/storage/redis_cluster.go @@ -1,3 +1,5 @@ +// +build !race + package storage import ( @@ -176,7 +178,14 @@ func (r RedisCluster) GetExp(keyName string) (int64, error) { return 0, ErrKeyNotFound } return value, nil +} +func (r RedisCluster) SetExp(keyName string, timeout int64) error { + _, err := r.singleton().Do("EXPIRE", r.fixKey(keyName), timeout) + if err != nil { + log.Error("Could not EXPIRE key: ", err) + } + return err } // SetKey will create (or update) a key value in the store @@ -187,9 +196,7 @@ func (r RedisCluster) SetKey(keyName, session string, timeout int64) error { r.ensureConnection() _, err := r.singleton().Do("SET", r.fixKey(keyName), session) if timeout > 0 { - _, err := r.singleton().Do("EXPIRE", r.fixKey(keyName), timeout) - if err != nil { - log.Error("Could not EXPIRE key: ", err) + if err := r.SetExp(keyName, timeout); err != nil { return err } } diff --git a/storage/storage.go b/storage/storage.go index 70da534b6558..cf5db9b36414 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -22,6 +22,7 @@ type Handler interface { GetRawKey(string) (string, error) SetKey(string, string, int64) error // Second input string is expected to be a JSON object (user.SessionState) SetRawKey(string, string, int64) error + SetExp(string, int64) error // Set key expiration GetExp(string) (int64, error) // Returns expiry of a key GetKeys(string) []string DeleteKey(string) bool