forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 1
/
schema.go
124 lines (103 loc) · 3.35 KB
/
schema.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package schema
import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)
// Schema describes how a map[string]interface{} object can be parsed and converted into
// an event. The conversions can be described using an (optionally nested) common.MapStr
// that contains Conv objects.
type Schema map[string]Mapper
// Mapper interface represents a valid type to be used in a schema.
type Mapper interface {
// Map applies the Mapper conversion on the data and adds the result
// to the event on the key.
Map(key string, event common.MapStr, data map[string]interface{}) *Errors
HasKey(key string) bool
}
// A Conv object represents a conversion mechanism from the data map to the event map.
type Conv struct {
Func Converter // Convertor function
Key string // The key in the data map
Optional bool // Whether to log errors if the key is not found
}
// Convertor function type
type Converter func(key string, data map[string]interface{}) (interface{}, error)
// Map applies the conversion on the data and adds the result
// to the event on the key.
func (conv Conv) Map(key string, event common.MapStr, data map[string]interface{}) *Errors {
value, err := conv.Func(conv.Key, data)
if err != nil {
err := NewError(key, err.Error())
if conv.Optional {
err.SetType(OptionalType)
} else {
logp.Err("Error on field '%s': %v", key, err)
}
errs := NewErrors()
errs.AddError(err)
return errs
} else {
event[key] = value
}
return nil
}
func (conv Conv) HasKey(key string) bool {
return conv.Key == key
}
// implements Mapper interface for structure
type Object map[string]Mapper
func (o Object) Map(key string, event common.MapStr, data map[string]interface{}) *Errors {
subEvent := common.MapStr{}
errs := applySchemaToEvent(subEvent, data, o)
event[key] = subEvent
return errs
}
func (o Object) HasKey(key string) bool {
return hasKey(key, o)
}
// ApplyTo adds the fields extracted from data, converted using the schema, to the
// event map.
func (s Schema) ApplyTo(event common.MapStr, data map[string]interface{}) (common.MapStr, *Errors) {
errors := applySchemaToEvent(event, data, s)
return event, errors
}
// Apply converts the fields extracted from data, using the schema, into a new map and reports back the errors.
func (s Schema) Apply(data map[string]interface{}) (common.MapStr, *Errors) {
return s.ApplyTo(common.MapStr{}, data)
}
// HasKey checks if the key is part of the schema
func (s Schema) HasKey(key string) bool {
return hasKey(key, s)
}
func hasKey(key string, mappers map[string]Mapper) bool {
for _, mapper := range mappers {
if mapper.HasKey(key) {
return true
}
}
return false
}
func applySchemaToEvent(event common.MapStr, data map[string]interface{}, conversions map[string]Mapper) *Errors {
errs := NewErrors()
for key, mapper := range conversions {
errors := mapper.Map(key, event, data)
errs.AddErrors(errors)
}
return errs
}
// SchemaOption is for adding optional parameters to the conversion
// functions
type SchemaOption func(c Conv) Conv
// The optional flag suppresses the error message in case the key
// doesn't exist or results in an error.
func Optional(c Conv) Conv {
c.Optional = true
return c
}
// setOptions adds the optional flags to the Conv object
func SetOptions(c Conv, opts []SchemaOption) Conv {
for _, opt := range opts {
c = opt(c)
}
return c
}