Skip to content

Commit

Permalink
Improve RPC stability in case of master failures
Browse files Browse the repository at this point in the history
tbd
  • Loading branch information
buger committed Feb 20, 2018
1 parent 37801ce commit 26c7a12
Show file tree
Hide file tree
Showing 14 changed files with 399 additions and 188 deletions.
8 changes: 7 additions & 1 deletion api_definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,14 @@ func (a APIDefinitionLoader) FromDashboardService(endpoint, secret string) []*AP

// FromCloud will connect and download ApiDefintions from a Mongo DB instance.
func (a APIDefinitionLoader) FromRPC(orgId string) []*APISpec {
if rpcEmergencyMode {
return LoadDefinitionsFromRPCBackup()
}

store := RPCStorageHandler{UserKey: config.Global.SlaveOptions.APIKey, Address: config.Global.SlaveOptions.ConnectionString}
store.Connect()
if !store.Connect() {
return nil
}

// enable segments
var tags []string
Expand Down
75 changes: 0 additions & 75 deletions api_definition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"time"

"github.com/garyburd/redigo/redis"
"github.com/lonelycode/gorpc"

"github.com/TykTechnologies/tyk/apidef"
"github.com/TykTechnologies/tyk/config"
Expand Down Expand Up @@ -191,80 +190,6 @@ func TestIgnored(t *testing.T) {
})
}

func startRPCMock(dispatcher *gorpc.Dispatcher) *gorpc.Server {
configMu.Lock()
defer configMu.Unlock()

config.Global.SlaveOptions.UseRPC = true
config.Global.SlaveOptions.RPCKey = "test_org"
config.Global.SlaveOptions.APIKey = "test"

server := gorpc.NewTCPServer("127.0.0.1:0", dispatcher.NewHandlerFunc())
list := &customListener{}
server.Listener = list
server.LogError = gorpc.NilErrorLogger

if err := server.Start(); err != nil {
panic(err)
}
config.Global.SlaveOptions.ConnectionString = list.L.Addr().String()

return server
}

func stopRPCMock(server *gorpc.Server) {
config.Global.SlaveOptions.ConnectionString = ""
config.Global.SlaveOptions.RPCKey = ""
config.Global.SlaveOptions.APIKey = ""
config.Global.SlaveOptions.UseRPC = false

server.Listener.Close()
server.Stop()

RPCCLientSingleton.Stop()
RPCClientIsConnected = false
RPCCLientSingleton = nil
RPCFuncClientSingleton = nil
}

func TestSyncAPISpecsRPCFailure(t *testing.T) {
// Mock RPC
dispatcher := gorpc.NewDispatcher()
dispatcher.AddFunc("GetApiDefinitions", func(clientAddr string, dr *DefRequest) (string, error) {
return "malformed json", nil
})
dispatcher.AddFunc("Login", func(clientAddr, userKey string) bool {
return true
})

rpc := startRPCMock(dispatcher)
defer stopRPCMock(rpc)

count := syncAPISpecs()
if count != 0 {
t.Error("Should return empty value for malformed rpc response", apiSpecs)
}
}

func TestSyncAPISpecsRPCSuccess(t *testing.T) {
// Mock RPC
dispatcher := gorpc.NewDispatcher()
dispatcher.AddFunc("GetApiDefinitions", func(clientAddr string, dr *DefRequest) (string, error) {
return "[{}]", nil
})
dispatcher.AddFunc("Login", func(clientAddr, userKey string) bool {
return true
})

rpc := startRPCMock(dispatcher)
defer stopRPCMock(rpc)

count := syncAPISpecs()
if count != 1 {
t.Error("Should return array with one spec", apiSpecs)
}
}

func TestSyncAPISpecsDashboardSuccess(t *testing.T) {
// Mock Dashboard
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down
2 changes: 0 additions & 2 deletions api_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,8 +665,6 @@ func loadApps(specs []*APISpec, muxer *mux.Router) {
}).Info("Initialised API Definitions")

