forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
state.go
149 lines (121 loc) · 3.63 KB
/
state.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package file
import (
"os"
"sync"
"time"
"github.com/elastic/beats/libbeat/logp"
)
// State is used to communicate the reading state of a file
type State struct {
Source string `json:"source"`
Offset int64 `json:"offset"`
Finished bool `json:"-"` // harvester state
Fileinfo os.FileInfo `json:"-"` // the file info
FileStateOS StateOS
Timestamp time.Time `json:"timestamp"`
TTL time.Duration `json:"ttl"`
}
// NewState creates a new file state
func NewState(fileInfo os.FileInfo, path string) State {
return State{
Fileinfo: fileInfo,
Source: path,
Finished: false,
FileStateOS: GetOSState(fileInfo),
Timestamp: time.Now(),
TTL: -1 * time.Second, // By default, state does have an infinit ttl
}
}
// IsEmpty returns true if the state is empty
func (s *State) IsEmpty() bool {
return *s == State{}
}
// States handles list of FileState
type States struct {
states []State
mutex sync.Mutex
}
func NewStates() *States {
return &States{
states: []State{},
}
}
// Update updates a state. If previous state didn't exist, new one is created
func (s *States) Update(newState State) {
s.mutex.Lock()
defer s.mutex.Unlock()
index, _ := s.findPrevious(newState)
newState.Timestamp = time.Now()
if index >= 0 {
s.states[index] = newState
} else {
// No existing state found, add new one
s.states = append(s.states, newState)
logp.Debug("prospector", "New state added for %s", newState.Source)
}
}
func (s *States) FindPrevious(newState State) State {
// TODO: This currently blocks writing updates every time state is fetched. Should be improved for performance
s.mutex.Lock()
defer s.mutex.Unlock()
_, state := s.findPrevious(newState)
return state
}
// findPreviousState returns the previous state fo the file
// In case no previous state exists, index -1 is returned
func (s *States) findPrevious(newState State) (int, State) {
// TODO: This could be made potentially more performance by using an index (harvester id) and only use iteration as fall back
for index, oldState := range s.states {
// This is using the FileStateOS for comparison as FileInfo identifiers can only be fetched for existing files
if oldState.FileStateOS.IsSame(newState.FileStateOS) {
return index, oldState
}
}
return -1, State{}
}
// Cleanup cleans up the state array. All states which are older then `older` are removed
func (s *States) Cleanup() {
s.mutex.Lock()
defer s.mutex.Unlock()
currentTime := time.Now()
states := s.states[:0]
for _, state := range s.states {
ttl := state.TTL
if ttl == 0 || (ttl > 0 && currentTime.Sub(state.Timestamp) > ttl) {
if state.Finished {
logp.Debug("state", "State removed for %v because of older: %v", state.Source, ttl)
continue // drop state
} else {
logp.Err("State for %s should have been dropped, but couldn't as state is not finished.", state.Source)
}
}
states = append(states, state) // in-place copy old state
}
s.states = states
}
// Count returns number of states
func (s *States) Count() int {
s.mutex.Lock()
defer s.mutex.Unlock()
return len(s.states)
}
// Returns a copy of the file states
func (s *States) GetStates() []State {
s.mutex.Lock()
defer s.mutex.Unlock()
newStates := make([]State, len(s.states))
copy(newStates, s.states)
return newStates
}
// SetStates overwrites all internal states with the given states array
func (s *States) SetStates(states []State) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.states = states
}
// Copy create a new copy of the states object
func (s *States) Copy() *States {
states := NewStates()
states.states = s.GetStates()
return states
}