Skip to content

Commit

Permalink
event based deregistration fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Krawczyk committed Jan 7, 2016
1 parent edf4a2b commit e056782
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 27 deletions.
5 changes: 5 additions & 0 deletions apps/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ type App struct {
Tasks []tasks.Task `json:"tasks"`
}

func (app *App) IsConsulApp() bool {
value, ok := app.Labels["consul"]
return ok && value == "true"
}

func ParseApps(jsonBlob []byte) ([]*App, error) {
apps := &AppsResponse{}
err := json.Unmarshal(jsonBlob, apps)
Expand Down
28 changes: 28 additions & 0 deletions apps/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,31 @@ func TestParseApp(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, expected, app)
}

func TestConsulApp(t *testing.T) {
t.Parallel()

// when
app := &App{
Labels: map[string]string{"consul": "true"},
}

// then
assert.True(t, app.IsConsulApp())

// when
app = &App{
Labels: map[string]string{"consul": "false", "marathon": "true"},
}

// then
assert.False(t, app.IsConsulApp())

// when
app = &App{
Labels: map[string]string{},
}

// then
assert.False(t, app.IsConsulApp())
}
2 changes: 1 addition & 1 deletion sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (s *Sync) registerMarathonApps(apps []*apps.App) {
healthCheck := app.HealthChecks
labels := app.Labels

if value, ok := app.Labels["consul"]; !ok || value != "true" {
if !app.IsConsulApp() {
log.WithField("Id", app.ID).Debug("App should not be registered in Consul")
continue
}
Expand Down
80 changes: 56 additions & 24 deletions web/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,22 @@ func (fh *EventHandler) handleTerminationEvent(w http.ResponseWriter, body []byt

// app_terminated_event only has one app in it, so we will just take care of
// it instead of looping
app := event.Apps()[0]
log.WithField("Id", app.ID).Info("Got TerminationEvent")
appId := event.Apps()[0].ID
log.WithField("Id", appId).Info("Got TerminationEvent")

app, err := fh.marathon.App(appId)
if err != nil {
log.WithField("Id", appId).WithError(err).Error("There was a problem obtaining app info")
fh.handleError(err, w)
return
}

if !app.IsConsulApp() {
err = fmt.Errorf("%s is not consul app. Missing consul:true label", app.ID)
log.WithField("Id", app.ID).WithError(err).Debug("Skipping app deregistration from Consul")
fh.handleBadRequest(err, w)
return
}

tasks, err := fh.marathon.Tasks(app.ID)
if err != nil {
Expand All @@ -97,7 +111,7 @@ func (fh *EventHandler) handleTerminationEvent(w http.ResponseWriter, body []byt
}
err = fmt.Errorf(errMessage)
fh.handleError(err, w)
log.WithError(err).WithField("Id", app.ID).Error("There where problems processing request")
log.WithError(err).WithField("Id", app.ID).Error("There were problems processing request")
} else {
w.WriteHeader(200)
fmt.Fprintln(w, "OK")
Expand Down Expand Up @@ -133,17 +147,15 @@ func (fh *EventHandler) handleHealthStatusEvent(w http.ResponseWriter, body []by
fh.handleError(err, w)
return
}
tasks := app.Tasks

if value, ok := app.Labels["consul"]; !ok || value != "true" {
if !app.IsConsulApp() {
err = fmt.Errorf("%s is not consul app. Missing consul:true label", app.ID)
log.WithField("Id", taskHealthChange.ID).WithError(err).Debug("App should not be registered in Consul")
log.WithField("Id", taskHealthChange.ID).WithError(err).Debug("Skipping app registration in Consul")
fh.handleBadRequest(err, w)
return
}

healthCheck := app.HealthChecks
labels := app.Labels
tasks := app.Tasks

task, err := findTaskById(taskHealthChange.ID, tasks)
if err != nil {
Expand All @@ -152,6 +164,9 @@ func (fh *EventHandler) handleHealthStatusEvent(w http.ResponseWriter, body []by
return
}

healthCheck := app.HealthChecks
labels := app.Labels

if service.IsTaskHealthy(task.HealthCheckResults) {
err := fh.service.Register(service.MarathonTaskToConsulService(task, healthCheck, labels))
if err != nil {
Expand Down Expand Up @@ -180,26 +195,43 @@ func findTaskById(id tasks.Id, tasks_ []tasks.Task) (tasks.Task, error) {
func (fh *EventHandler) handleStatusEvent(w http.ResponseWriter, body []byte) {
body = replaceTaskIdWithId(body)
task, err := tasks.ParseTask(body)

if err != nil {
log.WithError(err).WithField("Body", body).Error("[ERROR] body generated error")
log.WithError(err).WithField("Body", body).Error("Could not parse event body")
fh.handleBadRequest(err, w)
} else {
return
}

log.WithFields(log.Fields{
"Id": task.ID,
"TaskStatus": task.TaskStatus,
}).Info("Got StatusEvent")

switch task.TaskStatus {
case "TASK_FINISHED", "TASK_FAILED", "TASK_KILLED", "TASK_LOST":
app, err := fh.marathon.App(task.AppID)
if err != nil {
log.WithField("Id", task.AppID).WithError(err).Error("There was a problem obtaining app info")
fh.handleError(err, w)
return
}

if !app.IsConsulApp() {
err = fmt.Errorf("%s is not consul app. Missing consul:true label", app.ID)
log.WithField("Id", task.ID).WithError(err).Debug("Not handling task event")
fh.handleBadRequest(err, w)
return
}

fh.service.Deregister(task.ID, task.Host)
w.WriteHeader(200)
fmt.Fprintln(w, "OK")
default:
log.WithFields(log.Fields{
"taskStatus": task.TaskStatus,
"Id": task.ID,
"TaskStatus": task.TaskStatus,
}).Info("Got StatusEvent")
switch task.TaskStatus {
case "TASK_FINISHED", "TASK_FAILED", "TASK_KILLED", "TASK_LOST":
fh.service.Deregister(task.ID, task.Host)
w.WriteHeader(200)
fmt.Fprintln(w, "OK")
default:
log.WithFields(log.Fields{
"taskStatus": task.TaskStatus,
"Id": task.ID,
}).Info("not handling event")
fh.handleBadRequest(fmt.Errorf("Not Handling task %s with status %s", task.ID, task.TaskStatus), w)
}
}).Info("Not handling event")
fh.handleBadRequest(fmt.Errorf("Not Handling task %s with status %s", task.ID, task.TaskStatus), w)
}
}

Expand Down
69 changes: 67 additions & 2 deletions web/event_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/allegro/marathon-consul/consul"
"github.com/allegro/marathon-consul/events"
"github.com/allegro/marathon-consul/marathon"
Expand Down Expand Up @@ -98,6 +99,9 @@ func TestForwardHandler_HandleAppTerminatedEvent(t *testing.T) {
// given
app := ConsulApp("/test/app", 3)
marathon := marathon.MarathonerStubForApps(app)
a, _ := marathon.Apps()
log.Info("wut")
log.Info(a[0].Labels)
service := consul.NewConsulStub()
for _, task := range app.Tasks {
service.Register(consul.MarathonTaskToConsulService(task, app.HealthChecks, app.Labels))
Expand All @@ -118,6 +122,30 @@ func TestForwardHandler_HandleAppTerminatedEvent(t *testing.T) {
assert.Empty(t, services)
}

func TestForwardHandler_NotHandleNonConsulAppTerminatedEvent(t *testing.T) {
t.Parallel()
// given
app := NonConsulApp("/test/app", 3)
marathon := marathon.MarathonerStubForApps(app)
service := consul.NewConsulStub()
for _, task := range app.Tasks {
service.Register(consul.MarathonTaskToConsulService(task, app.HealthChecks, app.Labels))
}
handler := NewEventHandler(service, marathon)
body, _ := json.Marshal(events.AppTerminatedEvent{
Type: "app_terminated_event",
AppID: app.ID,
})
req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body)))
// when
recorder := httptest.NewRecorder()
handler.Handle(recorder, req)
// then
assert.Equal(t, 400, recorder.Code)
assert.Equal(t, "/test/app is not consul app. Missing consul:true label\n", recorder.Body.String())
assert.Len(t, service.RegisteredServicesIds(), 3)
}

func TestForwardHandler_HandleAppInvalidBody(t *testing.T) {
t.Parallel()
// given
Expand Down Expand Up @@ -289,11 +317,48 @@ func TestForwardHandler_HandleStatusEventAboutDeadTask(t *testing.T) {
}
}

func TestForwardHandler_NotHandleStatusEventAboutNonConsulAppsDeadTask(t *testing.T) {
t.Parallel()
// given
app := NonConsulApp("/test/app", 3)
marathon := marathon.MarathonerStubForApps(app)
service := consul.NewConsulStub()
for _, task := range app.Tasks {
service.Register(consul.MarathonTaskToConsulService(task, app.HealthChecks, app.Labels))
}
handler := NewEventHandler(service, marathon)
taskStatuses := []string{"TASK_FINISHED", "TASK_FAILED", "TASK_KILLED", "TASK_LOST"}
for _, taskStatus := range taskStatuses {
body := `{
"slaveId":"85e59460-a99e-4f16-b91f-145e0ea595bd-S0",
"taskId":"` + app.Tasks[1].ID.String() + `",
"taskStatus":"` + taskStatus + `",
"message":"Command terminated with signal Terminated",
"appId":"/test/app",
"host":"localhost",
"ports":[
31372
],
"version":"2015-12-07T09:02:48.981Z",
"eventType":"status_update_event",
"timestamp":"2015-12-07T09:33:40.898Z"
}`
req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body)))
// when
recorder := httptest.NewRecorder()
handler.Handle(recorder, req)
servicesIds := service.RegisteredServicesIds()
// then
assert.Equal(t, 400, recorder.Code)
assert.Equal(t, "/test/app is not consul app. Missing consul:true label\n", recorder.Body.String())
assert.Len(t, servicesIds, 3)
}
}

func TestForwardHandler_NotHandleHealthStatusEventWhenAppHasNotConsulLabel(t *testing.T) {
t.Parallel()
// given
app := ConsulApp("/test/app", 3)
app.Labels["consul"] = "false"
app := NonConsulApp("/test/app", 3)
marathon := marathon.MarathonerStubForApps(app)
service := consul.NewConsulStub()
handler := NewEventHandler(service, marathon)
Expand Down

0 comments on commit e056782

Please sign in to comment.