Skip to content

Commit

Permalink
More efficient deregistering by task
Browse files Browse the repository at this point in the history
  • Loading branch information
pbetkier committed Aug 19, 2016
1 parent 885200d commit 780fb4b
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 48 deletions.
96 changes: 69 additions & 27 deletions consul/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,42 +57,50 @@ func (c *Consul) getServicesUsingProviderWithRetriesOnAgentFailure(provide Servi
}

func (c *Consul) getServicesUsingAgent(name string, agent *consulapi.Client) ([]*service.Service, error) {
datacenters, err := agent.Catalog().Datacenters()
dcAwareQueries, err := dcAwareQueriesForAllDcs(agent)
if err != nil {
return nil, err
}
var allServices []*service.Service

for _, dc := range datacenters {
dcAwareQuery := &consulapi.QueryOptions{
Datacenter: dc,
}
for _, dcAwareQuery := range dcAwareQueries {
allConsulServices, _, err := agent.Catalog().Service(name, c.config.Tag, dcAwareQuery)
if err != nil {
return nil, err
}
for _, consulService := range allConsulServices {
allServices = append(allServices, consulServiceToService(consulService))
}
allServices = append(allServices, consulServicesToServices(allConsulServices)...)
}
return allServices, nil
}

func dcAwareQueriesForAllDcs(agent *consulapi.Client) ([]*consulapi.QueryOptions, error) {
datacenters, err := agent.Catalog().Datacenters()
if err != nil {
return nil, err
}

var queries []*consulapi.QueryOptions
for _, dc := range datacenters {
queries = append(queries, &consulapi.QueryOptions{
Datacenter: dc,
})
}

return queries, nil
}

func (c *Consul) GetAllServices() ([]*service.Service, error) {
return c.getServicesUsingProviderWithRetriesOnAgentFailure(c.getAllServices)
}

func (c *Consul) getAllServices(agent *consulapi.Client) ([]*service.Service, error) {
datacenters, err := agent.Catalog().Datacenters()
dcAwareQueries, err := dcAwareQueriesForAllDcs(agent)
if err != nil {
return nil, err
}
var allInstances []*service.Service

for _, dc := range datacenters {
dcAwareQuery := &consulapi.QueryOptions{
Datacenter: dc,
}
for _, dcAwareQuery := range dcAwareQueries {
consulServices, _, err := agent.Catalog().Services(dcAwareQuery)
if err != nil {
return nil, err
Expand All @@ -103,9 +111,7 @@ func (c *Consul) getAllServices(agent *consulapi.Client) ([]*service.Service, er
if err != nil {
return nil, err
}
for _, consulServiceInstance := range consulServiceInstances {
allInstances = append(allInstances, consulServiceToService(consulServiceInstance))
}
allInstances = append(allInstances, consulServicesToServices(consulServiceInstances)...)
}
}
}
Expand All @@ -121,6 +127,14 @@ func consulServiceToService(consulService *consulapi.CatalogService) *service.Se
}
}

func consulServicesToServices(consulServices []*consulapi.CatalogService) []*service.Service {
var allServices []*service.Service
for _, c := range consulServices {
allServices = append(allServices, consulServiceToService(c))
}
return allServices
}

func contains(slice []string, search string) bool {
for _, element := range slice {
if element == search {
Expand Down Expand Up @@ -169,25 +183,53 @@ func (c *Consul) register(service *consulapi.AgentServiceRegistration) error {
}

func (c *Consul) DeregisterByTask(taskId apps.TaskId, agentAddress string) error {
service, err := c.findServiceByTaskId(taskId)
services, err := c.findServicesByTaskId(taskId)
if err != nil {
return err
} else if len(services) == 0 {
return errors.New(fmt.Sprintf("Couldn't find any service matching task id %s", taskId))
}
return c.Deregister(service)
return c.deregisterMultipleServices(services, taskId)
}

func (c *Consul) findServiceByTaskId(searchedTaskId apps.TaskId) (*service.Service, error) {
services, err := c.GetAllServices()
if err != nil {
return nil, err
}
func (c *Consul) deregisterMultipleServices(services []*service.Service, taskId apps.TaskId) error {
var deregisterErrors []error
for _, s := range services {
taskId, err := s.TaskId()
if err == nil && taskId == searchedTaskId {
return s, nil
deregisterErr := c.Deregister(s)
if deregisterErr != nil {
deregisterErrors = append(deregisterErrors, deregisterErr)
}
}
return nil, errors.New(fmt.Sprintf("Couldn't find service matching task id %s", searchedTaskId))

return utils.MergeErrorsOrNil(deregisterErrors, fmt.Sprintf("deregistering by task %s", taskId))
}

func (c *Consul) findServicesByTaskId(searchedTaskId apps.TaskId) ([]*service.Service, error) {
return c.getServicesUsingProviderWithRetriesOnAgentFailure(func(agent *consulapi.Client) ([]*service.Service, error) {
dcAwareQueries, err := dcAwareQueriesForAllDcs(agent)
if err != nil {
return nil, err
}

var allFound []*service.Service
searchedTag := service.MarathonTaskTag(searchedTaskId)
for _, dcAwareQuery := range dcAwareQueries {
consulServices, _, err := agent.Catalog().Services(dcAwareQuery)
if err != nil {
return nil, err
}
for consulService, tags := range consulServices {
if contains(tags, searchedTag) {
instancesForTask, _, err := agent.Catalog().Service(consulService, searchedTag, dcAwareQuery)
if err != nil {
return nil, err
}
allFound = append(allFound, consulServicesToServices(instancesForTask)...)
}
}
}
return allFound, nil
})
}

func (c *Consul) Deregister(toDeregister *service.Service) error {
Expand Down
53 changes: 53 additions & 0 deletions consul/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,59 @@ func TestDeregisterServicesByTask_shouldReturnErrorOnFailure(t *testing.T) {
assert.Error(t, err)
}

func TestDeregisterServicesByTask_shouldReturnErrorOnServiceMatchingTaskNotFound(t *testing.T) {
t.Parallel()
server := CreateConsulTestServer(t)
defer server.Stop()

consul := ConsulClientAtServer(server)
consul.config.Tag = "marathon"

// given
app := utils.ConsulApp("serviceA", 1)
task := app.Tasks[0]

server.AddService("serviceA", "passing", []string{"marathon", service.MarathonTaskTag(task.ID)})
server.AddService("serviceB", "passing", []string{"marathon", service.MarathonTaskTag(apps.TaskId("other"))})
services, _ := consul.GetAllServices()
assert.Len(t, services, 2)

// when
err := consul.DeregisterByTask("non-existing", "")

// then
assert.Error(t, err)
services, _ = consul.GetAllServices()
assert.Len(t, services, 2)
}

func TestDeregisterServicesByTask_shouldDeregisterAllMatchingServicesWhenMultipleMatchGivenTaskId(t *testing.T) {
t.Parallel()
server := CreateConsulTestServer(t)
defer server.Stop()

consul := ConsulClientAtServer(server)
consul.config.Tag = "marathon"

// given
app := utils.ConsulApp("serviceA", 1)
task := app.Tasks[0]

server.AddService("serviceA", "passing", []string{"marathon", service.MarathonTaskTag(task.ID)})
server.AddService("serviceA-bis", "passing", []string{"marathon", service.MarathonTaskTag(task.ID)})
server.AddService("serviceB", "passing", []string{"marathon", service.MarathonTaskTag(apps.TaskId("other"))})
services, _ := consul.GetAllServices()
assert.Len(t, services, 3)

// when
err := consul.DeregisterByTask(task.ID, "")

// then
assert.NoError(t, err)
services, _ = consul.GetAllServices()
assert.Len(t, services, 1)
}

func TestAddAgentsFromApp(t *testing.T) {
t.Parallel()
server := CreateConsulTestServer(t)
Expand Down
6 changes: 3 additions & 3 deletions service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package service
import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/allegro/marathon-consul/apps"
"github.com/stretchr/testify/assert"
)

func TestMarathonTaskTAg(t *testing.T) {
Expand All @@ -16,7 +16,7 @@ func TestServiceTaskId(t *testing.T) {
t.Parallel()
// given
service := Service{
ID: "123",
ID: "123",
Name: "abc",
RegisteringAgentAddress: "localhost",
Tags: []string{MarathonTaskTag("my-task")},
Expand All @@ -34,7 +34,7 @@ func TestServiceTaskId_NoMarathonTaskTag(t *testing.T) {
t.Parallel()
// given
service := Service{
ID: "123",
ID: "123",
Name: "abc",
RegisteringAgentAddress: "localhost",
Tags: []string{},
Expand Down
15 changes: 15 additions & 0 deletions utils/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package utils

import "fmt"

func MergeErrorsOrNil(errors []error, description string) error {
if len(errors) == 0 {
return nil
}

errMessage := fmt.Sprintf("%d errors occured %s", len(errors), description)
for i, err := range errors {
errMessage = fmt.Sprintf("%s\n%d: %s", errMessage, i+1, err.Error())
}
return fmt.Errorf(errMessage)
}
26 changes: 26 additions & 0 deletions utils/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package utils

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
)

func TestErrors_shouldMergeErrors(t *testing.T) {
t.Parallel()
// given
errs := []error{errors.New("first"), errors.New("second")}

// when
err := MergeErrorsOrNil(errs, "testing")

// then
assert.EqualError(t, err, "2 errors occured testing\n1: first\n2: second")
}

func TestErrors_shouldReturnNilForEmptyErrors(t *testing.T) {
t.Parallel()
// expect
assert.NoError(t, MergeErrorsOrNil([]error{}, "testing"))
}
21 changes: 3 additions & 18 deletions web/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/allegro/marathon-consul/marathon"
"github.com/allegro/marathon-consul/metrics"
"github.com/allegro/marathon-consul/service"
"github.com/allegro/marathon-consul/utils"
)

type event struct {
Expand Down Expand Up @@ -195,11 +196,7 @@ func (fh *eventHandler) handleDeploymentInfo(body []byte) error {
errors = append(errors, err)
}
}
if len(errors) > 0 {
return fh.mergeDeregistrationErrors(errors)
} else {
return nil
}
return utils.MergeErrorsOrNil(errors, "handling deregistration")
}

//This handler is used when an application is restarted and renamed
Expand All @@ -217,11 +214,7 @@ func (fh *eventHandler) handleDeploymentStepSuccess(body []byte) error {
errors = append(errors, err)
}
}
if len(errors) > 0 {
return fh.mergeDeregistrationErrors(errors)
} else {
return nil
}
return utils.MergeErrorsOrNil(errors, "handling deregistration")
}

func (fh *eventHandler) deregisterAllAppServices(app *apps.App) []error {
Expand Down Expand Up @@ -277,14 +270,6 @@ func findTaskById(id apps.TaskId, tasks_ []apps.Task) (apps.Task, error) {
return apps.Task{}, fmt.Errorf("Task %s not found", id)
}

func (fh *eventHandler) mergeDeregistrationErrors(errors []error) error {
errMessage := fmt.Sprintf("%d errors occured handling service deregistration", len(errors))
for i, err := range errors {
errMessage = fmt.Sprintf("%s\n%d: %s", errMessage, i, err.Error())
}
return fmt.Errorf(errMessage)
}

// for every other use of Tasks, Marathon uses the "id" field for the task ID.
// Here, it uses "taskId", with most of the other fields being equal. We'll
// just swap "taskId" for "id" in the body so that we can successfully parse
Expand Down

0 comments on commit 780fb4b

Please sign in to comment.