Skip to content

Commit

Permalink
Add raw fields feature to metricset (#3001)
Browse files Browse the repository at this point in the history
Each metricset can add raw fields under the `raw` namespace in addition to the existing fields. The raw fields should contain the raw data coming the service itself. Fields are not changed in any way.

As a first example, raw fields were added to mysql status.
  • Loading branch information
ruflin authored and andrewkroh committed Nov 18, 2016
1 parent 647ed86 commit 0b33ee3
Show file tree
Hide file tree
Showing 12 changed files with 123 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ https://github.com/elastic/beats/compare/v5.0.0...master[Check the HEAD diff]
- Add a sample Redis Kibana dashboard. {pull}2916[2916]
- Add support for MongoDB 3.4 and WiredTiger metrics. {pull}2999[2999]
- Add kafkka module with partition metricset. {pull}2969[2969]
- Add raw config option for mysql/status metricset. {pull}3001[3001]

*Packetbeat*

Expand Down
3 changes: 3 additions & 0 deletions metricbeat/_meta/beat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ metricbeat.modules:
# Password of hosts. Empty by default
#password: test

# By setting raw to true, all raw fields from the status metricset will be added to the event.
#raw: false

#-------------------------------- Nginx Module -------------------------------
#- module: nginx
#metricsets: ["stubstatus"]
Expand Down
5 changes: 5 additions & 0 deletions metricbeat/mb/mb.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ func (b *BaseMetricSet) Host() string {
// Configuration types

// ModuleConfig is the base configuration data for all Modules.
//
// The Raw config option is used to enable raw fields in a metricset. This means
// the metricset fetches not only the predefined fields but add alls raw data under
// the raw namespace to the event.
type ModuleConfig struct {
Hosts []string `config:"hosts"`
Period time.Duration `config:"period" validate:"positive"`
Expand All @@ -111,6 +115,7 @@ type ModuleConfig struct {
MetricSets []string `config:"metricsets" validate:"required"`
Enabled bool `config:"enabled"`
Filters processors.PluginConfig `config:"filters"`
Raw bool `config:"raw"`

common.EventMetadata `config:",inline"` // Fields and tags to add to events.
}
Expand Down
3 changes: 3 additions & 0 deletions metricbeat/metricbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ metricbeat.modules:
# Password of hosts. Empty by default
#password: test

# By setting raw to true, all raw fields from the status metricset will be added to the event.
#raw: false

#-------------------------------- Nginx Module -------------------------------
#- module: nginx
#metricsets: ["stubstatus"]
Expand Down
3 changes: 3 additions & 0 deletions metricbeat/module/mysql/_meta/config.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@

# Password of hosts. Empty by default
#password: test

# By setting raw to true, all raw fields from the status metricset will be added to the event.
#raw: false
9 changes: 9 additions & 0 deletions metricbeat/module/mysql/status/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,12 @@
The MySQL `status` metricset collects data from MySQL by running a
http://dev.mysql.com/doc/refman/5.7/en/show-status.html[`SHOW GLOBAL STATUS;`]
SQL query. This query returns a large number of metrics.


==== raw config option

experimental[]

The MySQL Status Metricset supports the `raw` config option. When enabled, in addition to the existing data structure, all fields available from the mysql service throgh `"SHOW /*!50002 GLOBAL */ STATUS;"` will be added to the event.

These fields will be added under the namespace `mysql.status.raw`. The fields can vary from one MySQL instance to an other and no guarantees are provided for the mapping of the fields as the mapping happens dynamically. This option is intended for enhance use cases.
13 changes: 13 additions & 0 deletions metricbeat/module/mysql/status/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,16 @@ func eventMapping(status map[string]string) common.MapStr {
}
return schema.Apply(source)
}

func rawEventMapping(status map[string]string) common.MapStr {
source := common.MapStr{}
for key, val := range status {
// Only adds events which are not in the mapping
if schema.HasKey(key) {
continue
}

source[key] = val
}
return source
}
9 changes: 7 additions & 2 deletions metricbeat/module/mysql/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
}

// Fetch fetches status messages from a mysql host.
func (m *MetricSet) Fetch() (event common.MapStr, err error) {
func (m *MetricSet) Fetch() (common.MapStr, error) {
if m.db == nil {
var err error
m.db, err = mysql.NewDB(m.dsn)
Expand All @@ -77,7 +77,12 @@ func (m *MetricSet) Fetch() (event common.MapStr, err error) {
return nil, err
}

return eventMapping(status), nil
event := eventMapping(status)

if m.Module().Config().Raw {
event["raw"] = rawEventMapping(status)
}
return event, nil
}

// loadStatus loads all status entries from the given database into an array.
Expand Down
31 changes: 28 additions & 3 deletions metricbeat/module/mysql/status/status_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

func TestFetch(t *testing.T) {
f := mbtest.NewEventFetcher(t, getConfig())
f := mbtest.NewEventFetcher(t, getConfig(false))
event, err := f.Fetch()
if !assert.NoError(t, err) {
t.FailNow()
Expand All @@ -34,19 +34,44 @@ func TestFetch(t *testing.T) {
assert.True(t, openStreams == 0)
}

func TestFetchRaw(t *testing.T) {
f := mbtest.NewEventFetcher(t, getConfig(true))
event, err := f.Fetch()
if !assert.NoError(t, err) {
t.FailNow()
}

t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event)

// Check event fields
cachedThreads := event["threads"].(common.MapStr)["cached"].(int64)
assert.True(t, cachedThreads >= 0)

rawData := event["raw"].(common.MapStr)

// Make sure field was removed from raw fields as in schema
_, exists := rawData["Threads_cached"]
assert.False(t, exists)

// Check a raw field if it is available
_, exists = rawData["Slow_launch_threads"]
assert.True(t, exists)
}

func TestData(t *testing.T) {
f := mbtest.NewEventFetcher(t, getConfig())
f := mbtest.NewEventFetcher(t, getConfig(false))

err := mbtest.WriteEvent(f, t)
if err != nil {
t.Fatal("write", err)
}
}

func getConfig() map[string]interface{} {
func getConfig(raw bool) map[string]interface{} {
return map[string]interface{}{
"module": "mysql",
"metricsets": []string{"status"},
"hosts": []string{mysql.GetMySQLEnvDSN()},
"raw": raw,
}
}
8 changes: 8 additions & 0 deletions metricbeat/schema/mapstriface/mapstriface.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ func (convMap ConvMap) Map(key string, event common.MapStr, data map[string]inte
event[key] = subEvent
}

func (convMap ConvMap) HasKey(key string) bool {
if convMap.Key == key {
return true
}

return convMap.Schema.HasKey(key)
}

func Dict(key string, s schema.Schema, opts ...DictSchemaOption) ConvMap {
return dictSetOptions(ConvMap{Key: key, Schema: s}, opts)
}
Expand Down
24 changes: 24 additions & 0 deletions metricbeat/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ type Mapper interface {
// Map applies the Mapper conversion on the data and adds the result
// to the event on the key.
Map(key string, event common.MapStr, data map[string]interface{})

HasKey(key string) bool
}

// A Conv object represents a conversion mechanism from the data map to the event map.
Expand All @@ -40,6 +42,10 @@ func (conv Conv) Map(key string, event common.MapStr, data map[string]interface{
}
}

func (conv Conv) HasKey(key string) bool {
return conv.Key == key
}

// implements Mapper interface for structure
type Object map[string]Mapper

Expand All @@ -49,6 +55,10 @@ func (o Object) Map(key string, event common.MapStr, data map[string]interface{}
event[key] = subEvent
}

func (o Object) HasKey(key string) bool {
return hasKey(key, o)
}

// ApplyTo adds the fields extracted from data, converted using the schema, to the
// event map.
func (s Schema) ApplyTo(event common.MapStr, data map[string]interface{}) common.MapStr {
Expand All @@ -61,6 +71,20 @@ func (s Schema) Apply(data map[string]interface{}) common.MapStr {
return s.ApplyTo(common.MapStr{}, data)
}

// HasKey checks if the key is part of the schema
func (s Schema) HasKey(key string) bool {
return hasKey(key, s)
}

func hasKey(key string, mappers map[string]Mapper) bool {
for _, mapper := range mappers {
if mapper.HasKey(key) {
return true
}
}
return false
}

func applySchemaToEvent(event common.MapStr, data map[string]interface{}, conversions map[string]Mapper) {
for key, mapper := range conversions {
mapper.Map(key, event, data)
Expand Down
19 changes: 19 additions & 0 deletions metricbeat/schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,25 @@ func TestSchema(t *testing.T) {
})
}

func TestHasKey(t *testing.T) {
schema := Schema{
"test": Conv{Key: "Test", Func: nop},
"test_obj": Object{
"test_a": Conv{Key: "TestA", Func: nop},
"test_b": Conv{Key: "TestB", Func: nop},
},
}

assert.True(t, schema.HasKey("Test"))
assert.True(t, schema.HasKey("TestA"))
assert.True(t, schema.HasKey("TestB"))
assert.False(t, schema.HasKey("test"))
assert.False(t, schema.HasKey("test_obj"))
assert.False(t, schema.HasKey("test_a"))
assert.False(t, schema.HasKey("test_b"))
assert.False(t, schema.HasKey("other"))
}

func test(key string, opts ...SchemaOption) Conv {
return SetOptions(Conv{Key: key, Func: nop}, opts)
}
Expand Down

0 comments on commit 0b33ee3

Please sign in to comment.