Skip to content

Commit

Permalink
Updated to use fabio.properties ConcurrentConsulRequests
Browse files Browse the repository at this point in the history
  • Loading branch information
galen0624 committed Oct 16, 2018
1 parent 3235cb9 commit c0fe13b
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 16 deletions.
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,5 @@ type Consul struct {
CheckTLSSkipVerify bool
CheckDeregisterCriticalServiceAfter string
ChecksRequired string
ConcurrentConsulRequests int
}
1 change: 1 addition & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ var defaultConfig = &Config{
CheckScheme: "http",
CheckDeregisterCriticalServiceAfter: "90m",
ChecksRequired: "one",
ConcurrentConsulRequests: 10,
},
Timeout: 10 * time.Second,
Retry: 500 * time.Millisecond,
Expand Down
1 change: 1 addition & 0 deletions config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func load(cmdline, environ, envprefix []string, props *properties.Properties) (c
f.StringVar(&cfg.Registry.Consul.KVPath, "registry.consul.kvpath", defaultConfig.Registry.Consul.KVPath, "consul KV path for manual overrides")
f.StringVar(&cfg.Registry.Consul.NoRouteHTMLPath, "registry.consul.noroutehtmlpath", defaultConfig.Registry.Consul.NoRouteHTMLPath, "consul KV path for HTML returned when no route is found")
f.StringVar(&cfg.Registry.Consul.TagPrefix, "registry.consul.tagprefix", defaultConfig.Registry.Consul.TagPrefix, "prefix for consul tags")
f.IntVar(&cfg.Registry.Consul.ConcurrentConsulRequests, "registry.consul.concurrentrequests", defaultConfig.Registry.Consul.ConcurrentConsulRequests, "number of concurrent requests querying consul")
f.BoolVar(&cfg.Registry.Consul.Register, "registry.consul.register.enabled", defaultConfig.Registry.Consul.Register, "register fabio in consul")
f.StringVar(&cfg.Registry.Consul.ServiceAddr, "registry.consul.register.addr", defaultConfig.Registry.Consul.ServiceAddr, "service registration address")
f.StringVar(&cfg.Registry.Consul.ServiceName, "registry.consul.register.name", defaultConfig.Registry.Consul.ServiceName, "service registration name")
Expand Down
9 changes: 9 additions & 0 deletions fabio.properties
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,15 @@
#
# registry.consul.checksRequired = one

# registry.consul.concurrentrequests sets the number of concurrent requests used
# to query consul.
#
# values must be an integer >= 1
#
# The default is
#
# registry.consul.concurrentrequests = 10


# glob.matching.disabled disables glob matching on route lookups
# If glob matching is enabled there is a performance decrease
Expand Down
7 changes: 4 additions & 3 deletions registry/consul/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ type be struct {

func NewBackend(cfg *config.Consul) (registry.Backend, error) {
// create a reusable client
var httpTrans = &http.Transport{MaxIdleConnsPerHost:1000, MaxIdleConns:1000}
var httpClient = &http.Client{Transport:httpTrans}
// Updated default Idle Connections setting to avoid TIME_WAIT issues
var httpTrans = &http.Transport{MaxIdleConnsPerHost: cfg.ConcurrentConsulRequests *2, MaxIdleConns: cfg.ConcurrentConsulRequests *2}
var httpClient = &http.Client{Transport: httpTrans}

c, err := api.NewClient(&api.Config{Address: cfg.Addr, Scheme: cfg.Scheme, Token: cfg.Token, HttpClient:httpClient})
c, err := api.NewClient(&api.Config{Address: cfg.Addr, Scheme: cfg.Scheme, Token: cfg.Token, HttpClient: httpClient})

if err != nil {
return nil, err
Expand Down
24 changes: 11 additions & 13 deletions registry/consul/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@ import (

// Channel used to pass data to serviceConfig when using Go Routines
type ServiceChannel struct {
Client *api.Client
Name string
Passing map[string]bool
Client *api.Client
Name string
Passing map[string]bool
TagPrefix string
}


// watchServices monitors the consul health checks and creates a new configuration
// on every change.
func watchServices(client *api.Client, config *config.Consul, svcConfig chan string) {
Expand All @@ -40,15 +39,14 @@ func watchServices(client *api.Client, config *config.Consul, svcConfig chan str
}

log.Printf("[DEBUG] consul: Health changed to #%d", meta.LastIndex)
svcConfig <- servicesConfig(client, passingServices(checks, config.ServiceStatus, strict), config.TagPrefix)
svcConfig <- servicesConfig(client, passingServices(checks, config.ServiceStatus, strict), config.TagPrefix, config.ConcurrentConsulRequests)
lastIndex = meta.LastIndex
}
}


// servicesConfig determines which service instances have passing health checks
// and then finds the ones which have tags with the right prefix to build the config from.
func servicesConfig(client *api.Client, checks []*api.HealthCheck, tagPrefix string) string {
func servicesConfig(client *api.Client, checks []*api.HealthCheck, tagPrefix string, concurrentRequests int) string {
// map service name to list of service passing for which the health check is ok
m := map[string]map[string]bool{}
for _, check := range checks {
Expand All @@ -64,21 +62,21 @@ func servicesConfig(client *api.Client, checks []*api.HealthCheck, tagPrefix str
}

//Create Buffered Channel
serviceChan := make(chan ServiceChannel, 100)
serviceChan := make(chan ServiceChannel, concurrentRequests)

//Create Wait Group
var wg sync.WaitGroup
//config is where the update strings are stored
var config []string

//Spin up 100 Go Routines for getting service info from Consul
for i := 1; i <= 100; i++ {
//Spin up Go Routines for getting service info from Consul
for i := 1; i <= concurrentRequests; i++ {
wg.Add(1)
go serviceConfig(serviceChan, &wg, &config)
}
//Call serviceConfig Go Routines for every service
for name, passing := range m {
serviceChan <- ServiceChannel {Client:client, Name:name, Passing:passing, TagPrefix:tagPrefix}
serviceChan <- ServiceChannel{Client: client, Name: name, Passing: passing, TagPrefix: tagPrefix}
}

close(serviceChan)
Expand All @@ -94,7 +92,7 @@ func servicesConfig(client *api.Client, checks []*api.HealthCheck, tagPrefix str
func serviceConfig(ch chan ServiceChannel, wg *sync.WaitGroup, config *[]string) {

defer wg.Done()
for service := range ch{
for service := range ch {
if service.Name == "" || len(service.Passing) == 0 {
return
}
Expand Down Expand Up @@ -193,4 +191,4 @@ func serviceConfig(ch chan ServiceChannel, wg *sync.WaitGroup, config *[]string)
}
}
}
}
}

0 comments on commit c0fe13b

Please sign in to comment.