Skip to content

Commit

Permalink
Remove duplicated code from sse and web modules
Browse files Browse the repository at this point in the history
  • Loading branch information
tomez committed Mar 3, 2017
1 parent 3a0096c commit 3bfc3c2
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 699 deletions.
40 changes: 22 additions & 18 deletions web/event_handler.go → events/event_handler.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package web
package events

import (
"bytes"
Expand All @@ -7,28 +7,32 @@ import (

log "github.com/Sirupsen/logrus"
"github.com/allegro/marathon-consul/apps"
"github.com/allegro/marathon-consul/events"
"github.com/allegro/marathon-consul/marathon"
"github.com/allegro/marathon-consul/metrics"
"github.com/allegro/marathon-consul/service"
)

type event struct {
timestamp time.Time
eventType string
body []byte
type Event struct {
Timestamp time.Time
EventType string
Body []byte
}

type eventHandler struct {
id int
serviceRegistry service.ServiceRegistry
marathon marathon.Marathoner
eventQueue <-chan event
eventQueue <-chan Event
}

type stopEvent struct{}
type StopEvent struct{}

func newEventHandler(id int, serviceRegistry service.ServiceRegistry, marathon marathon.Marathoner, eventQueue <-chan event) *eventHandler {
const (
StatusUpdateEventType = "status_update_event"
HealthStatusChangedEventType = "health_status_changed_event"
)

func NewEventHandler(id int, serviceRegistry service.ServiceRegistry, marathon marathon.Marathoner, eventQueue <-chan Event) *eventHandler {
return &eventHandler{
id: id,
serviceRegistry: serviceRegistry,
Expand All @@ -37,18 +41,18 @@ func newEventHandler(id int, serviceRegistry service.ServiceRegistry, marathon m
}
}

func (fh *eventHandler) start() chan<- stopEvent {
var e event
func (fh *eventHandler) Start() chan<- StopEvent {
var e Event
process := func() {
err := fh.handleEvent(e.eventType, e.body)
err := fh.handleEvent(e.EventType, e.Body)
if err != nil {
metrics.Mark("events.processing.error")
} else {
metrics.Mark("events.processing.succes")
}
}

quitChan := make(chan stopEvent)
quitChan := make(chan StopEvent)
log.WithField("Id", fh.id).Println("Starting worker")
go func() {
for {
Expand All @@ -66,8 +70,8 @@ func (fh *eventHandler) start() chan<- stopEvent {
}
metrics.UpdateGauge("events.queue.util", utilization)

metrics.UpdateGauge("events.queue.delay_ns", time.Since(e.timestamp).Nanoseconds())
metrics.Time("events.processing."+e.eventType, process)
metrics.UpdateGauge("events.queue.delay_ns", time.Since(e.Timestamp).Nanoseconds())
metrics.Time("events.processing."+e.EventType, process)
case <-quitChan:
log.WithField("Id", fh.id).Info("Stopping worker")
}
Expand All @@ -81,9 +85,9 @@ func (fh *eventHandler) handleEvent(eventType string, body []byte) error {
body = replaceTaskIDWithID(body)

switch eventType {
case statusUpdateEventType:
case StatusUpdateEventType:
return fh.handleStatusEvent(body)
case healthStatusChangedEventType:
case HealthStatusChangedEventType:
return fh.handleHealthyTask(body)
default:
err := fmt.Errorf("Unsuported event type: %s", eventType)
Expand All @@ -93,7 +97,7 @@ func (fh *eventHandler) handleEvent(eventType string, body []byte) error {
}

func (fh *eventHandler) handleHealthyTask(body []byte) error {
taskHealthChange, err := events.ParseTaskHealthChange(body)
taskHealthChange, err := ParseTaskHealthChange(body)
if err != nil {
log.WithError(err).Error("Body generated error")
return err
Expand Down
38 changes: 19 additions & 19 deletions web/event_handler_test.go → events/event_handler_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package web
package events

import (
"errors"
Expand All @@ -22,11 +22,11 @@ type handlerStubs struct {
// Creates eventHandler and returns nonbuffered event queue that has to be used to send events to handler and
// function that can be used as a synchronization point to wait until previous event has been processed.
// Under the hood synchronization function simply sends a stop signal to the handlers stopChan.
func testEventHandler(stubs handlerStubs) (chan<- event, func()) {
queue := make(chan event)
awaitChan := newEventHandler(0, stubs.serviceRegistry, stubs.marathon, queue).start()
func testEventHandler(stubs handlerStubs) (chan<- Event, func()) {
queue := make(chan Event)
awaitChan := NewEventHandler(0, stubs.serviceRegistry, stubs.marathon, queue).Start()

return queue, func() { awaitChan <- stopEvent{} }
return queue, func() { awaitChan <- StopEvent{} }
}

func TestEventHandler_NotHandleStatusEventWithInvalidBody(t *testing.T) {
Expand All @@ -50,7 +50,7 @@ func TestEventHandler_NotHandleStatusEventWithInvalidBody(t *testing.T) {
}`)

// when
queue <- event{eventType: "status_update_event", timestamp: time.Now(), body: body}
queue <- Event{EventType: "status_update_event", Timestamp: time.Now(), Body: body}
awaitFunc()

// then
Expand Down Expand Up @@ -82,7 +82,7 @@ func TestEventHandler_NotHandleStatusEventAboutStartingTask(t *testing.T) {
}`)

// when
queue <- event{eventType: "status_update_event", timestamp: time.Now(), body: body}
queue <- Event{EventType: "status_update_event", Timestamp: time.Now(), Body: body}
awaitFunc()

// then
Expand Down Expand Up @@ -125,7 +125,7 @@ func TestEventHandler_HandleStatusEventAboutDeadTask(t *testing.T) {
}`)

// when
queue <- event{eventType: "status_update_event", timestamp: time.Now(), body: body}
queue <- Event{EventType: "status_update_event", Timestamp: time.Now(), Body: body}
awaitFunc()

// then
Expand Down Expand Up @@ -162,7 +162,7 @@ func TestEventHandler_HandleStatusEventAboutDeadTaskErrOnDeregistration(t *testi
}`)

// when
queue <- event{eventType: "status_update_event", timestamp: time.Now(), body: body}
queue <- Event{EventType: "status_update_event", Timestamp: time.Now(), Body: body}
awaitFunc()

// then
Expand Down Expand Up @@ -197,7 +197,7 @@ func TestEventHandler_NotHandleStatusEventAboutNonConsulAppsDeadTask(t *testing.
}`)

// when
queue <- event{eventType: "status_update_event", timestamp: time.Now(), body: body}
queue <- Event{EventType: "status_update_event", Timestamp: time.Now(), Body: body}
awaitFunc()

// then
Expand All @@ -217,7 +217,7 @@ func TestEventHandler_NotHandleHealthStatusEventWhenAppHasNotConsulLabel(t *test
body := healthStatusChangeEventForTask("test_app.1")

// when
queue <- event{eventType: "health_status_changed_event", timestamp: time.Now(), body: body}
queue <- Event{EventType: "health_status_changed_event", Timestamp: time.Now(), Body: body}
awaitFunc()

// then
Expand All @@ -237,7 +237,7 @@ func TestEventHandler_HandleHealthStatusEvent(t *testing.T) {
body := healthStatusChangeEventForTask("test_app.1")

// when
queue <- event{eventType: "health_status_changed_event", timestamp: time.Now(), body: body}
queue <- Event{EventType: "health_status_changed_event", Timestamp: time.Now(), Body: body}
awaitFunc()

// then
Expand All @@ -261,7 +261,7 @@ func TestEventHandler_HandleHealthStatusEventWithErrorsOnRegistration(t *testing
body := healthStatusChangeEventForTask("test_app.1")

// when
queue <- event{eventType: "health_status_changed_event", timestamp: time.Now(), body: body}
queue <- Event{EventType: "health_status_changed_event", Timestamp: time.Now(), Body: body}
awaitFunc()

// then
Expand All @@ -281,7 +281,7 @@ func TestEventHandler_NotHandleHealthStatusEventForTaskWithNotAllHealthChecksPas
body := healthStatusChangeEventForTask("test_app.1")

// when
queue <- event{eventType: "health_status_changed_event", timestamp: time.Now(), body: body}
queue <- Event{EventType: "health_status_changed_event", Timestamp: time.Now(), Body: body}
awaitFunc()

// then
Expand All @@ -301,7 +301,7 @@ func TestEventHandler_NotHandleHealthStatusEventForTaskWithNoHealthCheck(t *test
body := healthStatusChangeEventForTask("/test/app.0")

// when
queue <- event{eventType: "health_status_changed_event", timestamp: time.Now(), body: body}
queue <- Event{EventType: "health_status_changed_event", Timestamp: time.Now(), Body: body}
awaitFunc()

// then
Expand All @@ -327,7 +327,7 @@ func TestEventHandler_NotHandleHealthStatusEventWhenTaskIsNotAlive(t *testing.T)
}`)

// when
queue <- event{eventType: "health_status_changed_event", timestamp: time.Now(), body: body}
queue <- Event{EventType: "health_status_changed_event", Timestamp: time.Now(), Body: body}
awaitFunc()

// then
Expand All @@ -353,7 +353,7 @@ func TestEventHandler_NotHandleHealthStatusEventWhenBodyIsInvalid(t *testing.T)
}`)

// when
queue <- event{eventType: "health_status_changed_event", timestamp: time.Now(), body: body}
queue <- Event{EventType: "health_status_changed_event", Timestamp: time.Now(), Body: body}
awaitFunc()

// then
Expand All @@ -379,7 +379,7 @@ func TestEventHandler_HandleHealthStatusEventReturn202WhenMarathonReturnedError(
}`)

// when
queue <- event{eventType: "health_status_changed_event", timestamp: time.Now(), body: body}
queue <- Event{EventType: "health_status_changed_event", Timestamp: time.Now(), Body: body}
awaitFunc()

// then
Expand All @@ -398,7 +398,7 @@ func TestEventHandler_HandleHealthStatusEventWhenTaskIsNotInMarathon(t *testing.
body := healthStatusChangeEventForTask("unknown.1")

// when
queue <- event{eventType: "health_status_changed_event", timestamp: time.Now(), body: body}
queue <- Event{EventType: "health_status_changed_event", Timestamp: time.Now(), Body: body}
awaitFunc()

// then
Expand Down
20 changes: 10 additions & 10 deletions events/events.go → events/web_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,20 @@ func (t *Timestamp) String() string {
return t.Format(time.RFC3339Nano)
}

type Event struct {
type WebEvent struct {
Type string `json:"eventType"`
Timestamp Timestamp `json:"timestamp"`
}

func ParseEvent(jsonBlob []byte) (Event, error) {
event := Event{}
err := json.Unmarshal(jsonBlob, &event)
func ParseEvent(jsonBlob []byte) (WebEvent, error) {
webEvent := WebEvent{}
err := json.Unmarshal(jsonBlob, &webEvent)
if err != nil {
return Event{}, err
} else if event.Type == "" {
return Event{}, errors.New("Missing event type")
} else if event.Timestamp.Unix() == (time.Time{}).Unix() {
return Event{}, errors.New("Missing timestamp")
return WebEvent{}, err
} else if webEvent.Type == "" {
return WebEvent{}, errors.New("Missing event type")
} else if webEvent.Timestamp.Unix() == (time.Time{}).Unix() {
return WebEvent{}, errors.New("Missing timestamp")
}
return event, nil
return webEvent, nil
}
14 changes: 7 additions & 7 deletions events/events_test.go → events/web_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,39 @@ import (
"github.com/stretchr/testify/assert"
)

func TestEventTypeValid(t *testing.T) {
func TestWebEventTypeValid(t *testing.T) {
t.Parallel()
out, err := ParseEvent([]byte(`{"eventType":"test","timestamp": "2014-03-01T23:29:30.158Z"}`))
assert.Nil(t, err)
assert.Equal(t, "2014-03-01T23:29:30.158Z", out.Timestamp.String())
}

func TestEventTypeInvalid(t *testing.T) {
func TestWebEventTypeInvalid(t *testing.T) {
t.Parallel()

out, err := ParseEvent([]byte(`{}`))
assert.Error(t, err)
assert.Equal(t, out, Event{})
assert.Equal(t, out, WebEvent{})
}

func TestTimestampInvalid(t *testing.T) {
t.Parallel()
out, err := ParseEvent([]byte(`{"eventType":"test","timestamp": "invalid"}`))
assert.Error(t, err)
assert.Equal(t, out, Event{})
assert.Equal(t, out, WebEvent{})
}

func TestMissingTimestamp(t *testing.T) {
t.Parallel()
out, err := ParseEvent([]byte(`{"eventType":"test"}`))
assert.EqualError(t, err, "Missing timestamp")
assert.Equal(t, out, Event{})
assert.Equal(t, out, WebEvent{})
}

func TestEventTypeInvalidJson(t *testing.T) {
func TestWebEventTypeInvalidJson(t *testing.T) {
t.Parallel()

out, err := ParseEvent([]byte(`not a json`))
assert.Error(t, err)
assert.Equal(t, out, Event{})
assert.Equal(t, out, WebEvent{})
}

0 comments on commit 3bfc3c2

Please sign in to comment.