Skip to content

Commit

Permalink
Fix marathon events delay, now fetched from event data
Browse files Browse the repository at this point in the history
  • Loading branch information
tomez committed Mar 10, 2017
1 parent 7a2fe39 commit 1cac59b
Show file tree
Hide file tree
Showing 11 changed files with 41 additions and 33 deletions.
8 changes: 7 additions & 1 deletion apps/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,16 @@ package apps
import (
"encoding/json"
"strings"

"github.com/allegro/marathon-consul/time"
)

type Task struct {
ID TaskID `json:"id"`
ID TaskID `json:"id"`
// Timestamp field is not a part of a Marathon task object.
// It's only present in StatusUpdateEventType and we are using this struct for decoding it.
// As well as for Marathon Task.
Timestamp time.Timestamp `json:"timestamp"`
TaskStatus string `json:"taskStatus"`
AppID AppID `json:"appId"`
Host string `json:"host"`
Expand Down
3 changes: 3 additions & 0 deletions apps/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io/ioutil"
"testing"

"github.com/allegro/marathon-consul/time"
"github.com/stretchr/testify/assert"
)

Expand All @@ -13,6 +14,7 @@ func TestParseTask(t *testing.T) {

testTask := &Task{
ID: "my-app_0-1396592784349",
Timestamp: time.Timestamp{},
AppID: "/my-app",
Host: "slave-1234.acme.org",
Ports: []int{31372},
Expand Down Expand Up @@ -40,6 +42,7 @@ func TestParseTasks(t *testing.T) {
expectedTasks := []Task{
{
ID: "test.47de43bd-1a81-11e5-bdb6-e6cb6734eaf8",
Timestamp: time.Timestamp{},
AppID: "/test",
Host: "192.168.2.114",
Ports: []int{31315},
Expand Down
5 changes: 4 additions & 1 deletion events/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ func (fh *eventHandler) handleHealthyTask(body []byte) error {
log.WithError(err).Error("Body generated error")
return err
}
delay := taskHealthChange.Timestamp.Delay()
metrics.UpdateGauge("events.read.delay.current", int64(delay))

appID := taskHealthChange.AppID
taskID := taskHealthChange.TaskID()
Expand Down Expand Up @@ -146,11 +148,12 @@ func (fh *eventHandler) handleHealthyTask(body []byte) error {

func (fh *eventHandler) handleStatusEvent(body []byte) error {
task, err := apps.ParseTask(body)

if err != nil {
log.WithError(err).WithField("Body", body).Error("Could not parse event body")
return err
}
delay := task.Timestamp.Delay()
metrics.UpdateGauge("events.read.delay.current", int64(delay))

log.WithFields(log.Fields{
"Id": task.ID,
Expand Down
1 change: 0 additions & 1 deletion events/sse_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
// Event holds state of parsed fields from marathon EventStream
type SSEEvent struct {
Type string
Timestamp Timestamp
Body []byte
ID string
Delay string
Expand Down
2 changes: 1 addition & 1 deletion events/sse_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
func TestEvent_IfEventIsEmptyReturnsFalse(t *testing.T) {
t.Parallel()
// given
event := &SSEEvent{Timestamp: Timestamp{},
event := &SSEEvent{
Type: "status_update_event",
Body: []byte(`{"id": "simpleId"}`),
ID: "id",
Expand Down
3 changes: 2 additions & 1 deletion events/task_health_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import (
"regexp"

"github.com/allegro/marathon-consul/apps"
"github.com/allegro/marathon-consul/time"
)

type TaskHealthChange struct {
Timestamp string `json:"timestamp"`
Timestamp time.Timestamp `json:"timestamp"`
// Prefer TaskID() instead of ID
ID apps.TaskID `json:"id"`
InstanceID string `json:"instanceId"`
Expand Down
12 changes: 11 additions & 1 deletion events/task_health_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,22 @@ package events
import (
"encoding/json"
"testing"
gotime "time"

"github.com/allegro/marathon-consul/time"
"github.com/stretchr/testify/assert"
)

func StringToTimestamp(date string) time.Timestamp {
t, err := gotime.Parse("2006-01-02T15:04:05.000Z", date)
if err != nil {
return time.Timestamp{Time: gotime.Time{}}
}
return time.Timestamp{Time: t}
}

var testHealthChange = &TaskHealthChange{
Timestamp: "2014-03-01T23:29:30.158Z",
Timestamp: StringToTimestamp("2014-03-01T23:29:30.158Z"),
ID: "my-app_0-1396592784349",
Alive: true,
AppID: "/my-app",
Expand Down
28 changes: 5 additions & 23 deletions events/web_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,13 @@ package events
import (
"encoding/json"
"errors"
"strings"
"time"
)

type Timestamp struct {
time.Time
}

func (t *Timestamp) UnmarshalJSON(b []byte) (err error) {
s := strings.Trim(string(b), "\"")
if s == "null" {
t.Time = time.Time{}
return
}
t.Time, err = time.Parse(time.RFC3339Nano, s)
return
}

func (t *Timestamp) String() string {
return t.Format(time.RFC3339Nano)
}
"github.com/allegro/marathon-consul/time"
)

type WebEvent struct {
Type string `json:"eventType"`
Timestamp Timestamp `json:"timestamp"`
Type string `json:"eventType"`
Timestamp time.Timestamp `json:"timestamp"`
}

func ParseEvent(jsonBlob []byte) (WebEvent, error) {
Expand All @@ -37,7 +19,7 @@ func ParseEvent(jsonBlob []byte) (WebEvent, error) {
return WebEvent{}, err
} else if webEvent.Type == "" {
return WebEvent{}, errors.New("Missing event type")
} else if webEvent.Timestamp.Unix() == (time.Time{}).Unix() {
} else if webEvent.Timestamp.Missing() {
return WebEvent{}, errors.New("Missing timestamp")
}
return webEvent, nil
Expand Down
2 changes: 0 additions & 2 deletions sse/sse_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ func (h *SSEHandler) handle() {
}
}
metrics.Mark("events.read." + e.Type)
delay := time.Now().Unix() - e.Timestamp.Unix()
metrics.UpdateGauge("events.read.delay.current", delay)
if e.Type != events.StatusUpdateEventType && e.Type != events.HealthStatusChangedEventType {
log.Debugf("%s is not supported", e.Type)
metrics.Mark("events.read.drop")
Expand Down
8 changes: 8 additions & 0 deletions time/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,18 @@ func (t Timestamp) MarshalJSON() ([]byte, error) {
return json.Marshal(t.String())
}

func (t *Timestamp) Delay() time.Duration {
return time.Now().Sub(t.Time)
}

func (t *Timestamp) String() string {
return t.Format(time.RFC3339Nano)
}

func (t *Timestamp) Missing() bool {
return t.Unix() == (time.Time{}).Unix()
}

type Interval struct {
time.Duration
}
Expand Down
2 changes: 0 additions & 2 deletions web/web_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ func (h *EventHandler) Handle(w http.ResponseWriter, r *http.Request) {
}

metrics.Mark("events.requests." + e.Type)
delay := time.Now().Unix() - e.Timestamp.Unix()
metrics.UpdateGauge("events.requests.delay.current", delay)
log.WithFields(log.Fields{"EventType": e.Type, "OriginalTimestamp": e.Timestamp.String()}).Debug("Received event")

if e.Type != events.StatusUpdateEventType && e.Type != events.HealthStatusChangedEventType {
Expand Down

0 comments on commit 1cac59b

Please sign in to comment.