Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Version 0.2.0 #13

Merged
merged 8 commits into from
Sep 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@
/protologbeat.test
protologbeat.local.yml
*.pyc
/logs
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@

### Version 0.2.0
- Added support for receiving GELF format messages by setting `enable_gelf: true`
- Fixed issue where (more commonly when getting high volumes of messages) the `processMessage` goroutine might have it's byte buffer modified before it actually executes with the original payload/message. (Related to original PR [#8](https://github.com/hartfordfive/protologbeat/pull/8), credit to [vcostet](https://github.com/vcostet))


### Version 0.1.1
- Added Dockerfile and seperate `protologbeat-docker.yml` config file to be used by docker image
- Updated default `protologbeat.yml` to have bare-minimum config values
Expand Down
16 changes: 7 additions & 9 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,25 @@ RUN set -ex ;\
# Install dependencies
apk --no-cache add gettext libc6-compat curl ;\
# Hotfix for libc compat
ln -s /lib /lib64

RUN cd /tmp ;\
ln -s /lib /lib64 ;\
cd /tmp ;\
mkdir -p /opt/protologbeat/conf ;\
mkdir -p /opt/protologbeat/ssl ;\
curl -L https://github.com/hartfordfive/protologbeat/releases/download/${VERSION}/protologbeat-${VERSION}-linux-x86_64.tar.gz --output protologbeat-${VERSION}-linux-x86_64.tar.gz ;\
tar -xvzf protologbeat-${VERSION}-linux-x86_64.tar.gz ;\
mv /tmp/protologbeat-${VERSION}-linux-x86_64 /opt/protologbeat/protologbeat ;\
rm -rf protologbeat-${VERSION}-linux-x86_64 && rm protologbeat-${VERSION}-linux-x86_64.tar.gz
rm -rf protologbeat-${VERSION}-linux-x86_64 && rm protologbeat-${VERSION}-linux-x86_64.tar.gz ;\
# Fix permissions
chown -R protologbeat:protologbeat /opt/protologbeat ;\
chmod 750 /opt/protologbeat ;\
chmod 700 /opt/protologbeat/ssl

ENV PATH=/opt/protologbeat:$PATH

COPY protologbeat-docker.yml /opt/protologbeat/conf/protologbeat.yml
COPY protologbeat.template-es2x.json /opt/protologbeat
COPY protologbeat.template.json /opt/protologbeat

# Fix permissions
RUN chown -R protologbeat:protologbeat /opt/protologbeat ;\
chmod 750 /opt/protologbeat ;\
chmod 700 /opt/protologbeat/ssl

