Skip to content

Commit

Permalink
make receiving events nonblocking even when queue is full
Browse files Browse the repository at this point in the history
  • Loading branch information
adamdubiel committed Dec 22, 2016
1 parent df6a8a7 commit b49bdc9
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 42 deletions.
26 changes: 16 additions & 10 deletions web/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,22 @@ type event struct {
}

type eventHandler struct {
id int
serviceRegistry service.ServiceRegistry
marathon marathon.Marathoner
eventQueue <-chan event
id int
serviceRegistry service.ServiceRegistry
marathon marathon.Marathoner
eventQueue <-chan event
eventQueueCapacity int
}

type stopEvent struct{}

func newEventHandler(id int, serviceRegistry service.ServiceRegistry, marathon marathon.Marathoner, eventQueue <-chan event) *eventHandler {
func newEventHandler(id int, serviceRegistry service.ServiceRegistry, marathon marathon.Marathoner, eventQueue <-chan event, eventQueueCapacity int) *eventHandler {
return &eventHandler{
id: id,
serviceRegistry: serviceRegistry,
marathon: marathon,
eventQueue: eventQueue,
id: id,
serviceRegistry: serviceRegistry,
marathon: marathon,
eventQueue: eventQueue,
eventQueueCapacity: eventQueueCapacity,
}
}

Expand All @@ -55,7 +57,11 @@ func (fh *eventHandler) start() chan<- stopEvent {
select {
case e = <-fh.eventQueue:
metrics.Mark(fmt.Sprintf("events.handler.%d", fh.id))
metrics.UpdateGauge("events.queue.len", int64(len(fh.eventQueue)))

ql := int64(len(fh.eventQueue))
metrics.UpdateGauge("events.queue.len", ql)
metrics.UpdateGauge("events.queue.util", 100*(ql/int64(fh.eventQueueCapacity)))

metrics.UpdateGauge("events.queue.delay_ns", time.Since(e.timestamp).Nanoseconds())
metrics.Time("events.processing."+e.eventType, process)
case <-quitChan:
Expand Down
60 changes: 30 additions & 30 deletions web/event_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ func TestWebHandler_DropUnknownEventType(t *testing.T) {
t.Parallel()

// given
queue := make(chan event)
queue := make(chan event, 10)
handler := newWebHandler(queue, maxEventSize)
stopChan := newEventHandler(0, nil, nil, queue).start()
stopChan := newEventHandler(0, nil, nil, queue, 10).start()
req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(`{"eventType":"test_event"}`)))

// when
Expand Down Expand Up @@ -116,9 +116,9 @@ func TestWebHandler_DropAppInvalidBody(t *testing.T) {
t.Parallel()

// given
queue := make(chan event)
queue := make(chan event, 10)
handler := newWebHandler(queue, maxEventSize)
stopChan := newEventHandler(0, nil, nil, queue).start()
stopChan := newEventHandler(0, nil, nil, queue, 10).start()
body := `{"type": "app_terminated_event", "appID": 123}`
req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body)))

