Permalink
Browse files

MOD new options and various cleanups

ADD  harvester reader buffer size option
MOD  remove all log imports - use emit throughout
MOD  use fault() inst of log.Fatalf to be consistent
MOD  WIP beefup error handling in config and registrar
NOTE see REVU in registrar.go re registry write errors
NOP  gofmt affected files
  • Loading branch information...
Joubin Houshyar
Joubin Houshyar committed Jul 21, 2014
1 parent 2671f42 commit 44299dda0750005005fb9734d7b181f287722419
Showing with 141 additions and 116 deletions.
  1. +13 −8 config.go
  2. +7 −9 harvester.go
  3. +57 −42 logstash-forwarder.go
  4. +42 −23 registrar.go
  5. +7 −16 registrar_other.go
  6. +15 −18 registrar_windows.go
View
@@ -6,9 +6,15 @@ import (
"time"
)
const default_NetworkConfig_Timeout int64 = 15
const configFileSizeLimit = 10 << 20
const default_FileConfig_DeadTime string = "24h"
var defaultConfig = &struct {
netTimeout int64
fileDeadtime string
}{
netTimeout: 15,
fileDeadtime: "24h",
}
type Config struct {
Network NetworkConfig `json:network`
@@ -39,10 +45,9 @@ func LoadConfig(path string) (config Config, err error) {
}
fi, _ := config_file.Stat()
if fi.Size() > (10 << 20) {
emit("Config file too large? Aborting, just in case. '%s' is %d bytes\n",
path, fi)
return
if size := fi.Size(); size > (configFileSizeLimit) {
emit("config file (%q) size exceeds reasonable limit (%d) - aborting", path, size)
return // REVU: shouldn't this return an error, then?
}
buffer := make([]byte, fi.Size())
@@ -56,14 +61,14 @@ func LoadConfig(path string) (config Config, err error) {
}
if config.Network.Timeout == 0 {
config.Network.Timeout = default_NetworkConfig_Timeout
config.Network.Timeout = defaultConfig.netTimeout
}
config.Network.timeout = time.Duration(config.Network.Timeout) * time.Second
for k, _ := range config.Files {
if config.Files[k].DeadTime == "" {
config.Files[k].DeadTime = default_FileConfig_DeadTime
config.Files[k].DeadTime = defaultConfig.fileDeadtime
}
config.Files[k].deadtime, err = time.ParseDuration(config.Files[k].DeadTime)
if err != nil {
View
@@ -5,7 +5,6 @@ import (
"bytes"
"fmt"
"io"
"log"
"os" // for File and friends
"time"
)
@@ -36,7 +35,7 @@ func (h *Harvester) Harvest(output chan *FileEvent) {
offset, _ := h.file.Seek(0, os.SEEK_CUR)
if h.Offset > 0 {
emit("harvest:%q position:%d (offset snapshot:%d)\n", h.Path, h.Offset, offset)
emit("harvest: %q position:%d (offset snapshot:%d)\n", h.Path, h.Offset, offset)
} else if options.tailOnRotate {
emit("harvest: (tailing) %q (offset snapshot:%d)\n", h.Path, offset)
} else {
@@ -45,8 +44,7 @@ func (h *Harvester) Harvest(output chan *FileEvent) {
h.Offset = offset
// TODO(sissel): Make the buffer size tunable at start-time
reader := bufio.NewReaderSize(h.file, 16<<10) // 16kb buffer by default
reader := bufio.NewReaderSize(h.file, options.harvesterBufferSize) // 16kb buffer by default
buffer := new(bytes.Buffer)
var read_timeout = 10 * time.Second
@@ -60,18 +58,18 @@ func (h *Harvester) Harvest(output chan *FileEvent) {
// Check to see if the file was truncated
info, _ := h.file.Stat()
if info.Size() < h.Offset {
log.Printf("File truncated, seeking to beginning: %s\n", h.Path)
emit("File truncated, seeking to beginning: %s\n", h.Path)
h.file.Seek(0, os.SEEK_SET)
h.Offset = 0
} else if age := time.Since(last_read_time); age > h.FileConfig.deadtime {
// if last_read_time was more than dead time, this file is probably
// dead. Stop watching it.
log.Printf("Stopping harvest of %s; last change was %v ago\n", h.Path, age)
emit("Stopping harvest of %s; last change was %v ago\n", h.Path, age)
return
}
continue
} else {
log.Printf("Unexpected state reading from %s; error: %s\n", h.Path, err)
emit("Unexpected state reading from %s; error: %s\n", h.Path, err)
return
}
}
@@ -105,7 +103,7 @@ func (h *Harvester) open() *os.File {
if err != nil {
// retry on failure.
log.Printf("Failed opening %s: %s\n", h.Path, err)
emit("Failed opening %s: %s\n", h.Path, err)
time.Sleep(5 * time.Second)
} else {
break
@@ -160,7 +158,7 @@ func (h *Harvester) readline(reader *bufio.Reader, buffer *bytes.Buffer, eof_tim
}
continue
} else {
log.Println(err)
emit("error: Harvester.readLine: %s", err.Error())
return nil, 0, err // TODO(sissel): don't do this?
}
}
View
@@ -18,31 +18,35 @@ var exitStat = struct {
}
var options = &struct {
configFile string
spoolSize uint64
cpuProfileFile string
idleTimeout time.Duration
useSyslog bool
tailOnRotate bool
debug bool
verbose bool
configFile string
spoolSize uint64
harvesterBufferSize int
cpuProfileFile string
idleTimeout time.Duration
useSyslog bool
tailOnRotate bool
debug bool
verbose bool
}{
spoolSize: 1024,
idleTimeout: time.Second * 5,
spoolSize: 1024,
harvesterBufferSize: 16 << 10,
idleTimeout: time.Second * 5,
}
func emitOptions() {
emit("\t--- options -------\n")
emit("\tconfig-file: %s\n", options.configFile)
emit("\tidle-timeout: %v\n", options.idleTimeout)
emit("\tconfig-file: %s\n", options.configFile)
emit("\tidle-timeout: %v\n", options.idleTimeout)
emit("\tspool-size: %d\n", options.spoolSize)
emit("\tharvester-buff-size: %d\n", options.harvesterBufferSize)
emit("\t--- flags ---------\n")
emit("\ttail (on-rotation): %t\n", options.tailOnRotate)
emit("\tuse-syslog: %t\n", options.useSyslog)
emit("\tverbose: %t\n", options.verbose)
emit("\tdebug: %t\n", options.debug)
emit("\ttail (on-rotation): %t\n", options.tailOnRotate)
emit("\tuse-syslog: %t\n", options.useSyslog)
emit("\tverbose: %t\n", options.verbose)
emit("\tdebug: %t\n", options.debug)
if runProfiler() {
emit("\t--- profile run ---\n")
emit("\tcpu-profile-file: %s\n", options.cpuProfileFile)
emit("\tcpu-profile-file: %s\n", options.cpuProfileFile)
}
}
@@ -56,13 +60,24 @@ func assertRequiredOptions() {
func init() {
flag.StringVar(&options.configFile, "config", options.configFile, "path to logstash-forwarder configuration file")
flag.StringVar(&options.cpuProfileFile, "cpuprofile", options.cpuProfileFile, "path to cpu profile output - note: exits on profile end.")
flag.Uint64Var(&options.spoolSize, "spool-size", options.spoolSize, "event count spool threshold - forces network flush")
flag.BoolVar(&options.useSyslog, "log-to-syslog", options.useSyslog, "log to syslog instead of stdout")
flag.Uint64Var(&options.spoolSize, "sv", options.spoolSize, "event count spool threshold - forces network flush")
flag.IntVar(&options.harvesterBufferSize, "harvest-buffer-size", options.harvesterBufferSize, "harvester reader buffer size")
flag.IntVar(&options.harvesterBufferSize, "hb", options.harvesterBufferSize, "harvester reader buffer size")
flag.BoolVar(&options.useSyslog, "log-to-syslog", options.useSyslog, "log to syslog instead of stdout") // deprecate this
flag.BoolVar(&options.useSyslog, "syslog", options.useSyslog, "log to syslog instead of stdout")
flag.BoolVar(&options.tailOnRotate, "tail", options.tailOnRotate, "always tail on log rotation -note: may skip entries ")
flag.BoolVar(&options.tailOnRotate, "t", options.tailOnRotate, "always tail on log rotation -note: may skip entries ")
flag.BoolVar(&options.verbose, "verbose", options.verbose, "operate in verbose mode - emits to log")
flag.BoolVar(&options.verbose, "v", options.verbose, "operate in verbose mode - emits to log")
flag.BoolVar(&options.debug, "debug", options.debug, "emit debg info (verbose must also be set)")
}
@@ -73,7 +88,7 @@ func main() {
if p == nil {
return
}
log.Fatalf("panic: %v\n", p)
fault("recovered panic: %v", p)
}()
flag.Parse()
@@ -93,9 +108,9 @@ func main() {
}()
}
config, err := LoadConfig(options.configFile)
if err != nil {
return
config, e := LoadConfig(options.configFile)
if e != nil {
fault("on LoadConfig: %s\n", e.Error())
}
event_chan := make(chan *FileEvent, 16)
@@ -113,49 +128,48 @@ func main() {
// - publisher: writes to the network, notifies registrar
// - registrar: records positions of files read
// Finally, prospector uses the registrar information, on restart, to
// determine where in each file to resume a harvester.
// determine where in each file to restart a harvester.
log.SetFlags(log.Ldate | log.Ltime | log.Lmicroseconds)
if options.useSyslog {
configureSyslog()
}
resume := &ProspectorResume{}
resume.persist = make(chan *FileState)
restart := &ProspectorResume{}
restart.persist = make(chan *FileState)
// Load the previous log file locations now, for use in prospector
resume.files = make(map[string]*FileState)
history, err := os.Open(".logstash-forwarder")
if err == nil {
wd, err := os.Getwd()
if err != nil {
wd = ""
restart.files = make(map[string]*FileState)
if existing, e := os.Open(".logstash-forwarder"); e == nil {
defer existing.Close()
wd := ""
if wd, e = os.Getwd(); e != nil {
emit("WARNING: os.Getwd retuned unexpected error %s -- ignoring\n", e.Error())
}
emit("Loading registrar data from %s/.logstash-forwarder\n", wd)
decoder := json.NewDecoder(history)
decoder.Decode(&resume.files)
history.Close()
decoder := json.NewDecoder(existing)
decoder.Decode(&restart.files)
}
prospector_pending := 0
pendingProspectorCnt := 0
// Prospect the globs/paths given on the command line and launch harvesters
for _, fileconfig := range config.Files {
prospector := &Prospector{FileConfig: fileconfig}
go prospector.Prospect(resume, event_chan)
prospector_pending++
go prospector.Prospect(restart, event_chan)
pendingProspectorCnt++
}
// Now determine which states we need to persist by pulling the events from the prospectors
// When we hit a nil source a prospector had finished so we decrease the expected events
emit("Waiting for %d prospectors to initialise\n", prospector_pending)
emit("Waiting for %d prospectors to initialise\n", pendingProspectorCnt)
persist := make(map[string]*FileState)
for event := range resume.persist {
for event := range restart.persist {
if event.Source == nil {
prospector_pending--
if prospector_pending == 0 {
pendingProspectorCnt--
if pendingProspectorCnt == 0 {
break
}
continue
@@ -173,7 +187,7 @@ func main() {
// registrar records last acknowledged positions in all files.
Registrar(persist, registrar_chan)
} /* main */
}
// REVU: yes, this is a temp hack.
func emit(msgfmt string, args ...interface{}) {
@@ -186,6 +200,7 @@ func emit(msgfmt string, args ...interface{}) {
func fault(msgfmt string, args ...interface{}) {
exit(exitStat.faulted, msgfmt, args...)
}
func exit(stat int, msgfmt string, args ...interface{}) {
log.Printf(msgfmt, args...)
os.Exit(stat)
View
@@ -1,32 +1,51 @@
package main
import (
"log"
"os"
"encoding/json"
)
func Registrar(state map[string]*FileState, input chan []*FileEvent) {
for events := range input {
log.Printf("Registrar received %d events\n", len(events))
// Take the last event found for each file source
for _, event := range events {
// skip stdin
if *event.Source == "-" {
continue
}
for events := range input {
emit ("Registrar: precessing %d events\n", len(events))
// Take the last event found for each file source
for _, event := range events {
// skip stdin
if *event.Source == "-" {
continue
}
ino, dev := file_ids(event.fileinfo)
state[*event.Source] = &FileState{
Source: event.Source,
// take the offset + length of the line + newline char and
// save it as the new starting offset.
// This issues a problem, if the EOL is a CRLF! Then on start it read the LF again and generates a event with an empty line
Offset: event.Offset + int64(len(*event.Text)) + 1,
Inode: ino,
Device: dev,
}
//log.Printf("State %s: %d\n", *event.Source, event.Offset)
}
ino, dev := file_ids(event.fileinfo)
state[*event.Source] = &FileState{
Source: event.Source,
// take the offset + length of the line + newline char and
// save it as the new starting offset.
// This issues a problem, if the EOL is a CRLF! Then on start it read the LF again and generates a event with an empty line
Offset: event.Offset + int64(len(*event.Text)) + 1, // REVU: this is begging for BUGs
Inode: ino,
Device: dev,
}
//log.Printf("State %s: %d\n", *event.Source, event.Offset)
}
WriteRegistry(state, ".logstash-forwarder")
}
if e := writeRegistry(state, ".logstash-forwarder"); e != nil {
// REVU: but we should panic, or something, right?
emit("WARNING: (continuing) update of registry returned error: %s", e)
}
}
}
func writeRegistry(state map[string]*FileState, path string) error {
tempfile := path + ".new"
file, e := os.Create(tempfile)
if e != nil {
emit("Failed to create tempfile (%s) for writing: %s\n", tempfile, e)
return e
}
defer file.Close()
encoder := json.NewEncoder(file)
encoder.Encode(state)
return onRegistryWrite(path, tempfile)
}
Oops, something went wrong.

0 comments on commit 44299dd

Please sign in to comment.