WORKDIR /opt/protologbeat
USER protologbeat

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Ensure that this folder is at the following location:
- `protolog.merge_fields_to_root` : When **json_mode** enabled, wether to merge parsed fields to the root level. (Default = false)
- `protologbeat.default_es_log_type`: Elasticsearch type to assign to an event if one isn't specified (Default: protologbeat)
- `protologbeat.enable_syslog_format_only` : Boolean value indicating if only syslog messages should be accepted. (Default = false)
- `protologbeat.enable_gelf` : Boolean value indiciating if process should in mode to only accept [GELF formated messages](http://docs.graylog.org/en/2.2/pages/gelf.html)
- `protologbeat.enable_json_validation` : Boolean value indicating if JSON schema validation should be applied for `json` format messages (Default = false)
- `protologbeat.validate_all_json_types` : When json_mode enabled, indicates if ALL types must have a schema specified. Log entries with types that have no schema will not be published. (Default = false)
- `protologbeat.json_schema` : A hash consisting of the Elasticsearch type as the key, and the absolute local schema file path as the value.
Expand Down
8 changes: 8 additions & 0 deletions _samples/generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from logger import Logger
import time, random

l = Logger('127.0.0.1', 6000, 'udp', 'json')

for i in range(10000):
l.send_message({'message': 'This is JSON encoded message #{}'.format(i), 'type': 'generator_test', 'id': int(i), 'log_level': 'INFO'})
time.sleep(random.uniform(0.0001, 0.0010))
4 changes: 2 additions & 2 deletions _samples/logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ function Logger:sendMsg(msg)
end

-- Start logger client to send plain-text formated message to protologbeat listening on UDP host/port
logger = Logger.init('127.0.0.1', 6000, "udp", "plain")
logger:sendMsg('This is a sample message sent from the Lua logger.')
--logger = Logger.init('127.0.0.1', 6000, "udp", "plain")
--logger:sendMsg('This is a sample message sent from the Lua logger.')

-- Start logger client to send json formated message to protologbeat listening on TCP host/port
--logger = Logger.init('127.0.0.1', 6000, "tcp", "json")
Expand Down
6 changes: 3 additions & 3 deletions _samples/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ def send_message(self, msg):


# Initializing udp connection and sending a plaintext message
l = Logger('127.0.0.1', 6000)
l.enable_debug()
l.send_message('This is a sample plaintext message to be sent via udp')
#l = Logger('127.0.0.1', 6000)
#l.enable_debug()
#l.send_message('This is a sample plaintext message to be sent via udp')

# Initializing tcp connection and sending a json-encoded message
#l = Logger('127.0.0.1', 6000, 'tcp', 'json')
Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type Config struct {
Protocol string `config:"protocol"`
MaxMsgSize int `config:"max_msg_size"`
JsonMode bool `config:"json_mode"`
EnableGelf bool `config:"enable_gelf"`
DefaultEsLogType string `config:"default_es_log_type"`
MergeFieldsToRoot bool `config:"merge_fields_to_root"`
EnableSyslogFormatOnly bool `config:"enable_syslog_format_only"`
Expand All @@ -28,6 +29,7 @@ var DefaultConfig = Config{
Protocol: "udp",
MaxMsgSize: 4096,
JsonMode: false,
EnableGelf: false,
DefaultEsLogType: "protologbeat",
MergeFieldsToRoot: false,
EnableSyslogFormatOnly: false,
Expand Down
94 changes: 73 additions & 21 deletions protolog/loglistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package protolog
import (
"fmt"
"net"
"strconv"
"strings"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"

"github.com/Graylog2/go-gelf/gelf"
"github.com/hartfordfive/protologbeat/config"
"github.com/pquerna/ffjson/ffjson"
"github.com/xeipuuv/gojsonschema"
Expand All @@ -25,7 +27,7 @@ func NewLogListener(cfg config.Config) *LogListener {
ll := &LogListener{
config: cfg,
}
if ll.config.EnableJsonValidation {
if !ll.config.EnableGelf && ll.config.EnableJsonValidation {
ll.jsonSchema = map[string]gojsonschema.JSONLoader{}
for name, path := range ll.config.JsonSchema {
logp.Info("Loading JSON schema %s from %s", name, path)
Expand All @@ -46,6 +48,8 @@ func (ll *LogListener) Start(logEntriesRecieved chan common.MapStr, logEntriesEr

if ll.config.Protocol == "tcp" {
ll.startTCP(ll.config.Protocol, address)
} else if ll.config.EnableGelf {
ll.startGELF(address)
} else {
ll.startUDP(ll.config.Protocol, address)
}
Expand All @@ -72,19 +76,19 @@ func (ll *LogListener) startTCP(proto string, address string) {
continue
}

go func() {
buffer := make([]byte, ll.config.MaxMsgSize)
length, err := conn.Read(buffer)
if err != nil {
e, ok := err.(net.Error)
if ok && e.Timeout() {
logp.Err("Timeout reading from socket: %v", err)
ll.logEntriesError <- true
return
}
buffer := make([]byte, ll.config.MaxMsgSize)

length, err := conn.Read(buffer)
if err != nil {
e, ok := err.(net.Error)
if ok && e.Timeout() {
logp.Err("Timeout reading from socket: %v", err)
ll.logEntriesError <- true
return
}
go ll.processMessage(buffer, length)
}()
}
go ll.processMessage(strings.TrimSpace(string(buffer[:length])))

}
}

Expand All @@ -99,30 +103,49 @@ func (ll *LogListener) startUDP(proto string, address string) {
defer l.Close()

logp.Info("Now listening for logs via %s on %s", ll.config.Protocol, address)
buffer := make([]byte, ll.config.MaxMsgSize)

for {
buffer := make([]byte, ll.config.MaxMsgSize)
length, _, err := l.ReadFrom(buffer)
if err != nil {
logp.Err("Error reading from buffer: %v", err.Error())
continue
}
go ll.processMessage(buffer, length)
if length == 0 {
return
}
go ll.processMessage(strings.TrimSpace(string(buffer[:length])))
}
}

func (ll *LogListener) startGELF(address string) {

gr, err := gelf.NewReader(address)
if err != nil {
logp.Err("Error starting GELF listener on %s: %v", address, err.Error())
ll.logEntriesError <- true
}

logp.Info("Listening for GELF encoded messages on %s...", address)

for {
msg, err := gr.ReadMessage()
if err != nil {
logp.Err("Could not read GELF message: %v", err)
} else {
go ll.processGelfMessage(msg)
}
}

}

func (ll *LogListener) Shutdown() {
close(ll.logEntriesError)
close(ll.logEntriesRecieved)
}

func (ll *LogListener) processMessage(buffer []byte, length int) {

if length == 0 {
return
}
func (ll *LogListener) processMessage(logData string) {

logData := strings.TrimSpace(string(buffer[:length]))
if logData == "" {
logp.Err("Event is empty")
return
Expand Down Expand Up @@ -203,3 +226,32 @@ PreSend:

ll.logEntriesRecieved <- event
}

func (ll *LogListener) processGelfMessage(msg *gelf.Message) {

event := common.MapStr{}
event["gelf"] = map[string]interface{}{"version": msg.Version}
event["host"] = msg.Host
event["type"] = ll.config.DefaultEsLogType
event["short_message"] = msg.Short
event["full_message"] = msg.Full

// 1 ms = 1000000 ns
if msg.TimeUnix == 0 {
event["@timestamp"] = common.Time(time.Now())
} else {
millisec := msg.TimeUnix - float64(int64(msg.TimeUnix))
ms := fmt.Sprintf("%.4f", millisec)
msf, err := strconv.ParseFloat(ms, 64)
if err != nil {
event["@timestamp"] = common.Time(time.Now())
} else {
event["@timestamp"] = common.Time(time.Unix(int64(msg.TimeUnix), int64(msf)*1000000))
}
}

event["level"] = msg.Level
event["facility"] = msg.Facility
ll.logEntriesRecieved <- event

}
2 changes: 2 additions & 0 deletions protologbeat-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ protologbeat:
port: ${PORT:6000}
protocol: ${PROTOCOL:udp}
max_msg_size: ${MAX_MSG_SIZE:4096}
enable_gelf: ${ENABLE_GELF:false}
enable_syslog_format_only: ${ENABLE_SYSLOG:false}
default_es_log_type: ${DEFAULT_ES_TYPE:protologbeat}


Expand Down
5 changes: 4 additions & 1 deletion protologbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ protologbeat:
#merge_fields_to_root: true

# Set process to only act as a syslog message reciever
#enable_syslog_format_only: true
#enable_syslog_format_only: false

# Set process to receive only GELF formated messages
#enable_gelf: false

# Enable json validation with schemas
#enable_json_validation: false

Expand Down
81 changes: 81 additions & 0 deletions protologbeat_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package main

import (
"testing"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/hartfordfive/protologbeat/config"
"github.com/hartfordfive/protologbeat/protolog"

"github.com/Graylog2/go-gelf/gelf"
"github.com/stretchr/testify/assert"
)

func TestGreylogReceive(t *testing.T) {

var logEntriesRecieved chan common.MapStr
var logEntriesErrors chan bool

logEntriesRecieved = make(chan common.MapStr, 1)
logEntriesErrors = make(chan bool, 1)

ll := protolog.NewLogListener(config.Config{EnableGelf: true, Port: 12000, DefaultEsLogType: "graylog"})

go func(logs chan common.MapStr, errs chan bool) {
ll.Start(logs, errs)
}(logEntriesRecieved, logEntriesErrors)

var event common.MapStr

gw, err := gelf.NewWriter("127.0.0.1:12000")
if err != nil {
t.Errorf("NewWriter: %s", err)
return
}
gw.CompressionType = gelf.CompressGzip

expectedVersion := "1.1"
expectedHost := "localhost"
expectedShort := "This is a test message for protologbeat"
expectedFull := "This is the full message expected for the test of gelf input."
expectedTs := float64(time.Now().Unix())
expectedLevel := int32(6)
expectedFacility := "local6"
exepectedType := "graylog"

if err := gw.WriteMessage(&gelf.Message{
Version: expectedVersion,
Host: expectedHost,
Short: expectedShort,
Full: expectedFull,
TimeUnix: expectedTs,
Level: expectedLevel,
Facility: expectedFacility,
Extra: map[string]interface{}{"type": exepectedType},
}); err != nil {
t.Errorf("Could not write message to GELF listener: %v", err)
return
}

for {
select {
case <-logEntriesErrors:
t.Errorf("Error receiving GELF format message")
return
case event = <-logEntriesRecieved:
if _, ok := event["@timestamp"]; !ok {
t.Errorf("Message missing timestamp field!: %v", event)
return
}
assert.Equal(t, event["gelf"].(map[string]interface{})["version"], expectedVersion, "Version should be the same")
assert.Equal(t, event["host"], expectedHost, "Host should be the same")
assert.Equal(t, event["short_message"], expectedShort, "Short message should be the same")
assert.Equal(t, event["full_message"], expectedFull, "Host should be the same")
assert.Equal(t, event["level"], expectedLevel, "Host should be the same")
assert.Equal(t, event["facility"], expectedFacility, "Host should be the same")
assert.Equal(t, event["type"], exepectedType, "Host should be the same")
return
}
}
}