Expand All @@ -135,9 +135,9 @@ func TestWebHandler_NotHandleStatusEventWithInvalidBody(t *testing.T) {
t.Parallel()

// given
queue := make(chan event)
queue := make(chan event, 10)
handler := newWebHandler(queue, maxEventSize)
stopChan := newEventHandler(0, nil, nil, queue).start()
stopChan := newEventHandler(0, nil, nil, queue, 10).start()
body := `{
"slaveId":"85e59460-a99e-4f16-b91f-145e0ea595bd-S0",
"taskId":"python_simple.4a7e99d0-9cc1-11e5-b4d8-0a0027000004",
Expand Down Expand Up @@ -166,8 +166,8 @@ func TestWebHandler_NotHandleStatusEventAboutStartingTask(t *testing.T) {

// given
services := consul.NewConsulStub()
queue := make(chan event)
stopChan := newEventHandler(0, services, nil, queue).start()
queue := make(chan event, 10)
stopChan := newEventHandler(0, services, nil, queue, 10).start()
handler := newWebHandler(queue, maxEventSize)
ignoredTaskStatuses := []string{"TASK_STAGING", "TASK_STARTING", "TASK_RUNNING", "unknown"}
for _, taskStatus := range ignoredTaskStatuses {
Expand Down Expand Up @@ -209,8 +209,8 @@ func TestWebHandler_HandleStatusEventAboutDeadTask(t *testing.T) {
for _, task := range app.Tasks {
service.Register(&task, app)
}
queue := make(chan event)
stopChan := newEventHandler(0, service, nil, queue).start()
queue := make(chan event, 10)
stopChan := newEventHandler(0, service, nil, queue, 10).start()
handler := newWebHandler(queue, maxEventSize)
body := `{
"slaveId":"85e59460-a99e-4f16-b91f-145e0ea595bd-S0",
Expand Down Expand Up @@ -250,8 +250,8 @@ func TestWebHandler_HandleStatusEventAboutDeadTaskErrOnDeregistration(t *testing
// given
service := consul.NewConsulStub()
service.FailDeregisterByTaskForID("task.1")
queue := make(chan event)
stopChan := newEventHandler(0, service, nil, queue).start()
queue := make(chan event, 10)
stopChan := newEventHandler(0, service, nil, queue, 10).start()
handler := newWebHandler(queue, maxEventSize)
body := `{
"slaveId":"85e59460-a99e-4f16-b91f-145e0ea595bd-S0",
Expand Down Expand Up @@ -285,8 +285,8 @@ func TestWebHandler_NotHandleStatusEventAboutNonConsulAppsDeadTask(t *testing.T)
// given
app := NonConsulApp("/test/app", 3)
service := consul.NewConsulStub()
queue := make(chan event)
stopChan := newEventHandler(0, service, nil, queue).start()
queue := make(chan event, 10)
stopChan := newEventHandler(0, service, nil, queue, 10).start()
handler := newWebHandler(queue, maxEventSize)
taskStatuses := []string{"TASK_FINISHED", "TASK_FAILED", "TASK_KILLED", "TASK_LOST"}
for _, taskStatus := range taskStatuses {
Expand Down Expand Up @@ -323,8 +323,8 @@ func TestWebHandler_NotHandleHealthStatusEventWhenAppHasNotConsulLabel(t *testin
// given
app := NonConsulApp("/test/app", 3)
marathon := marathon.MarathonerStubForApps(app)
queue := make(chan event)
stopChan := newEventHandler(0, nil, marathon, queue).start()
queue := make(chan event, 10)
stopChan := newEventHandler(0, nil, marathon, queue, 10).start()
handler := newWebHandler(queue, maxEventSize)
body := healthStatusChangeEventForTask("test_app.1")
req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body)))
Expand All @@ -346,7 +346,7 @@ func TestWebHandler_HandleHealthStatusEvent(t *testing.T) {
app := ConsulApp("/test/app", 3)
marathon := marathon.MarathonerStubForApps(app)
service := consul.NewConsulStub()
handle, stop := NewHandler(Config{WorkersCount: 1}, marathon, service)
handle, stop := NewHandler(Config{WorkersCount: 1, QueueSize: 10}, marathon, service)
body := healthStatusChangeEventForTask("test_app.1")
req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body)))

Expand All @@ -371,7 +371,7 @@ func TestWebHandler_HandleHealthStatusEventWithErrorsOnRegistration(t *testing.T
marathon := marathon.MarathonerStubForApps(app)
service := consul.NewConsulStub()
service.FailRegisterForID(app.Tasks[1].ID)
handle, stop := NewHandler(Config{WorkersCount: 1}, marathon, service)
handle, stop := NewHandler(Config{WorkersCount: 1, QueueSize: 10}, marathon, service)
body := healthStatusChangeEventForTask("test_app.1")
req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body)))

Expand All @@ -393,8 +393,8 @@ func TestWebHandler_NotHandleHealthStatusEventForTaskWithNotAllHealthChecksPasse
app := ConsulApp("/test/app", 3)
app.Tasks[1].HealthCheckResults = []apps.HealthCheckResult{{Alive: true}, {Alive: false}}
marathon := marathon.MarathonerStubForApps(app)
queue := make(chan event)
stopChan := newEventHandler(0, nil, marathon, queue).start()
queue := make(chan event, 10)
stopChan := newEventHandler(0, nil, marathon, queue, 10).start()
handler := newWebHandler(queue, maxEventSize)
body := healthStatusChangeEventForTask("test_app.1")
req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body)))
Expand All @@ -416,8 +416,8 @@ func TestWebHandler_NotHandleHealthStatusEventForTaskWithNoHealthCheck(t *testin
app := ConsulApp("/test/app", 1)
app.Tasks[0].HealthCheckResults = []apps.HealthCheckResult{}
marathon := marathon.MarathonerStubForApps(app)
queue := make(chan event)
stopChan := newEventHandler(0, nil, marathon, queue).start()
queue := make(chan event, 10)
stopChan := newEventHandler(0, nil, marathon, queue, 10).start()
handler := newWebHandler(queue, maxEventSize)
body := healthStatusChangeEventForTask("/test/app.0")
req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body)))
Expand All @@ -436,8 +436,8 @@ func TestWebHandler_NotHandleHealthStatusEventWhenTaskIsNotAlive(t *testing.T) {
t.Parallel()

// given
queue := make(chan event)
stopChan := newEventHandler(0, nil, nil, queue).start()
queue := make(chan event, 10)
stopChan := newEventHandler(0, nil, nil, queue, 10).start()
handler := newWebHandler(queue, maxEventSize)
body := `{
"appId":"/test/app",
Expand All @@ -462,8 +462,8 @@ func TestWebHandler_NotHandleHealthStatusEventWhenBodyIsInvalid(t *testing.T) {
t.Parallel()

// given
queue := make(chan event)
stopChan := newEventHandler(0, nil, nil, queue).start()
queue := make(chan event, 10)
stopChan := newEventHandler(0, nil, nil, queue, 10).start()
handler := newWebHandler(queue, maxEventSize)
body := `{
"appId":"/test/app",
Expand All @@ -490,8 +490,8 @@ func TestWebHandler_HandleHealthStatusEventReturn202WhenMarathonReturnedError(t
// given
app := ConsulApp("/test/app", 3)
marathon := marathon.MarathonerStubForApps(app)
queue := make(chan event)
stopChan := newEventHandler(0, nil, marathon, queue).start()
queue := make(chan event, 10)
stopChan := newEventHandler(0, nil, marathon, queue, 10).start()
handler := newWebHandler(queue, maxEventSize)
body := `{
"appId":"unknown",
Expand Down Expand Up @@ -519,8 +519,8 @@ func TestWebHandler_HandleHealthStatusEventWhenTaskIsNotInMarathon(t *testing.T)
// given
app := ConsulApp("/test/app", 3)
marathon := marathon.MarathonerStubForApps(app)
queue := make(chan event)
stopChan := newEventHandler(0, nil, marathon, queue).start()
queue := make(chan event, 10)
stopChan := newEventHandler(0, nil, marathon, queue, 10).start()
handler := newWebHandler(queue, maxEventSize)
body := healthStatusChangeEventForTask("unknown.1")
req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body)))
Expand Down
2 changes: 1 addition & 1 deletion web/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func NewHandler(config Config, marathon marathon.Marathoner, serviceOperations s
stopChannels := make([]chan<- stopEvent, config.WorkersCount, config.WorkersCount)
eventQueue := make(chan event, config.QueueSize)
for i := 0; i < config.WorkersCount; i++ {
handler := newEventHandler(i, serviceOperations, marathon, eventQueue)
handler := newEventHandler(i, serviceOperations, marathon, eventQueue, config.QueueSize)
stopChannels[i] = handler.start()
}
return newWebHandler(eventQueue, config.MaxEventSize).Handle, stop(stopChannels)
Expand Down
6 changes: 5 additions & 1 deletion web/web_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ func (h *EventHandler) Handle(w http.ResponseWriter, r *http.Request) {
return
}

h.eventQueue <- event{eventType: e.Type, body: body, timestamp: time.Now()}
select {
case h.eventQueue <- event{eventType: e.Type, body: body, timestamp: time.Now()}:
default:
metrics.Mark("events.queue.drop")
}
accept(w)

})
Expand Down

0 comments on commit b49bdc9

Please sign in to comment.