Skip to content

Commit

Permalink
Getting Consul services with retries
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Krawczyk committed Feb 5, 2016
1 parent 732c7f0 commit fd3b9d9
Show file tree
Hide file tree
Showing 12 changed files with 329 additions and 125 deletions.
15 changes: 0 additions & 15 deletions apps/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package apps

import (
"encoding/json"
"net"
"fmt"
)

type Task struct {
Expand Down Expand Up @@ -54,16 +52,3 @@ func (t *Task) IsHealthy() bool {
}
return register
}

func (t *Task) HostToIPv4() (net.IP, error) {
IPs, err := net.LookupIP(t.Host)
if err != nil {
return nil, err
}
for _, IP := range IPs {
if IP.To4() != nil {
return IP, nil
}
}
return nil, fmt.Errorf("Could not resolve host to IPv4")
}
70 changes: 0 additions & 70 deletions apps/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,73 +107,3 @@ func TestId_String(t *testing.T) {
t.Parallel()
assert.Equal(t, "id", TaskId("id").String())
}

func TestHostToIPv4_NoHost(t *testing.T) {
t.Parallel()

// given
task := &Task{}

// when
ip, err := task.HostToIPv4()

// then
assert.Nil(t, ip)
assert.Error(t, err)
}

func TestHostToIPv4_ProvidedIPv4(t *testing.T) {
t.Parallel()

// given
task := &Task{Host: "127.1.1.12"}

// when
ip, err := task.HostToIPv4()

// then
assert.Equal(t, "127.1.1.12", ip.String())
assert.NoError(t, err)
}

func TestHostToIPv4_ProvidedIPv6(t *testing.T) {
t.Parallel()

// given
task := &Task{Host: "2001:cdba:0000:0000:0000:0000:3257:9652"}

// when
ip, err := task.HostToIPv4()

// then
assert.Nil(t, ip)
assert.Error(t, err)
}

func TestHostToIPv4_IsResolvable(t *testing.T) {
t.Parallel()

// given
task := &Task{Host: "localhost"}

// when
ip, err := task.HostToIPv4()

// then
assert.Equal(t, "127.0.0.1", ip.String())
assert.NoError(t, err)
}

func TestHostToIPv4_NotResolvable(t *testing.T) {
t.Parallel()

// given
task := &Task{Host: "no.such.localhost"}

// when
ip, err := task.HostToIPv4()

// then
assert.Nil(t, ip)
assert.Error(t, err)
}
66 changes: 49 additions & 17 deletions consul/agents.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@ import (
"crypto/tls"
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/allegro/marathon-consul/metrics"
"github.com/allegro/marathon-consul/utils"
consulapi "github.com/hashicorp/consul/api"
"math/rand"
"net/http"
"sync"
)

type Agents interface {
GetAgent(string) (*consulapi.Client, error)
GetAnyAgent() (*consulapi.Client, error)
GetAgent(agentAddress string) (agent *consulapi.Client, err error)
GetAnyAgent() (agent *consulapi.Client, agentAddress string, err error)
RemoveAgent(agentAddress string)
}

type ConcurrentAgents struct {
Expand All @@ -28,14 +31,15 @@ func NewAgents(config *ConsulConfig) *ConcurrentAgents {
}
}

