diff --git a/apps/app.go b/apps/app.go index 8904119..ec897b0 100644 --- a/apps/app.go +++ b/apps/app.go @@ -30,6 +30,11 @@ type App struct { Tasks []tasks.Task `json:"tasks"` } +func (app *App) IsConsulApp() bool { + value, ok := app.Labels["consul"] + return ok && value == "true" +} + func ParseApps(jsonBlob []byte) ([]*App, error) { apps := &AppsResponse{} err := json.Unmarshal(jsonBlob, apps) diff --git a/apps/app_test.go b/apps/app_test.go index 23d8436..a5f5623 100644 --- a/apps/app_test.go +++ b/apps/app_test.go @@ -86,3 +86,31 @@ func TestParseApp(t *testing.T) { assert.NoError(t, err) assert.Equal(t, expected, app) } + +func TestConsulApp(t *testing.T) { + t.Parallel() + + // when + app := &App{ + Labels: map[string]string{"consul": "true"}, + } + + // then + assert.True(t, app.IsConsulApp()) + + // when + app = &App{ + Labels: map[string]string{"consul": "false", "marathon": "true"}, + } + + // then + assert.False(t, app.IsConsulApp()) + + // when + app = &App{ + Labels: map[string]string{}, + } + + // then + assert.False(t, app.IsConsulApp()) +} diff --git a/sync/sync.go b/sync/sync.go index aee8ae3..7786d6d 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -70,7 +70,7 @@ func (s *Sync) registerMarathonApps(apps []*apps.App) { healthCheck := app.HealthChecks labels := app.Labels - if value, ok := app.Labels["consul"]; !ok || value != "true" { + if !app.IsConsulApp() { log.WithField("Id", app.ID).Debug("App should not be registered in Consul") continue } diff --git a/web/event_handler.go b/web/event_handler.go index c228b42..3d7d9d9 100644 --- a/web/event_handler.go +++ b/web/event_handler.go @@ -72,8 +72,22 @@ func (fh *EventHandler) handleTerminationEvent(w http.ResponseWriter, body []byt // app_terminated_event only has one app in it, so we will just take care of // it instead of looping - app := event.Apps()[0] - log.WithField("Id", app.ID).Info("Got TerminationEvent") + appId := event.Apps()[0].ID + log.WithField("Id", appId).Info("Got TerminationEvent") + + app, err := fh.marathon.App(appId) + if err != nil { + log.WithField("Id", appId).WithError(err).Error("There was a problem obtaining app info") + fh.handleError(err, w) + return + } + + if !app.IsConsulApp() { + err = fmt.Errorf("%s is not consul app. Missing consul:true label", app.ID) + log.WithField("Id", app.ID).WithError(err).Debug("Skipping app deregistration from Consul") + fh.handleBadRequest(err, w) + return + } tasks, err := fh.marathon.Tasks(app.ID) if err != nil { @@ -97,7 +111,7 @@ func (fh *EventHandler) handleTerminationEvent(w http.ResponseWriter, body []byt } err = fmt.Errorf(errMessage) fh.handleError(err, w) - log.WithError(err).WithField("Id", app.ID).Error("There where problems processing request") + log.WithError(err).WithField("Id", app.ID).Error("There were problems processing request") } else { w.WriteHeader(200) fmt.Fprintln(w, "OK") @@ -133,17 +147,15 @@ func (fh *EventHandler) handleHealthStatusEvent(w http.ResponseWriter, body []by fh.handleError(err, w) return } - tasks := app.Tasks - if value, ok := app.Labels["consul"]; !ok || value != "true" { + if !app.IsConsulApp() { err = fmt.Errorf("%s is not consul app. Missing consul:true label", app.ID) - log.WithField("Id", taskHealthChange.ID).WithError(err).Debug("App should not be registered in Consul") + log.WithField("Id", taskHealthChange.ID).WithError(err).Debug("Skipping app registration in Consul") fh.handleBadRequest(err, w) return } - healthCheck := app.HealthChecks - labels := app.Labels + tasks := app.Tasks task, err := findTaskById(taskHealthChange.ID, tasks) if err != nil { @@ -152,6 +164,9 @@ func (fh *EventHandler) handleHealthStatusEvent(w http.ResponseWriter, body []by return } + healthCheck := app.HealthChecks + labels := app.Labels + if service.IsTaskHealthy(task.HealthCheckResults) { err := fh.service.Register(service.MarathonTaskToConsulService(task, healthCheck, labels)) if err != nil { @@ -180,26 +195,43 @@ func findTaskById(id tasks.Id, tasks_ []tasks.Task) (tasks.Task, error) { func (fh *EventHandler) handleStatusEvent(w http.ResponseWriter, body []byte) { body = replaceTaskIdWithId(body) task, err := tasks.ParseTask(body) + if err != nil { - log.WithError(err).WithField("Body", body).Error("[ERROR] body generated error") + log.WithError(err).WithField("Body", body).Error("Could not parse event body") fh.handleBadRequest(err, w) - } else { + return + } + + log.WithFields(log.Fields{ + "Id": task.ID, + "TaskStatus": task.TaskStatus, + }).Info("Got StatusEvent") + + switch task.TaskStatus { + case "TASK_FINISHED", "TASK_FAILED", "TASK_KILLED", "TASK_LOST": + app, err := fh.marathon.App(task.AppID) + if err != nil { + log.WithField("Id", task.AppID).WithError(err).Error("There was a problem obtaining app info") + fh.handleError(err, w) + return + } + + if !app.IsConsulApp() { + err = fmt.Errorf("%s is not consul app. Missing consul:true label", app.ID) + log.WithField("Id", task.ID).WithError(err).Debug("Not handling task event") + fh.handleBadRequest(err, w) + return + } + + fh.service.Deregister(task.ID, task.Host) + w.WriteHeader(200) + fmt.Fprintln(w, "OK") + default: log.WithFields(log.Fields{ + "taskStatus": task.TaskStatus, "Id": task.ID, - "TaskStatus": task.TaskStatus, - }).Info("Got StatusEvent") - switch task.TaskStatus { - case "TASK_FINISHED", "TASK_FAILED", "TASK_KILLED", "TASK_LOST": - fh.service.Deregister(task.ID, task.Host) - w.WriteHeader(200) - fmt.Fprintln(w, "OK") - default: - log.WithFields(log.Fields{ - "taskStatus": task.TaskStatus, - "Id": task.ID, - }).Info("not handling event") - fh.handleBadRequest(fmt.Errorf("Not Handling task %s with status %s", task.ID, task.TaskStatus), w) - } + }).Info("Not handling event") + fh.handleBadRequest(fmt.Errorf("Not Handling task %s with status %s", task.ID, task.TaskStatus), w) } } diff --git a/web/event_handler_test.go b/web/event_handler_test.go index c8a5812..fb59e96 100644 --- a/web/event_handler_test.go +++ b/web/event_handler_test.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "fmt" + log "github.com/Sirupsen/logrus" "github.com/allegro/marathon-consul/consul" "github.com/allegro/marathon-consul/events" "github.com/allegro/marathon-consul/marathon" @@ -98,6 +99,9 @@ func TestForwardHandler_HandleAppTerminatedEvent(t *testing.T) { // given app := ConsulApp("/test/app", 3) marathon := marathon.MarathonerStubForApps(app) + a, _ := marathon.Apps() + log.Info("wut") + log.Info(a[0].Labels) service := consul.NewConsulStub() for _, task := range app.Tasks { service.Register(consul.MarathonTaskToConsulService(task, app.HealthChecks, app.Labels)) @@ -118,6 +122,30 @@ func TestForwardHandler_HandleAppTerminatedEvent(t *testing.T) { assert.Empty(t, services) } +func TestForwardHandler_NotHandleNonConsulAppTerminatedEvent(t *testing.T) { + t.Parallel() + // given + app := NonConsulApp("/test/app", 3) + marathon := marathon.MarathonerStubForApps(app) + service := consul.NewConsulStub() + for _, task := range app.Tasks { + service.Register(consul.MarathonTaskToConsulService(task, app.HealthChecks, app.Labels)) + } + handler := NewEventHandler(service, marathon) + body, _ := json.Marshal(events.AppTerminatedEvent{ + Type: "app_terminated_event", + AppID: app.ID, + }) + req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body))) + // when + recorder := httptest.NewRecorder() + handler.Handle(recorder, req) + // then + assert.Equal(t, 400, recorder.Code) + assert.Equal(t, "/test/app is not consul app. Missing consul:true label\n", recorder.Body.String()) + assert.Len(t, service.RegisteredServicesIds(), 3) +} + func TestForwardHandler_HandleAppInvalidBody(t *testing.T) { t.Parallel() // given @@ -289,11 +317,48 @@ func TestForwardHandler_HandleStatusEventAboutDeadTask(t *testing.T) { } } +func TestForwardHandler_NotHandleStatusEventAboutNonConsulAppsDeadTask(t *testing.T) { + t.Parallel() + // given + app := NonConsulApp("/test/app", 3) + marathon := marathon.MarathonerStubForApps(app) + service := consul.NewConsulStub() + for _, task := range app.Tasks { + service.Register(consul.MarathonTaskToConsulService(task, app.HealthChecks, app.Labels)) + } + handler := NewEventHandler(service, marathon) + taskStatuses := []string{"TASK_FINISHED", "TASK_FAILED", "TASK_KILLED", "TASK_LOST"} + for _, taskStatus := range taskStatuses { + body := `{ + "slaveId":"85e59460-a99e-4f16-b91f-145e0ea595bd-S0", + "taskId":"` + app.Tasks[1].ID.String() + `", + "taskStatus":"` + taskStatus + `", + "message":"Command terminated with signal Terminated", + "appId":"/test/app", + "host":"localhost", + "ports":[ + 31372 + ], + "version":"2015-12-07T09:02:48.981Z", + "eventType":"status_update_event", + "timestamp":"2015-12-07T09:33:40.898Z" + }` + req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body))) + // when + recorder := httptest.NewRecorder() + handler.Handle(recorder, req) + servicesIds := service.RegisteredServicesIds() + // then + assert.Equal(t, 400, recorder.Code) + assert.Equal(t, "/test/app is not consul app. Missing consul:true label\n", recorder.Body.String()) + assert.Len(t, servicesIds, 3) + } +} + func TestForwardHandler_NotHandleHealthStatusEventWhenAppHasNotConsulLabel(t *testing.T) { t.Parallel() // given - app := ConsulApp("/test/app", 3) - app.Labels["consul"] = "false" + app := NonConsulApp("/test/app", 3) marathon := marathon.MarathonerStubForApps(app) service := consul.NewConsulStub() handler := NewEventHandler(service, marathon)