Skip to content

Commit

Permalink
Merge 5a8b97b into cba098b
Browse files Browse the repository at this point in the history
  • Loading branch information
joshblakeley authored Aug 7, 2018
2 parents cba098b + 5a8b97b commit 19b392b
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 13 deletions.
33 changes: 33 additions & 0 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,39 @@ func handleGetAllKeys(filter, apiID string) (interface{}, int) {
return sessionsObj, http.StatusOK
}

func handleAddKey(keyName, hashedName, sessionString, apiID string) {
mw := BaseMiddleware{
Spec: &APISpec{
SessionManager: FallbackKeySesionManager,
},
}

sess := user.SessionState{}
json.Unmarshal([]byte(sessionString), &sess)

sess.LastUpdated = strconv.Itoa(int(time.Now().Unix()))
var err error
if config.Global().HashKeys {
err = mw.Spec.SessionManager.UpdateSession(hashedName, &sess, 0, true)
} else {
err = mw.Spec.SessionManager.UpdateSession(keyName, &sess, 0, false)
}
if err != nil {
log.WithFields(logrus.Fields{
"prefix": "api",
"key": keyName,
"status": "fail",
"err": err,
}).Error("Failed to update key.")
}

log.WithFields(logrus.Fields{
"prefix": "RPC",
"key": obfuscateKey(keyName),
"status": "ok",
}).Info("Updated hashed key in slave storage.")
}

func handleDeleteKey(keyName, apiID string) (interface{}, int) {
if apiID == "-1" {
// Go through ALL managed API's and delete the key
Expand Down
89 changes: 76 additions & 13 deletions rpc_storage_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,30 +999,93 @@ func (r *RPCStorageHandler) CheckForKeyspaceChanges(orgId string) {
}
}

func (r *RPCStorageHandler) getKeyForCreation(keyName string) (string, string, error) {
//we want the key with this prefix from the master
newKeyName := "apikey-" + storage.HashStr(keyName)
//is the key in master redis
value, err := RPCFuncClientSingleton.CallTimeout("GetKey", newKeyName, GlobalRPCCallTimeout)
if err != nil {
emitRPCErrorEventKv(
rpcFuncClientSingletonCall,
"GetKey",
err,
map[string]string{
"keyName": keyName,
"fixedKeyName": newKeyName,
},
)
if r.IsAccessError(err) {
if r.Login() {
return r.getKeyForCreation(keyName)
}
}

log.Debug("Error trying to get value:", err)
return "", "", storage.ErrKeyNotFound
}

if config.Global().SlaveOptions.EnableRPCCache {
// Cache key
RPCGlobalCache.Set(newKeyName, value, cache.DefaultExpiration)
}
//return hash key without prefix so it doesnt get double prefixed in redis
return value.(string), newKeyName[7:], nil
}
func (r *RPCStorageHandler) getSessionAndCreate(keyName string) {
sessionString, hashedKeyName, err := r.getKeyForCreation(keyName)
if err != nil {
log.Error("Key not found in master - skipping")
}
handleAddKey(keyName, hashedKeyName, sessionString, "-1")
}
func (r *RPCStorageHandler) ProcessKeySpaceChanges(keys []string) {
var deletePayload []string
for _, key := range keys {
//split key from keyspace event type i.e. key:create
splitKeys := strings.Split(key, ":")
if len(splitKeys) > 1 {
key = splitKeys[0]
if splitKeys[1] == "hashed" {
switch splitKeys[1] {
//both treated the same currently
case "create", "createhashed":
r.getSessionAndCreate(splitKeys[0])
case "update":
log.Info("--> removing cached key: ", key)
handleDeleteKey(key, "-1")
SessionCache.Delete(key)
RPCGlobalCache.Delete(r.KeyPrefix + key)
deletePayload = append(deletePayload, splitKeys[0])
r.getSessionAndCreate(splitKeys[0])
case "updatehashed":
log.Info("--> removing cached (hashed) key: ", splitKeys[0])
handleDeleteHashedKey(splitKeys[0], "")
SessionCache.Delete(key)
RPCGlobalCache.Delete(r.KeyPrefix + key)
deletePayload = append(deletePayload, splitKeys[0])
r.getSessionAndCreate(splitKeys[0])
case "delete":
log.Info("--> removing cached key: ", key)
handleDeleteKey(key, "-1")
SessionCache.Delete(key)
RPCGlobalCache.Delete(r.KeyPrefix + key)
deletePayload = append(deletePayload, splitKeys[0])
case "deletehashed":
log.Info("--> removing cached (hashed) key: ", splitKeys[0])
handleDeleteHashedKey(splitKeys[0], "")
SessionCache.Delete(key)
RPCGlobalCache.Delete(r.KeyPrefix + key)
deletePayload = append(deletePayload, splitKeys[0])
}
} else {
log.Info("--> removing cached key: ", key)
handleDeleteKey(key, "-1")
}

SessionCache.Delete(key)
RPCGlobalCache.Delete(r.KeyPrefix + key)
}

// Notify rest of gateways in cluster to flush cache
n := Notification{
Command: KeySpaceUpdateNotification,
Payload: strings.Join(keys, ","),
if len(deletePayload) > 0 {
// Notify rest of gateways in cluster to flush cache
n := Notification{
Command: KeySpaceUpdateNotification,
Payload: strings.Join(deletePayload, ","),
}
MainNotifier.Notify(n)
}
MainNotifier.Notify(n)
}

func (r *RPCStorageHandler) DeleteScanMatch(pattern string) bool {
Expand Down

0 comments on commit 19b392b

Please sign in to comment.