Skip to content

Commit

Permalink
Merge 8156e63 into 60fdced
Browse files Browse the repository at this point in the history
  • Loading branch information
buger committed Nov 5, 2018
2 parents 60fdced + 8156e63 commit 5c1e522
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 54 deletions.
15 changes: 8 additions & 7 deletions api_definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,14 +342,14 @@ func (a APIDefinitionLoader) FromDashboardService(endpoint, secret string) ([]*A
}

// FromCloud will connect and download ApiDefintions from a Mongo DB instance.
func (a APIDefinitionLoader) FromRPC(orgId string) []*APISpec {
func (a APIDefinitionLoader) FromRPC(orgId string) ([]*APISpec, error) {
if rpc.IsEmergencyMode() {
return LoadDefinitionsFromRPCBackup()
}

store := RPCStorageHandler{}
if !store.Connect() {
return nil
return nil, errors.New("Can't connect RPC layer")
}

// enable segments
Expand All @@ -364,18 +364,19 @@ func (a APIDefinitionLoader) FromRPC(orgId string) []*APISpec {
//store.Disconnect()

if rpc.LoadCount() > 0 {
saveRPCDefinitionsBackup(apiCollection)
if err := saveRPCDefinitionsBackup(apiCollection); err != nil {
return nil, err
}
}

return a.processRPCDefinitions(apiCollection)
}

func (a APIDefinitionLoader) processRPCDefinitions(apiCollection string) []*APISpec {
func (a APIDefinitionLoader) processRPCDefinitions(apiCollection string) ([]*APISpec, error) {

var apiDefs []*apidef.APIDefinition
if err := json.Unmarshal([]byte(apiCollection), &apiDefs); err != nil {
log.Error("Failed decode: ", err)
return nil
return nil, err
}

var specs []*APISpec
Expand All @@ -396,7 +397,7 @@ func (a APIDefinitionLoader) processRPCDefinitions(apiCollection string) []*APIS
specs = append(specs, spec)
}

return specs
return specs, nil
}

func (a APIDefinitionLoader) ParseDefinition(r io.Reader) *apidef.APIDefinition {
Expand Down
41 changes: 26 additions & 15 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func buildConnStr(resource string) string {
return config.Global().DBAppConfOptions.ConnectionString + resource
}

func syncAPISpecs() int {
func syncAPISpecs() (int, error) {
loader := APIDefinitionLoader{}

apisMu.Lock()
Expand All @@ -241,7 +241,7 @@ func syncAPISpecs() int {
tmpSpecs, err := loader.FromDashboardService(connStr, config.Global().NodeSecret)
if err != nil {
log.Error("failed to load API specs: ", err)
return 0
return 0, err
}

apiSpecs = tmpSpecs
Expand All @@ -250,7 +250,11 @@ func syncAPISpecs() int {
} else if config.Global().SlaveOptions.UseRPC {
mainLog.Debug("Using RPC Configuration")

apiSpecs = loader.FromRPC(config.Global().SlaveOptions.RPCKey)
var err error
apiSpecs, err = loader.FromRPC(config.Global().SlaveOptions.RPCKey)
if err != nil {
return 0, err
}
} else {
apiSpecs = loader.FromDir(config.Global().AppPath)
}
Expand All @@ -269,10 +273,10 @@ func syncAPISpecs() int {
}
}

return len(apiSpecs)
return len(apiSpecs), nil
}

func syncPolicies() int {
func syncPolicies() (count int, err error) {
var pols map[string]user.Policy

mainLog.Info("Loading policies")
Expand All @@ -288,15 +292,14 @@ func syncPolicies() int {
mainLog.Info("Using Policies from Dashboard Service")

pols = LoadPoliciesFromDashboard(connStr, config.Global().NodeSecret, config.Global().Policies.AllowExplicitPolicyID)

case "rpc":
mainLog.Debug("Using Policies from RPC")
pols = LoadPoliciesFromRPC(config.Global().SlaveOptions.RPCKey)
pols, err = LoadPoliciesFromRPC(config.Global().SlaveOptions.RPCKey)
default:
// this is the only case now where we need a policy record name
if config.Global().Policies.PolicyRecordName == "" {
mainLog.Debug("No policy record name defined, skipping...")
return 0
return 0, nil
}
pols = LoadPoliciesFromFile(config.Global().Policies.PolicyRecordName)
}
Expand All @@ -311,7 +314,7 @@ func syncPolicies() int {
policiesByID = pols
}

return len(pols)
return len(pols), err
}

// stripSlashes removes any trailing slashes from the request's URL
Expand Down Expand Up @@ -605,14 +608,22 @@ func doReload() {
}

// Load the API Policies
syncPolicies()
if _, err := syncPolicies(); err != nil {
mainLog.Error("Error during syncing policies:", err.Error())
return
}

// load the specs
count := syncAPISpecs()
// skip re-loading only if dashboard service reported 0 APIs
// and current registry had 0 APIs
if count == 0 && apisByIDLen() == 0 {
mainLog.Warning("No API Definitions found, not reloading")
if count, err := syncAPISpecs(); err != nil {
mainLog.Error("Error during syncing apis:", err.Error())
return
} else {
// skip re-loading only if dashboard service reported 0 APIs
// and current registry had 0 APIs
if count == 0 && apisByIDLen() == 0 {
mainLog.Warning("No API Definitions found, not reloading")
return
}
}

// We have updated specs, lets load those...
Expand Down
2 changes: 1 addition & 1 deletion mw_jwt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func prepareGenericJWTSession(testName string, method string, claimName string,
var sessionFunc JwtCreator
switch method {
default:
log.Warningf("Signing method '%s' is not recognised, defaulting to HMAC signature")
log.Warningf("Signing method '%s' is not recognised, defaulting to HMAC signature", method)
method = HMACSign
fallthrough
case HMACSign:
Expand Down
13 changes: 8 additions & 5 deletions policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"encoding/json"
"errors"
"io/ioutil"
"net/http"
"os"
Expand Down Expand Up @@ -157,14 +158,14 @@ func parsePoliciesFromRPC(list string) (map[string]user.Policy, error) {
return policies, nil
}

func LoadPoliciesFromRPC(orgId string) map[string]user.Policy {
func LoadPoliciesFromRPC(orgId string) (map[string]user.Policy, error) {
if rpc.IsEmergencyMode() {
return LoadPoliciesFromRPCBackup()
}

store := &RPCStorageHandler{}
if !store.Connect() {
return nil
return nil, errors.New("Policies backup: Failed connecting to database")
}

rpcPolicies := store.GetPolicies(orgId)
Expand All @@ -175,10 +176,12 @@ func LoadPoliciesFromRPC(orgId string) map[string]user.Policy {
log.WithFields(logrus.Fields{
"prefix": "policy",
}).Error("Failed decode: ", err, rpcPolicies)
return nil
return nil, err
}

saveRPCPoliciesBackup(rpcPolicies)
if err := saveRPCPoliciesBackup(rpcPolicies); err != nil {
return nil, err
}

return policies
return policies, nil
}
48 changes: 28 additions & 20 deletions rpc_backup_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"crypto/cipher"
"crypto/rand"
"encoding/base64"
"encoding/json"
"errors"
"io"
"strings"

Expand All @@ -28,7 +30,7 @@ func getTagListAsString() string {
return tagList
}

func LoadDefinitionsFromRPCBackup() []*APISpec {
func LoadDefinitionsFromRPCBackup() ([]*APISpec, error) {
tagList := getTagListAsString()
checkKey := BackupApiKeyBase + tagList

Expand All @@ -37,24 +39,26 @@ func LoadDefinitionsFromRPCBackup() []*APISpec {
log.Info("[RPC] --> Loading API definitions from backup")

if !connected {
log.Error("[RPC] --> RPC Backup recovery failed: redis connection failed")
return nil
return nil, errors.New("[RPC] --> RPC Backup recovery failed: redis connection failed")
}

secret := rightPad2Len(config.Global().Secret, "=", 32)
cryptoText, err := store.GetKey(checkKey)
apiListAsString := decrypt([]byte(secret), cryptoText)

if err != nil {
log.Error("[RPC] --> Failed to get node backup (", checkKey, "): ", err)
return nil
return nil, errors.New("[RPC] --> Failed to get node backup (" + checkKey + "): " + err.Error())
}

a := APIDefinitionLoader{}
return a.processRPCDefinitions(apiListAsString)
}

func saveRPCDefinitionsBackup(list string) {
func saveRPCDefinitionsBackup(list string) error {
if !json.Valid([]byte(list)) {
return errors.New("--> RPC Backup save failure: wrong format, skipping.")
}

log.Info("Storing RPC Definitions backup")
tagList := getTagListAsString()

Expand All @@ -66,19 +70,20 @@ func saveRPCDefinitionsBackup(list string) {
log.Info("--> Connected to DB")

if !connected {
log.Error("--> RPC Backup save failed: redis connection failed")
return
return errors.New("--> RPC Backup save failed: redis connection failed")
}

secret := rightPad2Len(config.Global().Secret, "=", 32)
cryptoText := encrypt([]byte(secret), list)
err := store.SetKey(BackupApiKeyBase+tagList, cryptoText, -1)
if err != nil {
log.Error("Failed to store node backup: ", err)
return errors.New("Failed to store node backup: " + err.Error())
}

return nil
}

func LoadPoliciesFromRPCBackup() map[string]user.Policy {
func LoadPoliciesFromRPCBackup() (map[string]user.Policy, error) {
tagList := getTagListAsString()
checkKey := BackupPolicyKeyBase + tagList

Expand All @@ -88,30 +93,32 @@ func LoadPoliciesFromRPCBackup() map[string]user.Policy {
log.Info("[RPC] Loading Policies from backup")

if !connected {
log.Error("[RPC] --> RPC Policy Backup recovery failed: redis connection failed")
return nil
return nil, errors.New("[RPC] --> RPC Policy Backup recovery failed: redis connection failed")
}

secret := rightPad2Len(config.Global().Secret, "=", 32)
cryptoText, err := store.GetKey(checkKey)
listAsString := decrypt([]byte(secret), cryptoText)

if err != nil {
log.Error("[RPC] --> Failed to get node policy backup (", checkKey, "): ", err)
return nil
return nil, errors.New("[RPC] --> Failed to get node policy backup (" + checkKey + "): " + err.Error())
}

if policies, err := parsePoliciesFromRPC(listAsString); err != nil {
log.WithFields(logrus.Fields{
"prefix": "policy",
}).Error("Failed decode: ", err)
return nil
return nil, err
} else {
return policies
return policies, nil
}
}

func saveRPCPoliciesBackup(list string) {
func saveRPCPoliciesBackup(list string) error {
if !json.Valid([]byte(list)) {
return errors.New("--> RPC Backup save failure: wrong format, skipping.")
}

log.Info("Storing RPC policies backup")
tagList := getTagListAsString()

Expand All @@ -123,16 +130,17 @@ func saveRPCPoliciesBackup(list string) {
log.Info("--> Connected to DB")

if !connected {
log.Error("--> RPC Backup save failed: redis connection failed")
return
return errors.New("--> RPC Backup save failed: redis connection failed")
}

secret := rightPad2Len(config.Global().Secret, "=", 32)
cryptoText := encrypt([]byte(secret), list)
err := store.SetKey(BackupPolicyKeyBase+tagList, cryptoText, -1)
if err != nil {
log.Error("Failed to store node backup: ", err)
return errors.New("Failed to store node backup: " + err.Error())
}

return nil
}

// encrypt string to base64 crypto using AES
Expand Down
12 changes: 6 additions & 6 deletions rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestSyncAPISpecsRPCFailure(t *testing.T) {
rpc := startRPCMock(dispatcher)
defer stopRPCMock(rpc)

count := syncAPISpecs()
count, _ := syncAPISpecs()
if count != 0 {
t.Error("Should return empty value for malformed rpc response", apiSpecs)
}
Expand Down Expand Up @@ -103,12 +103,12 @@ func TestSyncAPISpecsRPCSuccess(t *testing.T) {
ts := newTykTestServer()
defer ts.Close()

apiBackup := LoadDefinitionsFromRPCBackup()
apiBackup, _ := LoadDefinitionsFromRPCBackup()
if len(apiBackup) != 1 {
t.Fatal("Should have APIs in backup")
}

policyBackup := LoadPoliciesFromRPCBackup()
policyBackup, _ := LoadPoliciesFromRPCBackup()
if len(policyBackup) != 1 {
t.Fatal("Should have Policies in backup")
}
Expand All @@ -118,7 +118,7 @@ func TestSyncAPISpecsRPCSuccess(t *testing.T) {
{Path: "/sample", Headers: authHeaders, Code: 200},
}...)

count := syncAPISpecs()
count, _ := syncAPISpecs()
if count != 1 {
t.Error("Should return array with one spec", apiSpecs)
}
Expand Down Expand Up @@ -191,11 +191,11 @@ func TestSyncAPISpecsRPCSuccess(t *testing.T) {
{Path: "/sample", Headers: notCachedAuth, Code: 200},
}...)

if count := syncAPISpecs(); count != 2 {
if count, _ := syncAPISpecs(); count != 2 {
t.Error("Should fetch latest specs", count)
}

if count := syncPolicies(); count != 2 {
if count, _ := syncPolicies(); count != 2 {
t.Error("Should fetch latest policies", count)
}
})
Expand Down

0 comments on commit 5c1e522

Please sign in to comment.