diff --git a/api.go b/api.go index e0974ee063e..3c8bca6e3f4 100644 --- a/api.go +++ b/api.go @@ -1341,7 +1341,7 @@ func invalidateCacheHandler(w http.ResponseWriter, r *http.Request) { keyPrefix := "cache-" + apiID matchPattern := keyPrefix + "*" - store := &storage.RedisCluster{KeyPrefix: keyPrefix, IsCache: true} + store := storage.RedisCluster{KeyPrefix: keyPrefix, IsCache: true} if ok := store.DeleteScanMatch(matchPattern); !ok { err := errors.New("scan/delete failed") diff --git a/api_loader.go b/api_loader.go index d953aa8f3ea..d788c9d2854 100644 --- a/api_loader.go +++ b/api_loader.go @@ -28,16 +28,16 @@ type ChainObject struct { Subrouter *mux.Router } -func prepareStorage() (*storage.RedisCluster, *storage.RedisCluster, *storage.RedisCluster, *RPCStorageHandler, *RPCStorageHandler) { +func prepareStorage() (storage.RedisCluster, storage.RedisCluster, storage.RedisCluster, *RPCStorageHandler, *RPCStorageHandler) { redisStore := storage.RedisCluster{KeyPrefix: "apikey-", HashKeys: config.Global.HashKeys} redisOrgStore := storage.RedisCluster{KeyPrefix: "orgkey."} - healthStore := &storage.RedisCluster{KeyPrefix: "apihealth."} + healthStore := storage.RedisCluster{KeyPrefix: "apihealth."} rpcAuthStore := RPCStorageHandler{KeyPrefix: "apikey-", HashKeys: config.Global.HashKeys, UserKey: config.Global.SlaveOptions.APIKey, Address: config.Global.SlaveOptions.ConnectionString} rpcOrgStore := RPCStorageHandler{KeyPrefix: "orgkey.", UserKey: config.Global.SlaveOptions.APIKey, Address: config.Global.SlaveOptions.ConnectionString} FallbackKeySesionManager.Init(&redisStore) - return &redisStore, &redisOrgStore, healthStore, &rpcAuthStore, &rpcOrgStore + return redisStore, redisOrgStore, healthStore, &rpcAuthStore, &rpcOrgStore } func skipSpecBecauseInvalid(spec *APISpec) bool { @@ -261,7 +261,7 @@ func processSpec(spec *APISpec, apisByListen map[string]int, } keyPrefix := "cache-" + spec.APIID - cacheStore := &storage.RedisCluster{KeyPrefix: keyPrefix, IsCache: true} + cacheStore := storage.RedisCluster{KeyPrefix: keyPrefix, IsCache: true} cacheStore.Connect() var chain http.Handler diff --git a/coprocess_api.go b/coprocess_api.go index 4a9039f2bbf..25b72f9775b 100644 --- a/coprocess_api.go +++ b/coprocess_api.go @@ -34,7 +34,7 @@ func TykStoreData(CKey, CValue *C.char, CTTL C.int) { value := C.GoString(CValue) ttl := int64(CTTL) - store := &storage.RedisCluster{KeyPrefix: CoProcessDefaultKeyPrefix} + store := storage.RedisCluster{KeyPrefix: CoProcessDefaultKeyPrefix} store.SetKey(key, value, ttl) } @@ -43,7 +43,7 @@ func TykStoreData(CKey, CValue *C.char, CTTL C.int) { func TykGetData(CKey *C.char) *C.char { key := C.GoString(CKey) - store := &storage.RedisCluster{KeyPrefix: CoProcessDefaultKeyPrefix} + store := storage.RedisCluster{KeyPrefix: CoProcessDefaultKeyPrefix} // TODO: return error val, _ := store.GetKey(key) return C.CString(val) diff --git a/event_handler_webhooks.go b/event_handler_webhooks.go index c6c1cd1e50f..6bcb83ce70d 100644 --- a/event_handler_webhooks.go +++ b/event_handler_webhooks.go @@ -66,7 +66,7 @@ func (w *WebHookHandler) Init(handlerConf interface{}) error { return err } - w.store = &storage.RedisCluster{KeyPrefix: "webhook.cache."} + w.store = storage.RedisCluster{KeyPrefix: "webhook.cache."} w.store.Connect() // Pre-load template on init diff --git a/gateway_test.go b/gateway_test.go index 558aef8cb8e..79e09d52f8a 100644 --- a/gateway_test.go +++ b/gateway_test.go @@ -519,9 +519,9 @@ const extendedPathGatewaySetup = `{ func createSpecTest(t *testing.T, def string) *APISpec { spec := createDefinitionFromString(def) tname := t.Name() - redisStore := &storage.RedisCluster{KeyPrefix: tname + "-apikey."} - healthStore := &storage.RedisCluster{KeyPrefix: tname + "-apihealth."} - orgStore := &storage.RedisCluster{KeyPrefix: tname + "-orgKey."} + redisStore := storage.RedisCluster{KeyPrefix: tname + "-apikey."} + healthStore := storage.RedisCluster{KeyPrefix: tname + "-apihealth."} + orgStore := storage.RedisCluster{KeyPrefix: tname + "-orgKey."} spec.Init(redisStore, redisStore, healthStore, orgStore) return spec } diff --git a/host_checker_test.go b/host_checker_test.go index 2daa0487b3a..8e2a70e54d8 100644 --- a/host_checker_test.go +++ b/host_checker_test.go @@ -148,7 +148,7 @@ func TestHostChecker(t *testing.T) { t.Error("Should set defaults", GlobalHostChecker.checker.checkTimeout) } - redisStore := GlobalHostChecker.store.(*storage.RedisCluster) + redisStore := GlobalHostChecker.store.(storage.RedisCluster) if ttl, _ := redisStore.GetKeyTTL(PoolerHostSentinelKeyPrefix + testHttpFailure); int(ttl) != GlobalHostChecker.checker.checkTimeout+1 { t.Error("HostDown expiration key should be checkTimeout + 1", ttl) } diff --git a/le_helpers.go b/le_helpers.go index 61dce07268f..c4e9291301b 100644 --- a/le_helpers.go +++ b/le_helpers.go @@ -18,7 +18,7 @@ func StoreLEState(m *letsencrypt.Manager) { log.Debug("[SSL] --> Connecting to DB") - store := &storage.RedisCluster{KeyPrefix: LEKeyPrefix} + store := storage.RedisCluster{KeyPrefix: LEKeyPrefix} connected := store.Connect() log.Debug("--> Connected to DB") @@ -41,7 +41,7 @@ func StoreLEState(m *letsencrypt.Manager) { func GetLEState(m *letsencrypt.Manager) { checkKey := "cache" - store := &storage.RedisCluster{KeyPrefix: LEKeyPrefix} + store := storage.RedisCluster{KeyPrefix: LEKeyPrefix} connected := store.Connect() log.Debug("[SSL] --> Connected to DB") diff --git a/main.go b/main.go index dd33142a6bf..474f1bf3bd4 100644 --- a/main.go +++ b/main.go @@ -102,7 +102,7 @@ func setupGlobals() { } // Initialise our Host Checker - healthCheckStore := &storage.RedisCluster{KeyPrefix: "host-checker:"} + healthCheckStore := storage.RedisCluster{KeyPrefix: "host-checker:"} InitHostCheckManager(healthCheckStore) if config.Global.EnableAnalytics && analytics.Store == nil { @@ -145,7 +145,7 @@ func setupGlobals() { }).Debug("Notifier will not work in hybrid mode") mainNotifierStore := storage.RedisCluster{} mainNotifierStore.Connect() - MainNotifier = RedisNotifier{&mainNotifierStore, RedisPubSubChannel} + MainNotifier = RedisNotifier{mainNotifierStore, RedisPubSubChannel} if config.Global.Monitor.EnableTriggerMonitors { h := &WebHookHandler{} @@ -924,7 +924,7 @@ func getGlobalStorageHandler(keyPrefix string, hashKeys bool) storage.Handler { if config.Global.SlaveOptions.UseRPC { return &RPCStorageHandler{KeyPrefix: keyPrefix, HashKeys: hashKeys, UserKey: config.Global.SlaveOptions.APIKey, Address: config.Global.SlaveOptions.ConnectionString} } - return &storage.RedisCluster{KeyPrefix: keyPrefix, HashKeys: hashKeys} + return storage.RedisCluster{KeyPrefix: keyPrefix, HashKeys: hashKeys} } func main() { @@ -1048,7 +1048,7 @@ func start(arguments map[string]interface{}) { log.WithFields(logrus.Fields{ "prefix": "main", }).Debug("Initialising default org store") - //DefaultOrgStore.Init(&storage.RedisCluster{KeyPrefix: "orgkey."}) + //DefaultOrgStore.Init(storage.RedisCluster{KeyPrefix: "orgkey."}) DefaultOrgStore.Init(getGlobalStorageHandler("orgkey.", false)) //DefaultQuotaStore.Init(getGlobalStorageHandler(CloudHandler, "orgkey.", false)) DefaultQuotaStore.Init(getGlobalStorageHandler("orgkey.", false)) diff --git a/redis_signal_outbound.go b/redis_signal_outbound.go index 1632a20f4a6..5242d0604ad 100644 --- a/redis_signal_outbound.go +++ b/redis_signal_outbound.go @@ -17,7 +17,7 @@ type InterfaceNotification struct { } type RedisNotificationHandler struct { - CacheStore *storage.RedisCluster + CacheStore storage.RedisCluster } const ( @@ -25,7 +25,9 @@ const ( ) func (u *RedisNotificationHandler) Start() { - go u.StartUIPubSubConn() + u.CacheStore = storage.RedisCluster{KeyPrefix: "gateway-notifications:"} + u.CacheStore.Connect() + go u.PubSubLoop() } func (u *RedisNotificationHandler) Notify(n InterfaceNotification) error { @@ -33,17 +35,11 @@ func (u *RedisNotificationHandler) Notify(n InterfaceNotification) error { if err != nil { return err } - - if u.CacheStore != nil { - u.CacheStore.Publish(UIChanName, string(jsonError)) - } - + u.CacheStore.Publish(UIChanName, string(jsonError)) return nil } -func (u *RedisNotificationHandler) StartUIPubSubConn() { - u.CacheStore = &storage.RedisCluster{KeyPrefix: "gateway-notifications:"} - u.CacheStore.Connect() +func (u *RedisNotificationHandler) PubSubLoop() { // On message, synchronize for { err := u.CacheStore.StartPubSubHandler(UIChanName, u.HandleIncommingRedisEvent) diff --git a/redis_signals.go b/redis_signals.go index 39dac1efcdc..7bc9f664bc1 100644 --- a/redis_signals.go +++ b/redis_signals.go @@ -175,7 +175,7 @@ func isPayloadSignatureValid(notification Notification) bool { // RedisNotifier will use redis pub/sub channels to send notifications type RedisNotifier struct { - store *storage.RedisCluster + store storage.RedisCluster channel string } diff --git a/rpc_backup_handlers.go b/rpc_backup_handlers.go index 262debf2c39..67484961b93 100644 --- a/rpc_backup_handlers.go +++ b/rpc_backup_handlers.go @@ -35,7 +35,7 @@ func saveRPCDefinitionsBackup(list string) { log.Info("--> Connecting to DB") - store := &storage.RedisCluster{KeyPrefix: RPCKeyPrefix} + store := storage.RedisCluster{KeyPrefix: RPCKeyPrefix} connected := store.Connect() log.Info("--> Connected to DB") @@ -57,7 +57,7 @@ func LoadDefinitionsFromRPCBackup() []*APISpec { tagList := getTagListAsString() checkKey := BackupKeyBase + tagList - store := &storage.RedisCluster{KeyPrefix: RPCKeyPrefix} + store := storage.RedisCluster{KeyPrefix: RPCKeyPrefix} connected := store.Connect() log.Info("[RPC] --> Connected to DB") diff --git a/storage/redis_cluster.go b/storage/redis_cluster.go index 9050ecda4f2..366adc9ff3f 100644 --- a/storage/redis_cluster.go +++ b/storage/redis_cluster.go @@ -21,15 +21,6 @@ var ( redisCacheClusterSingleton *rediscluster.RedisCluster ) -func GetRelevantClusterReference(cache bool) *rediscluster.RedisCluster { - redisSingletonMu.RLock() - defer redisSingletonMu.RUnlock() - if cache { - return redisCacheClusterSingleton - } - return redisClusterSingleton -} - // RedisCluster is a storage manager that uses the redis database. type RedisCluster struct { KeyPrefix string @@ -82,8 +73,8 @@ func NewRedisClusterPool(isCache bool) *rediscluster.RedisCluster { return &cluster } -// Connect will establish a connection to the GetRelevantClusterReference(r.IsCache) -func (r *RedisCluster) Connect() bool { +// Connect will establish a connection to the r.singleton() +func (r RedisCluster) Connect() bool { redisSingletonMu.Lock() defer redisSingletonMu.Unlock() disconnected := redisClusterSingleton == nil @@ -93,10 +84,10 @@ func (r *RedisCluster) Connect() bool { if disconnected { log.Debug("Connecting to redis cluster") if r.IsCache { - redisCacheClusterSingleton = NewRedisClusterPool(r.IsCache) + redisCacheClusterSingleton = NewRedisClusterPool(true) return true } - redisClusterSingleton = NewRedisClusterPool(r.IsCache) + redisClusterSingleton = NewRedisClusterPool(false) return true } @@ -104,7 +95,16 @@ func (r *RedisCluster) Connect() bool { return true } -func (r *RedisCluster) hashKey(in string) string { +func (r RedisCluster) singleton() *rediscluster.RedisCluster { + redisSingletonMu.RLock() + defer redisSingletonMu.RUnlock() + if r.IsCache { + return redisCacheClusterSingleton + } + return redisClusterSingleton +} + +func (r RedisCluster) hashKey(in string) string { if !r.HashKeys { // Not hashing? Return the raw key return in @@ -112,28 +112,23 @@ func (r *RedisCluster) hashKey(in string) string { return HashStr(in) } -func (r *RedisCluster) fixKey(keyName string) string { - setKeyName := r.KeyPrefix + r.hashKey(keyName) - - log.Debug("Input key was: ", setKeyName) - - return setKeyName +func (r RedisCluster) fixKey(keyName string) string { + return r.KeyPrefix + r.hashKey(keyName) } -func (r *RedisCluster) cleanKey(keyName string) string { - setKeyName := strings.Replace(keyName, r.KeyPrefix, "", 1) - return setKeyName +func (r RedisCluster) cleanKey(keyName string) string { + return strings.Replace(keyName, r.KeyPrefix, "", 1) } -func (r *RedisCluster) ensureConnection() { - if GetRelevantClusterReference(r.IsCache) != nil { +func (r RedisCluster) ensureConnection() { + if r.singleton() != nil { // already connected return } log.Info("Connection dropped, reconnecting...") for { r.Connect() - if GetRelevantClusterReference(r.IsCache) != nil { + if r.singleton() != nil { // reconnection worked return } @@ -142,11 +137,11 @@ func (r *RedisCluster) ensureConnection() { } // GetKey will retrieve a key from the database -func (r *RedisCluster) GetKey(keyName string) (string, error) { +func (r RedisCluster) GetKey(keyName string) (string, error) { r.ensureConnection() log.Debug("[STORE] Getting WAS: ", keyName) log.Debug("[STORE] Getting: ", r.fixKey(keyName)) - value, err := redis.String(GetRelevantClusterReference(r.IsCache).Do("GET", r.fixKey(keyName))) + value, err := redis.String(r.singleton().Do("GET", r.fixKey(keyName))) if err != nil { log.Debug("Error trying to get value:", err) return "", ErrKeyNotFound @@ -155,14 +150,14 @@ func (r *RedisCluster) GetKey(keyName string) (string, error) { return value, nil } -func (r *RedisCluster) GetKeyTTL(keyName string) (ttl int64, err error) { +func (r RedisCluster) GetKeyTTL(keyName string) (ttl int64, err error) { r.ensureConnection() - return redis.Int64(GetRelevantClusterReference(r.IsCache).Do("TTL", r.fixKey(keyName))) + return redis.Int64(r.singleton().Do("TTL", r.fixKey(keyName))) } -func (r *RedisCluster) GetRawKey(keyName string) (string, error) { +func (r RedisCluster) GetRawKey(keyName string) (string, error) { r.ensureConnection() - value, err := redis.String(GetRelevantClusterReference(r.IsCache).Do("GET", keyName)) + value, err := redis.String(r.singleton().Do("GET", keyName)) if err != nil { log.Debug("Error trying to get value:", err) return "", ErrKeyNotFound @@ -171,11 +166,11 @@ func (r *RedisCluster) GetRawKey(keyName string) (string, error) { return value, nil } -func (r *RedisCluster) GetExp(keyName string) (int64, error) { +func (r RedisCluster) GetExp(keyName string) (int64, error) { log.Debug("Getting exp for key: ", r.fixKey(keyName)) r.ensureConnection() - value, err := redis.Int64(GetRelevantClusterReference(r.IsCache).Do("TTL", r.fixKey(keyName))) + value, err := redis.Int64(r.singleton().Do("TTL", r.fixKey(keyName))) if err != nil { log.Error("Error trying to get TTL: ", err) return 0, ErrKeyNotFound @@ -185,14 +180,14 @@ func (r *RedisCluster) GetExp(keyName string) (int64, error) { } // SetKey will create (or update) a key value in the store -func (r *RedisCluster) SetKey(keyName, session string, timeout int64) error { +func (r RedisCluster) SetKey(keyName, session string, timeout int64) error { log.Debug("[STORE] SET Raw key is: ", keyName) log.Debug("[STORE] Setting key: ", r.fixKey(keyName)) r.ensureConnection() - _, err := GetRelevantClusterReference(r.IsCache).Do("SET", r.fixKey(keyName), session) + _, err := r.singleton().Do("SET", r.fixKey(keyName), session) if timeout > 0 { - _, err := GetRelevantClusterReference(r.IsCache).Do("EXPIRE", r.fixKey(keyName), timeout) + _, err := r.singleton().Do("EXPIRE", r.fixKey(keyName), timeout) if err != nil { log.Error("Could not EXPIRE key: ", err) return err @@ -205,12 +200,12 @@ func (r *RedisCluster) SetKey(keyName, session string, timeout int64) error { return nil } -func (r *RedisCluster) SetRawKey(keyName, session string, timeout int64) error { +func (r RedisCluster) SetRawKey(keyName, session string, timeout int64) error { r.ensureConnection() - _, err := GetRelevantClusterReference(r.IsCache).Do("SET", keyName, session) + _, err := r.singleton().Do("SET", keyName, session) if timeout > 0 { - _, err := GetRelevantClusterReference(r.IsCache).Do("EXPIRE", keyName, timeout) + _, err := r.singleton().Do("EXPIRE", keyName, timeout) if err != nil { log.Error("Could not EXPIRE key: ", err) return err @@ -224,29 +219,29 @@ func (r *RedisCluster) SetRawKey(keyName, session string, timeout int64) error { } // Decrement will decrement a key in redis -func (r *RedisCluster) Decrement(keyName string) { +func (r RedisCluster) Decrement(keyName string) { keyName = r.fixKey(keyName) log.Debug("Decrementing key: ", keyName) r.ensureConnection() - err := GetRelevantClusterReference(r.IsCache).Send("DECR", keyName) + err := r.singleton().Send("DECR", keyName) if err != nil { log.Error("Error trying to decrement value:", err) } } // IncrementWithExpire will increment a key in redis -func (r *RedisCluster) IncrememntWithExpire(keyName string, expire int64) int64 { +func (r RedisCluster) IncrememntWithExpire(keyName string, expire int64) int64 { log.Debug("Incrementing raw key: ", keyName) r.ensureConnection() // This function uses a raw key, so we shouldn't call fixKey fixedKey := keyName - val, err := redis.Int64(GetRelevantClusterReference(r.IsCache).Do("INCR", fixedKey)) + val, err := redis.Int64(r.singleton().Do("INCR", fixedKey)) log.Debug("Incremented key: ", fixedKey, ", val is: ", val) if val == 1 { log.Debug("--> Setting Expire") - GetRelevantClusterReference(r.IsCache).Do("EXPIRE", fixedKey, expire) + r.singleton().Do("EXPIRE", fixedKey, expire) } if err != nil { log.Error("Error trying to increment value:", err) @@ -255,10 +250,10 @@ func (r *RedisCluster) IncrememntWithExpire(keyName string, expire int64) int64 } // GetKeys will return all keys according to the filter (filter is a prefix - e.g. tyk.keys.*) -func (r *RedisCluster) GetKeys(filter string) []string { +func (r RedisCluster) GetKeys(filter string) []string { r.ensureConnection() searchStr := r.KeyPrefix + r.hashKey(filter) + "*" - sessionsInterface, err := GetRelevantClusterReference(r.IsCache).Do("KEYS", searchStr) + sessionsInterface, err := r.singleton().Do("KEYS", searchStr) if err != nil { log.Error("Error trying to get all keys: ", err) return nil @@ -272,18 +267,18 @@ func (r *RedisCluster) GetKeys(filter string) []string { } // GetKeysAndValuesWithFilter will return all keys and their values with a filter -func (r *RedisCluster) GetKeysAndValuesWithFilter(filter string) map[string]string { +func (r RedisCluster) GetKeysAndValuesWithFilter(filter string) map[string]string { r.ensureConnection() searchStr := r.KeyPrefix + r.hashKey(filter) + "*" log.Debug("[STORE] Getting list by: ", searchStr) - sessionsInterface, err := GetRelevantClusterReference(r.IsCache).Do("KEYS", searchStr) + sessionsInterface, err := r.singleton().Do("KEYS", searchStr) if err != nil { log.Error("Error trying to get filtered client keys: ", err) return nil } keys, _ := redis.Strings(sessionsInterface, err) - valueObj, err := GetRelevantClusterReference(r.IsCache).Do("MGET", sessionsInterface.([]interface{})...) + valueObj, err := r.singleton().Do("MGET", sessionsInterface.([]interface{})...) values, err := redis.Strings(valueObj, err) if err != nil { log.Error("Error trying to get filtered client keys: ", err) @@ -298,16 +293,16 @@ func (r *RedisCluster) GetKeysAndValuesWithFilter(filter string) map[string]stri } // GetKeysAndValues will return all keys and their values - not to be used lightly -func (r *RedisCluster) GetKeysAndValues() map[string]string { +func (r RedisCluster) GetKeysAndValues() map[string]string { r.ensureConnection() searchStr := r.KeyPrefix + "*" - sessionsInterface, err := GetRelevantClusterReference(r.IsCache).Do("KEYS", searchStr) + sessionsInterface, err := r.singleton().Do("KEYS", searchStr) if err != nil { log.Error("Error trying to get all keys: ", err) return nil } keys, _ := redis.Strings(sessionsInterface, err) - valueObj, err := GetRelevantClusterReference(r.IsCache).Do("MGET", sessionsInterface.([]interface{})...) + valueObj, err := r.singleton().Do("MGET", sessionsInterface.([]interface{})...) values, err := redis.Strings(valueObj, err) if err != nil { log.Error("Error trying to get all keys: ", err) @@ -322,11 +317,11 @@ func (r *RedisCluster) GetKeysAndValues() map[string]string { } // DeleteKey will remove a key from the database -func (r *RedisCluster) DeleteKey(keyName string) bool { +func (r RedisCluster) DeleteKey(keyName string) bool { r.ensureConnection() log.Debug("DEL Key was: ", keyName) log.Debug("DEL Key became: ", r.fixKey(keyName)) - _, err := GetRelevantClusterReference(r.IsCache).Do("DEL", r.fixKey(keyName)) + _, err := r.singleton().Do("DEL", r.fixKey(keyName)) if err != nil { log.Error("Error trying to delete key: ", err) } @@ -335,9 +330,9 @@ func (r *RedisCluster) DeleteKey(keyName string) bool { } // DeleteKey will remove a key from the database without prefixing, assumes user knows what they are doing -func (r *RedisCluster) DeleteRawKey(keyName string) bool { +func (r RedisCluster) DeleteRawKey(keyName string) bool { r.ensureConnection() - _, err := GetRelevantClusterReference(r.IsCache).Do("DEL", keyName) + _, err := r.singleton().Do("DEL", keyName) if err != nil { log.Error("Error trying to delete key: ", err) } @@ -346,7 +341,7 @@ func (r *RedisCluster) DeleteRawKey(keyName string) bool { } // DeleteKeys will remove a group of keys in bulk -func (r *RedisCluster) DeleteScanMatch(pattern string) bool { +func (r RedisCluster) DeleteScanMatch(pattern string) bool { r.ensureConnection() log.Debug("Deleting: ", pattern) @@ -357,7 +352,7 @@ func (r *RedisCluster) DeleteScanMatch(pattern string) bool { var keys []string for { // we scan with our iter offset, starting at 0 - arr, err := redis.MultiBulk(GetRelevantClusterReference(r.IsCache).Do("SCAN", iter, "MATCH", pattern)) + arr, err := redis.MultiBulk(r.singleton().Do("SCAN", iter, "MATCH", pattern)) if err != nil { log.Error("SCAN Token Get Failure: ", err) return false @@ -376,7 +371,7 @@ func (r *RedisCluster) DeleteScanMatch(pattern string) bool { if len(keys) > 0 { for _, name := range keys { log.Info("Deleting: ", name) - _, err := GetRelevantClusterReference(r.IsCache).Do("DEL", name) + _, err := r.singleton().Do("DEL", name) if err != nil { log.Error("Error trying to delete key: ", name, " - ", err) @@ -391,7 +386,7 @@ func (r *RedisCluster) DeleteScanMatch(pattern string) bool { } // DeleteKeys will remove a group of keys in bulk -func (r *RedisCluster) DeleteKeys(keys []string) bool { +func (r RedisCluster) DeleteKeys(keys []string) bool { r.ensureConnection() if len(keys) > 0 { asInterface := make([]interface{}, len(keys)) @@ -400,7 +395,7 @@ func (r *RedisCluster) DeleteKeys(keys []string) bool { } log.Debug("Deleting: ", asInterface) - _, err := GetRelevantClusterReference(r.IsCache).Do("DEL", asInterface...) + _, err := r.singleton().Do("DEL", asInterface...) if err != nil { log.Error("Error trying to delete keys: ", err) } @@ -413,18 +408,18 @@ func (r *RedisCluster) DeleteKeys(keys []string) bool { // StartPubSubHandler will listen for a signal and run the callback for // every subscription and message event. -func (r *RedisCluster) StartPubSubHandler(channel string, callback func(interface{})) error { - if GetRelevantClusterReference(r.IsCache) == nil { +func (r RedisCluster) StartPubSubHandler(channel string, callback func(interface{})) error { + if r.singleton() == nil { return errors.New("Redis connection failed") } - handle := GetRelevantClusterReference(r.IsCache).RandomRedisHandle() + handle := r.singleton().RandomRedisHandle() if handle == nil { return errors.New("Redis connection failed") } psc := redis.PubSubConn{ - Conn: GetRelevantClusterReference(r.IsCache).RandomRedisHandle().Pool.Get(), + Conn: r.singleton().RandomRedisHandle().Pool.Get(), } if err := psc.Subscribe(channel); err != nil { return err @@ -444,9 +439,9 @@ func (r *RedisCluster) StartPubSubHandler(channel string, callback func(interfac } } -func (r *RedisCluster) Publish(channel, message string) error { +func (r RedisCluster) Publish(channel, message string) error { r.ensureConnection() - _, err := GetRelevantClusterReference(r.IsCache).Do("PUBLISH", channel, message) + _, err := r.singleton().Do("PUBLISH", channel, message) if err != nil { log.Error("Error trying to set value: ", err) return err @@ -454,7 +449,7 @@ func (r *RedisCluster) Publish(channel, message string) error { return nil } -func (r *RedisCluster) GetAndDeleteSet(keyName string) []interface{} { +func (r RedisCluster) GetAndDeleteSet(keyName string) []interface{} { log.Debug("Getting raw key set: ", keyName) r.ensureConnection() log.Debug("keyName is: ", keyName) @@ -469,7 +464,7 @@ func (r *RedisCluster) GetAndDeleteSet(keyName string) []interface{} { delCmd.Cmd = "DEL" delCmd.Args = []interface{}{fixedKey} - redVal, err := redis.Values(GetRelevantClusterReference(r.IsCache).DoTransaction([]rediscluster.ClusterTransaction{lrange, delCmd})) + redVal, err := redis.Values(r.singleton().DoTransaction([]rediscluster.ClusterTransaction{lrange, delCmd})) if err != nil { log.Error("Multi command failed: ", err) return nil @@ -487,22 +482,22 @@ func (r *RedisCluster) GetAndDeleteSet(keyName string) []interface{} { return vals } -func (r *RedisCluster) AppendToSet(keyName, value string) { +func (r RedisCluster) AppendToSet(keyName, value string) { log.Debug("Pushing to raw key list: ", keyName) log.Debug("Appending to fixed key list: ", r.fixKey(keyName)) r.ensureConnection() - _, err := GetRelevantClusterReference(r.IsCache).Do("RPUSH", r.fixKey(keyName), value) + _, err := r.singleton().Do("RPUSH", r.fixKey(keyName), value) if err != nil { log.Error("Error trying to delete keys: ", err) } } -func (r *RedisCluster) GetSet(keyName string) (map[string]string, error) { +func (r RedisCluster) GetSet(keyName string) (map[string]string, error) { log.Debug("Getting from key set: ", keyName) log.Debug("Getting from fixed key set: ", r.fixKey(keyName)) r.ensureConnection() - val, err := GetRelevantClusterReference(r.IsCache).Do("SMEMBERS", r.fixKey(keyName)) + val, err := r.singleton().Do("SMEMBERS", r.fixKey(keyName)) if err != nil { log.Error("Error trying to get key set:", err) return nil, err @@ -517,22 +512,22 @@ func (r *RedisCluster) GetSet(keyName string) (map[string]string, error) { return vals, nil } -func (r *RedisCluster) AddToSet(keyName, value string) { +func (r RedisCluster) AddToSet(keyName, value string) { log.Debug("Pushing to raw key set: ", keyName) log.Debug("Pushing to fixed key set: ", r.fixKey(keyName)) r.ensureConnection() - _, err := GetRelevantClusterReference(r.IsCache).Do("SADD", r.fixKey(keyName), value) + _, err := r.singleton().Do("SADD", r.fixKey(keyName), value) if err != nil { log.Error("Error trying to append keys: ", err) } } -func (r *RedisCluster) RemoveFromSet(keyName, value string) { +func (r RedisCluster) RemoveFromSet(keyName, value string) { log.Debug("Removing from raw key set: ", keyName) log.Debug("Removing from fixed key set: ", r.fixKey(keyName)) r.ensureConnection() - _, err := GetRelevantClusterReference(r.IsCache).Do("SREM", r.fixKey(keyName), value) + _, err := r.singleton().Do("SREM", r.fixKey(keyName), value) if err != nil { log.Error("Error trying to remove keys: ", err) @@ -540,7 +535,7 @@ func (r *RedisCluster) RemoveFromSet(keyName, value string) { } // SetRollingWindow will append to a sorted set in redis and extract a timed window of values -func (r *RedisCluster) SetRollingWindow(keyName string, per int64, value_override string, pipeline bool) (int, []interface{}) { +func (r RedisCluster) SetRollingWindow(keyName string, per int64, value_override string, pipeline bool) (int, []interface{}) { log.Debug("Incrementing raw key: ", keyName) r.ensureConnection() @@ -574,9 +569,9 @@ func (r *RedisCluster) SetRollingWindow(keyName string, per int64, value_overrid var redVal []interface{} var err error if pipeline { - redVal, err = redis.Values(GetRelevantClusterReference(r.IsCache).DoPipeline([]rediscluster.ClusterTransaction{ZREMRANGEBYSCORE, ZRANGE, ZADD, EXPIRE})) + redVal, err = redis.Values(r.singleton().DoPipeline([]rediscluster.ClusterTransaction{ZREMRANGEBYSCORE, ZRANGE, ZADD, EXPIRE})) } else { - redVal, err = redis.Values(GetRelevantClusterReference(r.IsCache).DoTransaction([]rediscluster.ClusterTransaction{ZREMRANGEBYSCORE, ZRANGE, ZADD, EXPIRE})) + redVal, err = redis.Values(r.singleton().DoTransaction([]rediscluster.ClusterTransaction{ZREMRANGEBYSCORE, ZRANGE, ZADD, EXPIRE})) } if err != nil { log.Error("Multi command failed: ", err)