Skip to content

Commit

Permalink
Fix KeySpace RPC updated
Browse files Browse the repository at this point in the history
Added new PubSub message which should invalidate cache on all gateways

Additionally SessionCache keys are now hashed, to simplify their
invalidation.
  • Loading branch information
buger committed Mar 29, 2018
1 parent 8504c41 commit 482bb5a
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 4 deletions.
11 changes: 8 additions & 3 deletions middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/TykTechnologies/tyk/apidef"
"github.com/TykTechnologies/tyk/config"
"github.com/TykTechnologies/tyk/storage"
"github.com/TykTechnologies/tyk/user"
)

Expand Down Expand Up @@ -282,9 +283,13 @@ func (t BaseMiddleware) ApplyPolicies(key string, session *user.SessionState) er
func (t BaseMiddleware) CheckSessionAndIdentityForValidKey(key string) (user.SessionState, bool) {
// Try and get the session from the session store
log.Debug("Querying local cache")
cacheKey := key
if config.Global.HashKeys {
cacheKey = storage.HashStr(key)
}
// Check in-memory cache
if !config.Global.LocalSessionCache.DisableCacheSessionState {
cachedVal, found := SessionCache.Get(key)
cachedVal, found := SessionCache.Get(cacheKey)
if found {
log.Debug("--> Key found in local cache")
session := cachedVal.(user.SessionState)
Expand All @@ -301,7 +306,7 @@ func (t BaseMiddleware) CheckSessionAndIdentityForValidKey(key string) (user.Ses
if found {
// If exists, assume it has been authorized and pass on
// cache it
go SessionCache.Set(key, session, cache.DefaultExpiration)
go SessionCache.Set(cacheKey, session, cache.DefaultExpiration)

// Check for a policy, if there is a policy, pull it and overwrite the session values
if err := t.ApplyPolicies(key, &session); err != nil {
Expand All @@ -319,7 +324,7 @@ func (t BaseMiddleware) CheckSessionAndIdentityForValidKey(key string) (user.Ses
log.Info("Recreating session for key: ", key)

// cache it
go SessionCache.Set(key, session, cache.DefaultExpiration)
go SessionCache.Set(cacheKey, session, cache.DefaultExpiration)

// Check for a policy, if there is a policy, pull it and overwrite the session values
if err := t.ApplyPolicies(key, &session); err != nil {
Expand Down
9 changes: 8 additions & 1 deletion mw_jwt.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
cache "github.com/pmylund/go-cache"

"github.com/TykTechnologies/tyk/apidef"
"github.com/TykTechnologies/tyk/config"
"github.com/TykTechnologies/tyk/storage"
"github.com/TykTechnologies/tyk/user"
)

Expand Down Expand Up @@ -302,8 +304,13 @@ func (k *JWTMiddleware) processCentralisedJWT(r *http.Request, token *jwt.Token)
log.WithError(err).Error("Could not apply new policy from JWT to session")
return errors.New("Key not authorized: could not apply new policy"), 403
}

cacheKey := sessionID
if config.Global.HashKeys {
cacheKey = storage.HashStr(sessionID)
}
// update session in cache
go SessionCache.Set(sessionID, session, cache.DefaultExpiration)
go SessionCache.Set(cacheKey, session, cache.DefaultExpiration)
}
}

Expand Down
17 changes: 17 additions & 0 deletions redis_signals.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
NoticeGatewayConfigResponse NotificationCommand = "NoticeGatewayConfigResponse"
NoticeGatewayDRLNotification NotificationCommand = "NoticeGatewayDRLNotification"
NoticeGatewayLENotification NotificationCommand = "NoticeGatewayLENotification"
KeySpaceUpdateNotification NotificationCommand = "KeySpaceUpdateNotification"
)

// Notification is a type that encodes a message published to a pub sub channel (shared between implementations)
Expand Down Expand Up @@ -111,6 +112,8 @@ func handleRedisEvent(v interface{}, handled func(NotificationCommand), reloaded
"prefix": "pub-sub",
}).Info("Reloading endpoints")
reloadURLStructure(reloaded)
case KeySpaceUpdateNotification:
handleKeySpaceEventCacheFlush(notif.Payload)
default:
log.WithFields(logrus.Fields{
"prefix": "pub-sub",
Expand All @@ -123,6 +126,20 @@ func handleRedisEvent(v interface{}, handled func(NotificationCommand), reloaded
}
}

func handleKeySpaceEventCacheFlush(payload string) {
keys := strings.Split(payload, ",")

for _, key := range keys {
splitKeys := strings.Split(key, ":")
if len(splitKeys) > 1 {
key = splitKeys[0]
}

RPCGlobalCache.Delete("apikey-" + key)
SessionCache.Delete(key)
}
}

var redisInsecureWarn sync.Once
var notificationVerifier goverify.Verifier

Expand Down
10 changes: 10 additions & 0 deletions rpc_storage_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,7 @@ func (r *RPCStorageHandler) ProcessKeySpaceChanges(keys []string) {
for _, key := range keys {
splitKeys := strings.Split(key, ":")
if len(splitKeys) > 1 {
key = splitKeys[0]
if splitKeys[1] == "hashed" {
log.Info("--> removing cached (hashed) key: ", splitKeys[0])
handleDeleteHashedKey(splitKeys[0], "")
Expand All @@ -997,7 +998,16 @@ func (r *RPCStorageHandler) ProcessKeySpaceChanges(keys []string) {
handleDeleteKey(key, "-1")
}

SessionCache.Delete(key)
RPCGlobalCache.Delete(r.KeyPrefix + key)
}

// Notify rest of gateways in cluster to flush cache
n := Notification{
Command: KeySpaceUpdateNotification,
Payload: strings.Join(keys, ","),
}
MainNotifier.Notify(n)
}

func (r *RPCStorageHandler) DeleteScanMatch(pattern string) bool {
Expand Down

0 comments on commit 482bb5a

Please sign in to comment.