From e0f2b8cb45b11e8b24d88eebdcfd2c905e7b580c Mon Sep 17 00:00:00 2001 From: Tomasz Janiszewski Date: Mon, 25 Apr 2016 11:40:26 +0200 Subject: [PATCH] Fixes #78 | Async events handler --- README.md | 4 +- config/config.go | 9 +- config/config_test.go | 7 +- debian/config.json | 4 +- main.go | 5 +- marathon/marathon_stub.go | 25 +- marathon/marathon_stub_test.go | 3 + utils/apps.go | 2 +- web/config.go | 7 + web/event_handler.go | 229 ++++++++---------- web/event_handler_test.go | 410 ++++++++++++++++++++------------- web/web.go | 30 +++ web/web_handler.go | 58 +++++ 13 files changed, 491 insertions(+), 302 deletions(-) create mode 100644 web/config.go create mode 100644 web/web.go create mode 100644 web/web_handler.go diff --git a/README.md b/README.md index 3204229..67158eb 100644 --- a/README.md +++ b/README.md @@ -118,6 +118,7 @@ consul-ssl-verify | `true` | Verify certificates when connec consul-token | | The Consul ACL token consul-tag | `marathon` | Common tag name added to every service registered in Consul, should be unique for every Marathon-cluster connected to Consul consul-timeout | `3s` | Time limit for requests made by the Consul HTTP client. A Timeout of zero means no timeout +events-queue-size | `1000` | Size of events queue listen | `:4000` | Accept connections at this address log-level | `info` | Log level: panic, fatal, error, warn, info, or debug log-format | `text` | Log format: JSON, text @@ -132,9 +133,10 @@ metrics-location | | Graphite URL (used when metrics metrics-prefix | `default` | Metrics prefix (resolved to `.` by default) metrics-target | `stdout` | Metrics destination `stdout` or `graphite` (empty string disables metrics) sync-enabled | `true` | Enable Marathon-consul scheduled sync +sync-force | `false` | Force leadership-independent Marathon-consul sync (run always) sync-interval | `15m0s` | Marathon-consul sync [interval](https://golang.org/pkg/time/#Duration) **Note:** While using file configuration intervals should be provided in *nanoseconds* sync-leader | `:8080` | Marathon cluster-wide node name (defaults to `:8080`), the sync will run only if the node is the current [Marathon-leader](https://mesosphere.github.io/marathon/docs/rest-api.html#get-v2-leader) -sync-force | `false` | Force leadership-independent Marathon-consul sync (run always) +workers-pool-size | `10` | Number of concurrent workers processing events ### Endpoints diff --git a/config/config.go b/config/config.go index e692084..fdd3b2e 100644 --- a/config/config.go +++ b/config/config.go @@ -12,14 +12,13 @@ import ( "github.com/allegro/marathon-consul/marathon" "github.com/allegro/marathon-consul/metrics" "github.com/allegro/marathon-consul/sync" + "github.com/allegro/marathon-consul/web" flag "github.com/ogier/pflag" ) type Config struct { - Consul consul.ConsulConfig - Web struct { - Listen string - } + Consul consul.ConsulConfig + Web web.Config Sync sync.Config Marathon marathon.Config Metrics metrics.Config @@ -75,6 +74,8 @@ func (config *Config) parseFlags() { // Web flag.StringVar(&config.Web.Listen, "listen", ":4000", "accept connections at this address") + flag.IntVar(&config.Web.QueueSize, "events-queue-size", 1000, "size of events queue") + flag.IntVar(&config.Web.WorkersCount, "workers-pool-size", 10, "number of concurrent workers processing events") // Sync flag.BoolVar(&config.Sync.Enabled, "sync-enabled", true, "Enable Marathon-consul scheduled sync") diff --git a/config/config_test.go b/config/config_test.go index 4377c5f..5961bb5 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -10,6 +10,7 @@ import ( "github.com/allegro/marathon-consul/marathon" "github.com/allegro/marathon-consul/metrics" "github.com/allegro/marathon-consul/sync" + "github.com/allegro/marathon-consul/web" "github.com/stretchr/testify/assert" ) @@ -99,7 +100,11 @@ func TestConfig_ShouldBeMergedWithFileDefaultsAndFlags(t *testing.T) { RequestRetries: 5, AgentFailuresTolerance: 3, }, - Web: struct{ Listen string }{Listen: ":4000"}, + Web: web.Config{ + Listen: ":4000", + QueueSize: 1000, + WorkersCount: 10, + }, Sync: sync.Config{ Interval: 15 * time.Minute, Enabled: true, diff --git a/debian/config.json b/debian/config.json index eaa7410..a602949 100644 --- a/debian/config.json +++ b/debian/config.json @@ -17,7 +17,9 @@ "RequestRetries": 5 }, "Web": { - "Listen": ":4000" + "Listen": ":4000", + "QueueSize": 1000, + "WorkersCount": 10 }, "Sync": { "Enabled": true, diff --git a/main.go b/main.go index 56e92e3..dfa9efb 100644 --- a/main.go +++ b/main.go @@ -37,9 +37,12 @@ func main() { sync := sync.New(config.Sync, remote, service) go sync.StartSyncServicesJob() + handler, stop := web.NewHandler(config.Web, remote, service) + defer stop() + // set up routes http.HandleFunc("/health", web.HealthHandler) - http.HandleFunc("/events", web.NewEventHandler(service, remote).Handle) + http.HandleFunc("/events", handler) log.WithField("Port", config.Web.Listen).Info("Listening") log.Fatal(http.ListenAndServe(config.Web.Listen, nil)) diff --git a/marathon/marathon_stub.go b/marathon/marathon_stub.go index b067fd5..b623f9e 100644 --- a/marathon/marathon_stub.go +++ b/marathon/marathon_stub.go @@ -7,17 +7,20 @@ import ( ) type MarathonerStub struct { - AppsStub []*apps.App - AppStub map[apps.AppId]*apps.App - TasksStub map[apps.AppId][]*apps.Task - leader string + AppsStub []*apps.App + AppStub map[apps.AppId]*apps.App + TasksStub map[apps.AppId][]*apps.Task + leader string + interactions bool } -func (m MarathonerStub) ConsulApps() ([]*apps.App, error) { +func (m *MarathonerStub) ConsulApps() ([]*apps.App, error) { + m.interactions = true return m.AppsStub, nil } -func (m MarathonerStub) App(id apps.AppId) (*apps.App, error) { +func (m *MarathonerStub) App(id apps.AppId) (*apps.App, error) { + m.interactions = true if app, ok := m.AppStub[id]; ok { return app, nil } else { @@ -25,7 +28,8 @@ func (m MarathonerStub) App(id apps.AppId) (*apps.App, error) { } } -func (m MarathonerStub) Tasks(appId apps.AppId) ([]*apps.Task, error) { +func (m *MarathonerStub) Tasks(appId apps.AppId) ([]*apps.Task, error) { + m.interactions = true if app, ok := m.TasksStub[appId]; ok { return app, nil } else { @@ -33,10 +37,15 @@ func (m MarathonerStub) Tasks(appId apps.AppId) ([]*apps.Task, error) { } } -func (m MarathonerStub) Leader() (string, error) { +func (m *MarathonerStub) Leader() (string, error) { + m.interactions = true return m.leader, nil } +func (m MarathonerStub) Interactions() bool { + return m.interactions +} + func MarathonerStubWithLeaderForApps(leader string, args ...*apps.App) *MarathonerStub { stub := MarathonerStubForApps(args...) stub.leader = leader diff --git a/marathon/marathon_stub_test.go b/marathon/marathon_stub_test.go index d74885c..5b5a0de 100644 --- a/marathon/marathon_stub_test.go +++ b/marathon/marathon_stub_test.go @@ -12,9 +12,12 @@ func TestMarathonStub(t *testing.T) { t.Parallel() // given m := marathon.MarathonerStubWithLeaderForApps("some.host:1234", utils.ConsulApp("/test/app", 3)) + // then + assert.False(t, m.Interactions()) // when leader, _ := m.Leader() // then + assert.True(t, m.Interactions()) assert.Equal(t, "some.host:1234", leader) // when apps, _ := m.ConsulApps() diff --git a/utils/apps.go b/utils/apps.go index b02883f..04aac94 100644 --- a/utils/apps.go +++ b/utils/apps.go @@ -2,9 +2,9 @@ package utils import ( "fmt" + "strings" "github.com/allegro/marathon-consul/apps" - "strings" ) func ConsulApp(name string, instances int) *apps.App { diff --git a/web/config.go b/web/config.go new file mode 100644 index 0000000..93ebc3e --- /dev/null +++ b/web/config.go @@ -0,0 +1,7 @@ +package web + +type Config struct { + Listen string + QueueSize int + WorkersCount int +} diff --git a/web/event_handler.go b/web/event_handler.go index ca8820b..c789328 100644 --- a/web/event_handler.go +++ b/web/event_handler.go @@ -3,8 +3,8 @@ package web import ( "bytes" "fmt" - "io/ioutil" - "net/http" + "strings" + "time" log "github.com/Sirupsen/logrus" "github.com/allegro/marathon-consul/apps" @@ -14,68 +14,83 @@ import ( "github.com/allegro/marathon-consul/metrics" ) -type EventHandler struct { - service service.ConsulServices - marathon marathon.Marathoner +type event struct { + timestamp time.Time + eventType string + body []byte } -func NewEventHandler(service service.ConsulServices, marathon marathon.Marathoner) *EventHandler { - return &EventHandler{ - service: service, - marathon: marathon, - } -} - -func (fh *EventHandler) Handle(w http.ResponseWriter, r *http.Request) { - metrics.Time("events.response", func() { - fh.handle(w, r) - }) +type eventHandler struct { + id int + service service.ConsulServices + marathon marathon.Marathoner + eventQueue <-chan event } -func (fh *EventHandler) handle(w http.ResponseWriter, r *http.Request) { +type stopEvent struct{} - body, err := ioutil.ReadAll(r.Body) - if err != nil { - log.WithError(err).Debug("Malformed request") - fh.handleBadRequest(err, w) - return +func newEventHandler(id int, service service.ConsulServices, marathon marathon.Marathoner, eventQueue <-chan event) *eventHandler { + return &eventHandler{ + id: id, + service: service, + marathon: marathon, + eventQueue: eventQueue, } - log.WithField("Body", string(body)).Debug("Received request") +} - eventType, err := events.EventType(body) - if err != nil { - fh.handleBadRequest(err, w) - return +func (fh *eventHandler) Start() chan<- stopEvent { + var event event + process := func() { + err := fh.handleEvent(event.eventType, event.body) + if err != nil { + metrics.Mark("events.processing.error") + } else { + metrics.Mark("events.processing.succes") + } } - fh.markEventRequest(eventType) + quitChan := make(chan stopEvent) + log.WithField("Id", fh.id).Println("Starting worker") + go func() { + for { + select { + case event = <-fh.eventQueue: + metrics.Mark(fmt.Sprintf("events.handler.%d", fh.id)) + metrics.UpdateGauge("events.queue.len", int64(len(fh.eventQueue))) + metrics.UpdateGauge("events.queue.delay_ns", time.Since(event.timestamp).Nanoseconds()) + metrics.Time("events.processing."+event.eventType, process) + case <-quitChan: + log.WithField("Id", fh.id).Info("Stopping worker") + } + } + }() + return quitChan +} - log.WithField("EventType", eventType).Debug("Received event") +func (fh *eventHandler) handleEvent(eventType string, body []byte) error { + + body = replaceTaskIdWithId(body) switch eventType { case "status_update_event": - fh.handleStatusEvent(w, body) + return fh.handleStatusEvent(body) case "health_status_changed_event": - fh.handleHealthStatusEvent(w, body) + return fh.handleHealthStatusEvent(body) case "deployment_info": - fh.handleDeploymentInfo(w, body) + return fh.handleDeploymentInfo(body) case "deployment_step_success": - fh.handleDeploymentStepSuccess(w, body) + return fh.handleDeploymentStepSuccess(body) default: log.WithField("EventType", eventType).Debug("Not handled event type") - fh.okResponse(w) + return nil } - - fh.markSuccess() } -func (fh *EventHandler) handleHealthStatusEvent(w http.ResponseWriter, body []byte) { - body = replaceTaskIdWithId(body) +func (fh *eventHandler) handleHealthStatusEvent(body []byte) error { taskHealthChange, err := events.ParseTaskHealthChange(body) if err != nil { log.WithError(err).Error("Body generated error") - fh.handleBadRequest(err, w) - return + return err } log.WithFields( @@ -85,25 +100,21 @@ func (fh *EventHandler) handleHealthStatusEvent(w http.ResponseWriter, body []by }).Info("Got HealthStatusEvent") if !taskHealthChange.Alive { - err := fmt.Errorf("Task %s is not healthy. Not registering", taskHealthChange.ID) - log.WithField("Id", taskHealthChange.ID).WithError(err).Debug("Task is not healthy. Not registering") - fh.okResponse(w) - return + log.WithField("Id", taskHealthChange.ID).Debug("Task is not alive. Not registering") + return nil } appId := taskHealthChange.AppID app, err := fh.marathon.App(appId) if err != nil { log.WithField("Id", taskHealthChange.ID).WithError(err).Error("There was a problem obtaining app info") - fh.handleError(err, w) - return + return err } if !app.IsConsulApp() { err = fmt.Errorf("%s is not consul app. Missing consul label", app.ID) log.WithField("Id", taskHealthChange.ID).WithError(err).Debug("Skipping app registration in Consul") - fh.okResponse(w) - return + return nil } tasks := app.Tasks @@ -111,42 +122,29 @@ func (fh *EventHandler) handleHealthStatusEvent(w http.ResponseWriter, body []by task, err := findTaskById(taskHealthChange.ID, tasks) if err != nil { log.WithField("Id", taskHealthChange.ID).WithError(err).Error("Task not found") - fh.handleError(err, w) - return + return err } if task.IsHealthy() { err := fh.service.Register(&task, app) if err != nil { log.WithField("Id", task.ID).WithError(err).Error("There was a problem registering task") - fh.handleError(err, w) + return err } else { - fh.okResponse(w) + return nil } } else { - err := fmt.Errorf("Task %s is not healthy. Not registering", task.ID) - log.WithField("Id", task.ID).WithError(err).Debug("Task is not healthy. Not registering") - fh.okResponse(w) - } -} - -func findTaskById(id apps.TaskId, tasks_ []apps.Task) (apps.Task, error) { - for _, task := range tasks_ { - if task.ID == id { - return task, nil - } + log.WithField("Id", task.ID).Debug("Task is not healthy. Not registering") + return nil } - return apps.Task{}, fmt.Errorf("Task %s not found", id) } -func (fh *EventHandler) handleStatusEvent(w http.ResponseWriter, body []byte) { - body = replaceTaskIdWithId(body) +func (fh *eventHandler) handleStatusEvent(body []byte) error { task, err := apps.ParseTask(body) if err != nil { log.WithError(err).WithField("Body", body).Error("Could not parse event body") - fh.handleBadRequest(err, w) - return + return err } log.WithFields(log.Fields{ @@ -159,71 +157,64 @@ func (fh *EventHandler) handleStatusEvent(w http.ResponseWriter, body []byte) { err = fh.service.Deregister(task.ID, task.Host) if err != nil { log.WithField("Id", task.ID).WithError(err).Error("There was a problem deregistering task") - fh.handleError(err, w) - return + return err + } else { + return nil } default: log.WithFields(log.Fields{ "Id": task.ID, "taskStatus": task.TaskStatus, }).Debug("Not handled task status") + return nil } - fh.okResponse(w) } -/* - This handler is used when an application is stopped -*/ -func (fh *EventHandler) handleDeploymentInfo(w http.ResponseWriter, body []byte) { - body = replaceTaskIdWithId(body) +//This handler is used when an application is stopped +func (fh *eventHandler) handleDeploymentInfo(body []byte) error { deploymentEvent, err := events.ParseDeploymentEvent(body) if err != nil { log.WithError(err).WithField("Body", body).Error("Could not parse event body") - fh.handleBadRequest(err, w) - return + return err } errors := []error{} for _, app := range deploymentEvent.StoppedConsulApps() { - for _, error := range fh.deregisterAllAppServices(app) { - errors = append(errors, error) + for _, err = range fh.deregisterAllAppServices(app) { + errors = append(errors, err) } } if len(errors) > 0 { - fh.handleError(fh.mergeDeregistrationErrors(errors), w) - return + return fh.mergeDeregistrationErrors(errors) + } else { + return nil } - fh.okResponse(w) } -/* - This handler is used when an application is restarted and renamed -*/ -func (fh *EventHandler) handleDeploymentStepSuccess(w http.ResponseWriter, body []byte) { - body = replaceTaskIdWithId(body) +//This handler is used when an application is restarted and renamed +func (fh *eventHandler) handleDeploymentStepSuccess(body []byte) error { deploymentEvent, err := events.ParseDeploymentEvent(body) if err != nil { log.WithError(err).WithField("Body", body).Error("Could not parse event body") - fh.handleBadRequest(err, w) - return + return err } errors := []error{} for _, app := range deploymentEvent.RenamedConsulApps() { - for _, error := range fh.deregisterAllAppServices(app) { - errors = append(errors, error) + for _, err = range fh.deregisterAllAppServices(app) { + errors = append(errors, err) } } if len(errors) > 0 { - fh.handleError(fh.mergeDeregistrationErrors(errors), w) - return + return fh.mergeDeregistrationErrors(errors) + } else { + return nil } - fh.okResponse(w) } -func (fh *EventHandler) deregisterAllAppServices(app *apps.App) []error { +func (fh *eventHandler) deregisterAllAppServices(app *apps.App) []error { errors := []error{} serviceName := app.ConsulServiceName() @@ -256,7 +247,16 @@ func (fh *EventHandler) deregisterAllAppServices(app *apps.App) []error { return errors } -func (fh *EventHandler) mergeDeregistrationErrors(errors []error) error { +func findTaskById(id apps.TaskId, tasks_ []apps.Task) (apps.Task, error) { + for _, task := range tasks_ { + if task.ID == id { + return task, nil + } + } + return apps.Task{}, fmt.Errorf("Task %s not found", id) +} + +func (fh *eventHandler) mergeDeregistrationErrors(errors []error) error { errMessage := fmt.Sprintf("%d errors occured handling service deregistration", len(errors)) for i, err := range errors { errMessage = fmt.Sprintf("%s\n%d: %s", errMessage, i, err.Error()) @@ -264,37 +264,10 @@ func (fh *EventHandler) mergeDeregistrationErrors(errors []error) error { return fmt.Errorf(errMessage) } +// for every other use of Tasks, Marathon uses the "id" field for the task ID. +// Here, it uses "taskId", with most of the other fields being equal. We'll +// just swap "taskId" for "id" in the body so that we can successfully parse +// incoming events. func replaceTaskIdWithId(body []byte) []byte { - // for every other use of Tasks, Marathon uses the "id" field for the task ID. - // Here, it uses "taskId", with most of the other fields being equal. We'll - // just swap "taskId" for "id" in the body so that we can successfully parse - // incoming events. return bytes.Replace(body, []byte("taskId"), []byte("id"), -1) } - -func (fh *EventHandler) markEventRequest(event string) { - metrics.Mark("events.requests." + event) -} - -func (fh *EventHandler) markSuccess() { - metrics.Mark("events.response.success") -} - -func (fh *EventHandler) handleError(err error, w http.ResponseWriter) { - metrics.Mark("events.response.error.500") - w.WriteHeader(500) - log.WithError(err).Debug("Returning 500 due to error") - fmt.Fprintln(w, err.Error()) -} - -func (fh *EventHandler) handleBadRequest(err error, w http.ResponseWriter) { - metrics.Mark("events.response.error.400") - w.WriteHeader(400) - log.WithError(err).Debug("Returning 400 due to malformed request") - fmt.Fprintln(w, err.Error()) -} - -func (fh *EventHandler) okResponse(w http.ResponseWriter) { - w.WriteHeader(200) - fmt.Fprintln(w, "OK") -} diff --git a/web/event_handler_test.go b/web/event_handler_test.go index 8487fb0..d7915f9 100644 --- a/web/event_handler_test.go +++ b/web/event_handler_test.go @@ -16,26 +16,29 @@ import ( "github.com/stretchr/testify/assert" ) -func TestForwardHandler_NotHandleUnknownEventType(t *testing.T) { +func TestWebHandler_NotHandleUnknownEventType(t *testing.T) { t.Parallel() // given - handler := NewEventHandler(nil, nil) + queue := make(chan event) + handler := newWebHandler(queue) + stopChan := newEventHandler(0, nil, nil, queue).Start() req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(`{"eventType":"test_event"}`))) // when recorder := httptest.NewRecorder() handler.Handle(recorder, req) + stopChan <- stopEvent{} // then - assert.Equal(t, 200, recorder.Code) + assertAccepted(t, recorder) } -func TestForwardHandler_HandleRadderError(t *testing.T) { +func TestWebHandler_HandleRadderError(t *testing.T) { t.Parallel() // given - handler := NewEventHandler(nil, nil) + handler := newWebHandler(nil) req, _ := http.NewRequest("POST", "/events", BadReader{}) // when @@ -47,11 +50,11 @@ func TestForwardHandler_HandleRadderError(t *testing.T) { assert.Equal(t, "Some error\n", recorder.Body.String()) } -func TestForwardHandler_HandleEmptyBody(t *testing.T) { +func TestWebHandler_HandleEmptyBody(t *testing.T) { t.Parallel() // given - handler := NewEventHandler(nil, nil) + handler := newWebHandler(nil) req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte{})) // when @@ -63,11 +66,11 @@ func TestForwardHandler_HandleEmptyBody(t *testing.T) { assert.Equal(t, "unexpected end of JSON input\n", recorder.Body.String()) } -func TestForwardHandler_NotHandleMalformedEventType(t *testing.T) { +func TestWebHandler_NotHandleMalformedEventType(t *testing.T) { t.Parallel() // given - handler := NewEventHandler(nil, nil) + handler := newWebHandler(nil) req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(`{eventType:"test_event"}`))) // when @@ -79,11 +82,11 @@ func TestForwardHandler_NotHandleMalformedEventType(t *testing.T) { assert.Equal(t, "invalid character 'e' looking for beginning of object key string\n", recorder.Body.String()) } -func TestForwardHandler_HandleMalformedEventType(t *testing.T) { +func TestWebHandler_HandleMalformedEventType(t *testing.T) { t.Parallel() // given - handler := NewEventHandler(nil, nil) + handler := newWebHandler(nil) req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(`{eventType:"test_event"}`))) // when @@ -95,11 +98,11 @@ func TestForwardHandler_HandleMalformedEventType(t *testing.T) { assert.Equal(t, "invalid character 'e' looking for beginning of object key string\n", recorder.Body.String()) } -func TestForwardHandler_NotHandleInvalidEventType(t *testing.T) { +func TestWebHandler_NotHandleInvalidEventType(t *testing.T) { t.Parallel() // given - handler := NewEventHandler(nil, nil) + handler := newWebHandler(nil) req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(`{"eventType":[1,2]}`))) // when @@ -111,24 +114,27 @@ func TestForwardHandler_NotHandleInvalidEventType(t *testing.T) { assert.Equal(t, "json: cannot unmarshal array into Go value of type string\n", recorder.Body.String()) } -func TestForwardHandler_HandleAppInvalidBody(t *testing.T) { +func TestWebHandler_HandleAppInvalidBody(t *testing.T) { t.Parallel() // given - handler := NewEventHandler(nil, nil) + queue := make(chan event) + handler := newWebHandler(queue) + stopChan := newEventHandler(0, nil, nil, queue).Start() body := `{"type": "app_terminated_event", "appID": 123}` req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body))) // when recorder := httptest.NewRecorder() handler.Handle(recorder, req) + stopChan <- stopEvent{} // then assert.Equal(t, 400, recorder.Code) assert.Equal(t, "no event\n", recorder.Body.String()) } -func TestForwardHandler_HandleDeploymentInfoWithStopApplicationAction(t *testing.T) { +func TestWebHandler_HandleDeploymentInfoWithStopApplicationAction(t *testing.T) { t.Parallel() // given @@ -136,66 +142,72 @@ func TestForwardHandler_HandleDeploymentInfoWithStopApplicationAction(t *testing marathon := marathon.MarathonerStubForApps() service := newConsulStubWithApplicationsTasksRegistered(app) assert.Len(t, service.RegisteredServicesIds(), 3) - handler := NewEventHandler(service, marathon) + handle, stop := NewHandler(Config{WorkersCount: 1}, marathon, service) body, _ := json.Marshal(deploymentInfoWithStopApplicationActionForApps(app)) req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body))) // when recorder := httptest.NewRecorder() - handler.Handle(recorder, req) + handle(recorder, req) + stop() // then - assert.Equal(t, 200, recorder.Code) - assert.Len(t, service.RegisteredServicesIds(), 0) + assertAccepted(t, recorder) + assert.Empty(t, service.RegisteredServicesIds()) + assert.False(t, marathon.Interactions()) } -func TestForwardHandler_HandleDeploymentInfoWithStopApplicationActionForMultipleApps(t *testing.T) { +func TestWebHandler_handleDeploymentInfoWithStopApplicationForOneApp(t *testing.T) { t.Parallel() // given - app1 := ConsulApp("/test/app", 3) - app2 := ConsulApp("/test/otherapp", 2) + green := ConsulApp("/test/app.green", 3) + green.Labels[apps.MARATHON_CONSUL_LABEL] = "app" + blue := ConsulApp("/test/app.blue", 2) + blue.Labels[apps.MARATHON_CONSUL_LABEL] = "app" marathon := marathon.MarathonerStubForApps() - service := newConsulStubWithApplicationsTasksRegistered(app1, app2) + service := newConsulStubWithApplicationsTasksRegistered(green, blue) assert.Len(t, service.RegisteredServicesIds(), 5) - handler := NewEventHandler(service, marathon) - body, _ := json.Marshal(deploymentInfoWithStopApplicationActionForApps(app1, app2)) + handle, stop := NewHandler(Config{WorkersCount: 1}, marathon, service) + body, _ := json.Marshal(deploymentInfoWithStopApplicationActionForApps(green)) req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body))) // when recorder := httptest.NewRecorder() - handler.Handle(recorder, req) + handle(recorder, req) + stop() // then - assert.Equal(t, 200, recorder.Code) - assert.Len(t, service.RegisteredServicesIds(), 0) + assertAccepted(t, recorder) + assert.Len(t, service.RegisteredServicesIds(), 2) + assert.False(t, marathon.Interactions()) } -func TestForwardHandler_HandleDeploymentInfoWithStopApplicationForOneApp(t *testing.T) { +func TestWebHandler_HandleDeploymentInfoWithStopApplicationActionForMultipleApps(t *testing.T) { t.Parallel() // given - green := ConsulApp("/test/app.green", 3) - green.Labels[apps.MARATHON_CONSUL_LABEL] = "app" - blue := ConsulApp("/test/app.blue", 2) - blue.Labels[apps.MARATHON_CONSUL_LABEL] = "app" + app1 := ConsulApp("/test/app", 3) + app2 := ConsulApp("/test/otherapp", 2) marathon := marathon.MarathonerStubForApps() - service := newConsulStubWithApplicationsTasksRegistered(green, blue) + service := newConsulStubWithApplicationsTasksRegistered(app1, app2) assert.Len(t, service.RegisteredServicesIds(), 5) - handler := NewEventHandler(service, marathon) - body, _ := json.Marshal(deploymentInfoWithStopApplicationActionForApps(green)) + handle, stop := NewHandler(Config{WorkersCount: 1}, marathon, service) + body, _ := json.Marshal(deploymentInfoWithStopApplicationActionForApps(app1, app2)) req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body))) // when recorder := httptest.NewRecorder() - handler.Handle(recorder, req) + handle(recorder, req) + stop() // then - assert.Equal(t, 200, recorder.Code) - assert.Len(t, service.RegisteredServicesIds(), 2) + assertAccepted(t, recorder) + assert.Len(t, service.RegisteredServicesIds(), 0) + assert.False(t, marathon.Interactions()) } -func TestForwardHandler_HandleDeploymentInfoWithStopApplicationActionForMultipleAppsAndProblemsDeregisteringOne(t *testing.T) { +func TestWebHandler_HandleDeploymentInfoWithStopApplicationActionForMultipleAppsAndProblemsDeregisteringOne(t *testing.T) { t.Parallel() // given @@ -205,20 +217,22 @@ func TestForwardHandler_HandleDeploymentInfoWithStopApplicationActionForMultiple service := newConsulStubWithApplicationsTasksRegistered(app1, app2) service.ErrorServices["test_app.1"] = fmt.Errorf("Cannot deregister service") assert.Len(t, service.RegisteredServicesIds(), 5) - handler := NewEventHandler(service, marathon) + handle, stop := NewHandler(Config{WorkersCount: 1}, marathon, service) body, _ := json.Marshal(deploymentInfoWithStopApplicationActionForApps(app1, app2)) req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body))) // when recorder := httptest.NewRecorder() - handler.Handle(recorder, req) + handle(recorder, req) + stop() // then - assert.Equal(t, 500, recorder.Code) + assertAccepted(t, recorder) assert.Len(t, service.RegisteredServicesIds(), 1) + assert.False(t, marathon.Interactions()) } -func TestForwardHandler_HandleDeploymentInfoWithStopApplicationActionForMultipleAppsAndProblemsGettingServicesForOne(t *testing.T) { +func TestWebHandler_HandleDeploymentInfoWithStopApplicationActionForMultipleAppsAndProblemsGettingServicesForOne(t *testing.T) { t.Parallel() // given @@ -228,20 +242,22 @@ func TestForwardHandler_HandleDeploymentInfoWithStopApplicationActionForMultiple service := newConsulStubWithApplicationsTasksRegistered(app1, app2) service.ErrorGetServices["test.app"] = fmt.Errorf("Something went terribly wrong!") assert.Len(t, service.RegisteredServicesIds(), 5) - handler := NewEventHandler(service, marathon) + handle, stop := NewHandler(Config{WorkersCount: 1}, marathon, service) body, _ := json.Marshal(deploymentInfoWithStopApplicationActionForApps(app1, app2)) req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body))) // when recorder := httptest.NewRecorder() - handler.Handle(recorder, req) + handle(recorder, req) + stop() // then - assert.Equal(t, 500, recorder.Code) + assertAccepted(t, recorder) assert.Len(t, service.RegisteredServicesIds(), 3) + assert.False(t, marathon.Interactions()) } -func TestForwardHandler_HandleDeploymentInfoWithStopApplicationActionWithNoServicesRegistered(t *testing.T) { +func TestWebHandler_HandleDeploymentInfoWithStopApplicationActionWithNoServicesRegistered(t *testing.T) { t.Parallel() // given @@ -249,20 +265,22 @@ func TestForwardHandler_HandleDeploymentInfoWithStopApplicationActionWithNoServi marathon := marathon.MarathonerStubForApps() service := consul.NewConsulStub() assert.Len(t, service.RegisteredServicesIds(), 0) - handler := NewEventHandler(service, marathon) + handle, stop := NewHandler(Config{WorkersCount: 1}, marathon, service) body, _ := json.Marshal(deploymentInfoWithStopApplicationActionForApps(app)) req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body))) // when recorder := httptest.NewRecorder() - handler.Handle(recorder, req) + handle(recorder, req) + stop() // then - assert.Equal(t, 200, recorder.Code) + assertAccepted(t, recorder) assert.Len(t, service.RegisteredServicesIds(), 0) + assert.False(t, marathon.Interactions()) } -func TestForwardHandler_HandleDeploymentInfoWithInvalidBody(t *testing.T) { +func TestWebHandler_HandleDeploymentInfoWithInvalidBody(t *testing.T) { t.Parallel() // given @@ -270,19 +288,21 @@ func TestForwardHandler_HandleDeploymentInfoWithInvalidBody(t *testing.T) { marathon := marathon.MarathonerStubForApps() service := newConsulStubWithApplicationsTasksRegistered(app) assert.Len(t, service.RegisteredServicesIds(), 3) - handler := NewEventHandler(service, marathon) + handle, stop := NewHandler(Config{WorkersCount: 1}, marathon, service) req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(`{"eventType":"deployment_info", "Plan": 123}`))) // when recorder := httptest.NewRecorder() - handler.Handle(recorder, req) + handle(recorder, req) + stop() // then - assert.Equal(t, 400, recorder.Code) + assertAccepted(t, recorder) assert.Len(t, service.RegisteredServicesIds(), 3) + assert.False(t, marathon.Interactions()) } -func TestForwardHandler_HandleDeploymentInfoWithStopApplicationActionForNonConsulApp(t *testing.T) { +func TestWebHandler_HandleDeploymentInfoWithStopApplicationActionForNonConsulApp(t *testing.T) { t.Parallel() // given @@ -290,20 +310,22 @@ func TestForwardHandler_HandleDeploymentInfoWithStopApplicationActionForNonConsu marathon := marathon.MarathonerStubForApps() service := newConsulStubWithApplicationsTasksRegistered(app) assert.Len(t, service.RegisteredServicesIds(), 3) - handler := NewEventHandler(service, marathon) + handle, stop := NewHandler(Config{WorkersCount: 1}, marathon, service) body, _ := json.Marshal(deploymentInfoWithStopApplicationActionForApps(app)) req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body))) // when recorder := httptest.NewRecorder() - handler.Handle(recorder, req) + handle(recorder, req) + stop() // then - assert.Equal(t, 200, recorder.Code) + assertAccepted(t, recorder) assert.Len(t, service.RegisteredServicesIds(), 3) + assert.False(t, marathon.Interactions()) } -func TestForwardHandler_HandleDeploymentInfoWithStopApplicationActionForCustomServiceName(t *testing.T) { +func TestWebHandler_HandleDeploymentInfoWithStopApplicationActionForCustomServiceName(t *testing.T) { t.Parallel() // given @@ -312,20 +334,22 @@ func TestForwardHandler_HandleDeploymentInfoWithStopApplicationActionForCustomSe marathon := marathon.MarathonerStubForApps() service := newConsulStubWithApplicationsTasksRegistered(app) assert.Len(t, service.RegisteredServicesIds(), 3) - handler := NewEventHandler(service, marathon) + handle, stop := NewHandler(Config{WorkersCount: 1}, marathon, service) body, _ := json.Marshal(deploymentInfoWithStopApplicationActionForApps(app)) req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body))) // when recorder := httptest.NewRecorder() - handler.Handle(recorder, req) + handle(recorder, req) + stop() // then - assert.Equal(t, 200, recorder.Code) + assertAccepted(t, recorder) assert.Len(t, service.RegisteredServicesIds(), 0) + assert.False(t, marathon.Interactions()) } -func TestForwardHandler_NotHandleDeploymentInfoWithScaleApplicationAction(t *testing.T) { +func TestWebHandler_NotHandleDeploymentInfoWithScaleApplicationAction(t *testing.T) { t.Parallel() // given @@ -333,7 +357,7 @@ func TestForwardHandler_NotHandleDeploymentInfoWithScaleApplicationAction(t *tes marathon := marathon.MarathonerStubForApps() service := newConsulStubWithApplicationsTasksRegistered(app) assert.Len(t, service.RegisteredServicesIds(), 3) - handler := NewEventHandler(service, marathon) + handle, stop := NewHandler(Config{WorkersCount: 1}, marathon, service) deploymentInfo := deploymentInfoWithStopApplicationActionForApps(app) deploymentInfo.CurrentStep.Actions[0].Type = "ScaleApplication" body, _ := json.Marshal(deploymentInfo) @@ -341,14 +365,16 @@ func TestForwardHandler_NotHandleDeploymentInfoWithScaleApplicationAction(t *tes // when recorder := httptest.NewRecorder() - handler.Handle(recorder, req) + handle(recorder, req) + stop() // then - assert.Equal(t, 200, recorder.Code) + assertAccepted(t, recorder) assert.Len(t, service.RegisteredServicesIds(), 3) + assert.False(t, marathon.Interactions()) } -func TestForwardHandler_HandleDeploymentInfoWithStopApplicationActionAndProblemsDeregistering(t *testing.T) { +func TestWebHandler_HandleDeploymentInfoWithStopApplicationActionAndProblemsDeregistering(t *testing.T) { t.Parallel() // given @@ -357,18 +383,19 @@ func TestForwardHandler_HandleDeploymentInfoWithStopApplicationActionAndProblems service := newConsulStubWithApplicationsTasksRegistered(app) service.ErrorServices["test_app.1"] = fmt.Errorf("Cannot deregister service") assert.Len(t, service.RegisteredServicesIds(), 3) - handler := NewEventHandler(service, marathon) + handle, stop := NewHandler(Config{WorkersCount: 1}, marathon, service) body, _ := json.Marshal(deploymentInfoWithStopApplicationActionForApps(app)) req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body))) // when recorder := httptest.NewRecorder() - handler.Handle(recorder, req) + handle(recorder, req) + stop() // then - assert.Equal(t, 500, recorder.Code) - assert.Equal(t, "1 errors occured handling service deregistration\n0: Cannot deregister service\n", recorder.Body.String()) + assertAccepted(t, recorder) assert.Len(t, service.RegisteredServicesIds(), 1) + assert.False(t, marathon.Interactions()) } func deploymentInfoWithStopApplicationActionForApps(applications ...*apps.App) *events.DeploymentEvent { @@ -391,7 +418,7 @@ func deploymentInfoWithStopApplicationActionForApps(applications ...*apps.App) * return deploymentInfo } -func TestForwardHandler_HandleDeploymentStepSuccessWithRestartAndRenameApplicationAction(t *testing.T) { +func TestWebHandler_HandleDeploymentStepSuccessWithRestartAndRenameApplicationAction(t *testing.T) { t.Parallel() // given @@ -399,20 +426,22 @@ func TestForwardHandler_HandleDeploymentStepSuccessWithRestartAndRenameApplicati marathon := marathon.MarathonerStubForApps() service := newConsulStubWithApplicationsTasksRegistered(app) assert.Len(t, service.RegisteredServicesIds(), 3) - handler := NewEventHandler(service, marathon) + handle, stop := NewHandler(Config{WorkersCount: 1}, marathon, service) body, _ := json.Marshal(deploymentStepSuccessWithRestartAndRenameApplicationActionForApps(app)) req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body))) // when recorder := httptest.NewRecorder() - handler.Handle(recorder, req) + handle(recorder, req) + stop() // then - assert.Equal(t, 200, recorder.Code) + assertAccepted(t, recorder) assert.Len(t, service.RegisteredServicesIds(), 0) + assert.False(t, marathon.Interactions()) } -func TestForwardHandler_HandleDeploymentStepSuccessWithInvalidBody(t *testing.T) { +func TestWebHandler_HandleDeploymentStepSuccessWithInvalidBody(t *testing.T) { t.Parallel() // given @@ -420,19 +449,21 @@ func TestForwardHandler_HandleDeploymentStepSuccessWithInvalidBody(t *testing.T) marathon := marathon.MarathonerStubForApps() service := newConsulStubWithApplicationsTasksRegistered(app) assert.Len(t, service.RegisteredServicesIds(), 3) - handler := NewEventHandler(service, marathon) + handle, stop := NewHandler(Config{WorkersCount: 1}, marathon, service) req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(`{"eventType":"deployment_step_success", "Plan": 123}`))) // when recorder := httptest.NewRecorder() - handler.Handle(recorder, req) + handle(recorder, req) + stop() // then - assert.Equal(t, 400, recorder.Code) + assertAccepted(t, recorder) assert.Len(t, service.RegisteredServicesIds(), 3) + assert.False(t, marathon.Interactions()) } -func TestForwardHandler_HandleDeploymentStepSuccessWithRestartApplicationActionForMultipleAppsAndProblemsDeregisteringOne(t *testing.T) { +func TestWebHandler_HandleDeploymentStepSuccessWithRestartApplicationActionForMultipleAppsAndProblemsDeregisteringOne(t *testing.T) { t.Parallel() // given @@ -442,18 +473,19 @@ func TestForwardHandler_HandleDeploymentStepSuccessWithRestartApplicationActionF service := newConsulStubWithApplicationsTasksRegistered(app1, app2) service.ErrorServices["test_app.1"] = fmt.Errorf("Cannot deregister service") assert.Len(t, service.RegisteredServicesIds(), 5) - handler := NewEventHandler(service, marathon) + handle, stop := NewHandler(Config{WorkersCount: 1}, marathon, service) body, _ := json.Marshal(deploymentStepSuccessWithRestartAndRenameApplicationActionForApps(app1, app2)) req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body))) // when recorder := httptest.NewRecorder() - handler.Handle(recorder, req) + handle(recorder, req) + stop() // then - assert.Equal(t, 500, recorder.Code) - assert.Equal(t, "1 errors occured handling service deregistration\n0: Cannot deregister service\n", recorder.Body.String()) + assertAccepted(t, recorder) assert.Len(t, service.RegisteredServicesIds(), 1) + assert.False(t, marathon.Interactions()) } func deploymentStepSuccessWithRestartAndRenameApplicationActionForApps(applications ...*apps.App) *events.DeploymentEvent { @@ -483,11 +515,13 @@ func deploymentStepSuccessWithRestartAndRenameApplicationActionForApps(applicati return deploymentInfo } -func TestForwardHandler_NotHandleStatusEventWithInvalidBody(t *testing.T) { +func TestWebHandler_NotHandleStatusEventWithInvalidBody(t *testing.T) { t.Parallel() // given - handler := NewEventHandler(nil, nil) + queue := make(chan event) + handler := newWebHandler(queue) + stopChan := newEventHandler(0, nil, nil, queue).Start() body := `{ "slaveId":"85e59460-a99e-4f16-b91f-145e0ea595bd-S0", "taskId":"python_simple.4a7e99d0-9cc1-11e5-b4d8-0a0027000004", @@ -505,19 +539,20 @@ func TestForwardHandler_NotHandleStatusEventWithInvalidBody(t *testing.T) { // when recorder := httptest.NewRecorder() handler.Handle(recorder, req) + stopChan <- stopEvent{} // then - assert.Equal(t, 400, recorder.Code) - assert.Equal(t, "json: cannot unmarshal number into Go value of type []int\n", - recorder.Body.String()) + assertAccepted(t, recorder) } -func TestForwardHandler_NotHandleStatusEventAboutStartingTask(t *testing.T) { +func TestWebHandler_NotHandleStatusEventAboutStartingTask(t *testing.T) { t.Parallel() // given services := consul.NewConsulStub() - handler := NewEventHandler(services, nil) + queue := make(chan event) + stopChan := newEventHandler(0, services, nil, queue).Start() + handler := newWebHandler(queue) ignoredTaskStatuses := []string{"TASK_STAGING", "TASK_STARTING", "TASK_RUNNING", "unknown"} for _, taskStatus := range ignoredTaskStatuses { body := `{ @@ -539,25 +574,28 @@ func TestForwardHandler_NotHandleStatusEventAboutStartingTask(t *testing.T) { // when recorder := httptest.NewRecorder() handler.Handle(recorder, req) + stopChan <- stopEvent{} // then - assert.Equal(t, 200, recorder.Code) - assert.Len(t, services.RegisteredServicesIds(), 0) + assert.Equal(t, 202, recorder.Code) + assert.Equal(t, "OK\n", recorder.Body.String()) + assert.Empty(t, services.RegisteredServicesIds()) } } -func TestForwardHandler_HandleStatusEventAboutDeadTask(t *testing.T) { +func TestWebHandler_HandleStatusEventAboutDeadTask(t *testing.T) { t.Parallel() - - // given - app := ConsulApp("/test/app", 3) - service := consul.NewConsulStub() - for _, task := range app.Tasks { - service.Register(&task, app) - } - handler := NewEventHandler(service, nil) taskStatuses := []string{"TASK_FINISHED", "TASK_FAILED", "TASK_KILLED", "TASK_LOST"} for _, taskStatus := range taskStatuses { + // given + app := ConsulApp("/test/app", 3) + service := consul.NewConsulStub() + for _, task := range app.Tasks { + service.Register(&task, app) + } + queue := make(chan event) + stopChan := newEventHandler(0, service, nil, queue).Start() + handler := newWebHandler(queue) body := `{ "slaveId":"85e59460-a99e-4f16-b91f-145e0ea595bd-S0", "taskId":"` + app.Tasks[1].ID.String() + `", @@ -577,10 +615,11 @@ func TestForwardHandler_HandleStatusEventAboutDeadTask(t *testing.T) { // when recorder := httptest.NewRecorder() handler.Handle(recorder, req) + stopChan <- stopEvent{} servicesIds := service.RegisteredServicesIds() // then - assert.Equal(t, 200, recorder.Code) + assert.Equal(t, 202, recorder.Code) assert.Equal(t, "OK\n", recorder.Body.String()) assert.Len(t, servicesIds, 2) assert.NotContains(t, servicesIds, app.Tasks[1].ID) @@ -589,13 +628,50 @@ func TestForwardHandler_HandleStatusEventAboutDeadTask(t *testing.T) { } } -func TestForwardHandler_NotHandleStatusEventAboutNonConsulAppsDeadTask(t *testing.T) { +func TestWebHandler_HandleStatusEventAboutDeadTaskErrOnDeregistration(t *testing.T) { + t.Parallel() + + // given + service := consul.NewConsulStub() + service.ErrorServices[apps.TaskId("task.1")] = fmt.Errorf("Cannot deregister task") + queue := make(chan event) + stopChan := newEventHandler(0, service, nil, queue).Start() + handler := newWebHandler(queue) + body := `{ + "slaveId":"85e59460-a99e-4f16-b91f-145e0ea595bd-S0", + "taskId":"task.1", + "taskStatus":"TASK_KILLED", + "message":"Command terminated with signal Terminated", + "appId":"/test/app", + "host":"localhost", + "ports":[ + 31372 + ], + "version":"2015-12-07T09:02:48.981Z", + "eventType":"status_update_event", + "timestamp":"2015-12-07T09:33:40.898Z" + }` + req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body))) + + // when + recorder := httptest.NewRecorder() + handler.Handle(recorder, req) + stopChan <- stopEvent{} + + // then + assertAccepted(t, recorder) + assert.Empty(t, service.RegisteredServicesIds()) +} + +func TestWebHandler_NotHandleStatusEventAboutNonConsulAppsDeadTask(t *testing.T) { t.Parallel() // given app := NonConsulApp("/test/app", 3) service := consul.NewConsulStub() - handler := NewEventHandler(service, nil) + queue := make(chan event) + stopChan := newEventHandler(0, service, nil, queue).Start() + handler := newWebHandler(queue) taskStatuses := []string{"TASK_FINISHED", "TASK_FAILED", "TASK_KILLED", "TASK_LOST"} for _, taskStatus := range taskStatuses { body := `{ @@ -617,57 +693,61 @@ func TestForwardHandler_NotHandleStatusEventAboutNonConsulAppsDeadTask(t *testin // when recorder := httptest.NewRecorder() handler.Handle(recorder, req) + stopChan <- stopEvent{} // then - assert.Equal(t, 200, recorder.Code) + assert.Equal(t, 202, recorder.Code) + assert.Equal(t, "OK\n", recorder.Body.String()) } } -func TestForwardHandler_NotHandleHealthStatusEventWhenAppHasNotConsulLabel(t *testing.T) { +func TestWebHandler_NotHandleHealthStatusEventWhenAppHasNotConsulLabel(t *testing.T) { t.Parallel() // given app := NonConsulApp("/test/app", 3) marathon := marathon.MarathonerStubForApps(app) - service := consul.NewConsulStub() - handler := NewEventHandler(service, marathon) + queue := make(chan event) + stopChan := newEventHandler(0, nil, marathon, queue).Start() + handler := newWebHandler(queue) body := healthStatusChangeEventForTask("test_app.1") req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body))) // when recorder := httptest.NewRecorder() handler.Handle(recorder, req) - servicesIds := service.RegisteredServicesIds() + stopChan <- stopEvent{} // then - assert.Equal(t, 200, recorder.Code) - assert.Len(t, servicesIds, 0) + assertAccepted(t, recorder) + assert.True(t, marathon.Interactions()) } -func TestForwardHandler_HandleHealthStatusEvent(t *testing.T) { +func TestWebHandler_HandleHealthStatusEvent(t *testing.T) { t.Parallel() // given app := ConsulApp("/test/app", 3) marathon := marathon.MarathonerStubForApps(app) service := consul.NewConsulStub() - handler := NewEventHandler(service, marathon) + handle, stop := NewHandler(Config{WorkersCount: 1}, marathon, service) body := healthStatusChangeEventForTask("test_app.1") req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body))) // when recorder := httptest.NewRecorder() - handler.Handle(recorder, req) + handle(recorder, req) + stop() servicesIds := service.RegisteredServicesIds() // then - assert.Equal(t, 200, recorder.Code) - assert.Equal(t, "OK\n", recorder.Body.String()) + assertAccepted(t, recorder) assert.Len(t, servicesIds, 1) assert.Contains(t, servicesIds, app.Tasks[1].ID.String()) + assert.True(t, marathon.Interactions()) } -func TestForwardHandler_HandleHealthStatusEventWithErrorsOnRegistration(t *testing.T) { +func TestWebHandler_HandleHealthStatusEventWithErrorsOnRegistration(t *testing.T) { t.Parallel() // given @@ -675,71 +755,74 @@ func TestForwardHandler_HandleHealthStatusEventWithErrorsOnRegistration(t *testi marathon := marathon.MarathonerStubForApps(app) service := consul.NewConsulStub() service.ErrorServices[app.Tasks[1].ID] = fmt.Errorf("Cannot register task") - handler := NewEventHandler(service, marathon) + handle, stop := NewHandler(Config{WorkersCount: 1}, marathon, service) body := healthStatusChangeEventForTask("test_app.1") req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body))) // when recorder := httptest.NewRecorder() - handler.Handle(recorder, req) - servicesIds := service.RegisteredServicesIds() + handle(recorder, req) + stop() // then - assert.Equal(t, 500, recorder.Code) - assert.Equal(t, "Cannot register task\n", recorder.Body.String()) - assert.Len(t, servicesIds, 0) + assertAccepted(t, recorder) + assert.Empty(t, service.RegisteredServicesIds()) + assert.True(t, marathon.Interactions()) } -func TestForwardHandler_NotHandleHealthStatusEventForTaskWithNotAllHealthChecksPassed(t *testing.T) { +func TestWebHandler_NotHandleHealthStatusEventForTaskWithNotAllHealthChecksPassed(t *testing.T) { t.Parallel() // given app := ConsulApp("/test/app", 3) app.Tasks[1].HealthCheckResults = []apps.HealthCheckResult{{Alive: true}, {Alive: false}} marathon := marathon.MarathonerStubForApps(app) - service := consul.NewConsulStub() - handler := NewEventHandler(service, marathon) + queue := make(chan event) + stopChan := newEventHandler(0, nil, marathon, queue).Start() + handler := newWebHandler(queue) body := healthStatusChangeEventForTask("test_app.1") req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body))) // when recorder := httptest.NewRecorder() handler.Handle(recorder, req) - servicesIds := service.RegisteredServicesIds() + stopChan <- stopEvent{} // then - assert.Equal(t, 200, recorder.Code) - assert.Len(t, servicesIds, 0) + assertAccepted(t, recorder) + assert.True(t, marathon.Interactions()) } -func TestForwardHandler_NotHandleHealthStatusEventForTaskWithNoHealthCheck(t *testing.T) { +func TestWebHandler_NotHandleHealthStatusEventForTaskWithNoHealthCheck(t *testing.T) { t.Parallel() // given app := ConsulApp("/test/app", 1) app.Tasks[0].HealthCheckResults = []apps.HealthCheckResult{} marathon := marathon.MarathonerStubForApps(app) - service := consul.NewConsulStub() - handler := NewEventHandler(service, marathon) - body := healthStatusChangeEventForTask("test_app.0") + queue := make(chan event) + stopChan := newEventHandler(0, nil, marathon, queue).Start() + handler := newWebHandler(queue) + body := healthStatusChangeEventForTask("/test/app.0") req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body))) // when recorder := httptest.NewRecorder() handler.Handle(recorder, req) - servicesIds := service.RegisteredServicesIds() + stopChan <- stopEvent{} // then - assert.Equal(t, 200, recorder.Code) - assert.Len(t, servicesIds, 0) + assertAccepted(t, recorder) + assert.True(t, marathon.Interactions()) } -func TestForwardHandler_NotHandleHealthStatusEventWhenTaskIsNotAlive(t *testing.T) { +func TestWebHandler_NotHandleHealthStatusEventWhenTaskIsNotAlive(t *testing.T) { t.Parallel() // given - services := consul.NewConsulStub() - handler := NewEventHandler(services, nil) + queue := make(chan event) + stopChan := newEventHandler(0, nil, nil, queue).Start() + handler := newWebHandler(queue) body := `{ "appId":"/test/app", "taskId":"test_app.1", @@ -753,17 +836,19 @@ func TestForwardHandler_NotHandleHealthStatusEventWhenTaskIsNotAlive(t *testing. // when recorder := httptest.NewRecorder() handler.Handle(recorder, req) + stopChan <- stopEvent{} // then - assert.Equal(t, 200, recorder.Code) - assert.Len(t, services.RegisteredServicesIds(), 0) + assertAccepted(t, recorder) } -func TestForwardHandler_NotHandleHealthStatusEventWhenBodyIsInvalid(t *testing.T) { +func TestWebHandler_NotHandleHealthStatusEventWhenBodyIsInvalid(t *testing.T) { t.Parallel() // given - handler := NewEventHandler(nil, nil) + queue := make(chan event) + stopChan := newEventHandler(0, nil, nil, queue).Start() + handler := newWebHandler(queue) body := `{ "appId":"/test/app", "taskId":"test_app.1", @@ -777,21 +862,23 @@ func TestForwardHandler_NotHandleHealthStatusEventWhenBodyIsInvalid(t *testing.T // when recorder := httptest.NewRecorder() handler.Handle(recorder, req) + stopChan <- stopEvent{} // then - assert.Equal(t, 400, recorder.Code) - assert.Equal(t, "json: cannot unmarshal number into Go value of type string\n", recorder.Body.String()) + assertAccepted(t, recorder) } -func TestForwardHandler_HandleHealthStatusEventReturn500WhenMarathonReturnedError(t *testing.T) { +func TestWebHandler_HandleHealthStatusEventReturn202WhenMarathonReturnedError(t *testing.T) { t.Parallel() // given app := ConsulApp("/test/app", 3) marathon := marathon.MarathonerStubForApps(app) - handler := NewEventHandler(nil, marathon) + queue := make(chan event) + stopChan := newEventHandler(0, nil, marathon, queue).Start() + handler := newWebHandler(queue) body := `{ - "appId":"/unknown", + "appId":"unknown", "taskId":"unknown.1", "version":"2015-12-07T09:02:48.981Z", "alive":true, @@ -803,29 +890,33 @@ func TestForwardHandler_HandleHealthStatusEventReturn500WhenMarathonReturnedErro // when recorder := httptest.NewRecorder() handler.Handle(recorder, req) + stopChan <- stopEvent{} // then - assert.Equal(t, 500, recorder.Code) - assert.Equal(t, "app not found\n", recorder.Body.String()) + assertAccepted(t, recorder) + assert.True(t, marathon.Interactions()) } -func TestForwardHandler_HandleHealthStatusEventWhenTaskIsNotInMarathon(t *testing.T) { +func TestWebHandler_HandleHealthStatusEventWhenTaskIsNotInMarathon(t *testing.T) { t.Parallel() // given app := ConsulApp("/test/app", 3) marathon := marathon.MarathonerStubForApps(app) - handler := NewEventHandler(nil, marathon) + queue := make(chan event) + stopChan := newEventHandler(0, nil, marathon, queue).Start() + handler := newWebHandler(queue) body := healthStatusChangeEventForTask("unknown.1") req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body))) // when recorder := httptest.NewRecorder() handler.Handle(recorder, req) + stopChan <- stopEvent{} // then - assert.Equal(t, 500, recorder.Code) - assert.Equal(t, "Task unknown.1 not found\n", recorder.Body.String()) + assertAccepted(t, recorder) + assert.True(t, marathon.Interactions()) } func newConsulStubWithApplicationsTasksRegistered(applications ...*apps.App) *consul.ConsulStub { @@ -854,3 +945,8 @@ func healthStatusChangeEventForTask(taskId string) string { "timestamp":"2015-12-07T09:33:50.069Z" }` } + +func assertAccepted(t *testing.T, recorder *httptest.ResponseRecorder) { + assert.Equal(t, 202, recorder.Code) + assert.Equal(t, "OK\n", recorder.Body.String()) +} diff --git a/web/web.go b/web/web.go new file mode 100644 index 0000000..396c59c --- /dev/null +++ b/web/web.go @@ -0,0 +1,30 @@ +package web + +import ( + "net/http" + + service "github.com/allegro/marathon-consul/consul" + "github.com/allegro/marathon-consul/marathon" +) + +type Stop func() +type Handler func(w http.ResponseWriter, r *http.Request) + +func NewHandler(config Config, marathon marathon.Marathoner, service service.ConsulServices) (Handler, Stop) { + + stopChannels := make([]chan<- stopEvent, config.WorkersCount, config.WorkersCount) + eventQueue := make(chan event, config.QueueSize) + for i := 0; i < config.WorkersCount; i++ { + handler := newEventHandler(i, service, marathon, eventQueue) + stopChannels[i] = handler.Start() + } + return newWebHandler(eventQueue).Handle, stop(stopChannels) +} + +func stop(channels []chan<- stopEvent) Stop { + return func() { + for _, channel := range channels { + channel <- stopEvent{} + } + } +} diff --git a/web/web_handler.go b/web/web_handler.go new file mode 100644 index 0000000..0788504 --- /dev/null +++ b/web/web_handler.go @@ -0,0 +1,58 @@ +package web + +import ( + "fmt" + "io/ioutil" + "net/http" + + "time" + + log "github.com/Sirupsen/logrus" + "github.com/allegro/marathon-consul/events" + "github.com/allegro/marathon-consul/metrics" +) + +type WebHandler struct { + eventQueue chan event +} + +func newWebHandler(eventQueue chan event) *WebHandler { + return &WebHandler{eventQueue: eventQueue} +} + +func (h *WebHandler) Handle(w http.ResponseWriter, r *http.Request) { + metrics.Time("events.response", func() { + body, err := ioutil.ReadAll(r.Body) + if err != nil { + log.WithError(err).Debug("Malformed request") + handleBadRequest(err, w) + return + } + log.WithField("Body", string(body)).Debug("Received request") + + eventType, err := events.EventType(body) + if err != nil { + handleBadRequest(err, w) + return + } + + log.WithField("EventType", eventType).Debug("Received event") + metrics.Mark("events.requests." + eventType) + + h.eventQueue <- event{eventType: eventType, body: body, timestamp: time.Now()} + accepted(w) + }) +} + +func handleBadRequest(err error, w http.ResponseWriter) { + metrics.Mark("events.response.error.400") + w.WriteHeader(http.StatusBadRequest) + log.WithError(err).Debug("Returning 400 due to malformed request") + fmt.Fprintln(w, err.Error()) +} + +func accepted(w http.ResponseWriter) { + metrics.Mark("events.response.202") + w.WriteHeader(http.StatusAccepted) + fmt.Fprintln(w, "OK") +}