Skip to content

Commit

Permalink
Fix filebeat registry meta being nil vs empty (#7632)
Browse files Browse the repository at this point in the history
Filebeat introduces a meta field to registry entries in 6.3.1. The meta field is used to distuingish different log streams in docker files. For other input types the meta field must be null. Unfortunately the input loader did initialize the meta field with an empty dictionary. This leads to failing matches of old and new registry entries. Due to the match failing, old entries will not be removed, and filebeat will handle all files as new files on startup (old logs are send again).

Users will observe duplicate entries in the reigstry file. One entry with "meta": null and one entry with "meta": {}. The entry with "meta": {} will be used by filebeat. The null-entry will not be used by filebeat, but is kept in the registry file, cause it has now active owner (yet).

Improvements provided by this PR:

* when matching states consider an empty map and a null-map to be equivalent
* update input loader to create a null map for old state -> registry entries will be compatible on upgrade
* Add checks in critical places replacing an empty map with a null-map
* Add support to fix registry entries on load. states from corrupted 6.3.1 files will be merged into one single state on load 
* introduce unit tests for loading different registry formats
* introduce system tests validating output and registry when upgrading filebeat from an older version

Closes: #7634
  • Loading branch information
Steffen Siering authored and tsg committed Jul 19, 2018
1 parent 5eaafff commit c558984
Show file tree
Hide file tree
Showing 13 changed files with 401 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Expand Up @@ -103,6 +103,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff]
- Fix offset field pointing at end of a line. {issue}6514[6514]
- Fix an issue when parsing ISO8601 dates with timezone definition {issue}7367[7367]
- Fix Grok pattern of MongoDB module. {pull}7568[7568]
- Fix registry duplicates and log resending on upgrade. {issue}7634[7634]

