Skip to content

Commit

Permalink
Add basic Logstash appender (#28)
Browse files Browse the repository at this point in the history
This commit adds a basic Logstash appender that can be used to send
scraped logs to Logstash server. Package log is renamed to servicelog
to better reflect its responsibility.
  • Loading branch information
medzin authored Dec 1, 2017
1 parent 4d1a9e4 commit 06cd2a8
Show file tree
Hide file tree
Showing 12 changed files with 147 additions and 25 deletions.
14 changes: 10 additions & 4 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
mesos "github.com/mesos/mesos-go/api/v1/lib"

osutil "github.com/allegro/mesos-executor/os"
"github.com/allegro/mesos-executor/servicelog/scraper"
)

// TaskExitState is a type describing reason of program execution interuption.
Expand Down Expand Up @@ -119,7 +120,7 @@ func (c *cancellableCommand) Stop(gracePeriod time.Duration) {
}

// NewCommand returns a new command based on passed CommandInfo.
func NewCommand(commandInfo mesos.CommandInfo, env []string) (Command, error) {
func NewCommand(commandInfo mesos.CommandInfo, env []string, scr scraper.Scraper) (Command, error) {
// TODO(janisz): Implement shell policy
// From: https://github.com/apache/mesos/blob/1.1.3/include/mesos/mesos.proto#L509-L521
// There are two ways to specify the command:
Expand All @@ -134,9 +135,14 @@ func NewCommand(commandInfo mesos.CommandInfo, env []string) (Command, error) {
// execlp(value, arguments(0), arguments(1), ...)).
cmd := exec.Command("sh", "-c", commandInfo.GetValue()) // #nosec
cmd.Env = combineExecutorAndTaskEnv(env, commandInfo.GetEnvironment())
// Redirect command output
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if scr != nil { // start scraping command logs if scraper is provided
writer := scraper.Pipe(scr)
cmd.Stdout = writer
cmd.Stderr = writer
} else { // or just redirect command output
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
}
// Set new group for a command
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}

Expand Down
2 changes: 1 addition & 1 deletion command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
func TestIfNewCancellableCommandReturnsCommand(t *testing.T) {
os.Environ()
commandInfo := newCommandInfo("./sleep 100", "ignored", false, []string{"ignored"}, map[string]string{"one": "1", "two": "2"})
command, err := NewCommand(commandInfo, nil)
command, err := NewCommand(commandInfo, nil, nil)
cmd := command.(*cancellableCommand).cmd

assert.NoError(t, err)
Expand Down
11 changes: 6 additions & 5 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,19 +320,20 @@ func (e *Executor) launchTask(taskInfo mesos.TaskInfo) (Command, error) {
validateCertificate := mesosutils.TaskInfo{TaskInfo: taskInfo}.GetLabelValue("validate-certificate")
if validateCertificate == "true" {
if certificate, err := GetCertFromEnvVariables(env); err != nil {
return nil, fmt.Errorf("Problem with certificate: %s", err)
return nil, fmt.Errorf("problem with certificate: %s", err)
} else if err := e.checkCert(certificate); err != nil {
return nil, fmt.Errorf("Problem with certificate: %s", err)
return nil, fmt.Errorf("problem with certificate: %s", err)
}
}

cmd, err := NewCommand(commandInfo, env)
// TODO(medzin): add ability to use service log scraper here
cmd, err := NewCommand(commandInfo, env, nil)
if err != nil {
return nil, fmt.Errorf("Cannot create command: %s", err)
return nil, fmt.Errorf("cannot create command: %s", err)
}

if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("Cannot start command: %s", err)
return nil, fmt.Errorf("cannot start command: %s", err)
}

go taskExitToEvent(cmd.Wait(), e.events)
Expand Down
2 changes: 1 addition & 1 deletion executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestIfLaunchesCommandAndSendsStateUpdatesWhenTaskRequireCertButNoCertIsGive
mock.AnythingOfType("mesos.TaskID"),
mesos.TASK_FAILED,
mock.MatchedBy(func(info state.OptionalInfo) bool {
return "Canot launch task: Problem with certificate: Missing certificate" == *info.Message
return "Canot launch task: problem with certificate: Missing certificate" == *info.Message
})).Once()

exec := new(Executor)
Expand Down
11 changes: 0 additions & 11 deletions log/scraper/scraper.go

This file was deleted.

8 changes: 8 additions & 0 deletions servicelog/appender/appender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package appender

import "github.com/allegro/mesos-executor/servicelog"

// Appender delivers service log entries to their destination.
type Appender interface {
Append(entries <-chan servicelog.Entry)
}
54 changes: 54 additions & 0 deletions servicelog/appender/logstash.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package appender

import (
"encoding/json"
"fmt"
"net"
"time"

log "github.com/Sirupsen/logrus"

"github.com/allegro/mesos-executor/servicelog"
)

const (
logstashVersion = "1"
logstashDefaultTimeFormat = time.RFC3339Nano
)

type logstash struct {
conn net.Conn
}

func (l logstash) Append(entries <-chan servicelog.Entry) {
for entry := range entries {
if err := l.sendEntry(entry); err != nil {
log.WithError(err).Warn("Error appending logs.")
}
}
}

func (l logstash) sendEntry(entry servicelog.Entry) error {
// TODO(medzin): Move formatting logic to separate structure and extend it there
entry["@timestamp"] = time.Now().Format(logstashDefaultTimeFormat)
entry["@version"] = logstashVersion
bytes, err := json.Marshal(entry)
bytes = append(bytes, '\n')

if err != nil {
return fmt.Errorf("unable to marshal log entry: %s", err)
}
if _, err = l.conn.Write(bytes); err != nil {
return fmt.Errorf("unable to write to Logstash server: %s", err)
}
return nil
}

// NewLogstash creates new appender that will send log entries to Logstash.
func NewLogstash(protocol, address string) (Appender, error) {
conn, err := net.Dial(protocol, address)
if err != nil {
return nil, fmt.Errorf("unable to connect to Logstash server: %s", err)
}
return &logstash{conn: conn}, nil
}
39 changes: 39 additions & 0 deletions servicelog/appender/logstash_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package appender

import (
"bufio"
"net"
"testing"

"github.com/allegro/mesos-executor/servicelog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestIfSendsLogsToLogstash(t *testing.T) {
ln, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)

done := make(chan struct{})
go func() {
defer ln.Close()
conn, _ := ln.Accept()
reader := bufio.NewReader(conn)
bytes, _, err := reader.ReadLine()

require.NoError(t, err)
assert.Contains(t, string(bytes), "\"@version\":\"1\"")
assert.Contains(t, string(bytes), "@timestamp")

done <- struct{}{}
}()

entries := make(chan servicelog.Entry)
logstash, err := NewLogstash("tcp", ln.Addr().String())
require.NoError(t, err)

go logstash.Append(entries)

entries <- servicelog.Entry{}
<-done
}
4 changes: 4 additions & 0 deletions servicelog/entry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package servicelog

// Entry represents one scraped log line in flat key-value store.
type Entry map[string]string
8 changes: 5 additions & 3 deletions log/scraper/logfmt.go → servicelog/scraper/logfmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"io"

"github.com/go-logfmt/logfmt"

"github.com/allegro/mesos-executor/servicelog"
)

