Skip to content

Commit

Permalink
Merging to release-4: TT-6968 GroupLogin synchronization force (#4605) (
Browse files Browse the repository at this point in the history
#4609)

TT-6968 GroupLogin synchronization force (#4605)

[changelog]
fixed: force synchronization for the edge group when
`slave_options.synchroniser_enabled` is set to true.

Co-authored-by: Tomas Buchaillot <tombuchaillot89@gmail.com>
  • Loading branch information
buger and tbuchaillot committed Jan 3, 2023
1 parent c96726b commit ddfa82a
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 8 deletions.
5 changes: 3 additions & 2 deletions apidef/rpc.go
Expand Up @@ -16,8 +16,9 @@ type DefRequest struct {
}

type GroupLoginRequest struct {
UserKey string
GroupID string
UserKey string
GroupID string
ForceSync bool
}

type GroupKeySpaceRequest struct {
Expand Down
21 changes: 15 additions & 6 deletions gateway/rpc_storage_handler.go
Expand Up @@ -132,19 +132,28 @@ func (r *RPCStorageHandler) Connect() bool {
rpcConfig,
r.SuppressRegister,
dispatcherFuncs,
func(userKey string, groupID string) interface{} {
return apidef.GroupLoginRequest{
UserKey: userKey,
GroupID: groupID,
}
},
r.getGroupLoginCallback(r.Gw.GetConfig().SlaveOptions.SynchroniserEnabled),
func() {
r.Gw.reloadURLStructure(nil)
},
r.DoReload,
)
}

func (r *RPCStorageHandler) getGroupLoginCallback(synchroniserEnabled bool) func(userKey string, groupID string) interface{} {
groupLoginCallbackFn := func(userKey string, groupID string) interface{} {
return apidef.GroupLoginRequest{
UserKey: userKey,
GroupID: groupID,
}
}
if synchroniserEnabled {
forcer := rpc.NewSyncForcer(r.Gw.RedisController)
groupLoginCallbackFn = forcer.GroupLoginCallback
}
return groupLoginCallbackFn
}

func (r *RPCStorageHandler) hashKey(in string) string {
if !r.HashKeys {
// Not hashing? Return the raw key
Expand Down
48 changes: 48 additions & 0 deletions gateway/rpc_storage_handler_test.go
Expand Up @@ -5,6 +5,7 @@ import (
"net/http"
"testing"

"github.com/TykTechnologies/tyk/apidef"
"github.com/TykTechnologies/tyk/config"

"github.com/lonelycode/osin"
Expand Down Expand Up @@ -281,3 +282,50 @@ func TestRPCUpdateKey(t *testing.T) {
})
}
}

func TestGetGroupLoginCallback(t *testing.T) {
tcs := []struct {
testName string
syncEnabled bool
givenKey string
givenGroup string
expectedCallbackResponse interface{}
}{
{
testName: "sync disabled",
syncEnabled: false,
givenKey: "key",
givenGroup: "group",
expectedCallbackResponse: apidef.GroupLoginRequest{UserKey: "key", GroupID: "group"},
},
{
testName: "sync enabled",
syncEnabled: true,
givenKey: "key",
givenGroup: "group",
expectedCallbackResponse: apidef.GroupLoginRequest{UserKey: "key", GroupID: "group", ForceSync: true},
},
}

for _, tc := range tcs {
t.Run(tc.testName, func(t *testing.T) {
g := StartTest(func(globalConf *config.Config) {
globalConf.SlaveOptions.SynchroniserEnabled = tc.syncEnabled
})
defer g.Close()
defer g.Gw.GlobalSessionManager.Store().DeleteAllKeys()

rpcListener := RPCStorageHandler{
KeyPrefix: "rpc.listener.",
SuppressRegister: true,
Gw: g.Gw,
}

fn := rpcListener.getGroupLoginCallback(tc.syncEnabled)
groupLogin, ok := fn(tc.givenKey, tc.givenGroup).(apidef.GroupLoginRequest)
assert.True(t, ok)
assert.Equal(t, tc.expectedCallbackResponse, groupLogin)
})
}

}
45 changes: 45 additions & 0 deletions rpc/synchronization_forcer.go
@@ -0,0 +1,45 @@
package rpc

import (
"errors"

"github.com/TykTechnologies/tyk/apidef"
"github.com/TykTechnologies/tyk/storage"
)

type SyncronizerForcer struct {
store *storage.RedisCluster
}

//NewSyncForcer returns a new syncforcer with a connected redis with a key prefix synchronizer-group- for group synchronization control.
func NewSyncForcer(redisController *storage.RedisController) *SyncronizerForcer {
sf := &SyncronizerForcer{}

sf.store = &storage.RedisCluster{KeyPrefix: "synchronizer-group-", RedisController: redisController}
sf.store.Connect()

return sf
}

// GroupLoginCallback checks if the groupID key exists in the storage to turn on/off ForceSync param.
// If the the key doesn't exists in the storage, it creates it and set ForceSync to true
func (sf *SyncronizerForcer) GroupLoginCallback(userKey string, groupID string) interface{} {
shouldForce := false

_, err := sf.store.GetKey(groupID)
if err != nil && errors.Is(err, storage.ErrKeyNotFound) {
shouldForce = true

err = sf.store.SetKey(groupID, "", 0)
if err != nil {
Log.Error("error setting syncforcer key", err)
}
Log.Info("Forcing MDCB synchronization for group:", groupID)
}

return apidef.GroupLoginRequest{
UserKey: userKey,
GroupID: groupID,
ForceSync: shouldForce,
}
}
60 changes: 60 additions & 0 deletions rpc/synchronization_forcer_test.go
@@ -0,0 +1,60 @@
package rpc

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/TykTechnologies/tyk/apidef"
"github.com/TykTechnologies/tyk/config"
"github.com/TykTechnologies/tyk/storage"
)

var rc *storage.RedisController

func init() {
conf := config.Default

rc = storage.NewRedisController(context.Background())
go rc.ConnectToRedis(context.Background(), nil, &conf)
for {
if rc.Connected() {
break
}

time.Sleep(10 * time.Millisecond)
}
}

func TestNewSyncForcer(t *testing.T) {
sf := NewSyncForcer(rc)

assert.True(t, sf.store.ControllerInitiated())
assert.Equal(t, "synchronizer-group-", sf.store.KeyPrefix)

assert.Equal(t, true, sf.store.RedisController.Connected())
}

func TestGroupLoginCallback(t *testing.T) {
sf := NewSyncForcer(rc)
defer sf.store.DeleteAllKeys()

key := "key"
groupID := "group"

//first time, it should force since the group key doesn't exists
groupLogin, ok := sf.GroupLoginCallback(key, groupID).(apidef.GroupLoginRequest)
assert.True(t, ok)
assert.Equal(t, true, groupLogin.ForceSync)
assert.Equal(t, key, groupLogin.UserKey)
assert.Equal(t, groupID, groupLogin.GroupID)

//second time, it shouldn't force since the group key already exists
groupLogin, ok = sf.GroupLoginCallback(key, groupID).(apidef.GroupLoginRequest)
assert.True(t, ok)
assert.Equal(t, false, groupLogin.ForceSync)
assert.Equal(t, key, groupLogin.UserKey)
assert.Equal(t, groupID, groupLogin.GroupID)
}

0 comments on commit ddfa82a

Please sign in to comment.