From 77e939aafa21c68b49dc15a294f320ff6cac6c2e Mon Sep 17 00:00:00 2001 From: Al Lefebvre Date: Wed, 10 May 2017 11:00:38 -0400 Subject: [PATCH 1/7] Adding current changes --- config/config.go | 2 ++ protolog/loglistener.go | 50 ++++++++++++++++++++++++++++++++++++++++- protologbeat_test.go | 41 +++++++++++++++++++++++++++++++++ 3 files changed, 92 insertions(+), 1 deletion(-) create mode 100644 protologbeat_test.go diff --git a/config/config.go b/config/config.go index 5276c0d3..9672a721 100644 --- a/config/config.go +++ b/config/config.go @@ -12,6 +12,7 @@ type Config struct { Protocol string `config:"protocol"` MaxMsgSize int `config:"max_msg_size"` JsonMode bool `config:"json_mode"` + EnableGelf bool `config:"enable_gelf"` DefaultEsLogType string `config:"default_es_log_type"` MergeFieldsToRoot bool `config:"merge_fields_to_root"` EnableSyslogFormatOnly bool `config:"enable_syslog_format_only"` @@ -28,6 +29,7 @@ var DefaultConfig = Config{ Protocol: "udp", MaxMsgSize: 4096, JsonMode: false, + EnableGelf: false, DefaultEsLogType: "protologbeat", MergeFieldsToRoot: false, EnableSyslogFormatOnly: false, diff --git a/protolog/loglistener.go b/protolog/loglistener.go index deead53e..441756d4 100644 --- a/protolog/loglistener.go +++ b/protolog/loglistener.go @@ -3,12 +3,14 @@ package protolog import ( "fmt" "net" + "strconv" "strings" "time" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" + "github.com/Graylog2/go-gelf/gelf" "github.com/hartfordfive/protologbeat/config" "github.com/pquerna/ffjson/ffjson" "github.com/xeipuuv/gojsonschema" @@ -25,7 +27,7 @@ func NewLogListener(cfg config.Config) *LogListener { ll := &LogListener{ config: cfg, } - if ll.config.EnableJsonValidation { + if !ll.config.EnableGelf && ll.config.EnableJsonValidation { ll.jsonSchema = map[string]gojsonschema.JSONLoader{} for name, path := range ll.config.JsonSchema { logp.Info("Loading JSON schema %s from %s", name, path) @@ -46,6 +48,8 @@ func (ll *LogListener) Start(logEntriesRecieved chan common.MapStr, logEntriesEr if ll.config.Protocol == "tcp" { ll.startTCP(ll.config.Protocol, address) + } else if ll.config.EnableGelf { + ll.startGELF(address) } else { ll.startUDP(ll.config.Protocol, address) } @@ -111,6 +115,25 @@ func (ll *LogListener) startUDP(proto string, address string) { } } +func (ll *LogListener) startGELF(address string) { + + gr, err := gelf.NewReader(address) + if err != nil { + logp.Err("Error starting GELF listener on %s: %v", address, err.Error()) + ll.logEntriesError <- true + } + + for { + msg, err := gr.ReadMessage() + if err != nil { + logp.Err("Could not read GELF message: %v", err) + } else { + go ll.processGelfMessage(msg) + } + } + +} + func (ll *LogListener) Shutdown() { close(ll.logEntriesError) close(ll.logEntriesRecieved) @@ -203,3 +226,28 @@ PreSend: ll.logEntriesRecieved <- event } + +func (ll *LogListener) processGelfMessage(msg *gelf.Message) { + + event := common.MapStr{} + event["gelf"] = map[string]interface{}{"version": msg.Version} + event["host"] = msg.Host + event["type"] = ll.config.DefaultEsLogType + event["short_message"] = msg.Short + event["full_message"] = msg.Full + + // 1 ms = 1000000 ns + millisec := msg.TimeUnix - float64(int64(msg.TimeUnix)) + ms := fmt.Sprintf("%.4f", millisec) + msf, err := strconv.ParseFloat(ms, 64) + if err != nil { + event["@timestamp"] = common.Time(time.Now()) + } else { + event["@timestamp"] = common.Time(time.Unix(int64(msg.TimeUnix), int64(msf)*1000000)) + } + + event["level"] = msg.Level + event["facility"] = msg.Facility + ll.logEntriesRecieved <- event + +} diff --git a/protologbeat_test.go b/protologbeat_test.go new file mode 100644 index 00000000..db494039 --- /dev/null +++ b/protologbeat_test.go @@ -0,0 +1,41 @@ +package main + +import ( + "testing" + + "github.com/Graylog2/go-gelf/gelf" + "github.com/hartfordfive/protologbeat/config" + "github.com/hartfordfive/protologbeat/protolog" +) + +func TestGreylogReceive(t *testing.T) { + + var logEntriesRecieved chan common.MapStr + var logEntriesError chan bool + + ll := protolog.NewLogListener(config.Config{EnableGelf: true, Port: 6000}) + + go func(logs chan common.MapStr, errs chan bool) { + ll.Start(logs, errs) + }(logEntriesRecieved, logEntriesErrors) + + var event common.MapStr + + gelf.CompressGzip + gw, err := gelf.NewWriter("127.0.0.1:6000") + if err != nil { + return nil, fmt.Errorf("NewWriter: %s", err) + } + gw.CompressionType = gelf.CompressGzip + + for { + select { + case <-logEntriesErrors: + t.Errorf("Error receiving GELF format message") + case event = <-logEntriesRecieved: + if _, ok := event["@timestamp"]; ok { + + } + } + } +} From 84889e56d38b3344a8bfc5bc00668a662e17894d Mon Sep 17 00:00:00 2001 From: Al Lefebvre Date: Sun, 14 May 2017 07:01:49 -0400 Subject: [PATCH 2/7] Adding support for GELF formated input messages --- CHANGELOG.md | 3 +++ README.md | 1 + protologbeat.full.yml | 5 +++- protologbeat_test.go | 54 +++++++++++++++++++++++++++++++++++++------ 4 files changed, 55 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c9a2081..cf9c531c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,7 @@ +### Version 0.2.0 +- Added support for receiving GELF format messages by setting `enable_gelf: true` + ### Version 0.1.1 - Added Dockerfile and seperate `protologbeat-docker.yml` config file to be used by docker image - Updated default `protologbeat.yml` to have bare-minimum config values diff --git a/README.md b/README.md index 5120a102..a48cf3f6 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ Ensure that this folder is at the following location: - `protolog.merge_fields_to_root` : When **json_mode** enabled, wether to merge parsed fields to the root level. (Default = false) - `protologbeat.default_es_log_type`: Elasticsearch type to assign to an event if one isn't specified (Default: protologbeat) - `protologbeat.enable_syslog_format_only` : Boolean value indicating if only syslog messages should be accepted. (Default = false) +- `protologbeat.enable_gelf` : Boolean value indiciating if process should in mode to only accept [GELF formated messages](http://docs.graylog.org/en/2.2/pages/gelf.html) - `protologbeat.enable_json_validation` : Boolean value indicating if JSON schema validation should be applied for `json` format messages (Default = false) - `protologbeat.validate_all_json_types` : When json_mode enabled, indicates if ALL types must have a schema specified. Log entries with types that have no schema will not be published. (Default = false) - `protologbeat.json_schema` : A hash consisting of the Elasticsearch type as the key, and the absolute local schema file path as the value. diff --git a/protologbeat.full.yml b/protologbeat.full.yml index eac755a8..7dbd2c11 100644 --- a/protologbeat.full.yml +++ b/protologbeat.full.yml @@ -25,8 +25,11 @@ protologbeat: #merge_fields_to_root: true # Set process to only act as a syslog message reciever - #enable_syslog_format_only: true + #enable_syslog_format_only: false + # Set process to receive only GELF formated messages + #enable_gelf: false + # Enable json validation with schemas #enable_json_validation: false diff --git a/protologbeat_test.go b/protologbeat_test.go index db494039..7f745ea7 100644 --- a/protologbeat_test.go +++ b/protologbeat_test.go @@ -2,18 +2,25 @@ package main import ( "testing" + "time" - "github.com/Graylog2/go-gelf/gelf" + "github.com/elastic/beats/libbeat/common" "github.com/hartfordfive/protologbeat/config" "github.com/hartfordfive/protologbeat/protolog" + + "github.com/Graylog2/go-gelf/gelf" + "github.com/stretchr/testify/assert" ) func TestGreylogReceive(t *testing.T) { var logEntriesRecieved chan common.MapStr - var logEntriesError chan bool + var logEntriesErrors chan bool + + logEntriesRecieved = make(chan common.MapStr, 1) + logEntriesErrors = make(chan bool, 1) - ll := protolog.NewLogListener(config.Config{EnableGelf: true, Port: 6000}) + ll := protolog.NewLogListener(config.Config{EnableGelf: true, Port: 6000, DefaultEsLogType: "graylog"}) go func(logs chan common.MapStr, errs chan bool) { ll.Start(logs, errs) @@ -21,21 +28,54 @@ func TestGreylogReceive(t *testing.T) { var event common.MapStr - gelf.CompressGzip gw, err := gelf.NewWriter("127.0.0.1:6000") if err != nil { - return nil, fmt.Errorf("NewWriter: %s", err) + t.Errorf("NewWriter: %s", err) + return } gw.CompressionType = gelf.CompressGzip + expectedVersion := "1.1" + expectedHost := "localhost" + expectedShort := "This is a test message for protologbeat" + expectedFull := "This is the full message expected for the test of gelf input." + expectedTs := float64(time.Now().Unix()) + expectedLevel := int32(6) + expectedFacility := "local6" + exepectedType := "graylog" + + if err := gw.WriteMessage(&gelf.Message{ + Version: expectedVersion, + Host: expectedHost, + Short: expectedShort, + Full: expectedFull, + TimeUnix: expectedTs, + Level: expectedLevel, + Facility: expectedFacility, + Extra: map[string]interface{}{"type": exepectedType}, + }); err != nil { + t.Errorf("Could not write message to GELF listener: %v", err) + return + } + for { select { case <-logEntriesErrors: t.Errorf("Error receiving GELF format message") + return case event = <-logEntriesRecieved: - if _, ok := event["@timestamp"]; ok { - + if _, ok := event["@timestamp"]; !ok { + t.Errorf("Message missing timestamp field!: %v", event) + return } + assert.Equal(t, event["gelf"].(map[string]interface{})["version"], expectedVersion, "Version should be the same") + assert.Equal(t, event["host"], expectedHost, "Host should be the same") + assert.Equal(t, event["short_message"], expectedShort, "Short message should be the same") + assert.Equal(t, event["full_message"], expectedFull, "Host should be the same") + assert.Equal(t, event["level"], expectedLevel, "Host should be the same") + assert.Equal(t, event["facility"], expectedFacility, "Host should be the same") + assert.Equal(t, event["type"], exepectedType, "Host should be the same") + return } } } From 14d527b86061d2dcb02cfe89edd40093d5cc01e7 Mon Sep 17 00:00:00 2001 From: Al Lefebvre Date: Mon, 15 May 2017 09:13:12 -0400 Subject: [PATCH 3/7] Adding small changes & tests --- protolog/loglistener.go | 16 +++++++++++----- protologbeat_test.go | 4 ++-- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/protolog/loglistener.go b/protolog/loglistener.go index 441756d4..87b68bc6 100644 --- a/protolog/loglistener.go +++ b/protolog/loglistener.go @@ -123,6 +123,8 @@ func (ll *LogListener) startGELF(address string) { ll.logEntriesError <- true } + logp.Info("Listening for GELF encoded messages on %s...", address) + for { msg, err := gr.ReadMessage() if err != nil { @@ -237,13 +239,17 @@ func (ll *LogListener) processGelfMessage(msg *gelf.Message) { event["full_message"] = msg.Full // 1 ms = 1000000 ns - millisec := msg.TimeUnix - float64(int64(msg.TimeUnix)) - ms := fmt.Sprintf("%.4f", millisec) - msf, err := strconv.ParseFloat(ms, 64) - if err != nil { + if msg.TimeUnix == 0 { event["@timestamp"] = common.Time(time.Now()) } else { - event["@timestamp"] = common.Time(time.Unix(int64(msg.TimeUnix), int64(msf)*1000000)) + millisec := msg.TimeUnix - float64(int64(msg.TimeUnix)) + ms := fmt.Sprintf("%.4f", millisec) + msf, err := strconv.ParseFloat(ms, 64) + if err != nil { + event["@timestamp"] = common.Time(time.Now()) + } else { + event["@timestamp"] = common.Time(time.Unix(int64(msg.TimeUnix), int64(msf)*1000000)) + } } event["level"] = msg.Level diff --git a/protologbeat_test.go b/protologbeat_test.go index 7f745ea7..3f3154fc 100644 --- a/protologbeat_test.go +++ b/protologbeat_test.go @@ -20,7 +20,7 @@ func TestGreylogReceive(t *testing.T) { logEntriesRecieved = make(chan common.MapStr, 1) logEntriesErrors = make(chan bool, 1) - ll := protolog.NewLogListener(config.Config{EnableGelf: true, Port: 6000, DefaultEsLogType: "graylog"}) + ll := protolog.NewLogListener(config.Config{EnableGelf: true, Port: 12000, DefaultEsLogType: "graylog"}) go func(logs chan common.MapStr, errs chan bool) { ll.Start(logs, errs) @@ -28,7 +28,7 @@ func TestGreylogReceive(t *testing.T) { var event common.MapStr - gw, err := gelf.NewWriter("127.0.0.1:6000") + gw, err := gelf.NewWriter("127.0.0.1:12000") if err != nil { t.Errorf("NewWriter: %s", err) return From 8d327784286ff0fcd1f49ed40a37c5e5758e75d1 Mon Sep 17 00:00:00 2001 From: Al Lefebvre Date: Tue, 23 May 2017 16:27:25 -0400 Subject: [PATCH 4/7] Adding current changes --- Dockerfile | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/Dockerfile b/Dockerfile index 2aadbc96..1aa7cea7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -13,15 +13,18 @@ RUN set -ex ;\ # Install dependencies apk --no-cache add gettext libc6-compat curl ;\ # Hotfix for libc compat - ln -s /lib /lib64 - -RUN cd /tmp ;\ + ln -s /lib /lib64 ;\ + cd /tmp ;\ mkdir -p /opt/protologbeat/conf ;\ mkdir -p /opt/protologbeat/ssl ;\ curl -L https://github.com/hartfordfive/protologbeat/releases/download/${VERSION}/protologbeat-${VERSION}-linux-x86_64.tar.gz --output protologbeat-${VERSION}-linux-x86_64.tar.gz ;\ tar -xvzf protologbeat-${VERSION}-linux-x86_64.tar.gz ;\ mv /tmp/protologbeat-${VERSION}-linux-x86_64 /opt/protologbeat/protologbeat ;\ - rm -rf protologbeat-${VERSION}-linux-x86_64 && rm protologbeat-${VERSION}-linux-x86_64.tar.gz + rm -rf protologbeat-${VERSION}-linux-x86_64 && rm protologbeat-${VERSION}-linux-x86_64.tar.gz ;\ + # Fix permissions + chown -R protologbeat:protologbeat /opt/protologbeat ;\ + chmod 750 /opt/protologbeat ;\ + chmod 700 /opt/protologbeat/ssl ENV PATH=/opt/protologbeat:$PATH @@ -29,11 +32,6 @@ COPY protologbeat-docker.yml /opt/protologbeat/conf/protologbeat.yml COPY protologbeat.template-es2x.json /opt/protologbeat COPY protologbeat.template.json /opt/protologbeat -# Fix permissions -RUN chown -R protologbeat:protologbeat /opt/protologbeat ;\ - chmod 750 /opt/protologbeat ;\ - chmod 700 /opt/protologbeat/ssl - WORKDIR /opt/protologbeat USER protologbeat From fd6e810848ef8c290946a06f83bbb7fdfecfcc03 Mon Sep 17 00:00:00 2001 From: Al Lefebvre Date: Fri, 21 Jul 2017 10:51:05 -0400 Subject: [PATCH 5/7] Updated processMessage func to accept string instead of byte buffer that could be outdated --- .gitignore | 1 + CHANGELOG.md | 1 + _samples/logger.lua | 4 ++-- _samples/logger.py | 6 +++--- protolog/loglistener.go | 36 +++++++++++++++++------------------- protologbeat-docker.yml | 2 ++ 6 files changed, 26 insertions(+), 24 deletions(-) diff --git a/.gitignore b/.gitignore index a886fd12..47e41e11 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ /protologbeat.test protologbeat.local.yml *.pyc +/logs diff --git a/CHANGELOG.md b/CHANGELOG.md index cf9c531c..6d2bfdc6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ### Version 0.2.0 - Added support for receiving GELF format messages by setting `enable_gelf: true` +- Fixed issue where (more commonly when getting high volumes of messages) the `processMessage` goroutine might have it's byte buffer modified before it actually executes with the original payload/message. (Related to original PR #8) ### Version 0.1.1 - Added Dockerfile and seperate `protologbeat-docker.yml` config file to be used by docker image diff --git a/_samples/logger.lua b/_samples/logger.lua index b8ed7a2b..cb12a5ce 100644 --- a/_samples/logger.lua +++ b/_samples/logger.lua @@ -34,8 +34,8 @@ function Logger:sendMsg(msg) end -- Start logger client to send plain-text formated message to protologbeat listening on UDP host/port -logger = Logger.init('127.0.0.1', 6000, "udp", "plain") -logger:sendMsg('This is a sample message sent from the Lua logger.') +--logger = Logger.init('127.0.0.1', 6000, "udp", "plain") +--logger:sendMsg('This is a sample message sent from the Lua logger.') -- Start logger client to send json formated message to protologbeat listening on TCP host/port --logger = Logger.init('127.0.0.1', 6000, "tcp", "json") diff --git a/_samples/logger.py b/_samples/logger.py index f54db5ba..571b4f3e 100644 --- a/_samples/logger.py +++ b/_samples/logger.py @@ -37,9 +37,9 @@ def send_message(self, msg): # Initializing udp connection and sending a plaintext message -l = Logger('127.0.0.1', 6000) -l.enable_debug() -l.send_message('This is a sample plaintext message to be sent via udp') +#l = Logger('127.0.0.1', 6000) +#l.enable_debug() +#l.send_message('This is a sample plaintext message to be sent via udp') # Initializing tcp connection and sending a json-encoded message #l = Logger('127.0.0.1', 6000, 'tcp', 'json') diff --git a/protolog/loglistener.go b/protolog/loglistener.go index 87b68bc6..efae199e 100644 --- a/protolog/loglistener.go +++ b/protolog/loglistener.go @@ -76,19 +76,19 @@ func (ll *LogListener) startTCP(proto string, address string) { continue } - go func() { - buffer := make([]byte, ll.config.MaxMsgSize) - length, err := conn.Read(buffer) - if err != nil { - e, ok := err.(net.Error) - if ok && e.Timeout() { - logp.Err("Timeout reading from socket: %v", err) - ll.logEntriesError <- true - return - } + buffer := make([]byte, ll.config.MaxMsgSize) + + length, err := conn.Read(buffer) + if err != nil { + e, ok := err.(net.Error) + if ok && e.Timeout() { + logp.Err("Timeout reading from socket: %v", err) + ll.logEntriesError <- true + return } - go ll.processMessage(buffer, length) - }() + } + go ll.processMessage(strings.TrimSpace(string(buffer[:length]))) + } } @@ -111,7 +111,10 @@ func (ll *LogListener) startUDP(proto string, address string) { logp.Err("Error reading from buffer: %v", err.Error()) continue } - go ll.processMessage(buffer, length) + if length == 0 { + return + } + go ll.processMessage(strings.TrimSpace(string(buffer[:length]))) } } @@ -141,13 +144,8 @@ func (ll *LogListener) Shutdown() { close(ll.logEntriesRecieved) } -func (ll *LogListener) processMessage(buffer []byte, length int) { - - if length == 0 { - return - } +func (ll *LogListener) processMessage(logData string) { - logData := strings.TrimSpace(string(buffer[:length])) if logData == "" { logp.Err("Event is empty") return diff --git a/protologbeat-docker.yml b/protologbeat-docker.yml index 2b8cc275..681d291f 100644 --- a/protologbeat-docker.yml +++ b/protologbeat-docker.yml @@ -7,6 +7,8 @@ protologbeat: port: ${PORT:6000} protocol: ${PROTOCOL:udp} max_msg_size: ${MAX_MSG_SIZE:4096} + enable_gelf: ${ENABLE_GELF:false} + enable_syslog_format_only: ${ENABLE_SYSLOG:false} default_es_log_type: ${DEFAULT_ES_TYPE:protologbeat} From ba2f1beb883851440504d66346d04ac7476acd60 Mon Sep 17 00:00:00 2001 From: Al Lefebvre Date: Fri, 21 Jul 2017 10:51:34 -0400 Subject: [PATCH 6/7] Added testing script to generate events --- _samples/generator.py | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 _samples/generator.py diff --git a/_samples/generator.py b/_samples/generator.py new file mode 100644 index 00000000..4e3586f4 --- /dev/null +++ b/_samples/generator.py @@ -0,0 +1,8 @@ +from logger import Logger +import time, random + +l = Logger('127.0.0.1', 6000, 'udp', 'json') + +for i in range(10000): + l.send_message({'message': 'This is JSON encoded message #{}'.format(i), 'type': 'generator_test', 'id': int(i), 'log_level': 'INFO'}) + time.sleep(random.uniform(0.0001, 0.0010)) \ No newline at end of file From 460dec9eb0c1b411f0ca0e3eb3d08244b1af95af Mon Sep 17 00:00:00 2001 From: Al Lefebvre Date: Tue, 5 Sep 2017 11:22:31 -0400 Subject: [PATCH 7/7] Adding current changes --- CHANGELOG.md | 3 ++- protolog/loglistener.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d2bfdc6..d8570b2c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,8 @@ ### Version 0.2.0 - Added support for receiving GELF format messages by setting `enable_gelf: true` -- Fixed issue where (more commonly when getting high volumes of messages) the `processMessage` goroutine might have it's byte buffer modified before it actually executes with the original payload/message. (Related to original PR #8) +- Fixed issue where (more commonly when getting high volumes of messages) the `processMessage` goroutine might have it's byte buffer modified before it actually executes with the original payload/message. (Related to original PR [#8](https://github.com/hartfordfive/protologbeat/pull/8), credit to [vcostet](https://github.com/vcostet)) + ### Version 0.1.1 - Added Dockerfile and seperate `protologbeat-docker.yml` config file to be used by docker image diff --git a/protolog/loglistener.go b/protolog/loglistener.go index efae199e..cb55f257 100644 --- a/protolog/loglistener.go +++ b/protolog/loglistener.go @@ -103,9 +103,9 @@ func (ll *LogListener) startUDP(proto string, address string) { defer l.Close() logp.Info("Now listening for logs via %s on %s", ll.config.Protocol, address) - buffer := make([]byte, ll.config.MaxMsgSize) for { + buffer := make([]byte, ll.config.MaxMsgSize) length, _, err := l.ReadFrom(buffer) if err != nil { logp.Err("Error reading from buffer: %v", err.Error())