if config.Global.SlaveOptions.UseRPC {
//log.Warning("TODO: PUT THE KEEPALIVE WATCHER BACK")
startRPCKeepaliveWatcher(rpcAuthStore)
startRPCKeepaliveWatcher(rpcOrgStore)
}
}
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type AnalyticsConfigConfig struct {
GeoIPDBLocation string `json:"geo_ip_db_path"`
NormaliseUrls NormalisedURLConfig `json:"normalise_urls"`
PoolSize int `json:"pool_size"`
StorageExpirationTime int `json:"storage_expiration_time"`
ignoredIPsCompiled map[string]bool
}

Expand Down
6 changes: 6 additions & 0 deletions ldap_auth_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,16 @@ func (l *LDAPStorageHandler) GetRawKey(filter string) (string, error) {
return "", nil
}

func (l *LDAPStorageHandler) SetExp(cn string, exp int64) error {
log.Warning("Not implementated")
return nil
}

func (l *LDAPStorageHandler) GetExp(cn string) (int64, error) {
log.Warning("Not implementated")
return 0, nil
}

func (l *LDAPStorageHandler) GetKeys(filter string) []string {
log.Warning("Not implementated")
s := []string{}
Expand Down
3 changes: 3 additions & 0 deletions lint/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ const confSchema = `{
"pool_size": {
"type": "integer"
},
"storage_expiration_time": {
"type": "integer"
},
"type": {
"type": "string"
}
Expand Down
40 changes: 26 additions & 14 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ func apisByIDLen() int {
return len(apisByID)
}

var redisPurgeOnce sync.Once
var rpcPurgeOnce sync.Once

// Create all globals and init connection handlers
func setupGlobals() {
reloadMu.Lock()
Expand Down Expand Up @@ -152,13 +155,19 @@ func setupGlobals() {
analytics.Store = &analyticsStore
analytics.Init()

redisPurgeOnce.Do(func(){
redisPurger := RedisPurger{Store: &analyticsStore}
go redisPurger.PurgeLoop(1 * time.Second)
})

if config.Global.AnalyticsConfig.Type == "rpc" {
log.Debug("Using RPC cache purge")

purger := RPCPurger{Store: &analyticsStore}
purger.Connect()
analytics.Clean = &purger
go analytics.Clean.PurgeLoop(10 * time.Second)
rpcPurgeOnce.Do(func(){
purger := RPCPurger{Store: &analyticsStore}
purger.Connect()
go purger.PurgeLoop(10 * time.Second)
})
}
}

Expand Down Expand Up @@ -274,7 +283,7 @@ func syncAPISpecs() int {
return len(apiSpecs)
}

func syncPolicies() {
func syncPolicies() int {
var pols map[string]user.Policy

log.WithFields(logrus.Fields{
Expand Down Expand Up @@ -308,7 +317,7 @@ func syncPolicies() {
log.WithFields(logrus.Fields{
"prefix": "main",
}).Debug("No policy record name defined, skipping...")
return
return 0
}
pols = LoadPoliciesFromFile(config.Global.Policies.PolicyRecordName)
}
Expand All @@ -321,11 +330,13 @@ func syncPolicies() {
}).Infof(" - %s", id)
}

policiesMu.Lock()
defer policiesMu.Unlock()
if len(pols) > 0 {
policiesMu.Lock()
policiesByID = pols
policiesMu.Unlock()
}

return len(pols)
}

// stripSlashes removes any trailing slashes from the request's URL
Expand Down Expand Up @@ -676,9 +687,9 @@ func doReload() {

mainRouter = newRouter

// Unset these
rpcEmergencyModeLoaded = false
rpcEmergencyMode = false
// // Unset these
// rpcEmergencyModeLoaded = false
// rpcEmergencyMode = false
}

// startReloadChan and reloadDoneChan are used by the two reload loops
Expand Down Expand Up @@ -1275,6 +1286,8 @@ func handleDashboardRegistration() {
go DashService.StartBeating()
}

var drlOnce sync.Once

func startDRL() {
switch {
case config.Global.ManagementNode:
Expand Down Expand Up @@ -1311,6 +1324,8 @@ func listen(l, controlListener net.Listener, err error) {
writeTimeout = config.Global.HttpServerOptions.WriteTimeout
}

drlOnce.Do(startDRL)

// Handle reload when SIGUSR2 is received
if err != nil {
// Listen on a TCP or a UNIX domain socket (TCP here).
Expand All @@ -1321,8 +1336,6 @@ func listen(l, controlListener net.Listener, err error) {
// handle dashboard registration and nonces if available
handleDashboardRegistration()

startDRL()

if !rpcEmergencyMode {
count := syncAPISpecs()
if count > 0 {
Expand Down Expand Up @@ -1396,7 +1409,6 @@ func listen(l, controlListener net.Listener, err error) {
os.Setenv("TYK_SERVICE_NONCE", "")
os.Setenv("TYK_SERVICE_NODEID", "")
}
startDRL()

// Resume accepting connections in a new goroutine.
if !rpcEmergencyMode {
Expand Down
38 changes: 27 additions & 11 deletions policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,29 +139,45 @@ func LoadPoliciesFromDashboard(endpoint, secret string, allowExplicit bool) map[
return policies
}

func LoadPoliciesFromRPC(orgId string) map[string]user.Policy {
func parsePoliciesFromRPC(list string) (map[string]user.Policy, error) {
var dbPolicyList []user.Policy

if err := json.Unmarshal([]byte(list), &dbPolicyList); err != nil {
return nil, err
}

policies := make(map[string]user.Policy, len(dbPolicyList))

for _, p := range dbPolicyList {
p.ID = p.MID.Hex()
policies[p.MID.Hex()] = p
}

return policies, nil
}

func LoadPoliciesFromRPC(orgId string) map[string]user.Policy {
if rpcEmergencyMode {
return LoadPoliciesFromRPCBackup()
}

store := &RPCStorageHandler{UserKey: config.Global.SlaveOptions.APIKey, Address: config.Global.SlaveOptions.ConnectionString}
store.Connect()
if !store.Connect() {
return nil
}

rpcPolicies := store.GetPolicies(orgId)

//store.Disconnect()
policies, err := parsePoliciesFromRPC(rpcPolicies)

if err := json.Unmarshal([]byte(rpcPolicies), &dbPolicyList); err != nil {
if err != nil {
log.WithFields(logrus.Fields{
"prefix": "policy",
}).Error("Failed decode: ", err)
}).Error("Failed decode: ", err, rpcPolicies)
return nil
}

policies := make(map[string]user.Policy, len(dbPolicyList))

for _, p := range dbPolicyList {
p.ID = p.MID.Hex()
policies[p.MID.Hex()] = p
}
saveRPCPoliciesBackup(rpcPolicies)

return policies
}
31 changes: 31 additions & 0 deletions rpc_analytics_purger.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"gopkg.in/vmihailenco/msgpack.v2"

"github.com/TykTechnologies/tyk/config"
"github.com/TykTechnologies/tyk/storage"
)

Expand Down Expand Up @@ -41,6 +42,11 @@ func (r RPCPurger) PurgeLoop(sleep time.Duration) {

// PurgeCache will pull the data from the in-memory store and drop it into the specified MongoDB collection
func (r *RPCPurger) PurgeCache() {
if _, err := RPCFuncClientSingleton.Call("Ping", nil); err != nil {
log.Error("Failed to ping RPC: ", err)
return
}

analyticsValues := r.Store.GetAndDeleteSet(analyticsKeyName)
if len(analyticsValues) == 0 {
return
Expand Down Expand Up @@ -70,3 +76,28 @@ func (r *RPCPurger) PurgeCache() {
}

}

type RedisPurger struct {
Store storage.Handler
}

func (r RedisPurger) PurgeLoop(sleep time.Duration) {
for {
time.Sleep(sleep)
r.PurgeCache()
}
}

func (r *RedisPurger) PurgeCache() {
configMu.Lock()
expireAfter := config.Global.AnalyticsConfig.StorageExpirationTime
configMu.Unlock()
if expireAfter == 0 {
expireAfter = 60 // 1 minute
}

exp, _ := r.Store.GetExp(analyticsKeyName)
if exp <= 0 {
r.Store.SetExp(analyticsKeyName, int64(expireAfter))
}
}
Loading

0 comments on commit 26c7a12

Please sign in to comment.