Skip to content

Commit

Permalink
Add optional rate/size limiting config (#62)
Browse files Browse the repository at this point in the history
Add optional rate and size limiting config in executor environment for
service log scraping.
  • Loading branch information
medzin authored Jan 17, 2018
1 parent 1856688 commit 20bcc31
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 18 deletions.
6 changes: 1 addition & 5 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,11 +400,7 @@ func (e *Executor) createOptionsForLogstashServiceLogScrapping(taskInfo mesos.Ta
scr := &scraper.LogFmt{
KeyFilter: filter,
}
writer, err := appender.LogstashWriterFromEnv()
if err != nil {
return nil, fmt.Errorf("cannot configure service log scraping: %s", err)
}
apr, err := appender.NewLogstash(writer)
apr, err := appender.LogstashAppenderFromEnv()
if err != nil {
return nil, fmt.Errorf("cannot configure service log scraping: %s", err)
}
Expand Down
26 changes: 19 additions & 7 deletions servicelog/appender/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ const (
)

type logstashConfig struct {
Protocol string `required:"true"`
Address string `required:"true"`
Protocol string `required:"true"`
Address string `required:"true"`
RateLimit int `split_words:"true"`
SizeLimit int `split_words:"true"`
}

type logstashEntry map[string]interface{}
Expand Down Expand Up @@ -105,15 +107,25 @@ func NewLogstash(writer io.Writer, options ...func(*logstash) error) (Appender,
return l, nil
}

// LogstashWriterFromEnv creates the connection from the environment variables
// for the Logstash appender.
func LogstashWriterFromEnv() (io.Writer, error) {
// LogstashAppenderFromEnv creates the appender from the environment variables.
func LogstashAppenderFromEnv() (Appender, error) {
config := &logstashConfig{}
err := envconfig.Process(logstashConfigPrefix, config)
if err != nil {
return nil, fmt.Errorf("unable to get address from env: %s", err)
return nil, fmt.Errorf("unable to get config from env: %s", err)
}
return net.Dial(config.Protocol, config.Address)
baseWriter, err := net.Dial(config.Protocol, config.Address)
if err != nil {
return nil, fmt.Errorf("invalid logstash connection data: %s", err)
}
var options []func(*logstash) error
if config.RateLimit > 0 {
options = append(options, LogstashRateLimit(config.RateLimit))
}
if config.SizeLimit > 0 {
options = append(options, LogstashSizeLimit(config.SizeLimit))
}
return NewLogstash(baseWriter, options...)
}

// LogstashRateLimit adds rate limiting to logs sending. Logs send in higher rate
Expand Down
38 changes: 32 additions & 6 deletions servicelog/appender/logstash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package appender

import (
"bufio"
"fmt"
"net"
"os"
"testing"
Expand Down Expand Up @@ -58,12 +59,37 @@ func TestIfFormatsLogsCorrectly(t *testing.T) {
assert.Equal(t, "my logger", formattedEntry["logger"])
}

func TestIfFailsToCreateWriterWithInvalidConfigurationInEnv(t *testing.T) {
os.Setenv("ALLEGRO_EXECUTOR_SERVICELOG_PROTOCOL", "invalid")
os.Setenv("ALLEGRO_EXECUTOR_SERVICELOG_ADDRESS", "!@#$")
defer os.Unsetenv("ALLEGRO_EXECUTOR_SERVICELOG_PROTOCOL")
defer os.Unsetenv("ALLEGRO_EXECUTOR_SERVICELOG_ADDRESS")
func TestIfFailsToCreateAppenderWithInvalidRequiredConfigurationInEnv(t *testing.T) {
os.Setenv("ALLEGRO_EXECUTOR_SERVICELOG_LOGSTASH_PROTOCOL", "invalid")
os.Setenv("ALLEGRO_EXECUTOR_SERVICELOG_LOGSTASH_ADDRESS", "!@#$")
defer os.Unsetenv("ALLEGRO_EXECUTOR_SERVICELOG_LOGSTASH_PROTOCOL")
defer os.Unsetenv("ALLEGRO_EXECUTOR_SERVICELOG_LOGSTASH_ADDRESS")

_, err := LogstashWriterFromEnv()
_, err := LogstashAppenderFromEnv()
assert.Error(t, err)
}

func TestIfFailsToCreateAppenderWithInvalidOptionalConfigurationInEnv(t *testing.T) {
// valid required env
os.Setenv("ALLEGRO_EXECUTOR_SERVICELOG_LOGSTASH_PROTOCOL", "udp")
os.Setenv("ALLEGRO_EXECUTOR_SERVICELOG_LOGSTASH_ADDRESS", "localhost:8080")
defer os.Unsetenv("ALLEGRO_EXECUTOR_SERVICELOG_LOGSTASH_PROTOCOL")
defer os.Unsetenv("ALLEGRO_EXECUTOR_SERVICELOG_LOGSTASH_ADDRESS")

testCases := []struct {
envKey, envVal string
}{
{"ALLEGRO_EXECUTOR_SERVICELOG_LOGSTASH_RATE_LIMIT", "invalid"},
{"ALLEGRO_EXECUTOR_SERVICELOG_LOGSTASH_SIZE_LIMIT", "invalid"},
}

for _, tc := range testCases {
t.Run(fmt.Sprintf("envKey=%s;envVal=%s", tc.envKey, tc.envVal), func(t *testing.T) {
// invalid optional env
os.Setenv(tc.envKey, tc.envVal)
defer os.Unsetenv(tc.envKey)
_, err := LogstashAppenderFromEnv()
assert.Error(t, err)
})
}
}

0 comments on commit 20bcc31

Please sign in to comment.