Skip to content

Commit

Permalink
make receiving events nonblocking even when queue is full
Browse files Browse the repository at this point in the history
  • Loading branch information
adamdubiel committed Dec 25, 2016
1 parent df6a8a7 commit 7dffeb4
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 33 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,7 @@ bump:
git add .goxc.json
git commit -m "Bumped version"

format:
go fmt $(PACKAGES)

.PHONY: all build test xcompile package dist
2 changes: 1 addition & 1 deletion install_consul.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ hash consul 2>/dev/null || {
os="windows"
arch="386"
fi
version="0.7.0"
version="0.7.2"
archive="consul_${version}_${os}_${arch}.zip"

mkdir -p bin
Expand Down
30 changes: 20 additions & 10 deletions marathon/marathon_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,56 @@ import (
"errors"

"github.com/allegro/marathon-consul/apps"
"sync"
)

type MarathonerStub struct {
AppsStub []*apps.App
AppStub map[apps.AppID]*apps.App
TasksStub map[apps.AppID][]*apps.Task
leader string
interactions bool
AppsStub []*apps.App
AppStub map[apps.AppID]*apps.App
TasksStub map[apps.AppID][]*apps.Task
leader string
interactionsMu sync.RWMutex
interactions bool
}

func (m *MarathonerStub) ConsulApps() ([]*apps.App, error) {
m.interactions = true
m.noteInteraction()
return m.AppsStub, nil
}

func (m *MarathonerStub) App(id apps.AppID) (*apps.App, error) {
m.interactions = true
m.noteInteraction()
if app, ok := m.AppStub[id]; ok {
return app, nil
}
return nil, errors.New("app not found")
}

func (m *MarathonerStub) Tasks(appID apps.AppID) ([]*apps.Task, error) {
m.interactions = true
m.noteInteraction()
if app, ok := m.TasksStub[appID]; ok {
return app, nil
}
return nil, errors.New("app not found")
}

func (m *MarathonerStub) Leader() (string, error) {
m.interactions = true
m.noteInteraction()
return m.leader, nil
}

func (m MarathonerStub) Interactions() bool {
func (m *MarathonerStub) Interactions() bool {
m.interactionsMu.RLock()
defer m.interactionsMu.RUnlock()
return m.interactions
}

func (m *MarathonerStub) noteInteraction() {
m.interactionsMu.Lock()
defer m.interactionsMu.Unlock()
m.interactions = true
}

func MarathonerStubWithLeaderForApps(leader string, args ...*apps.App) *MarathonerStub {
stub := MarathonerStubForApps(args...)
stub.leader = leader
Expand Down
15 changes: 10 additions & 5 deletions web/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ type event struct {
}

type eventHandler struct {
id int
serviceRegistry service.ServiceRegistry
marathon marathon.Marathoner
eventQueue <-chan event
id int
serviceRegistry service.ServiceRegistry
marathon marathon.Marathoner
eventQueue <-chan event
eventQueueCapacity int
}

type stopEvent struct{}
Expand Down Expand Up @@ -55,7 +56,11 @@ func (fh *eventHandler) start() chan<- stopEvent {
select {
case e = <-fh.eventQueue:
metrics.Mark(fmt.Sprintf("events.handler.%d", fh.id))
metrics.UpdateGauge("events.queue.len", int64(len(fh.eventQueue)))

ql := int64(len(fh.eventQueue))
metrics.UpdateGauge("events.queue.len", ql)
metrics.UpdateGauge("events.queue.util", 100*(ql/int64(cap(fh.eventQueue))))

metrics.UpdateGauge("events.queue.delay_ns", time.Since(e.timestamp).Nanoseconds())
metrics.Time("events.processing."+e.eventType, process)
case <-quitChan:
Expand Down
32 changes: 16 additions & 16 deletions web/event_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestWebHandler_DropUnknownEventType(t *testing.T) {
t.Parallel()

// given
queue := make(chan event)
queue := make(chan event, 10)
handler := newWebHandler(queue, maxEventSize)
stopChan := newEventHandler(0, nil, nil, queue).start()
req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(`{"eventType":"test_event"}`)))
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestWebHandler_DropAppInvalidBody(t *testing.T) {
t.Parallel()

// given
queue := make(chan event)
queue := make(chan event, 10)
handler := newWebHandler(queue, maxEventSize)
stopChan := newEventHandler(0, nil, nil, queue).start()
body := `{"type": "app_terminated_event", "appID": 123}`
Expand All @@ -135,7 +135,7 @@ func TestWebHandler_NotHandleStatusEventWithInvalidBody(t *testing.T) {
t.Parallel()

// given
queue := make(chan event)
queue := make(chan event, 10)
handler := newWebHandler(queue, maxEventSize)
stopChan := newEventHandler(0, nil, nil, queue).start()
body := `{
Expand Down Expand Up @@ -166,7 +166,7 @@ func TestWebHandler_NotHandleStatusEventAboutStartingTask(t *testing.T) {

// given
services := consul.NewConsulStub()
queue := make(chan event)
queue := make(chan event, 10)
stopChan := newEventHandler(0, services, nil, queue).start()
handler := newWebHandler(queue, maxEventSize)
ignoredTaskStatuses := []string{"TASK_STAGING", "TASK_STARTING", "TASK_RUNNING", "unknown"}
Expand Down Expand Up @@ -209,7 +209,7 @@ func TestWebHandler_HandleStatusEventAboutDeadTask(t *testing.T) {
for _, task := range app.Tasks {
service.Register(&task, app)
}
queue := make(chan event)
queue := make(chan event, 10)
stopChan := newEventHandler(0, service, nil, queue).start()
handler := newWebHandler(queue, maxEventSize)
body := `{
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestWebHandler_HandleStatusEventAboutDeadTaskErrOnDeregistration(t *testing
// given
service := consul.NewConsulStub()
service.FailDeregisterByTaskForID("task.1")
queue := make(chan event)
queue := make(chan event, 10)
stopChan := newEventHandler(0, service, nil, queue).start()
handler := newWebHandler(queue, maxEventSize)
body := `{
Expand Down Expand Up @@ -285,7 +285,7 @@ func TestWebHandler_NotHandleStatusEventAboutNonConsulAppsDeadTask(t *testing.T)
// given
app := NonConsulApp("/test/app", 3)
service := consul.NewConsulStub()
queue := make(chan event)
queue := make(chan event, 10)
stopChan := newEventHandler(0, service, nil, queue).start()
handler := newWebHandler(queue, maxEventSize)
taskStatuses := []string{"TASK_FINISHED", "TASK_FAILED", "TASK_KILLED", "TASK_LOST"}
Expand Down Expand Up @@ -323,7 +323,7 @@ func TestWebHandler_NotHandleHealthStatusEventWhenAppHasNotConsulLabel(t *testin
// given
app := NonConsulApp("/test/app", 3)
marathon := marathon.MarathonerStubForApps(app)
queue := make(chan event)
queue := make(chan event, 10)
stopChan := newEventHandler(0, nil, marathon, queue).start()
handler := newWebHandler(queue, maxEventSize)
body := healthStatusChangeEventForTask("test_app.1")
Expand All @@ -346,7 +346,7 @@ func TestWebHandler_HandleHealthStatusEvent(t *testing.T) {
app := ConsulApp("/test/app", 3)
marathon := marathon.MarathonerStubForApps(app)
service := consul.NewConsulStub()
handle, stop := NewHandler(Config{WorkersCount: 1}, marathon, service)
handle, stop := NewHandler(Config{WorkersCount: 1, QueueSize: 10}, marathon, service)
body := healthStatusChangeEventForTask("test_app.1")
req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body)))

Expand All @@ -371,7 +371,7 @@ func TestWebHandler_HandleHealthStatusEventWithErrorsOnRegistration(t *testing.T
marathon := marathon.MarathonerStubForApps(app)
service := consul.NewConsulStub()
service.FailRegisterForID(app.Tasks[1].ID)
handle, stop := NewHandler(Config{WorkersCount: 1}, marathon, service)
handle, stop := NewHandler(Config{WorkersCount: 1, QueueSize: 10}, marathon, service)
body := healthStatusChangeEventForTask("test_app.1")
req, _ := http.NewRequest("POST", "/events", bytes.NewBuffer([]byte(body)))

Expand All @@ -393,7 +393,7 @@ func TestWebHandler_NotHandleHealthStatusEventForTaskWithNotAllHealthChecksPasse
app := ConsulApp("/test/app", 3)
app.Tasks[1].HealthCheckResults = []apps.HealthCheckResult{{Alive: true}, {Alive: false}}
marathon := marathon.MarathonerStubForApps(app)
queue := make(chan event)
queue := make(chan event, 10)
stopChan := newEventHandler(0, nil, marathon, queue).start()
handler := newWebHandler(queue, maxEventSize)
body := healthStatusChangeEventForTask("test_app.1")
Expand All @@ -416,7 +416,7 @@ func TestWebHandler_NotHandleHealthStatusEventForTaskWithNoHealthCheck(t *testin
app := ConsulApp("/test/app", 1)
app.Tasks[0].HealthCheckResults = []apps.HealthCheckResult{}
marathon := marathon.MarathonerStubForApps(app)
queue := make(chan event)
queue := make(chan event, 10)
stopChan := newEventHandler(0, nil, marathon, queue).start()
handler := newWebHandler(queue, maxEventSize)
body := healthStatusChangeEventForTask("/test/app.0")
Expand All @@ -436,7 +436,7 @@ func TestWebHandler_NotHandleHealthStatusEventWhenTaskIsNotAlive(t *testing.T) {
t.Parallel()

// given
queue := make(chan event)
queue := make(chan event, 10)
stopChan := newEventHandler(0, nil, nil, queue).start()
handler := newWebHandler(queue, maxEventSize)
body := `{
Expand All @@ -462,7 +462,7 @@ func TestWebHandler_NotHandleHealthStatusEventWhenBodyIsInvalid(t *testing.T) {
t.Parallel()

// given
queue := make(chan event)
queue := make(chan event, 10)
stopChan := newEventHandler(0, nil, nil, queue).start()
handler := newWebHandler(queue, maxEventSize)
body := `{
Expand Down Expand Up @@ -490,7 +490,7 @@ func TestWebHandler_HandleHealthStatusEventReturn202WhenMarathonReturnedError(t
// given
app := ConsulApp("/test/app", 3)
marathon := marathon.MarathonerStubForApps(app)
queue := make(chan event)
queue := make(chan event, 10)
stopChan := newEventHandler(0, nil, marathon, queue).start()
handler := newWebHandler(queue, maxEventSize)
body := `{
Expand Down Expand Up @@ -519,7 +519,7 @@ func TestWebHandler_HandleHealthStatusEventWhenTaskIsNotInMarathon(t *testing.T)
// given
app := ConsulApp("/test/app", 3)
marathon := marathon.MarathonerStubForApps(app)
queue := make(chan event)
queue := make(chan event, 10)
stopChan := newEventHandler(0, nil, marathon, queue).start()
handler := newWebHandler(queue, maxEventSize)
body := healthStatusChangeEventForTask("unknown.1")
Expand Down
6 changes: 5 additions & 1 deletion web/web_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ func (h *EventHandler) Handle(w http.ResponseWriter, r *http.Request) {
return
}

h.eventQueue <- event{eventType: e.Type, body: body, timestamp: time.Now()}
select {
case h.eventQueue <- event{eventType: e.Type, body: body, timestamp: time.Now()}:
default:
metrics.Mark("events.queue.drop")
}
accept(w)

})
Expand Down

0 comments on commit 7dffeb4

Please sign in to comment.