forked from elastic/beats
/
docker_json.go
65 lines (54 loc) · 1.28 KB
/
docker_json.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package reader
import (
"bytes"
"encoding/json"
"time"
"github.com/elastic/beats/libbeat/common"
"github.com/pkg/errors"
)
// DockerJSON processor renames a given field
type DockerJSON struct {
reader Reader
// stream filter, `all`, `stderr` or `stdout`
stream string
}
type dockerLog struct {
Timestamp string `json:"time"`
Log string `json:"log"`
Stream string `json:"stream"`
}
// NewDockerJSON creates a new reader renaming a field
func NewDockerJSON(r Reader, stream string) *DockerJSON {
return &DockerJSON{
stream: stream,
reader: r,
}
}
// Next returns the next line.
func (p *DockerJSON) Next() (Message, error) {
for {
message, err := p.reader.Next()
if err != nil {
return message, err
}
var line dockerLog
dec := json.NewDecoder(bytes.NewReader(message.Content))
if err = dec.Decode(&line); err != nil {
return message, errors.Wrap(err, "decoding docker JSON")
}
if p.stream != "all" && p.stream != line.Stream {
continue
}
// Parse timestamp
ts, err := time.Parse(time.RFC3339, line.Timestamp)
if err != nil {
return message, errors.Wrap(err, "parsing docker timestamp")
}
message.AddFields(common.MapStr{
"stream": line.Stream,
})
message.Content = []byte(line.Log)
message.Ts = ts
return message, nil
}
}