Skip to content

Commit

Permalink
Thread-safe LB and SD in place
Browse files Browse the repository at this point in the history
  • Loading branch information
lonelycode committed Sep 1, 2016
1 parent c0eebd0 commit 59ec10e
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 50 deletions.
3 changes: 3 additions & 0 deletions api_definition_manager.go
Expand Up @@ -124,6 +124,9 @@ type APISpec struct {
JSVM *JSVM
ResponseChain *[]TykResponseHandler
RoundRobin *RoundRobin
LastGoodHostList *tykcommon.HostList
HasRun bool
ServiceRefreshInProgress bool
}

// APIDefinitionLoader will load an Api definition from a storage system. It has two methods LoadDefinitionsFromMongo()
Expand Down
3 changes: 2 additions & 1 deletion host_checker_manager.go
Expand Up @@ -403,7 +403,8 @@ func (hc *HostCheckerManager) GetListFromService(APIID string) ([]HostData, erro

// The returned data is a string, so lets unmarshal it:
checkTargets := make([]tykcommon.HostCheckObject, 0)
decodeErr := json.Unmarshal([]byte(data.(string)), &checkTargets)
thisData, _ := data.GetIndex(0)
decodeErr := json.Unmarshal([]byte(thisData), &checkTargets)

if decodeErr != nil {
log.WithFields(logrus.Fields{
Expand Down
6 changes: 6 additions & 0 deletions main.go
Expand Up @@ -650,6 +650,12 @@ func loadApps(APISpecs *[]*APISpec, Muxer *mux.Router) {
}).Error("Culdn't parse target URL: ", err)
}

// Set up LB targets:
if referenceSpec.Proxy.EnableLoadBalancing {
thisSL := tykcommon.NewHostListFromList(referenceSpec.Proxy.Targets)
referenceSpec.Proxy.StructuredTargetList = *thisSL
}

if !skip {

listenPaths[referenceSpec.Proxy.ListenPath] = append(listenPaths[referenceSpec.Proxy.ListenPath], referenceSpec.Domain)
Expand Down
16 changes: 9 additions & 7 deletions round_robin.go
@@ -1,18 +1,20 @@
package main

import (
"github.com/TykTechnologies/tykcommon"
)

type RoundRobin struct {
pos int
max int
cur int
}

func (r *RoundRobin) SetMax(rp interface{}) {
switch rp.(type) {
case *[]string:
r.max = len(*rp.(*[]string)) - 1
case []string:
r.max = len(rp.([]string)) - 1
}
func (r *RoundRobin) SetMax(rp *tykcommon.HostList) {

// r.max = len(*rp.(*[]string)) - 1
r.max = rp.Len() - 1


if r.max < 0 {
r.max = 0
Expand Down
36 changes: 21 additions & 15 deletions service_discovery.go
Expand Up @@ -109,7 +109,7 @@ func (s *ServiceDiscovery) GetNestedObject(item *gabs.Container) string {
subContainer := gabs.Container{}
switch parentData.(type) {
default:
log.Info("Get Nested Object: parentData is not a string")
log.Debug("Get Nested Object: parentData is not a string")
return ""
case string:
}
Expand All @@ -119,7 +119,7 @@ func (s *ServiceDiscovery) GetNestedObject(item *gabs.Container) string {
hostnameData := s.decodeToNameSpace(s.dataPath, &subContainer)
switch hostnameData.(type) {
default:
log.Info("Get Nested Object: hostname is not a string")
log.Debug("Get Nested Object: hostname is not a string")
return ""
case string:
}
Expand All @@ -133,12 +133,12 @@ func (s *ServiceDiscovery) GetObject(item *gabs.Container) string {
hostnameData := s.decodeToNameSpace(s.dataPath, item)
switch hostnameData.(type) {
default:
log.Info("Get Object: hostname is not a string")
log.Debug("Get Object: hostname is not a string")
return ""
case string:
}
hostname := hostnameData.(string)
log.Info("get object hostname: ", hostname)
log.Debug("get object hostname: ", hostname)
// Get the port
s.GetPortFromObject(&hostname, item)
return hostname
Expand Down Expand Up @@ -181,7 +181,7 @@ func (s *ServiceDiscovery) GetSubObjectFromList(objList *gabs.Container) *[]stri

switch parentData.(type) {
default:
log.Info("parentData is not a string")
log.Debug("parentData is not a string")
return &hostList
case string:
}
Expand All @@ -206,7 +206,7 @@ func (s *ServiceDiscovery) GetSubObjectFromList(objList *gabs.Container) *[]stri
log.Debug("Not a list")
switch parentData.(type) {
default:
log.Info("parentData is not a string")
log.Debug("parentData is not a string")
case string:
s.ParseObject(parentData.(string), &subContainer)
thisSet = s.decodeToNameSpaceAsArray(s.dataPath, objList)
Expand All @@ -227,7 +227,7 @@ func (s *ServiceDiscovery) GetSubObjectFromList(objList *gabs.Container) *[]stri
hostList = append(hostList, hostname)
}
} else {
log.Info("Set is nil")
log.Debug("Set is nil")
}
return &hostList
}
Expand Down Expand Up @@ -256,10 +256,10 @@ func (s *ServiceDiscovery) ParseObject(contents string, jsonParsed *gabs.Contain
return pErr
}

func (s *ServiceDiscovery) ProcessRawData(rawData string) (interface{}, error) {
func (s *ServiceDiscovery) ProcessRawData(rawData string) (*tykcommon.HostList, error) {
var jsonParsed gabs.Container

var hostlist *[]string
hostlist := tykcommon.NewHostList()

if s.endpointReturnsList {
// Convert to an object
Expand All @@ -274,8 +274,9 @@ func (s *ServiceDiscovery) ProcessRawData(rawData string) (interface{}, error) {
// Treat JSON as a list and then apply the data path
if s.isTargetList {
// Get all values
hostlist = s.GetSubObjectFromList(&jsonParsed)
log.Debug("Host list:", hostlist)
asList := s.GetSubObjectFromList(&jsonParsed)
log.Debug("Host list:", asList)
hostlist.Set(*asList)
return hostlist, nil
}

Expand All @@ -287,7 +288,8 @@ func (s *ServiceDiscovery) ProcessRawData(rawData string) (interface{}, error) {
break
}

return host, nil
hostlist.Set([]string{host})
return hostlist, nil
}

// It's an object
Expand All @@ -296,17 +298,21 @@ func (s *ServiceDiscovery) ProcessRawData(rawData string) (interface{}, error) {
// It's a list object
log.Debug("It's a target list - getting sub object from list")
log.Debug("Passing in: ", jsonParsed)
hostlist = s.GetSubObjectFromList(&jsonParsed)

asList := s.GetSubObjectFromList(&jsonParsed)
hostlist.Set(*asList)
log.Debug("Got from object: ", hostlist)
return hostlist, nil
}

// It's a single object
host := s.GetSubObject(&jsonParsed)
return host, nil
hostlist.Set([]string{host})

return hostlist, nil
}

func (s *ServiceDiscovery) GetTarget(serviceURL string) (interface{}, error) {
func (s *ServiceDiscovery) GetTarget(serviceURL string) (*tykcommon.HostList, error) {
// Get the data
rawData, err := s.getServiceData(serviceURL)
if err != nil {
Expand Down
95 changes: 68 additions & 27 deletions tyk_reverse_proxy_clone.go
Expand Up @@ -21,16 +21,56 @@ import (
"strings"
"sync"
"time"
"github.com/TykTechnologies/tykcommon"

)

var ServiceCache *cache.Cache

func GetURLFromService(spec *APISpec) (interface{}, error) {
sd := ServiceDiscovery{}
sd.New(&spec.Proxy.ServiceDiscovery)
data, err := sd.GetTarget(spec.Proxy.ServiceDiscovery.QueryEndpoint)
func GetURLFromService(spec *APISpec) (*tykcommon.HostList, error) {

doCacheRefresh := func () (*tykcommon.HostList, error) {
log.Debug("--> Refreshing")
spec.ServiceRefreshInProgress = true
sd := ServiceDiscovery{}
sd.New(&spec.Proxy.ServiceDiscovery)
data, err := sd.GetTarget(spec.Proxy.ServiceDiscovery.QueryEndpoint)
if err == nil {
// Set the cached value
ServiceCache.Set(spec.APIID, data, cache.DefaultExpiration)
// Stash it too
spec.LastGoodHostList = data
spec.HasRun = true
spec.ServiceRefreshInProgress = false
return data, err
}
spec.ServiceRefreshInProgress = false
return nil, err
}

// First time? Refresh the cache and return that
if !spec.HasRun {
log.Debug("First run! Setting cache")
return doCacheRefresh()
}

return data, err
// Not first run - check the cache
cachedServiceData, found := ServiceCache.Get(spec.APIID)
if !found {
if spec.ServiceRefreshInProgress {
// Are we already refreshing the cache? skip and return last good conf
log.Debug("Cache expired! But service refresh in progress")
return spec.LastGoodHostList, nil
} else {
// Refresh the spec
log.Debug("Cache expired! Refreshing...")
return doCacheRefresh()
}

}

log.Debug("Returning from cache.")
return cachedServiceData.(*tykcommon.HostList), nil
}

func EnsureTransport(host string) string {
Expand All @@ -47,35 +87,32 @@ func EnsureTransport(host string) string {
return host
}

func GetNextTarget(targetData interface{}, spec *APISpec, tryCount int) string {
func GetNextTarget(targetData *tykcommon.HostList, spec *APISpec, tryCount int) string {
if spec.Proxy.EnableLoadBalancing {
log.Debug("[PROXY] [LOAD BALANCING] Load balancer enabled, getting upstream target")
// Use a list
// Use a HostList
spec.RoundRobin.SetMax(targetData)
var td []string

switch targetData.(type) {
case *[]string:
td = *targetData.(*[]string)
case []string:
log.Warning("Raw array found!")
td = targetData.([]string)
}

pos := spec.RoundRobin.GetPos()
if pos > (len(td) - 1) {
if pos > (targetData.Len() - 1) {
// problem
spec.RoundRobin.SetMax(td)
spec.RoundRobin.SetMax(targetData)
pos = 0
}

thisHost := EnsureTransport(td[pos])
gotHost, err := targetData.GetIndex(pos)
if err != nil {
log.Error("[PROXY] [LOAD BALANCING] ", err)
return gotHost
}

thisHost := EnsureTransport(gotHost)

// Check hosts against uptime tests
if spec.Proxy.CheckHostAgainstUptimeTests {
if !GlobalHostChecker.IsHostDown(thisHost) {
// Don't overdo it
if tryCount < len(td) {
if tryCount < targetData.Len() {
// Host is down, skip
return GetNextTarget(targetData, spec, tryCount+1)
}
Expand All @@ -87,7 +124,13 @@ func GetNextTarget(targetData interface{}, spec *APISpec, tryCount int) string {
}
// Use standard target - might still be service data
log.Debug("TARGET DATA:", targetData)
return EnsureTransport(targetData.(string))

gotHost, err := targetData.GetIndex(0)
if err != nil {
log.Error("[PROXY] ", err)
return gotHost
}
return EnsureTransport(gotHost)
}

// TykNewSingleHostReverseProxy returns a new ReverseProxy that rewrites
Expand All @@ -99,7 +142,7 @@ func GetNextTarget(targetData interface{}, spec *APISpec, tryCount int) string {
func TykNewSingleHostReverseProxy(target *url.URL, spec *APISpec) *ReverseProxy {
// initalise round robin
spec.RoundRobin = &RoundRobin{}
spec.RoundRobin.SetMax(&[]string{})
spec.RoundRobin.SetMax(tykcommon.NewHostList())

if spec.Proxy.ServiceDiscovery.UseDiscoveryService {
log.Debug("[PROXY] Service discovery enabled")
Expand All @@ -123,8 +166,7 @@ func TykNewSingleHostReverseProxy(target *url.URL, spec *APISpec) *ReverseProxy
} else {
// No error, replace the target
if spec.Proxy.EnableLoadBalancing {
var targetPtr *[]string = tempTargetURL.(*[]string)
remote, err := url.Parse(GetNextTarget(targetPtr, spec, 0))
remote, err := url.Parse(GetNextTarget(tempTargetURL, spec, 0))
if err != nil {
log.Error("[PROXY] [SERVICE DISCOVERY] Couldn't parse target URL:", err)
} else {
Expand All @@ -133,8 +175,7 @@ func TykNewSingleHostReverseProxy(target *url.URL, spec *APISpec) *ReverseProxy
targetQuery = target.RawQuery
}
} else {
var targetPtr string = tempTargetURL.(string)
remote, err := url.Parse(GetNextTarget(targetPtr, spec, 0))
remote, err := url.Parse(GetNextTarget(tempTargetURL, spec, 0))
if err != nil {
log.Error("[PROXY] [SERVICE DISCOVERY] Couldn't parse target URL:", err)
} else {
Expand All @@ -152,7 +193,7 @@ func TykNewSingleHostReverseProxy(target *url.URL, spec *APISpec) *ReverseProxy
// no override, better check if LB is enabled
if spec.Proxy.EnableLoadBalancing {
// it is, lets get that target data
lbRemote, lbErr := url.Parse(GetNextTarget(&spec.Proxy.TargetList, spec, 0))
lbRemote, lbErr := url.Parse(GetNextTarget(&spec.Proxy.StructuredTargetList, spec, 0))
if lbErr != nil {
log.Error("[PROXY] [LOAD BALANCING] Couldn't parse target URL:", lbErr)
} else {
Expand Down

0 comments on commit 59ec10e

Please sign in to comment.