Skip to content

Commit

Permalink
Add counter metrics for logs dropped by Logstash appender (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
medzin committed Jan 11, 2018
1 parent 40ebde8 commit fdc2d6c
Showing 1 changed file with 19 additions and 5 deletions.
24 changes: 19 additions & 5 deletions servicelog/appender/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/allegro/mesos-executor/xio"
"github.com/kelseyhightower/envconfig"
"github.com/rcrowley/go-metrics"
log "github.com/sirupsen/logrus"

"github.com/allegro/mesos-executor/servicelog"
Expand All @@ -27,17 +28,20 @@ type logstashEntry map[string]interface{}

type logstash struct {
writer io.Writer

droppedBecauseOfSize metrics.Counter
droppedBecauseOfRate metrics.Counter
}

func (l logstash) Append(entries <-chan servicelog.Entry) {
func (l *logstash) Append(entries <-chan servicelog.Entry) {
for entry := range entries {
if err := l.sendEntry(entry); err != nil {
log.WithError(err).Warn("Error appending logs.")
}
}
}

func (l logstash) formatEntry(entry servicelog.Entry) logstashEntry {
func (l *logstash) formatEntry(entry servicelog.Entry) logstashEntry {
formattedEntry := logstashEntry{}
formattedEntry["@timestamp"] = entry["time"]
formattedEntry["@version"] = logstashVersion
Expand All @@ -53,20 +57,28 @@ func (l logstash) formatEntry(entry servicelog.Entry) logstashEntry {
return formattedEntry
}

func (l logstash) sendEntry(entry servicelog.Entry) error {
func (l *logstash) sendEntry(entry servicelog.Entry) error {
formattedEntry := l.formatEntry(entry)
bytes, err := l.marshal(formattedEntry)
if err != nil {
return fmt.Errorf("unable to marshal log entry: %s", err)
}
log.WithField("entry", string(bytes)).Debug("Sending log entry to Logstash")
if _, err = l.writer.Write(bytes); err != nil {
if err == xio.ErrSizeLimitExceeded {
l.droppedBecauseOfSize.Inc(1)
return nil // returning this error will spam stdout with errors
}
if err == xio.ErrRateLimitExceeded {
l.droppedBecauseOfRate.Inc(1)
return nil // returning this error will spam stdout with errors
}
return fmt.Errorf("unable to write to Logstash server: %s", err)
}
return nil
}

func (l logstash) marshal(entry logstashEntry) ([]byte, error) {
func (l *logstash) marshal(entry logstashEntry) ([]byte, error) {
bytes, err := json.Marshal(entry)
if err != nil {
return nil, err
Expand All @@ -81,7 +93,9 @@ func (l logstash) marshal(entry logstashEntry) ([]byte, error) {
// passed writer.
func NewLogstash(writer io.Writer, options ...func(*logstash) error) (Appender, error) {
l := &logstash{
writer: writer,
writer: writer,
droppedBecauseOfRate: metrics.GetOrRegisterCounter("servicelog.logstash.dropped.RateExceeded", metrics.DefaultRegistry),
droppedBecauseOfSize: metrics.GetOrRegisterCounter("servicelog.logstash.dropped.SizeExceeded", metrics.DefaultRegistry),
}
for _, option := range options {
if err := option(l); err != nil {
Expand Down

0 comments on commit fdc2d6c

Please sign in to comment.