func (a *ConcurrentAgents) GetAnyAgent() (*consulapi.Client, error) {
func (a *ConcurrentAgents) GetAnyAgent() (*consulapi.Client, string, error) {
a.lock.Lock()
defer a.lock.Unlock()

if len(a.agents) > 0 {
return a.agents[a.getRandomAgentHost()], nil
host := a.getRandomAgentHost()
return a.agents[host], host, nil
}
return nil, fmt.Errorf("No Consul client available in agents cache")
return nil, "", fmt.Errorf("No Consul client available in agents cache")
}

func (a *ConcurrentAgents) getRandomAgentHost() string {
Expand All @@ -47,18 +51,45 @@ func (a *ConcurrentAgents) getRandomAgentHost() string {
return hosts[idx]
}

func (a *ConcurrentAgents) GetAgent(agentHost string) (*consulapi.Client, error) {
func (a *ConcurrentAgents) RemoveAgent(agentAddress string) {
a.lock.Lock()
defer a.lock.Unlock()
if agent, ok := a.agents[agentHost]; ok {

if IP, err := utils.HostToIPv4(agentAddress); err != nil {
log.WithError(err).Error("Could not remove agent from cache")
} else {
agentAddress = IP.String()
if _, ok := a.agents[agentAddress]; ok {
log.WithField("Address", agentAddress).Info("Removing agent from cache")
delete(a.agents, agentAddress)
a.updateAgentsCacheSizeMetricValue()
} else {
log.WithField("Address", agentAddress).Info("Agent not found in cache")
}
}
}

func (a *ConcurrentAgents) GetAgent(agentAddress string) (*consulapi.Client, error) {
a.lock.Lock()
defer a.lock.Unlock()

IP, err := utils.HostToIPv4(agentAddress)
if err != nil {
return nil, err
}
agentAddress = IP.String()

if agent, ok := a.agents[agentAddress]; ok {
return agent, nil
}

newAgent, err := a.createAgent(agentHost)
newAgent, err := a.createAgent(agentAddress)
if err != nil {
return nil, err
}
a.addAgent(agentHost, newAgent)
a.addAgent(agentAddress, newAgent)

a.updateAgentsCacheSizeMetricValue()
return newAgent, nil
}

Expand All @@ -67,9 +98,6 @@ func (a *ConcurrentAgents) addAgent(agentHost string, agent *consulapi.Client) {
}

func (a *ConcurrentAgents) createAgent(host string) (*consulapi.Client, error) {
if host == "" {
return nil, fmt.Errorf("Invalid agent address for Consul client")
}
config := consulapi.DefaultConfig()

config.Address = fmt.Sprintf("%s:%s", host, a.config.Port)
Expand Down Expand Up @@ -99,13 +127,17 @@ func (a *ConcurrentAgents) createAgent(host string) (*consulapi.Client, error) {
}

log.WithFields(log.Fields{
"Address": config.Address,
"Scheme": config.Scheme,
"Timeout": config.HttpClient.Timeout,
"BasicAuthEnabled": a.config.Auth.Enabled,
"TokenEnabled": a.config.Token != "",
"Address": config.Address,
"Scheme": config.Scheme,
"Timeout": config.HttpClient.Timeout,
"BasicAuthEnabled": a.config.Auth.Enabled,
"TokenEnabled": a.config.Token != "",
"SslVerificationEnabled": a.config.SslVerify,
}).Debug("Creating Consul client")

return consulapi.NewClient(config)
}

func (a *ConcurrentAgents) updateAgentsCacheSizeMetricValue() {
metrics.UpdateGauge("consul.agents.cache.size", int64(len(a.agents)))
}
60 changes: 54 additions & 6 deletions consul/agents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,36 +12,84 @@ func TestGetAgent(t *testing.T) {
agents := NewAgents(&ConsulConfig{})

// when
agent, _ := agents.GetAgent("http://127.0.0.1")
agent, err := agents.GetAgent("127.0.0.1")

// then
assert.NotNil(t, agent)
assert.NoError(t, err)
}

func TestGetAnyAgent_SingleAgentAvailable(t *testing.T) {
t.Parallel()
// given
agents := NewAgents(&ConsulConfig{})

// when
agents.GetAgent("127.0.0.1") // create
agent, address, err := agents.GetAnyAgent()

// then
assert.NotNil(t, agent)
assert.Equal(t, "127.0.0.1", address)
assert.NoError(t, err)
}

func TestGetAnyAgent(t *testing.T) {
t.Parallel()
// given
agents := NewAgents(&ConsulConfig{})
agent1, _ := agents.GetAgent("http://127.0.0.1")
agent2, _ := agents.GetAgent("http://127.0.0.2")
agent3, _ := agents.GetAgent("http://127.0.0.3")
agent1, _ := agents.GetAgent("127.0.0.1")
agent2, _ := agents.GetAgent("127.0.0.2")
agent3, _ := agents.GetAgent("127.0.0.3")

// when
anyAgent, _ := agents.GetAnyAgent()
anyAgent, _, _ := agents.GetAnyAgent()

// then
assert.Contains(t, []*consulapi.Client{agent1, agent2, agent3}, anyAgent)
}

func TestGetAgent_ShouldResolveAddressToIP(t *testing.T) {
t.Parallel()
// given
agents := NewAgents(&ConsulConfig{})

// when
agent1, _ := agents.GetAgent("127.0.0.1")
agent2, _ := agents.GetAgent("localhost")

// then
assert.Equal(t, agent1, agent2)
}

func TestGetAnyAgent_shouldFailOnNoAgentAvailable(t *testing.T) {
t.Parallel()
// given
agents := NewAgents(&ConsulConfig{})

// when
anyAgent, err := agents.GetAnyAgent()
anyAgent, _, err := agents.GetAnyAgent()

// then
assert.Nil(t, anyAgent)
assert.NotNil(t, err)
}

func TestRemoveAgent(t *testing.T) {
t.Parallel()
// given
agents := NewAgents(&ConsulConfig{})
agents.GetAgent("127.0.0.1")
agent2, _ := agents.GetAgent("127.0.0.2")

// when
agents.RemoveAgent("127.0.0.1")

// then
for i := 0; i < 10; i++ {
agent, address, err := agents.GetAnyAgent()
assert.Equal(t, agent, agent2)
assert.Equal(t, "127.0.0.2", address)
assert.NoError(t, err)
}
}
38 changes: 30 additions & 8 deletions consul/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/allegro/marathon-consul/apps"
"github.com/allegro/marathon-consul/metrics"
"github.com/allegro/marathon-consul/utils"
consulapi "github.com/hashicorp/consul/api"
"net/url"
)
Expand All @@ -22,6 +23,8 @@ type Consul struct {
config ConsulConfig
}

type ServicesProvider func(*consulapi.Client) ([]*consulapi.CatalogService, error)

func New(config ConsulConfig) *Consul {
return &Consul{
agents: NewAgents(&config),
Expand All @@ -30,10 +33,29 @@ func New(config ConsulConfig) *Consul {
}

func (c *Consul) GetServices(name string) ([]*consulapi.CatalogService, error) {
agent, err := c.agents.GetAnyAgent()
if err != nil {
return nil, err
return c.getServicesUsingProviderWithRetriesOnAgentFailure(func(agent *consulapi.Client) ([]*consulapi.CatalogService, error) {
return c.getServicesUsingAgent(name, agent)
})
}

func (c *Consul) getServicesUsingProviderWithRetriesOnAgentFailure(provide ServicesProvider) ([]*consulapi.CatalogService, error) {
var services []*consulapi.CatalogService = nil
for {
agent, agentAddress, err := c.agents.GetAnyAgent()
if err != nil {
return nil, err
}
if services, err = provide(agent); err != nil {
log.WithError(err).WithField("Address", agentAddress).Error("An error occurred getting services from Consul, retrying with another agent")
c.agents.RemoveAgent(agentAddress)
} else {
break
}
}
return services, nil
}

func (c *Consul) getServicesUsingAgent(name string, agent *consulapi.Client) ([]*consulapi.CatalogService, error) {
datacenters, err := agent.Catalog().Datacenters()
if err != nil {
return nil, err
Expand All @@ -54,10 +76,10 @@ func (c *Consul) GetServices(name string) ([]*consulapi.CatalogService, error) {
}

func (c *Consul) GetAllServices() ([]*consulapi.CatalogService, error) {
agent, err := c.agents.GetAnyAgent()
if err != nil {
return nil, err
}
return c.getServicesUsingProviderWithRetriesOnAgentFailure(c.getAllServices)
}

func (c *Consul) getAllServices(agent *consulapi.Client) ([]*consulapi.CatalogService, error) {
datacenters, err := agent.Catalog().Datacenters()
if err != nil {
return nil, err
Expand Down Expand Up @@ -163,7 +185,7 @@ func (c *Consul) GetAgent(agentAddress string) (*consulapi.Client, error) {
}

func (c *Consul) marathonTaskToConsulService(task *apps.Task, app *apps.App) (*consulapi.AgentServiceRegistration, error) {
IP, err := task.HostToIPv4()
IP, err := utils.HostToIPv4(task.Host)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit fd3b9d9

Please sign in to comment.