// LogFmt is a scraper for logs in logfmt format.
Expand All @@ -15,13 +17,13 @@ type LogFmt struct {
// StartScraping starts scraping logs in logfmt format from given reader and sends
// parsed entries to the returned unbuffered channel. Logs are scraped as long
// as the passed reader does not return an io.EOF error.
func (logFmt *LogFmt) StartScraping(reader io.Reader) <-chan LogEntry {
func (logFmt *LogFmt) StartScraping(reader io.Reader) <-chan servicelog.Entry {
decoder := logfmt.NewDecoder(reader)
logEntries := make(chan LogEntry)
logEntries := make(chan servicelog.Entry)

go func() {
for decoder.ScanRecord() {
logEntry := LogEntry{}
logEntry := servicelog.Entry{}

for decoder.ScanKeyval() {
key := string(decoder.Key())
Expand Down
File renamed without changes.
19 changes: 19 additions & 0 deletions servicelog/scraper/scraper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package scraper

import (
"io"

"github.com/allegro/mesos-executor/servicelog"
)

// Scraper in an interface for various scrapers that support different log formats.
type Scraper interface {
StartScraping(io.Reader) <-chan servicelog.Entry
}

// Pipe returns writer that can be used as a data provider for given scraper.
func Pipe(scraper Scraper) io.Writer {
reader, writer := io.Pipe()
scraper.StartScraping(reader)
return writer
}

0 comments on commit 06cd2a8

Please sign in to comment.