Skip to content

Commit

Permalink
Improve RPC stability in case of master failures
Browse files Browse the repository at this point in the history
tbd
  • Loading branch information
buger committed Feb 18, 2018
1 parent 37801ce commit 2ffc8ca
Show file tree
Hide file tree
Showing 13 changed files with 311 additions and 120 deletions.
8 changes: 7 additions & 1 deletion api_definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,14 @@ func (a APIDefinitionLoader) FromDashboardService(endpoint, secret string) []*AP

// FromCloud will connect and download ApiDefintions from a Mongo DB instance.
func (a APIDefinitionLoader) FromRPC(orgId string) []*APISpec {
if rpcEmergencyMode {
return LoadDefinitionsFromRPCBackup()
}

store := RPCStorageHandler{UserKey: config.Global.SlaveOptions.APIKey, Address: config.Global.SlaveOptions.ConnectionString}
store.Connect()
if !store.Connect() {
return nil
}

// enable segments
var tags []string
Expand Down
114 changes: 105 additions & 9 deletions api_definition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"encoding/json"
"fmt"
"io"
"net"
"net/http"
Expand Down Expand Up @@ -195,9 +196,13 @@ func startRPCMock(dispatcher *gorpc.Dispatcher) *gorpc.Server {
configMu.Lock()
defer configMu.Unlock()

GlobalRPCCallTimeout = 100 * time.Millisecond
config.Global.SlaveOptions.UseRPC = true
config.Global.SlaveOptions.RPCKey = "test_org"
config.Global.SlaveOptions.APIKey = "test"
config.Global.Policies.PolicySource = "rpc"
config.Global.SlaveOptions.CallTimeout = 1
config.Global.SlaveOptions.RPCPoolSize = 2

server := gorpc.NewTCPServer("127.0.0.1:0", dispatcher.NewHandlerFunc())
list := &customListener{}
Expand All @@ -217,14 +222,20 @@ func stopRPCMock(server *gorpc.Server) {
config.Global.SlaveOptions.RPCKey = ""
config.Global.SlaveOptions.APIKey = ""
config.Global.SlaveOptions.UseRPC = false
config.Global.Policies.PolicySource = ""

server.Listener.Close()
server.Stop()
if server != nil {
server.Listener.Close()
server.Stop()
}

RPCCLientSingleton.Stop()
RPCClientIsConnected = false
RPCCLientSingleton = nil
RPCFuncClientSingleton = nil
rpcLoadCount = 0
rpcEmergencyMode = false
rpcEmergencyModeLoaded = false
}

func TestSyncAPISpecsRPCFailure(t *testing.T) {
Expand All @@ -250,19 +261,104 @@ func TestSyncAPISpecsRPCSuccess(t *testing.T) {
// Mock RPC
dispatcher := gorpc.NewDispatcher()
dispatcher.AddFunc("GetApiDefinitions", func(clientAddr string, dr *DefRequest) (string, error) {
return "[{}]", nil
return "[" + sampleAPI + "]", nil
})
dispatcher.AddFunc("GetPolicies", func(clientAddr string, orgid string) (string, error) {
return `[{"_id":"507f191e810c19729de860ea", "rate":1, "per":1}]`, nil
})
dispatcher.AddFunc("Login", func(clientAddr, userKey string) bool {
return true
})
dispatcher.AddFunc("GetKey", func(clientAddr, key string) (string, error) {
return "", fmt.Errorf("Not found")
})

rpc := startRPCMock(dispatcher)
defer stopRPCMock(rpc)
t.Run("RPC is live", func(t *testing.T) {
rpc := startRPCMock(dispatcher)
defer stopRPCMock(rpc)
ts := newTykTestServer()
defer ts.Close()

count := syncAPISpecs()
if count != 1 {
t.Error("Should return array with one spec", apiSpecs)
}
apiBackup := LoadDefinitionsFromRPCBackup()
if len(apiBackup) != 1 {
t.Fatal("Should have APIs in backup")
}

policyBackup := LoadPoliciesFromRPCBackup()
if len(policyBackup) != 1 {
t.Fatal("Should have Policies in backup")
}

ts.Run(t, []test.TestCase{
{Path: "/sample", Code: 200},
}...)

count := syncAPISpecs()
if count != 1 {
t.Error("Should return array with one spec", apiSpecs)
}
})

t.Run("RPC down, load backup", func(t *testing.T) {
// Point rpc to non existent address
config.Global.SlaveOptions.ConnectionString = testHttpFailure
config.Global.SlaveOptions.UseRPC = true
config.Global.SlaveOptions.RPCKey = "test_org"
config.Global.SlaveOptions.APIKey = "test"
config.Global.Policies.PolicySource = "rpc"

// RPC layer is down
ts := newTykTestServer()
defer ts.Close()

// Wait for backup to load
time.Sleep(100 * time.Millisecond)

// Still should work!
ts.Run(t, []test.TestCase{
{Path: "/sample", Code: 200},
}...)

stopRPCMock(nil)
})

t.Run("RPC is back", func(t *testing.T) {
rpcEmergencyModeLoaded = false
rpcEmergencyMode = false

dispatcher := gorpc.NewDispatcher()
dispatcher.AddFunc("GetApiDefinitions", func(clientAddr string, dr *DefRequest) (string, error) {
return "[" + sampleAPI + "," + sampleAPI + "]", nil
})
dispatcher.AddFunc("GetPolicies", func(clientAddr string, orgid string) (string, error) {
return `[{"_id":"507f191e810c19729de860ea", "rate":1, "per":1}, {"_id":"507f191e810c19729de860eb", "rate":1, "per":1}]`, nil
})
dispatcher.AddFunc("Login", func(clientAddr, userKey string) bool {
return true
})
dispatcher.AddFunc("GetKey", func(clientAddr, key string) (string, error) {
return "", fmt.Errorf("Not found")
})
// Back to live
rpc := startRPCMock(dispatcher)
defer stopRPCMock(rpc)
ts := newTykTestServer()
defer ts.Close()

time.Sleep(100 * time.Millisecond)

ts.Run(t, []test.TestCase{
{Path: "/sample", Code: 200},
}...)

if count := syncAPISpecs(); count != 2 {
t.Error("Should fetch latest specs", count)
}

if count := syncPolicies(); count != 2 {
t.Error("Should fetch latest policies", count)
}
})
}

func TestSyncAPISpecsDashboardSuccess(t *testing.T) {
Expand Down
2 changes: 0 additions & 2 deletions api_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,8 +665,6 @@ func loadApps(specs []*APISpec, muxer *mux.Router) {
}).Info("Initialised API Definitions")

if config.Global.SlaveOptions.UseRPC {
//log.Warning("TODO: PUT THE KEEPALIVE WATCHER BACK")
startRPCKeepaliveWatcher(rpcAuthStore)
startRPCKeepaliveWatcher(rpcOrgStore)
}
}
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type AnalyticsConfigConfig struct {
GeoIPDBLocation string `json:"geo_ip_db_path"`
NormaliseUrls NormalisedURLConfig `json:"normalise_urls"`
PoolSize int `json:"pool_size"`
StorageExpirationTime int `json:"storage_expiration_time"`
ignoredIPsCompiled map[string]bool
}

Expand Down
6 changes: 6 additions & 0 deletions ldap_auth_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,16 @@ func (l *LDAPStorageHandler) GetRawKey(filter string) (string, error) {
return "", nil
}

func (l *LDAPStorageHandler) SetExp(cn string, exp int64) error {
log.Warning("Not implementated")
return nil
}

func (l *LDAPStorageHandler) GetExp(cn string) (int64, error) {
log.Warning("Not implementated")
return 0, nil
}

func (l *LDAPStorageHandler) GetKeys(filter string) []string {
log.Warning("Not implementated")
s := []string{}
Expand Down
3 changes: 3 additions & 0 deletions lint/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ const confSchema = `{
"pool_size": {
"type": "integer"
},
"storage_expiration_time": {
"type": "integer"
},
"type": {
"type": "string"
}
Expand Down
29 changes: 17 additions & 12 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,15 @@ func setupGlobals() {
analytics.Store = &analyticsStore
analytics.Init()

redisPurger := RedisPurger{Store: &analyticsStore}
go redisPurger.PurgeLoop(1 * time.Second)

if config.Global.AnalyticsConfig.Type == "rpc" {
log.Debug("Using RPC cache purge")

purger := RPCPurger{Store: &analyticsStore}
purger.Connect()
analytics.Clean = &purger
go analytics.Clean.PurgeLoop(10 * time.Second)
go purger.PurgeLoop(10 * time.Second)
}
}

Expand Down Expand Up @@ -274,7 +276,7 @@ func syncAPISpecs() int {
return len(apiSpecs)
}

func syncPolicies() {
func syncPolicies() int {
var pols map[string]user.Policy

log.WithFields(logrus.Fields{
Expand Down Expand Up @@ -308,7 +310,7 @@ func syncPolicies() {
log.WithFields(logrus.Fields{
"prefix": "main",
}).Debug("No policy record name defined, skipping...")
return
return 0
}
pols = LoadPoliciesFromFile(config.Global.Policies.PolicyRecordName)
}
Expand All @@ -321,11 +323,13 @@ func syncPolicies() {
}).Infof(" - %s", id)
}

policiesMu.Lock()
defer policiesMu.Unlock()
if len(pols) > 0 {
policiesMu.Lock()
policiesByID = pols
policiesMu.Unlock()
}

return len(pols)
}

// stripSlashes removes any trailing slashes from the request's URL
Expand Down Expand Up @@ -676,9 +680,9 @@ func doReload() {

mainRouter = newRouter

// Unset these
rpcEmergencyModeLoaded = false
rpcEmergencyMode = false
// // Unset these
// rpcEmergencyModeLoaded = false
// rpcEmergencyMode = false
}

// startReloadChan and reloadDoneChan are used by the two reload loops
Expand Down Expand Up @@ -1275,6 +1279,8 @@ func handleDashboardRegistration() {
go DashService.StartBeating()
}

var drlOnce sync.Once

func startDRL() {
switch {
case config.Global.ManagementNode:
Expand Down Expand Up @@ -1311,6 +1317,8 @@ func listen(l, controlListener net.Listener, err error) {
writeTimeout = config.Global.HttpServerOptions.WriteTimeout
}

drlOnce.Do(startDRL)

// Handle reload when SIGUSR2 is received
if err != nil {
// Listen on a TCP or a UNIX domain socket (TCP here).
Expand All @@ -1321,8 +1329,6 @@ func listen(l, controlListener net.Listener, err error) {
// handle dashboard registration and nonces if available
handleDashboardRegistration()

startDRL()

if !rpcEmergencyMode {
count := syncAPISpecs()
if count > 0 {
Expand Down Expand Up @@ -1396,7 +1402,6 @@ func listen(l, controlListener net.Listener, err error) {
os.Setenv("TYK_SERVICE_NONCE", "")
os.Setenv("TYK_SERVICE_NODEID", "")
}
startDRL()

// Resume accepting connections in a new goroutine.
if !rpcEmergencyMode {
Expand Down
38 changes: 27 additions & 11 deletions policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,29 +139,45 @@ func LoadPoliciesFromDashboard(endpoint, secret string, allowExplicit bool) map[
return policies
}

func LoadPoliciesFromRPC(orgId string) map[string]user.Policy {
func parsePoliciesFromRPC(list string) (map[string]user.Policy, error) {
var dbPolicyList []user.Policy

if err := json.Unmarshal([]byte(list), &dbPolicyList); err != nil {
return nil, err
}

policies := make(map[string]user.Policy, len(dbPolicyList))

for _, p := range dbPolicyList {
p.ID = p.MID.Hex()
policies[p.MID.Hex()] = p
}

return policies, nil
}

func LoadPoliciesFromRPC(orgId string) map[string]user.Policy {
if rpcEmergencyMode {
return LoadPoliciesFromRPCBackup()
}

store := &RPCStorageHandler{UserKey: config.Global.SlaveOptions.APIKey, Address: config.Global.SlaveOptions.ConnectionString}
store.Connect()
if !store.Connect() {
return nil
}

rpcPolicies := store.GetPolicies(orgId)

//store.Disconnect()
policies, err := parsePoliciesFromRPC(rpcPolicies)

if err := json.Unmarshal([]byte(rpcPolicies), &dbPolicyList); err != nil {
if err != nil {
log.WithFields(logrus.Fields{
"prefix": "policy",
}).Error("Failed decode: ", err)
}).Error("Failed decode: ", err, rpcPolicies)
return nil
}

policies := make(map[string]user.Policy, len(dbPolicyList))

for _, p := range dbPolicyList {
p.ID = p.MID.Hex()
policies[p.MID.Hex()] = p
}
saveRPCPoliciesBackup(rpcPolicies)

return policies
}
Loading

0 comments on commit 2ffc8ca

Please sign in to comment.