*Heartbeat*
- Fix race due to updates of shared a map, that was not supposed to be shared between multiple go-routines. {issue}6616[6616]
Expand Down
3 changes: 3 additions & 0 deletions filebeat/input/docker/input.go
Expand Up @@ -73,6 +73,9 @@ func NewInput(

// Add stream to meta to ensure different state per stream
if config.Containers.Stream != "all" {
if context.Meta == nil {
context.Meta = map[string]string{}
}
context.Meta["stream"] = config.Containers.Stream
}

Expand Down
7 changes: 5 additions & 2 deletions filebeat/input/file/state.go
Expand Up @@ -44,6 +44,9 @@ type State struct {

// NewState creates a new file state
func NewState(fileInfo os.FileInfo, path string, t string, meta map[string]string) State {
if len(meta) == 0 {
meta = nil
}
return State{
Fileinfo: fileInfo,
Source: path,
Expand All @@ -60,7 +63,7 @@ func NewState(fileInfo os.FileInfo, path string, t string, meta map[string]strin
func (s *State) ID() string {
// Generate id on first request. This is needed as id is not set when converting back from json
if s.Id == "" {
if s.Meta == nil {
if len(s.Meta) == 0 {
s.Id = s.FileStateOS.String()
} else {
hashValue, _ := hashstructure.Hash(s.Meta, nil)
Expand Down Expand Up @@ -91,6 +94,6 @@ func (s *State) IsEqual(c *State) bool {
func (s *State) IsEmpty() bool {
return s.FileStateOS == file.StateOS{} &&
s.Source == "" &&
s.Meta == nil &&
len(s.Meta) == 0 &&
s.Timestamp.IsZero()
}
2 changes: 1 addition & 1 deletion filebeat/input/input.go
Expand Up @@ -96,7 +96,7 @@ func New(
Done: input.done,
BeatDone: input.beatDone,
DynamicFields: dynFields,
Meta: map[string]string{},
Meta: nil,
}
var ipt Input
ipt, err = f(conf, outlet, context)
Expand Down
11 changes: 10 additions & 1 deletion filebeat/input/log/input.go
Expand Up @@ -93,6 +93,11 @@ func NewInput(
// can be forwarded correctly to the registrar.
stateOut := channel.CloseOnSignal(channel.SubOutlet(out), context.BeatDone)

meta := context.Meta
if len(meta) == 0 {
meta = nil
}

p := &Input{
config: defaultConfig,
cfg: cfg,
Expand All @@ -101,7 +106,7 @@ func NewInput(
stateOutlet: stateOut,
states: file.NewStates(),
done: context.Done,
meta: context.Meta,
meta: meta,
}

if err := cfg.Unpack(&p.config); err != nil {
Expand Down Expand Up @@ -687,6 +692,10 @@ func (p *Input) updateState(state file.State) error {
state.TTL = p.config.CleanInactive
}

if len(state.Meta) == 0 {
state.Meta = nil
}

// Update first internal state
p.states.Update(state)

Expand Down
104 changes: 98 additions & 6 deletions filebeat/registrar/registrar.go
Expand Up @@ -20,6 +20,7 @@ package registrar
import (
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"sync"
Expand Down Expand Up @@ -132,20 +133,111 @@ func (r *Registrar) loadStates() error {

logp.Info("Loading registrar data from %s", r.registryFile)

decoder := json.NewDecoder(f)
states := []file.State{}
err = decoder.Decode(&states)
states, err := readStatesFrom(f)
if err != nil {
return fmt.Errorf("Error decoding states: %s", err)
return err
}

states = resetStates(states)
r.states.SetStates(states)
logp.Info("States Loaded from registrar: %+v", len(states))

return nil
}

func readStatesFrom(in io.Reader) ([]file.State, error) {
states := []file.State{}
decoder := json.NewDecoder(in)
if err := decoder.Decode(&states); err != nil {
return nil, fmt.Errorf("Error decoding states: %s", err)
}

states = fixStates(states)
states = resetStates(states)
return states, nil
}

// fixStates cleans up the regsitry states when updating from an older version
// of filebeat potentially writing invalid entries.
func fixStates(states []file.State) []file.State {
if len(states) == 0 {
return states
}

// we use a map of states here, so to identify and merge duplicate entries.
idx := map[string]*file.State{}
for i := range states {
state := &states[i]
fixState(state)

id := state.ID()
old, exists := idx[id]
if !exists {
idx[id] = state
} else {
mergeStates(old, state) // overwrite the entry in 'old'
}
}

if len(idx) == len(states) {
return states
}

i := 0
newStates := make([]file.State, len(idx))
for _, state := range idx {
newStates[i] = *state
i++
}
return newStates
}

// fixState updates a read state to fullfil required invariantes:
// - "Meta" must be nil if len(Meta) == 0
func fixState(st *file.State) {
if len(st.Meta) == 0 {
st.Meta = nil
}
}

// mergeStates merges 2 states by trying to determine the 'newer' state.
// The st state is overwritten with the updated fields.
func mergeStates(st, other *file.State) {
st.Finished = st.Finished || other.Finished
if st.Offset < other.Offset { // always select the higher offset
st.Offset = other.Offset
}

// update file meta-data. As these are updated concurrently by the
// prospectors, select the newer state based on the update timestamp.
var meta, metaOld, metaNew map[string]string
if st.Timestamp.Before(other.Timestamp) {
st.Source = other.Source
st.Timestamp = other.Timestamp
st.TTL = other.TTL
st.FileStateOS = other.FileStateOS

metaOld, metaNew = st.Meta, other.Meta
} else {
metaOld, metaNew = other.Meta, st.Meta
}

if len(metaOld) == 0 || len(metaNew) == 0 {
meta = metaNew
} else {
meta = map[string]string{}
for k, v := range metaOld {
meta[k] = v
}
for k, v := range metaNew {
meta[k] = v
}
}

if len(meta) == 0 {
meta = nil
}
st.Meta = meta
}

// resetStates sets all states to finished and disable TTL on restart
// For all states covered by an input, TTL will be overwritten with the input value
func resetStates(states []file.State) []file.State {
Expand Down

0 comments on commit c558984

Please sign in to comment.