Skip to content

Commit

Permalink
Sync support for multi-registrations
Browse files Browse the repository at this point in the history
  • Loading branch information
pbetkier committed Sep 6, 2016
1 parent 741d90e commit 8e6cea8
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 12 deletions.
13 changes: 13 additions & 0 deletions apps/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,19 @@ type RegistrationIntent struct {
Tags []string
}

func (app *App) RegistrationIntentsNumber() int {
if !app.IsConsulApp() {
return 0
}

definitions := app.findConsulPortDefinitions()
if len(definitions) == 0 {
return 1
}

return len(definitions)
}

func (app *App) RegistrationIntents(task *Task, nameSeparator string) []*RegistrationIntent {
commonTags := labelsToTags(app.Labels)

Expand Down
5 changes: 5 additions & 0 deletions consul/consul_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ func (c *ConsulStub) RegisterWithoutMarathonTaskTag(task *apps.Task, app *apps.A
}
}

func (c *ConsulStub) RegisterOnlyFirstRegistrationIntent(task *apps.Task, app *apps.App) {
serviceRegistrations, _ := c.consul.marathonTaskToConsulServices(task, app)
c.services[service.ServiceId(serviceRegistrations[0].ID)] = serviceRegistrations[0]
}

func (c *ConsulStub) ServiceNames(app *apps.App) []string {
return c.consul.ServiceNames(app)
}
Expand Down
16 changes: 10 additions & 6 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,19 @@ func (s *Sync) deregisterConsulServicesNotFoundInMarathon(marathonApps []*apps.A
}

func (s *Sync) registerAppTasksNotFoundInConsul(marathonApps []*apps.App, services []*service.Service) {
registeredTaskIds := s.taskIdsInConsulServices(services)
registrationsUnderTaskIds := s.taskIdsInConsulServices(services)
for _, app := range marathonApps {
if !app.IsConsulApp() {
log.WithField("Id", app.ID).Debug("Not a Consul app, skipping registration")
continue
}
expectedRegistrations := app.RegistrationIntentsNumber()
for _, task := range app.Tasks {
if _, ok := registeredTaskIds[task.ID]; !ok {
if registrations := registrationsUnderTaskIds[task.ID]; registrations != expectedRegistrations {
if registrations != 0 {
log.WithField("Id", task.ID).WithField("HasRegistrations", registrations).
WithField("ExpectedRegistrations", expectedRegistrations).Info("Registering missing service registrations")
}
if task.IsHealthy() {
err := s.serviceRegistry.Register(&task, app)
if err != nil {
Expand All @@ -166,12 +171,11 @@ func (s *Sync) registerAppTasksNotFoundInConsul(marathonApps []*apps.App, servic
}
}

func (s *Sync) taskIdsInConsulServices(services []*service.Service) map[apps.TaskId]struct{} {
servicesSet := make(map[apps.TaskId]struct{})
var exists struct{}
func (s *Sync) taskIdsInConsulServices(services []*service.Service) map[apps.TaskId]int {
servicesSet := make(map[apps.TaskId]int)
for _, service := range services {
if taskId, err := service.TaskId(); err == nil {
servicesSet[taskId] = exists
servicesSet[taskId] += 1
}
}
return servicesSet
Expand Down
22 changes: 22 additions & 0 deletions sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,28 @@ func TestSync_WithRegisteringProblems(t *testing.T) {
assert.Len(t, services, 2)
}

func TestSync_ShouldRegisterMissingRegistrationInMultiregistrationScenario(t *testing.T) {
t.Parallel()
// given
app := ConsulAppMultipleRegistrations("/test/app", 1, 2)
marathon := marathon.MarathonerStubForApps(app)
consul := consul.NewConsulStub()

consul.RegisterOnlyFirstRegistrationIntent(&app.Tasks[0], app)
services, _ := consul.GetAllServices()
assert.Len(t, services, 1)

sync := newSyncWithDefaultConfig(marathon, consul)

// when
err := sync.SyncServices()

// then
services, _ = consul.GetAllServices()
assert.NoError(t, err)
assert.Len(t, services, 2)
}

func TestSync_WithDeregisteringProblems(t *testing.T) {
t.Parallel()
// given
Expand Down
31 changes: 25 additions & 6 deletions utils/apps.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,32 @@ import (
)

func ConsulApp(name string, instances int) *apps.App {
return app(name, instances, true, 0)
return app(name, instances, 1, true, 0)
}

func ConsulAppWithUnhealthyInstances(name string, instances int, unhealthyInstances int) *apps.App {
return app(name, instances, true, unhealthyInstances)
return app(name, instances, 1, true, unhealthyInstances)
}

func ConsulAppMultipleRegistrations(name string, instances int, registrations int) *apps.App {
return app(name, instances, registrations, true, 0)
}

func NonConsulApp(name string, instances int) *apps.App {
return app(name, instances, false, 0)
return app(name, instances, 1, false, 0)
}

func app(name string, instances int, consul bool, unhealthyInstances int) *apps.App {
func app(name string, instances int, registrationsPerInstance int, consul bool, unhealthyInstances int) *apps.App {
var appTasks []apps.Task
for i := 0; i < instances; i++ {
var ports []int
for j := 1; j <= registrationsPerInstance; j++ {
ports = append(ports, 8080 + (i * j) + j - 1)
}
task := apps.Task{
AppID: apps.AppId(name),
ID: apps.TaskId(fmt.Sprintf("%s.%d", strings.Replace(strings.Trim(name, "/"), "/", "_", -1), i)),
Ports: []int{8080 + i},
Ports: ports,
Host: "localhost",
}
if unhealthyInstances > 0 {
Expand All @@ -45,9 +53,20 @@ func app(name string, instances int, consul bool, unhealthyInstances int) *apps.
labels[apps.MARATHON_CONSUL_LABEL] = "true"
}

return &apps.App{
app := &apps.App{
ID: apps.AppId(name),
Tasks: appTasks,
Labels: labels,
}

if registrationsPerInstance > 1 {
for i := 0; i < registrationsPerInstance; i++ {
app.PortDefinitions = append(app.PortDefinitions, apps.PortDefinition{
Port: 0,
Labels: map[string]string{"consul": ""},
})
}
}

return app
}

0 comments on commit 8e6cea8

Please sign in to comment.