Skip to content

Commit

Permalink
Fix marathon events delay, now fetched from event data
Browse files Browse the repository at this point in the history
  • Loading branch information
tomez committed Mar 10, 2017
1 parent 7a2fe39 commit f243a73
Show file tree
Hide file tree
Showing 10 changed files with 38 additions and 32 deletions.
3 changes: 3 additions & 0 deletions apps/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package apps
import (
"encoding/json"
"strings"

"github.com/allegro/marathon-consul/time"
)

type Task struct {
ID TaskID `json:"id"`
Timestamp time.Timestamp `json:"timestamp"`
TaskStatus string `json:"taskStatus"`
AppID AppID `json:"appId"`
Host string `json:"host"`
Expand Down
5 changes: 4 additions & 1 deletion events/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ func (fh *eventHandler) handleHealthyTask(body []byte) error {
log.WithError(err).Error("Body generated error")
return err
}
delay := taskHealthChange.Timestamp.Delay()
metrics.UpdateGauge("events.read.delay.current", delay)

appID := taskHealthChange.AppID
taskID := taskHealthChange.TaskID()
Expand Down Expand Up @@ -146,11 +148,12 @@ func (fh *eventHandler) handleHealthyTask(body []byte) error {

func (fh *eventHandler) handleStatusEvent(body []byte) error {
task, err := apps.ParseTask(body)

if err != nil {
log.WithError(err).WithField("Body", body).Error("Could not parse event body")
return err
}
delay := task.Timestamp.Delay()
metrics.UpdateGauge("events.read.delay.current", delay)

log.WithFields(log.Fields{
"Id": task.ID,
Expand Down
1 change: 0 additions & 1 deletion events/sse_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
// Event holds state of parsed fields from marathon EventStream
type SSEEvent struct {
Type string
Timestamp Timestamp
Body []byte
ID string
Delay string
Expand Down
2 changes: 1 addition & 1 deletion events/sse_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
func TestEvent_IfEventIsEmptyReturnsFalse(t *testing.T) {
t.Parallel()
// given
event := &SSEEvent{Timestamp: Timestamp{},
event := &SSEEvent{
Type: "status_update_event",
Body: []byte(`{"id": "simpleId"}`),
ID: "id",
Expand Down
3 changes: 2 additions & 1 deletion events/task_health_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import (
"regexp"

"github.com/allegro/marathon-consul/apps"
"github.com/allegro/marathon-consul/time"
)

type TaskHealthChange struct {
Timestamp string `json:"timestamp"`
Timestamp time.Timestamp `json:"timestamp"`
// Prefer TaskID() instead of ID
ID apps.TaskID `json:"id"`
InstanceID string `json:"instanceId"`
Expand Down
3 changes: 2 additions & 1 deletion events/task_health_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"encoding/json"
"testing"

"github.com/allegro/marathon-consul/time"
"github.com/stretchr/testify/assert"
)

var testHealthChange = &TaskHealthChange{
Timestamp: "2014-03-01T23:29:30.158Z",
Timestamp: time.StringToTimestamp("2014-03-01T23:29:30.158Z"),
ID: "my-app_0-1396592784349",
Alive: true,
AppID: "/my-app",
Expand Down
28 changes: 5 additions & 23 deletions events/web_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,13 @@ package events
import (
"encoding/json"
"errors"
"strings"
"time"
)

type Timestamp struct {
time.Time
}

func (t *Timestamp) UnmarshalJSON(b []byte) (err error) {
s := strings.Trim(string(b), "\"")
if s == "null" {
t.Time = time.Time{}
return
}
t.Time, err = time.Parse(time.RFC3339Nano, s)
return
}

func (t *Timestamp) String() string {
return t.Format(time.RFC3339Nano)
}
"github.com/allegro/marathon-consul/time"
)

type WebEvent struct {
Type string `json:"eventType"`
Timestamp Timestamp `json:"timestamp"`
Type string `json:"eventType"`
Timestamp time.Timestamp `json:"timestamp"`
}

func ParseEvent(jsonBlob []byte) (WebEvent, error) {
Expand All @@ -37,7 +19,7 @@ func ParseEvent(jsonBlob []byte) (WebEvent, error) {
return WebEvent{}, err
} else if webEvent.Type == "" {
return WebEvent{}, errors.New("Missing event type")
} else if webEvent.Timestamp.Unix() == (time.Time{}).Unix() {
} else if webEvent.Timestamp.Missing() {
return WebEvent{}, errors.New("Missing timestamp")
}
return webEvent, nil
Expand Down
2 changes: 0 additions & 2 deletions sse/sse_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ func (h *SSEHandler) handle() {
}
}
metrics.Mark("events.read." + e.Type)
delay := time.Now().Unix() - e.Timestamp.Unix()
metrics.UpdateGauge("events.read.delay.current", delay)
if e.Type != events.StatusUpdateEventType && e.Type != events.HealthStatusChangedEventType {
log.Debugf("%s is not supported", e.Type)
metrics.Mark("events.read.drop")
Expand Down
21 changes: 21 additions & 0 deletions time/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ type Timestamp struct {
time.Time
}

func (t *Timestamp) nowFunc() time.Time {
ts := StringToTimestamp("2017-03-10T09:29:30.158Z")
return ts.Time
}

func (t *Timestamp) UnmarshalJSON(b []byte) (err error) {
s := strings.Trim(string(b), "\"")
if s == "null" {
Expand All @@ -25,10 +30,26 @@ func (t Timestamp) MarshalJSON() ([]byte, error) {
return json.Marshal(t.String())
}

func (t *Timestamp) Delay() int64 {
return t.nowFunc().Unix() - t.Unix()
}

func (t *Timestamp) String() string {
return t.Format(time.RFC3339Nano)
}

func (t *Timestamp) Missing() bool {
return t.Unix() == (time.Time{}).Unix()
}

func StringToTimestamp(date string) Timestamp {
t, err := time.Parse("2006-01-02T15:04:05.000Z", date)
if err != nil {
return Timestamp{Time: time.Time{}}
}
return Timestamp{Time: t}
}

type Interval struct {
time.Duration
}
Expand Down
2 changes: 0 additions & 2 deletions web/web_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ func (h *EventHandler) Handle(w http.ResponseWriter, r *http.Request) {
}

metrics.Mark("events.requests." + e.Type)
delay := time.Now().Unix() - e.Timestamp.Unix()
metrics.UpdateGauge("events.requests.delay.current", delay)
log.WithFields(log.Fields{"EventType": e.Type, "OriginalTimestamp": e.Timestamp.String()}).Debug("Received event")

if e.Type != events.StatusUpdateEventType && e.Type != events.HealthStatusChangedEventType {
Expand Down

0 comments on commit f243a73

Please sign in to comment.