Skip to content

Commit

Permalink
Change Logstash appender to use io.Writer (#57)
Browse files Browse the repository at this point in the history
Change Logstash appender to use simpler io.Writer interface instead of
an extensive net.Conn interface, whose functions were not needed at all.
This allows the use of decorators from the xio package (added an example
of their usage).
  • Loading branch information
medzin committed Jan 10, 2018
1 parent ced25f6 commit 40ebde8
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 31 deletions.
6 changes: 5 additions & 1 deletion executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,11 @@ func (e *Executor) createOptionsForLogstashServiceLogScrapping(taskInfo mesos.Ta
scr := &scraper.LogFmt{
KeyFilter: filter,
}
apr, err := appender.NewLogstash(appender.LogstashAddressFromEnv())
writer, err := appender.LogstashWriterFromEnv()
if err != nil {
return nil, fmt.Errorf("cannot configure service log scraping: %s", err)
}
apr, err := appender.NewLogstash(writer)
if err != nil {
return nil, fmt.Errorf("cannot configure service log scraping: %s", err)
}
Expand Down
24 changes: 24 additions & 0 deletions servicelog/appender/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package appender_test

import (
"log"
"net"

"github.com/allegro/mesos-executor/servicelog/appender"
)

func ExampleNewLogstash() {
// create writer that will be used to send logs
writer, err := net.DialUDP("udp", nil, nil)
if err != nil {
log.Fatal(err)
}

// create appender with desired options
_, err = appender.NewLogstash(writer,
appender.LogstashRateLimit(100),
appender.LogstashSizeLimit(16000))
if err != nil {
log.Fatal(err)
}
}
53 changes: 31 additions & 22 deletions servicelog/appender/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package appender
import (
"encoding/json"
"fmt"
"io"
"net"

"github.com/allegro/mesos-executor/xio"
"github.com/kelseyhightower/envconfig"
log "github.com/sirupsen/logrus"

Expand All @@ -24,7 +26,7 @@ type logstashConfig struct {
type logstashEntry map[string]interface{}

type logstash struct {
conn net.Conn
writer io.Writer
}

func (l logstash) Append(entries <-chan servicelog.Entry) {
Expand Down Expand Up @@ -58,7 +60,7 @@ func (l logstash) sendEntry(entry servicelog.Entry) error {
return fmt.Errorf("unable to marshal log entry: %s", err)
}
log.WithField("entry", string(bytes)).Debug("Sending log entry to Logstash")
if _, err = l.conn.Write(bytes); err != nil {
if _, err = l.writer.Write(bytes); err != nil {
return fmt.Errorf("unable to write to Logstash server: %s", err)
}
return nil
Expand All @@ -75,9 +77,12 @@ func (l logstash) marshal(entry logstashEntry) ([]byte, error) {
return bytes, nil
}

// NewLogstash creates new appender that will send log entries to Logstash.
func NewLogstash(options ...func(*logstash) error) (Appender, error) {
l := &logstash{}
// NewLogstash creates new appender that will send log entries to Logstash using
// passed writer.
func NewLogstash(writer io.Writer, options ...func(*logstash) error) (Appender, error) {
l := &logstash{
writer: writer,
}
for _, option := range options {
if err := option(l); err != nil {
return nil, fmt.Errorf("invalid config option: %s", err)
Expand All @@ -86,27 +91,31 @@ func NewLogstash(options ...func(*logstash) error) (Appender, error) {
return l, nil
}

// LogstashAddress sets the connection details for the Logstash appender.
func LogstashAddress(protocol, address string) func(*logstash) error {
// LogstashWriterFromEnv creates the connection from the environment variables
// for the Logstash appender.
func LogstashWriterFromEnv() (io.Writer, error) {
config := &logstashConfig{}
err := envconfig.Process(logstashConfigPrefix, config)
if err != nil {
return nil, fmt.Errorf("unable to get address from env: %s", err)
}
return net.Dial(config.Protocol, config.Address)
}

// LogstashRateLimit adds rate limiting to logs sending. Logs send in higher rate
// (log lines per seconds) will be discarded.
func LogstashRateLimit(limit int) func(*logstash) error {
return func(l *logstash) error {
conn, err := net.Dial(protocol, address)
if err != nil {
return fmt.Errorf("unable to connect to Logstash server: %s", err)
}
l.conn = conn
l.writer = xio.DecorateWriter(l.writer, xio.RateLimit(limit))
return nil
}
}

// LogstashAddressFromEnv sets the connection details from the environment
// variables for the Logstash appender.
func LogstashAddressFromEnv() func(*logstash) error {
config := &logstashConfig{}
err := envconfig.Process(logstashConfigPrefix, config)
if err != nil {
return func(l *logstash) error {
return fmt.Errorf("unable to get address from env: %s", err)
}
// LogstashSizeLimit adds size limiting to logs sending. Logs that exceeds passed
// size (in bytes) will be discarded.
func LogstashSizeLimit(size int) func(*logstash) error {
return func(l *logstash) error {
l.writer = xio.DecorateWriter(l.writer, xio.SizeLimit(size))
return nil
}
return LogstashAddress(config.Protocol, config.Address)
}
13 changes: 5 additions & 8 deletions servicelog/appender/logstash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ func TestIfSendsLogsToLogstash(t *testing.T) {
}()

entries := make(chan servicelog.Entry)
logstash, err := NewLogstash(LogstashAddress("tcp", ln.Addr().String()))
writer, err := net.Dial("tcp", ln.Addr().String())
require.NoError(t, err)
logstash, err := NewLogstash(writer)
require.NoError(t, err)

go logstash.Append(entries)
Expand All @@ -56,17 +58,12 @@ func TestIfFormatsLogsCorrectly(t *testing.T) {
assert.Equal(t, "my logger", formattedEntry["logger"])
}

func TestIfFailsToStartWithInvalidLogstashConfiguration(t *testing.T) {
_, err := NewLogstash(LogstashAddress("invalid", "!@#$"))
assert.Error(t, err)
}

func TestIfFailsToStartWithInvalidLogstashConfigurationInEnv(t *testing.T) {
func TestIfFailsToCreateWriterWithInvalidConfigurationInEnv(t *testing.T) {
os.Setenv("ALLEGRO_EXECUTOR_SERVICELOG_PROTOCOL", "invalid")
os.Setenv("ALLEGRO_EXECUTOR_SERVICELOG_ADDRESS", "!@#$")
defer os.Unsetenv("ALLEGRO_EXECUTOR_SERVICELOG_PROTOCOL")
defer os.Unsetenv("ALLEGRO_EXECUTOR_SERVICELOG_ADDRESS")

_, err := NewLogstash(LogstashAddressFromEnv())
_, err := LogstashWriterFromEnv()
assert.Error(t, err)
}

0 comments on commit 40ebde8

Please sign in to comment.