forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 4
/
scanner.go
178 lines (151 loc) · 4.43 KB
/
scanner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package file_integrity
import (
"errors"
"math"
"os"
"path/filepath"
"sync/atomic"
"time"
"github.com/juju/ratelimit"
"github.com/elastic/beats/libbeat/logp"
)
// scannerID is used as a global monotonically increasing counter for assigning
// a unique name to each scanner instance for logging purposes. Use
// atomic.AddUint32() to get a new value.
var scannerID uint32
type scanner struct {
fileCount uint64
byteCount uint64
tokenBucket *ratelimit.Bucket
done <-chan struct{}
eventC chan Event
log *logp.Logger
config Config
}
// NewFileSystemScanner creates a new EventProducer instance that scans the
// configured file paths.
func NewFileSystemScanner(c Config) (EventProducer, error) {
return &scanner{
log: logp.NewLogger(moduleName).With("scanner_id", atomic.AddUint32(&scannerID, 1)),
config: c,
eventC: make(chan Event, 1),
}, nil
}
// Start starts the EventProducer. The provided done channel can be used to stop
// the EventProducer prematurely. The returned Event channel will be closed when
// scanning is complete. The channel must drained otherwise the scanner will
// block.
func (s *scanner) Start(done <-chan struct{}) (<-chan Event, error) {
s.done = done
if s.config.ScanRateBytesPerSec > 0 {
s.log.With(
"bytes_per_sec", s.config.ScanRateBytesPerSec,
"capacity_bytes", s.config.MaxFileSizeBytes).
Debugf("Creating token bucket with rate %v/sec and capacity %v",
s.config.ScanRatePerSec,
s.config.MaxFileSize)
s.tokenBucket = ratelimit.NewBucketWithRate(
float64(s.config.ScanRateBytesPerSec)/2., // Fill Rate
int64(s.config.MaxFileSizeBytes)) // Max Capacity
s.tokenBucket.TakeAvailable(math.MaxInt64)
}
go s.scan()
return s.eventC, nil
}
// scan iterates over the configured paths and generates events for each file.
func (s *scanner) scan() {
s.log.Debugw("File system scanner is starting", "file_path", s.config.Paths)
defer s.log.Debug("File system scanner is stopping")
defer close(s.eventC)
startTime := time.Now()
for _, path := range s.config.Paths {
// Resolve symlinks to ensure we have an absolute path.
evalPath, err := filepath.EvalSymlinks(path)
if err != nil {
s.log.Warnw("Failed to scan", "file_path", path, "error", err)
continue
}
if err = s.walkDir(evalPath); err != nil {
s.log.Warnw("Failed to scan", "file_path", evalPath, "error", err)
}
}
duration := time.Since(startTime)
byteCount := atomic.LoadUint64(&s.byteCount)
fileCount := atomic.LoadUint64(&s.fileCount)
s.log.Infow("File system scan completed",
"took", duration,
"file_count", fileCount,
"total_bytes", byteCount,
"bytes_per_sec", float64(byteCount)/float64(duration)*float64(time.Second),
"files_per_sec", float64(fileCount)/float64(duration)*float64(time.Second),
)
}
func (s *scanner) walkDir(dir string) error {
errDone := errors.New("done")
startTime := time.Now()
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
if !os.IsNotExist(err) {
s.log.Warnw("Scanner is skipping a path because of an error",
"file_path", path, "error", err)
}
return nil
}
if s.config.IsExcludedPath(path) {
if info.IsDir() {
return filepath.SkipDir
}
return nil
}
defer func() { startTime = time.Now() }()
event := s.newScanEvent(path, info, err)
event.rtt = time.Since(startTime)
select {
case s.eventC <- event:
case <-s.done:
return errDone
}
// Throttle reading and hashing rate.
if event.Info != nil && len(event.Hashes) > 0 {
s.throttle(event.Info.Size)
}
// Always traverse into the start dir.
if !info.IsDir() || dir == path {
return nil
}
// Only step into directories if recursion is enabled.
// Skip symlinks to dirs.
m := info.Mode()
if !s.config.Recursive || m&os.ModeSymlink > 0 {
return filepath.SkipDir
}
return nil
})
if err == errDone {
err = nil
}
return err
}
func (s *scanner) throttle(fileSize uint64) {
if s.tokenBucket == nil {
return
}
wait := s.tokenBucket.Take(int64(fileSize))
if wait > 0 {
timer := time.NewTimer(wait)
select {
case <-timer.C:
case <-s.done:
}
}
}
func (s *scanner) newScanEvent(path string, info os.FileInfo, err error) Event {
event := NewEventFromFileInfo(path, info, err, None, SourceScan,
s.config.MaxFileSizeBytes, s.config.HashTypes)
// Update metrics.
atomic.AddUint64(&s.fileCount, 1)
if event.Info != nil {
atomic.AddUint64(&s.byteCount, event.Info.Size)
}
return event
}