/
jsonlog.go
119 lines (102 loc) · 2.89 KB
/
jsonlog.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package main
import (
"encoding/json"
"log"
"path/filepath"
"time"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
)
func captureJSONLogs(groupName string, retention int, done chan struct{}) error {
if retention == 0 {
retention = 3
log.Printf("Missing retention for %s; defaulting to %d days", groupName, retention)
}
stop := make(chan struct{}, 1)
go func() {
<-done
stop <- struct{}{}
}()
cwl, err := NewCloudWatchLog(cloudwatchlogs.New(session.Must(session.NewSession())), groupName)
if err != nil {
return err
}
collectionsLock.Lock()
eventCollection := Collections[groupName]
if eventCollection == nil {
eventCollection, err = OpenEventCollection(filepath.Join(DataDir, groupName+".lm2"))
if err != nil {
if err == ErrDoesNotExist {
eventCollection, err = CreateEventCollection(filepath.Join(DataDir, groupName+".lm2"))
}
if err != nil {
collectionsLock.Unlock()
return err
}
}
Collections[groupName] = eventCollection
}
collectionsLock.Unlock()
eventCollection.SetRetention(retention)
const regularBatchSize = 5 * 60 * 1000 // 5 minutes
const catchupBatchSize = 72 * regularBatchSize // 6 hours
nextBatchStart := cwl.LastTimestamp()
if nextBatchStart == 0 {
nextBatchStart = (time.Now().Unix() - 86400) * 1000
nextBatchStart = (nextBatchStart / regularBatchSize) * regularBatchSize
}
currentBatch := []Event{}
timer := time.NewTimer(0)
for {
select {
case <-timer.C:
case <-stop:
log.Println("Stopping", groupName)
eventCollection.col.Close()
return nil
}
batchSize := int64(regularBatchSize)
if now := time.Now().Unix() * 1000; now-nextBatchStart > 6*regularBatchSize {
batchSize = now - nextBatchStart
batchSize = (batchSize / regularBatchSize) * regularBatchSize
if batchSize > catchupBatchSize {
batchSize = catchupBatchSize
}
}
batchEnd := nextBatchStart + batchSize - 1
logEvents, err := cwl.GetLogEvents(nextBatchStart, batchEnd)
if err != nil {
return err
}
for _, e := range logEvents {
event := Event{}
err = json.Unmarshal([]byte(*e.Message), &event)
if err == nil {
timestamp := time.Unix(*e.Timestamp/1000, (*e.Timestamp%1000)*1000000)
event["_ts"] = timestamp.Format(time.RFC3339Nano)
event["_tag"] = *e.LogStreamName
currentBatch = append(currentBatch, event)
}
}
events := currentBatch
if len(logEvents) > 0 {
log.Printf("Logs group %s: aggregated %d events", groupName, len(logEvents))
err = eventCollection.StoreEvents(events)
if err != nil {
return err
}
err := cwl.SetLastTimestamp(nextBatchStart)
if err != nil {
return err
}
}
next := time.Unix((batchEnd+regularBatchSize)/1000, 0)
if time.Now().After(next) {
timer.Reset(0)
} else {
timer.Reset(next.Sub(time.Now()) + time.Second)
}
nextBatchStart = batchEnd + 1
currentBatch = currentBatch[:0]
}
}