-
Notifications
You must be signed in to change notification settings - Fork 0
/
jsonbeat.go
104 lines (87 loc) · 2.66 KB
/
jsonbeat.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package beater
import (
"encoding/json"
"fmt"
"time"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
"github.com/Akagi201/jsonbeat/config"
"github.com/hpcloud/tail"
)
// Jsonbeat implements the Beater interface
type Jsonbeat struct {
done chan struct{} // Channel used by the Run() method to stop when the Stop() method is called
config config.Config // Configuration options for the Beat
client publisher.Client // Publisher that takes care of sending the events to the defined output
}
// Creates beater
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
config := config.DefaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, fmt.Errorf("Error reading config file: %v", err)
}
bt := &Jsonbeat{
done: make(chan struct{}),
config: config,
}
return bt, nil
}
// Run implements the Beater Run interface
// Contains the main application loop that captures data and sends it to the defined output using the publisher
func (bt *Jsonbeat) Run(b *beat.Beat) error {
logp.Info("jsonbeat is running! Hit CTRL-C to stop it.")
bt.client = b.Publisher.Connect()
tailFileDone := make(chan struct{})
tailFileconfig := tail.Config{
ReOpen: true,
MustExist: false,
Poll: false,
Follow: true,
MaxLineSize: 0,
}
go bt.tailFile(bt.config.Path, tailFileconfig, tailFileDone, bt.done)
<-tailFileDone
return nil
}
// Stop implements the Beater Stop interface
// Contains logic that is called when the Beat is signaled to stop
func (bt *Jsonbeat) Stop() {
bt.client.Close()
close(bt.done)
}
func (bt *Jsonbeat) tailFile(filename string, config tail.Config, done chan struct{}, stop chan struct{}) {
defer func() {
done <- struct{}{}
}()
t, err := tail.TailFile(filename, config)
if err != nil {
logp.Err("Start tail file failed, err: %v", err)
return
}
for line := range t.Lines {
select {
case <-stop:
t.Stop()
return
default:
}
event := make(common.MapStr)
if err = json.Unmarshal([]byte(line.Text), &event); err != nil {
logp.Err("Unmarshal json log failed, err: %v", err)
continue
}
if logTime, err := time.Parse("2017-03-13T07:13:30.172Z", event["@timestamp"].(string)); err != nil {
event["@timestamp"] = common.Time(logTime)
} else {
logp.Err("Unmarshal json log @timestamp failed, time string: %v", event["@timestamp"].(string))
event["@timestamp"] = common.Time(time.Now())
}
bt.client.PublishEvent(event)
logp.Info("Event sent")
}
if err = t.Wait(); err != nil {
logp.Err("Tail file blocking goroutine stopped, err: %v", err)
}
}