Permalink
Browse files

Prospector now detects file rotation while it is not running

reported in issue #144
DRY - Restructure prospector
  • Loading branch information...
driskell authored and jordansissel committed Mar 15, 2014
1 parent 849003a commit 2edd2ddff80ecea44ae6b6aec886346e82e931e7
Showing with 131 additions and 96 deletions.
  1. +16 −0 filecompare.go
  2. +6 −0 filecompare_windows.go
  3. +13 −10 logstash-forwarder.go
  4. +96 −86 prospector.go
View
@@ -41,3 +41,19 @@ func is_file_renamed(file string, info os.FileInfo, fileinfo map[string]Prospect
}
return ""
}
func is_file_renamed_resumelist(file string, info os.FileInfo, initial map[string]*FileState) string {
// NOTE(driskell): What about using golang's func os.SameFile(fi1, fi2 FileInfo) bool instead?
stat := info.Sys().(*syscall.Stat_t)
for kf, ki := range initial {
if kf == file {
continue
}
if stat.Dev == ki.Device && stat.Ino == ki.Inode {
return kf
}
}
return ""
}
View
@@ -20,3 +20,9 @@ func is_file_renamed(file string, info os.FileInfo, fileinfo map[string]Prospect
// NOTE(driskell): What about using golang's func os.SameFile(fi1, fi2 FileInfo) bool?
return ""
}
func is_file_renamed_resumelist(file string, info os.FileInfo, initial map[string]*FileState) string {
// Can we detect if a file was renamed on Windows?
// NOTE(driskell): What about using golang's func os.SameFile(fi1, fi2 FileInfo) bool?
return ""
}
View
@@ -37,8 +37,10 @@ func main() {
return
}
statereturn_chan := make(chan *FileState)
statereturns := 0
resumeinfo := &ProspectorResume{}
resumeinfo.resave = make(chan *FileState)
prospector_waits := 0
event_chan := make(chan *FileEvent, 16)
publisher_chan := make(chan []*FileEvent, 1)
@@ -63,7 +65,7 @@ func main() {
}
// Load the previous log file locations now, for use in prospector
historical_state := make(map[string]*FileState)
resumeinfo.files = make(map[string]*FileState)
history, err := os.Open(".logstash-forwarder")
if err == nil {
wd, err := os.Getwd()
@@ -73,25 +75,26 @@ func main() {
log.Printf("Loading registrar data from %s/.logstash-forwarder\n", wd)
decoder := json.NewDecoder(history)
decoder.Decode(&historical_state)
decoder.Decode(&resumeinfo.files)
history.Close()
}
// Prospect the globs/paths given on the command line and launch harvesters
for _, fileconfig := range config.Files {
go Prospect(fileconfig, historical_state, statereturn_chan, event_chan)
statereturns++
prospector := &Prospector{FileConfig: fileconfig}
go prospector.Prospect(resumeinfo, event_chan)
prospector_waits++
}
// Now determine which states we need to re-save by pulling the events from the prospectors
// When we hit a nil source a prospector had finished so we decrease the expected events
log.Printf("Waiting for %d prospectors to process loaded registrar data\n", statereturns)
log.Printf("Waiting for %d prospectors to process loaded registrar data\n", prospector_waits)
new_state := make(map[string]*FileState)
for event := range statereturn_chan {
for event := range resumeinfo.resave {
if event.Source == nil {
statereturns--
if statereturns == 0 {
prospector_waits--
if prospector_waits == 0 {
break
}
continue
View
@@ -7,100 +7,71 @@ import (
"time"
)
type ProspectorResume struct {
files map[string]*FileState
resave chan *FileState
}
type ProspectorInfo struct {
fileinfo os.FileInfo /* the file info */
harvester chan int64 /* the harvester will send an event with its offset when it closes */
last_seen uint32 /* int number of the last iterations in which we saw this file */
}
func Prospect(fileconfig FileConfig, historical_state map[string]*FileState, statereturn chan *FileState, output chan *FileEvent) {
fileinfo := make(map[string]ProspectorInfo)
type Prospector struct {
FileConfig FileConfig
fileinfo map[string]ProspectorInfo
iteration uint32
}
func (p *Prospector) Prospect(resumelist *ProspectorResume, output chan *FileEvent) {
p.fileinfo = make(map[string]ProspectorInfo)
// Handle any "-" (stdin) paths
for i, path := range fileconfig.Paths {
for i, path := range p.FileConfig.Paths {
if path == "-" {
harvester := Harvester{Path: path, FileConfig: fileconfig}
// Offset and Initial never get used when path is "-"
harvester := Harvester{Path: path, FileConfig: p.FileConfig}
go harvester.Harvest(output)
// Remove it from the file list
fileconfig.Paths = append(fileconfig.Paths[:i], fileconfig.Paths[i+1:]...)
p.FileConfig.Paths = append(p.FileConfig.Paths[:i], p.FileConfig.Paths[i+1:]...)
}
}
// Use the registrar db to reopen any files at their last positions
resume_tracking(fileconfig, historical_state, fileinfo, statereturn, output)
// Now let's do one quick scan to pick up new files - flag true so new files obey from-beginning
for _, path := range p.FileConfig.Paths {
p.scan(path, output, resumelist)
}
// This signals we finished considering the previous state
event := &FileState{
Source: nil,
}
statereturn <- event
resumelist.resave <- event
var iteration uint32 = 0
for {
for _, path := range fileconfig.Paths {
prospector_scan(iteration, path, fileconfig, fileinfo, output)
for _, path := range p.FileConfig.Paths {
// Scan - flag false so new files always start at beginning
p.scan(path, output, nil)
}
// Defer next scan for a bit.
time.Sleep(10 * time.Second) // Make this tunable
// Clear out files that disappeared
for file, lastinfo := range fileinfo {
if lastinfo.last_seen < iteration {
for file, lastinfo := range p.fileinfo {
if lastinfo.last_seen < p.iteration {
log.Printf("No longer tracking file that hasn't been seen for a while: %s\n", file)
delete(fileinfo, file)
delete(p.fileinfo, file)
}
}
iteration++ // Overflow is allowed
p.iteration++ // Overflow is allowed
}
} /* Prospect */
func resume_tracking(fileconfig FileConfig, historical_state map[string]*FileState, fileinfo map[string]ProspectorInfo, statereturn chan *FileState, output chan *FileEvent) {
// Start up with any registrar data.
for path, state := range historical_state {
// if the file is the same inode/device as we last saw,
// start a harvester on it at the last known position
info, err := os.Stat(path)
if err != nil { continue }
if is_file_same(path, info, state) {
// same file, seek to last known position
for _, pathglob := range fileconfig.Paths {
match, _ := filepath.Match(pathglob, path)
if match {
// If we've already seen this in another file entry, ignore
if _, is_known := fileinfo[path]; is_known {
break
}
log.Printf("Resuming harvester on a previously harvested file: %s\n", path)
newinfo := ProspectorInfo{fileinfo: info, harvester: make(chan int64, 1)}
harvester := &Harvester{Path: path, FileConfig: fileconfig, Offset: state.Offset, FinishChan: newinfo.harvester}
go harvester.Harvest(output)
fileinfo[path] = newinfo
// Throw an event downstream so we re-save this resume information to the registrar state
// Registrar will not save until it receives events (null source) that state each prospector has finished resuming files
ino, dev := file_ids(&info)
event_source := path // We need a copy of path since we change it in the loop above
event := &FileState{
Source: &event_source,
Offset: state.Offset,
Inode: ino,
Device: dev,
}
statereturn <- event
break
}
}
}
}
}
func prospector_scan(iteration uint32, path string, fileconfig FileConfig,
fileinfo map[string]ProspectorInfo,
output chan *FileEvent) {
func (p *Prospector) scan(path string, output chan *FileEvent, resumelist *ProspectorResume) {
//log.Printf("Prospecting %s\n", path)
// Evaluate the path as a wildcards/shell glob
@@ -113,11 +84,6 @@ func prospector_scan(iteration uint32, path string, fileconfig FileConfig,
// To keep the old inode/dev reference if we see a file has renamed, in case it was also renamed prior
missingfiles := make(map[string]os.FileInfo)
// If the glob matches nothing, use the path itself as a literal.
if len(matches) == 0 && path == "-" {
matches = append(matches, path)
}
// Check any matched files to see if we need to start a harvester
for _, file := range matches {
// Stat the file, following any symlinks.
@@ -133,46 +99,64 @@ func prospector_scan(iteration uint32, path string, fileconfig FileConfig,
continue
}
// Check the current info against fileinfo[file]
lastinfo, is_known := fileinfo[file]
// Check the current info against p.fileinfo[file]
lastinfo, is_known := p.fileinfo[file]
newinfo := lastinfo
// Conditions for starting a new harvester:
// - file path hasn't been seen before
// - the file's inode or device changed
if !is_known {
// Create a new prospector info with the stat info for comparison
newinfo = ProspectorInfo{fileinfo: info, harvester: make(chan int64, 1), last_seen: iteration}
if time.Since(info.ModTime()) > fileconfig.deadtime {
// Old file, skip it, but push offset of 0 so we obey from_beginning if this file changes and needs picking up
log.Printf("Skipping file (older than dead time of %v): %s\n", fileconfig.deadtime, file)
newinfo.harvester <- 0
} else if previous := is_file_renamed(file, info, fileinfo, missingfiles); previous != "" {
newinfo = ProspectorInfo{fileinfo: info, harvester: make(chan int64, 1), last_seen: p.iteration}
if time.Since(info.ModTime()) > p.FileConfig.deadtime {
// Call the calculator - it will process resume state if there is one
offset, is_resuming := p.calculate_resume(file, info, resumelist)
// Are we resuming a dead file? We have to resume even if dead so we catch any old updates to the file
// This is safe as the harvester, once it hits the EOF and a timeout, will stop harvesting
// Once we detect changes again we can resume another harvester again - this keeps number of go routines to a minimum
if is_resuming {
log.Printf("Resuming harvester on a previously harvested file: %s\n", file)
harvester := &Harvester{Path: file, FileConfig: p.FileConfig, Offset: offset, FinishChan: newinfo.harvester}
go harvester.Harvest(output)
} else {
// Old file, skip it, but push offset of 0 so we obey from_beginning if this file changes and needs picking up
log.Printf("Skipping file (older than dead time of %v): %s\n", p.FileConfig.deadtime, file)
newinfo.harvester <- 0
}
} else if previous := is_file_renamed(file, info, p.fileinfo, missingfiles); previous != "" {
// This file was simply renamed (known inode+dev) - link the same harvester channel as the old file
log.Printf("File rename was detected: %s -> %s\n", previous, file)
newinfo.harvester = fileinfo[previous].harvester
newinfo.harvester = p.fileinfo[previous].harvester
} else {
// Most likely a new file. Harvest it!
log.Printf("Launching harvester on new file: %s\n", file)
// Call the calculator - it will process resume state if there is one
offset, is_resuming := p.calculate_resume(file, info, resumelist)
// Are we resuming a file or is this a completely new file?
if is_resuming {
log.Printf("Resuming harvester on a previously harvested file: %s\n", file)
} else {
log.Printf("Launching harvester on new file: %s\n", file)
}
harvester := &Harvester{Path: file, FileConfig: fileconfig, FinishChan: newinfo.harvester}
// Launch the harvester
harvester := &Harvester{Path: file, FileConfig: p.FileConfig, Offset: offset, FinishChan: newinfo.harvester}
go harvester.Harvest(output)
}
} else {
// Update the fileinfo information used for future comparisons, and the last_seen counter
newinfo.fileinfo = info
newinfo.last_seen = iteration
newinfo.last_seen = p.iteration
// NOTE(driskell): What about using golang's func os.SameFile(fi1, fi2 FileInfo) bool instead?
if !is_fileinfo_same(lastinfo.fileinfo, info) {
if previous := is_file_renamed(file, info, fileinfo, missingfiles); previous != "" {
if previous := is_file_renamed(file, info, p.fileinfo, missingfiles); previous != "" {
// This file was renamed from another file we know - link the same harvester channel as the old file
log.Printf("File rename was detected: %s -> %s\n", previous, file)
newinfo.harvester = fileinfo[previous].harvester
newinfo.harvester = p.fileinfo[previous].harvester
} else {
// File is not the same file we saw previously, it must have rotated and is a new file
log.Printf("Launching harvester on rotated file: %s\n", file)
@@ -181,27 +165,53 @@ func prospector_scan(iteration uint32, path string, fileconfig FileConfig,
newinfo.harvester = make(chan int64, 1)
// Start a harvester on the path
harvester := &Harvester{Path: file, FileConfig: fileconfig, FinishChan: newinfo.harvester}
harvester := &Harvester{Path: file, FileConfig: p.FileConfig, FinishChan: newinfo.harvester}
go harvester.Harvest(output)
}
// Keep the old file in missingfiles so we don't rescan it if it was renamed and we've not yet reached the new filename
// We only need to keep it for the remainder of this iteration then we can assume it was deleted and forget about it
missingfiles[file] = lastinfo.fileinfo
} else if len(newinfo.harvester) != 0 && time.Since(info.ModTime()) < fileconfig.deadtime {
} else if len(newinfo.harvester) != 0 && time.Since(info.ModTime()) < p.FileConfig.deadtime {
// NOTE(driskell): If dead time is less than the prospector interval, this stops working
// Resume harvesting of an old file we've stopped harvesting from
log.Printf("Resuming harvester on an old file that was just modified: %s\n", file)
// Start a harvester on the path; an old file was just modified and it doesn't have a harvester
// The offset to continue from will be stored in the harvester channel - so take that to use and also clear the channel
harvester := &Harvester{Path: file, FileConfig: fileconfig, Offset: <-newinfo.harvester, FinishChan: newinfo.harvester}
harvester := &Harvester{Path: file, FileConfig: p.FileConfig, Offset: <-newinfo.harvester, FinishChan: newinfo.harvester}
go harvester.Harvest(output)
}
}
// Track the stat data for this file for later comparison to check for
// rotation/etc
fileinfo[file] = newinfo
p.fileinfo[file] = newinfo
} // for each file matched by the glob
}
func (p *Prospector) calculate_resume(file string, info os.FileInfo, resumelist *ProspectorResume) (int64, bool) {
if resumelist != nil {
if last_state, is_found := resumelist.files[file]; is_found && is_file_same(file, info, last_state) {
// We're resuming - throw the last state back downstream so we resave it
// And return the offset - also force harvest in case the file is old and we're about to skip it
resumelist.resave <- last_state
return last_state.Offset, true
}
if previous := is_file_renamed_resumelist(file, info, resumelist.files); previous != "" {
// File has rotated between shutdown and startup
// We return last state downstream, with a modified event source with the new file name
// And return the offset - also force harvest in case the file is old and we're about to skip it
log.Printf("Detected rotation on a previously harvested file: %s -> %s\n", previous, file)
event := resumelist.files[previous]
event.Source = &file
resumelist.resave <- event
return event.Offset, true
}
}
// New file so just start from an automatic position if initial scan, or the beginning if subsequent scans
// The caller will know which to do
return 0, false
}

0 comments on commit 2edd2dd

Please sign in to comment.