Skip to content

Commit

Permalink
Merge pull request #1375 from ruflin/fb-registry
Browse files Browse the repository at this point in the history
Fix filebeat file rotation registrar issue #1281
  • Loading branch information
Steffen Siering committed Apr 18, 2016
2 parents 08204a2 + 8bbc264 commit a4d9086
Show file tree
Hide file tree
Showing 13 changed files with 280 additions and 40 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha1...master[Check the HEAD d
*Topbeat*

*Filebeat*
- Improvements in registrar dealing with file rotation. {pull}1281[1281]

*Winlogbeat*

Expand Down
2 changes: 1 addition & 1 deletion filebeat/beater/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func makeEvents(name string, n int) []*input.FileEvent {
DocumentType: "log",
Bytes: 100,
Offset: int64(i),
Source: &name,
Source: name,
}
events = append(events, event)
}
Expand Down
2 changes: 1 addition & 1 deletion filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (c *Crawler) Start(prospectorConfigs []config.ProspectorConfig, eventChan c
go prospector.Run(&c.wg)
}

logp.Info("All prospectors are initialised and running with %d states to persist", len(c.Registrar.State))
logp.Info("All prospectors are initialised and running with %d states to persist", len(c.Registrar.getStateCopy()))

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion filebeat/crawler/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ type Prospector struct {
}

type Prospectorer interface {
Run()
Init()
Run()
}

