Skip to content

Commit

Permalink
Fix #88 - Recheck if service should be deregistered
Browse files Browse the repository at this point in the history
When service is marked to be deregisterd during sync a
sk marathon for it tasks to recheck if this task was
not started after sync procedure started. Prevents
deregistartion of tasks started after sync asked for all
marathon services. It also minimize call to marathon for a price
of not deregistering tasks that was killed during sync. This will
be handled with events or next sync.
  • Loading branch information
janisz committed Feb 28, 2017
1 parent 14fcb04 commit 3790002
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 16 deletions.
54 changes: 38 additions & 16 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,25 +124,38 @@ func (s *Sync) resolveHostname() error {
func (s *Sync) deregisterConsulServicesNotFoundInMarathon(marathonApps []*apps.App, services []*service.Service) (deregisterCount int, errorCount int) {
runningTasks := marathonTaskIdsSet(marathonApps)
for _, service := range services {
taskIDInTag, err := service.TaskId()
taskIDNotFoundInTag := err != nil
if taskIDNotFoundInTag {
logFields := log.Fields{
"Id": service.ID,
"Address": service.RegisteringAgentAddress,
"Sync": true,
}
if taskIDInTag, err := service.TaskId(); err != nil {
log.WithField("Id", service.ID).WithError(err).
Warn("Couldn't extract marathon task id, deregistering since sync should have reregistered it already")
}

if _, isRunning := runningTasks[taskIDInTag]; !isRunning || taskIDNotFoundInTag {
err := s.serviceRegistry.Deregister(service)
if err != nil {
log.WithError(err).WithFields(log.Fields{
"Id": service.ID,
"Address": service.RegisteringAgentAddress,
"Sync": true,
}).Error("Can't deregister service")
if err := s.serviceRegistry.Deregister(service); err != nil {
log.WithError(err).WithFields(logFields).Error("Can't deregister service")
errorCount++
} else {
deregisterCount++
}
} else if _, isRunning := runningTasks[taskIDInTag]; !isRunning {
// Check latest marathon state to prevent deregistration of live service.
tasks, err := s.marathon.Tasks(taskIDInTag.AppID())
if err != nil {
log.WithError(err).WithFields(logFields).
Error("Can't get fresh info about app tasks. Will deregister this service.")
}

taskIsRunning := findTask(taskIDInTag, tasks)

if !taskIsRunning {
if err := s.serviceRegistry.Deregister(service); err != nil {
log.WithError(err).WithFields(logFields).Error("Can't deregister service")
errorCount++
} else {
deregisterCount++
}
}
} else {
log.WithField("Id", service.ID).Debug("Service is running")
}
Expand All @@ -161,10 +174,10 @@ func (s *Sync) registerAppTasksNotFoundInConsul(marathonApps []*apps.App, servic
for _, task := range app.Tasks {
registrations := registrationsUnderTaskIds[task.ID]
logFields := log.Fields{
"Id": task.ID,
"HasRegistrations": registrations,
"Id": task.ID,
"HasRegistrations": registrations,
"ExpectedRegistrations": expectedRegistrations,
"Sync": true,
"Sync": true,
}
if registrations < expectedRegistrations {
if registrations != 0 {
Expand Down Expand Up @@ -211,3 +224,12 @@ func marathonTaskIdsSet(marathonApps []*apps.App) map[apps.TaskID]struct{} {
}
return tasksSet
}

func findTask(id apps.TaskID, tasks []*apps.Task) bool {
for _, t := range tasks {
if t.ID == id {
return true
}
}
return false
}
44 changes: 44 additions & 0 deletions sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,16 +385,60 @@ func TestSync_WithDeregisteringProblems(t *testing.T) {
for _, s := range allServices {
consulStub.FailDeregisterForID(s.ID)
}
sync := newSyncWithDefaultConfig(marathon, consulStub)

// when
err := sync.SyncServices()
services, _ := consulStub.GetAllServices()

// then
assert.NoError(t, err)
assert.Len(t, services, 1)
}

func TestSync_WithDeregisteringFallback(t *testing.T) {
t.Parallel()
// given
marathon := marathon.MarathonerStubForApps()
consulStub := consul.NewConsulStub()
marathonApp := ConsulApp("/test/app", 1)
for _, task := range marathonApp.Tasks {
consulStub.Register(&task, marathonApp)
}
marathon.TasksStub = map[apps.AppID][]*apps.Task{
apps.AppID("/test/app"): []*apps.Task{&marathonApp.Tasks[0]},
}
sync := newSyncWithDefaultConfig(marathon, consulStub)

// when
err := sync.SyncServices()
services, _ := consulStub.GetAllServices()

// then
assert.NoError(t, err)
assert.Len(t, services, 1)
}

func TestSync_WithDeregisteringFallbackError(t *testing.T) {
t.Parallel()
// given
marathon := marathon.MarathonerStubForApps()
consulStub := consul.NewConsulStub()
marathonApp := ConsulApp("/test/app", 1)
for _, task := range marathonApp.Tasks {
consulStub.Register(&task, marathonApp)
}
sync := newSyncWithDefaultConfig(marathon, consulStub)

// when
err := sync.SyncServices()
services, _ := consulStub.GetAllServices()

// then
assert.NoError(t, err)
assert.Len(t, services, 0)
}

func TestSync_WithMarathonProblems(t *testing.T) {
t.Parallel()
// given
Expand Down

0 comments on commit 3790002

Please sign in to comment.