forked from cloudfoundry/blackbox
/
file_watcher.go
119 lines (97 loc) · 2.59 KB
/
file_watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package blackbox
import (
"io/ioutil"
"log"
"os"
"path/filepath"
"strings"
"time"
"github.com/CrunchyData/blackbox/syslog"
"github.com/tedsuo/ifrit/grouper"
)
const POLL_INTERVAL = 5 * time.Second
type fileWatcher struct {
logger *log.Logger
sourceDir string
logSuffix string
dynamicGroupClient grouper.DynamicClient
hostname string
structuredData string
drain syslog.Drain
}
func NewFileWatcher(
logger *log.Logger,
sourceDir string,
logSuffix string,
dynamicGroupClient grouper.DynamicClient,
drain syslog.Drain,
hostname string,
structuredData string,
) *fileWatcher {
return &fileWatcher{
logger: logger,
sourceDir: sourceDir,
logSuffix: logSuffix,
dynamicGroupClient: dynamicGroupClient,
drain: drain,
hostname: hostname,
structuredData: structuredData,
}
}
func (f *fileWatcher) Watch() {
for {
logDirs, err := ioutil.ReadDir(f.sourceDir)
if err != nil {
f.logger.Fatalf("could not list directories in source dir: %s\n", err)
}
for _, logDir := range logDirs {
tag := logDir.Name()
tagDirPath := filepath.Join(f.sourceDir, tag)
fileInfo, err := os.Stat(tagDirPath)
if err != nil {
f.logger.Fatalf("failed to determine if path is directory: %s\n", err)
}
if !fileInfo.IsDir() {
continue
}
f.findLogsToWatch(tag, tagDirPath, fileInfo)
}
time.Sleep(POLL_INTERVAL)
}
}
func (f *fileWatcher) findLogsToWatch(tag string, filePath string, file os.FileInfo) {
if !file.IsDir() {
if strings.HasSuffix(file.Name(), f.logSuffix) {
if _, found := f.dynamicGroupClient.Get(filePath); !found {
f.dynamicGroupClient.Inserter() <- f.memberForFile(filePath)
}
}
return
}
dirContents, err := ioutil.ReadDir(filePath)
if err != nil {
f.logger.Printf("skipping log dir '%s' (could not list files): %s\n", tag, err)
return
}
for _, content := range dirContents {
currentFilePath := filepath.Join(filePath, content.Name())
f.findLogsToWatch(tag, currentFilePath, content)
}
}
func (f *fileWatcher) memberForFile(logfilePath string) grouper.Member {
drainer, err := syslog.NewDrainer(f.logger, f.drain, f.hostname, f.structuredData)
if err != nil {
f.logger.Fatalf("could not drain to syslog: %s\n", err)
}
logfileDir := filepath.Dir(logfilePath)
tag, err := filepath.Rel(f.sourceDir, logfileDir)
if err != nil {
f.logger.Fatalf("could not compute tag from file path %s: %s\n", logfilePath, err)
}
tailer := &Tailer{
Path: logfilePath,
Tag: tag,
Drainer: drainer,
}
return grouper.Member{tailer.Path, tailer}
}