func NewProspector(prospectorConfig cfg.ProspectorConfig, registrar *Registrar, channel chan *input.FileEvent) (*Prospector, error) {
Expand Down
12 changes: 11 additions & 1 deletion filebeat/crawler/prospector_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ func (p *ProspectorLog) checkExistingFile(h *harvester.Harvester, newFile *input

// Start a new harvester on the path
h.Start()
p.Prospector.registrar.Persist <- h.GetState()

}

// Keep the old file in missingFiles so we don't rescan it if it was renamed and we've not yet reached the new filename
Expand All @@ -239,6 +241,8 @@ func (p *ProspectorLog) checkExistingFile(h *harvester.Harvester, newFile *input
// Start a harvester on the path; a file was just modified and it doesn't have a harvester
// The offset to continue from will be stored in the harvester channel - so take that to use and also clear the channel
p.resumeHarvesting(h, <-h.Stat.Return)
p.Prospector.registrar.Persist <- h.GetState()

} else {
logp.Debug("prospector", "Not harvesting, file didn't change: %s", h.Path)
}
Expand All @@ -252,14 +256,20 @@ func (p *ProspectorLog) continueExistingFile(h *harvester.Harvester, previousFil

lastinfo := p.prospectorList[previousFile]
h.Stat.Continue(&lastinfo)

// Update state because of file rotation
p.Prospector.registrar.Persist <- h.GetState()
}

// Start / resume harvester with a predefined offset
func (p *ProspectorLog) resumeHarvesting(h *harvester.Harvester, offset int64) {

logp.Debug("prospector", "Start / resuming harvester of file: %s", h.Path)
h.Offset = offset
h.SetOffset(offset)
h.Start()

// Update state because of file rotation
p.Prospector.registrar.Persist <- h.GetState()
}

// Check if the given file was renamed. If file is known but with different path,
Expand Down
62 changes: 49 additions & 13 deletions filebeat/crawler/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"path/filepath"
"sync"

cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/input"
Expand All @@ -17,7 +18,8 @@ type Registrar struct {
// Path to the Registry File
registryFile string
// Map with all file paths inside and the corresponding state
State map[string]*FileState
state map[string]*FileState
stateMutex sync.Mutex
// Channel used by the prospector and crawler to send FileStates to be persisted
Persist chan *input.FileState

Expand All @@ -39,7 +41,7 @@ func NewRegistrar(registryFile string) (*Registrar, error) {
func (r *Registrar) Init() error {
// Init state
r.Persist = make(chan *FileState)
r.State = make(map[string]*FileState)
r.state = map[string]*FileState{}
r.Channel = make(chan []*FileEvent, 1)

// Set to default in case it is not set
Expand All @@ -66,11 +68,13 @@ func (r *Registrar) Init() error {
// loadState fetches the previous reading state from the configure RegistryFile file
// The default file is `registry` in the data path.
func (r *Registrar) LoadState() {
r.stateMutex.Lock()
defer r.stateMutex.Unlock()
if existing, e := os.Open(r.registryFile); e == nil {
defer existing.Close()
logp.Info("Loading registrar data from %s", r.registryFile)
decoder := json.NewDecoder(existing)
decoder.Decode(&r.State)
decoder.Decode(&r.state)
}
}

Expand All @@ -87,8 +91,9 @@ func (r *Registrar) Run() {
return
// Treats new log files to persist with higher priority then new events
case state := <-r.Persist:
r.State[*state.Source] = state
logp.Debug("prospector", "Registrar will re-save state for %s", *state.Source)
source := state.Source
r.setState(source, state)
logp.Debug("prospector", "Registrar will re-save state for %s", source)
case events := <-r.Channel:
r.processEvents(events)
}
Expand All @@ -111,7 +116,7 @@ func (r *Registrar) processEvents(events []*FileEvent) {
continue
}

r.State[*event.Source] = event.GetState()
r.setState(event.Source, event.GetState())
}
}

Expand All @@ -122,7 +127,7 @@ func (r *Registrar) Stop() {
}

func (r *Registrar) GetFileState(path string) (*FileState, bool) {
state, exist := r.State[path]
state, exist := r.getState(path)
return state, exist
}

Expand All @@ -138,12 +143,14 @@ func (r *Registrar) writeRegistry() error {
}

encoder := json.NewEncoder(file)
encoder.Encode(r.State)

state := r.getStateCopy()
encoder.Encode(state)

// Directly close file because of windows
file.Close()

logp.Info("Registry file updated. %d states written.", len(r.State))
logp.Info("Registry file updated. %d states written.", len(state))

return SafeFileRotate(r.registryFile, tempfile)
}
Expand All @@ -157,7 +164,6 @@ func (r *Registrar) fetchState(filePath string, fileInfo os.FileInfo) (int64, bo
logp.Debug("registrar", "Same file as before found. Fetch the state and persist it.")
// We're resuming - throw the last state back downstream so we resave it
// And return the offset - also force harvest in case the file is old and we're about to skip it
r.Persist <- lastState
return lastState.Offset, true
}

Expand All @@ -168,8 +174,7 @@ func (r *Registrar) fetchState(filePath string, fileInfo os.FileInfo) (int64, bo
logp.Info("Detected rename of a previously harvested file: %s -> %s", previous, filePath)

lastState, _ := r.GetFileState(previous)
lastState.Source = &filePath
r.Persist <- lastState
r.updateStateSource(lastState, filePath)
return lastState.Offset, true
}

Expand All @@ -189,7 +194,7 @@ func (r *Registrar) getPreviousFile(newFilePath string, newFileInfo os.FileInfo)

newState := input.GetOSFileState(&newFileInfo)

for oldFilePath, oldState := range r.State {
for oldFilePath, oldState := range r.getStateCopy() {

// Skipping when path the same
if oldFilePath == newFilePath {
Expand All @@ -205,3 +210,34 @@ func (r *Registrar) getPreviousFile(newFilePath string, newFileInfo os.FileInfo)

return "", fmt.Errorf("No previous file found")
}

func (r *Registrar) setState(path string, state *FileState) {
r.stateMutex.Lock()
defer r.stateMutex.Unlock()
r.state[path] = state
}

func (r *Registrar) getState(path string) (*FileState, bool) {
r.stateMutex.Lock()
defer r.stateMutex.Unlock()
state, exist := r.state[path]
return state, exist
}

func (r *Registrar) updateStateSource(state *FileState, path string) {
r.stateMutex.Lock()
defer r.stateMutex.Unlock()
state.Source = path
}

func (r *Registrar) getStateCopy() map[string]FileState {
r.stateMutex.Lock()
defer r.stateMutex.Unlock()

copy := make(map[string]FileState)
for k, v := range r.state {
copy[k] = *v
}

return copy
}
2 changes: 1 addition & 1 deletion filebeat/harvester/filestat.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package harvester

import "os"

// Contains statistic about file when it was last seend by the prospector
// Contains statistic about file when it was last seen by the prospector
type FileStat struct {
Fileinfo os.FileInfo /* the file info */
Return chan int64 /* the harvester will send an event with its offset when it closes */
Expand Down
5 changes: 3 additions & 2 deletions filebeat/harvester/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package harvester
import (
"fmt"
"regexp"
"sync"

"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester/encoding"
Expand All @@ -25,7 +26,8 @@ import (
type Harvester struct {
Path string /* the file path to harvest */
Config *config.HarvesterConfig
Offset int64
offset int64
offsetLock sync.Mutex
Stat *FileStat
SpoolerChan chan *input.FileEvent
encoding encoding.EncodingFactory
Expand Down Expand Up @@ -67,6 +69,5 @@ func NewHarvester(

func (h *Harvester) Start() {
// Starts harvester and picks the right type. In case type is not set, set it to defeault (log)

go h.Harvest()
}
2 changes: 1 addition & 1 deletion filebeat/harvester/harvester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestExampleTest(t *testing.T) {

h := Harvester{
Path: "/var/log/",
Offset: 0,
offset: 0,
}

assert.Equal(t, "/var/log/", h.Path)
Expand Down
49 changes: 38 additions & 11 deletions filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (h *Harvester) Harvest() {
defer func() {
// On completion, push offset so we can continue where we left off if we relaunch on the same file
if h.Stat != nil {
h.Stat.Return <- h.Offset
h.Stat.Return <- h.GetOffset()
}

logp.Debug("harvester", "Stopping harvester for file: %s", h.Path)
Expand Down Expand Up @@ -123,8 +123,8 @@ func (h *Harvester) Harvest() {

logp.Info("File was truncated. Begin reading file from offset 0: %s", h.Path)

h.Offset = 0
seeker.Seek(h.Offset, os.SEEK_SET)
h.SetOffset(0)
seeker.Seek(h.GetOffset(), os.SEEK_SET)
continue
}

Expand All @@ -136,10 +136,10 @@ func (h *Harvester) Harvest() {
event := &input.FileEvent{
EventMetadata: h.Config.EventMetadata,
ReadTime: ts,
Source: &h.Path,
Source: h.Path,
InputType: h.Config.InputType,
DocumentType: h.Config.DocumentType,
Offset: h.Offset,
Offset: h.GetOffset(),
Bytes: bytesRead,
Text: &text,
Fileinfo: &info,
Expand All @@ -151,7 +151,7 @@ func (h *Harvester) Harvest() {
}

// Set Offset
h.Offset += int64(bytesRead) // Update offset if complete line has been processed
h.SetOffset(h.GetOffset() + int64(bytesRead)) // Update offset if complete line has been processed
}
}

Expand Down Expand Up @@ -237,25 +237,52 @@ func (h *Harvester) openFile() (encoding.Encoding, error) {
func (h *Harvester) initFileOffset(file *os.File) error {
offset, err := file.Seek(0, os.SEEK_CUR)

if h.Offset > 0 {
if h.GetOffset() > 0 {
// continue from last known offset

logp.Debug("harvester",
"harvest: %q position:%d (offset snapshot:%d)", h.Path, h.Offset, offset)
_, err = file.Seek(h.Offset, os.SEEK_SET)
"harvest: %q position:%d (offset snapshot:%d)", h.Path, h.GetOffset(), offset)
_, err = file.Seek(h.GetOffset(), os.SEEK_SET)
} else if h.Config.TailFiles {
// tail file if file is new and tail_files config is set

logp.Debug("harvester",
"harvest: (tailing) %q (offset snapshot:%d)", h.Path, offset)
h.Offset, err = file.Seek(0, os.SEEK_END)
offset, err = file.Seek(0, os.SEEK_END)
h.SetOffset(offset)

} else {
// get offset from file in case of encoding factory was
// required to read some data.
logp.Debug("harvester", "harvest: %q (offset snapshot:%d)", h.Path, offset)
h.Offset = offset
h.SetOffset(offset)
}

return err
}

// GetState returns current state of harvester
func (h *Harvester) GetState() *input.FileState {

state := input.FileState{
Source: h.Path,
Offset: h.GetOffset(),
FileStateOS: input.GetOSFileState(&h.Stat.Fileinfo),
}

return &state
}

func (h *Harvester) SetOffset(offset int64) {
h.offsetLock.Lock()
defer h.offsetLock.Unlock()

h.offset = offset
}

func (h *Harvester) GetOffset() int64 {
h.offsetLock.Lock()
defer h.offsetLock.Unlock()

return h.offset
}
6 changes: 3 additions & 3 deletions filebeat/input/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type File struct {
type FileEvent struct {
common.EventMetadata
ReadTime time.Time
Source *string
Source string
InputType string
DocumentType string
Offset int64
Expand All @@ -32,8 +32,8 @@ type FileEvent struct {
}

type FileState struct {
Source *string `json:"source,omitempty"`
Offset int64 `json:"offset,omitempty"`
Source string `json:"source,omitempty"`
Offset int64 `json:"offset,omitempty"`
FileStateOS *FileStateOS
}

Expand Down
Loading

0 comments on commit a4d9086

Please sign in to comment.