-
Notifications
You must be signed in to change notification settings - Fork 4.9k
/
event.go
170 lines (146 loc) · 4.37 KB
/
event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
package module
import (
"time"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/metricbeat/mb"
)
const (
defaultType = "metricsets"
)
// EventBuilder is used for building MetricSet events. MetricSets generate a
// data in the form of a common.MapStr. This builder transforms that data into
// a complete event and applies any Module-level filtering.
type EventBuilder struct {
ModuleName string
MetricSetName string
Host string
StartTime time.Time
FetchDuration time.Duration
Event common.MapStr
fetchErr error
filters *processors.Processors
metadata common.EventMetadata
}
// Build builds an event from MetricSet data and applies the Module-level
// filters.
func (b EventBuilder) Build() (common.MapStr, error) {
// event may be nil when there was an error fetching.
event := b.Event
if event == nil {
event = common.MapStr{} // TODO (akroh): do we want to send an empty event field?
}
// Get and remove meta fields from the event created by the MetricSet.
indexName := getIndex(event, "")
typeName := getType(event, defaultType)
timestamp := getTimestamp(event, common.Time(b.StartTime))
// Apply filters.
if b.filters != nil {
if event = b.filters.Run(event); event == nil {
return nil, nil
}
}
// Checks if additional meta information is provided by the MetricSet under the key ModuleData
// This is based on the convention that each MetricSet can provide module data under the key ModuleData
moduleData, moudleDataExists := event[mb.ModuleData]
if moudleDataExists {
delete(event, mb.ModuleData)
}
metricsetData := common.MapStr{
"module": b.ModuleName,
"name": b.MetricSetName,
"rtt": b.FetchDuration.Nanoseconds() / int64(time.Microsecond),
}
namespace := b.MetricSetName
if n, ok := event["_namespace"]; ok {
delete(event, "_namespace")
namespace = n.(string)
// TODO: check if namespace does not already exist
metricsetData["namespace"] = namespace
}
event = common.MapStr{
"@timestamp": timestamp,
"type": typeName,
common.EventMetadataKey: b.metadata,
b.ModuleName: common.MapStr{
namespace: event,
},
"metricset": metricsetData,
}
// In case meta data exists, it is added on the module level
if moudleDataExists {
if _, ok := moduleData.(common.MapStr); ok {
event[b.ModuleName].(common.MapStr).Update(moduleData.(common.MapStr))
}
}
// Overwrite default index if set.
if indexName != "" {
event["beat"] = common.MapStr{
"index": indexName,
}
}
// Adds host name to event.
if b.Host != "" {
event["metricset"].(common.MapStr)["host"] = b.Host
}
// Adds error to event in case error happened
if b.fetchErr != nil {
event["error"] = b.fetchErr.Error()
}
return event, nil
}
func getIndex(event common.MapStr, indexName string) string {
// Set index from event if set
if _, ok := event["index"]; ok {
indexName, ok = event["index"].(string)
if !ok {
logp.Err("Index couldn't be overwritten because event index is not string")
}
delete(event, "index")
}
return indexName
}
func getType(event common.MapStr, typeName string) string {
// Set type from event if set
if _, ok := event["type"]; ok {
typeName, ok = event["type"].(string)
if !ok {
logp.Err("Type couldn't be overwritten because event type is not string")
}
delete(event, "type")
}
return typeName
}
func getTimestamp(event common.MapStr, timestamp common.Time) common.Time {
// Set timestamp from event if set, move it to the top level
// If not set, timestamp is created
if _, ok := event["@timestamp"]; ok {
timestamp, ok = event["@timestamp"].(common.Time)
if !ok {
logp.Err("Timestamp couldn't be overwritten because event @timestamp is not common.Time")
}
delete(event, "@timestamp")
}
return timestamp
}
// createEvent creates a new event from the fetched MetricSet data.
func createEvent(
msw *metricSetWrapper,
event common.MapStr,
fetchErr error,
start time.Time,
elapsed time.Duration,
) (common.MapStr, error) {
return EventBuilder{
ModuleName: msw.Module().Name(),
MetricSetName: msw.Name(),
Host: msw.Host(),
StartTime: start,
FetchDuration: elapsed,
Event: event,
fetchErr: fetchErr,
filters: msw.module.filters,
metadata: msw.module.Config().EventMetadata,
}.Build()
}