Skip to content

Commit

Permalink
Add kafka consumer groups metricset (#3240)
Browse files Browse the repository at this point in the history
- mark kafka module as experimental
- add new metricset
- includes unit tests only
- fix kafka API version settings
  • Loading branch information
Steffen Siering authored and andrewkroh committed Jan 3, 2017
1 parent d22444a commit 5b973a0
Show file tree
Hide file tree
Showing 21 changed files with 1,305 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Expand Up @@ -60,6 +60,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff]
- Export number of cores for cpu module. {pull}3192[3192]
- Experimental Prometheus module. {pull}3202[3202]
- Add system socket module that reports all TCP sockets. {pull}3246[3246]
- Kafka consumer groups metricset. {pull}3240[3240]

*Packetbeat*

Expand Down
4 changes: 3 additions & 1 deletion libbeat/outputs/kafka/kafka.go
Expand Up @@ -91,7 +91,9 @@ var (
"0.10.0.0": sarama.V0_10_0_0,
"0.10.0.1": sarama.V0_10_0_1,
"0.10.0": sarama.V0_10_0_1,
"0.10": sarama.V0_10_0_1,
"0.10.1.0": sarama.V0_10_1_0,
"0.10.1": sarama.V0_10_1_0,
"0.10": sarama.V0_10_1_0,
}
)

Expand Down
101 changes: 101 additions & 0 deletions metricbeat/docs/fields.asciidoc
Expand Up @@ -2401,6 +2401,107 @@ experimental[]
[float]
== consumergroup Fields
consumergroup
[float]
== broker Fields
Broker Consumer Group Information have been read from (Broker handling the consumer group).
[float]
=== kafka.consumergroup.broker.id
type: long
Broker id
[float]
=== kafka.consumergroup.broker.address
type: keyword
Broker address
[float]
=== kafka.consumergroup.id
type: keyword
Consumer Group ID
[float]
=== kafka.consumergroup.topic
type: keyword
Topic name
[float]
=== kafka.consumergroup.partition
type: long
Partition ID
[float]
=== kafka.consumergroup.offset
type: long
consumer offset into partition being read
[float]
=== kafka.consumergroup.meta
type: text
custom consumer meta data string
[float]
=== kafka.consumergroup.error.code
type: long
kafka consumer/partition error code.
[float]
== client Fields
Assigned client reading events from partition
[float]
=== kafka.consumergroup.client.id
type: keyword
Client ID (kafka setting client.id)
[float]
=== kafka.consumergroup.client.host
type: keyword
Client host
[float]
=== kafka.consumergroup.client.member_id
type: keyword
internal consumer group member ID
[float]
== partition Fields
Expand Down
6 changes: 6 additions & 0 deletions metricbeat/docs/modules/kafka.asciidoc
Expand Up @@ -5,6 +5,8 @@ This file is generated! See scripts/docs_collector.py
[[metricbeat-module-kafka]]
== kafka Module

experimental[]

This is the kafka Module.


Expand Down Expand Up @@ -51,7 +53,11 @@ metricbeat.modules:

The following metricsets are available:

* <<metricbeat-metricset-kafka-consumergroup,consumergroup>>

* <<metricbeat-metricset-kafka-partition,partition>>

include::kafka/consumergroup.asciidoc[]

include::kafka/partition.asciidoc[]

19 changes: 19 additions & 0 deletions metricbeat/docs/modules/kafka/consumergroup.asciidoc
@@ -0,0 +1,19 @@
////
This file is generated! See scripts/docs_collector.py
////

[[metricbeat-metricset-kafka-consumergroup]]
include::../../../module/kafka/consumergroup/_meta/docs.asciidoc[]


==== Fields

For a description of each field in the metricset, see the
<<exported-fields-kafka,exported fields>> section.

Here is an example document generated by this metricset:

[source,json]
----
include::../../../module/kafka/consumergroup/_meta/data.json[]
----
1 change: 1 addition & 0 deletions metricbeat/include/list.go
Expand Up @@ -25,6 +25,7 @@ import (
_ "github.com/elastic/beats/metricbeat/module/haproxy/info"
_ "github.com/elastic/beats/metricbeat/module/haproxy/stat"
_ "github.com/elastic/beats/metricbeat/module/kafka"
_ "github.com/elastic/beats/metricbeat/module/kafka/consumergroup"
_ "github.com/elastic/beats/metricbeat/module/kafka/partition"
_ "github.com/elastic/beats/metricbeat/module/mongodb"
_ "github.com/elastic/beats/metricbeat/module/mongodb/status"
Expand Down
65 changes: 65 additions & 0 deletions metricbeat/metricbeat.template-es2x.json
Expand Up @@ -1311,6 +1311,71 @@
},
"kafka": {
"properties": {
"consumergroup": {
"properties": {
"broker": {
"properties": {
"address": {
"ignore_above": 1024,
"index": "not_analyzed",
"type": "string"
},
"id": {
"type": "long"
}
}
},
"client": {
"properties": {
"host": {
"ignore_above": 1024,
"index": "not_analyzed",
"type": "string"
},
"id": {
"ignore_above": 1024,
"index": "not_analyzed",
"type": "string"
},
"member_id": {
"ignore_above": 1024,
"index": "not_analyzed",
"type": "string"
}
}
},
"error": {
"properties": {
"code": {
"type": "long"
}
}
},
"id": {
"ignore_above": 1024,
"index": "not_analyzed",
"type": "string"
},
"meta": {
"index": "analyzed",
"norms": {
"enabled": false
},
"type": "string"
},
"offset": {
"type": "long"
},
"partition": {
"type": "long"
},
"topic": {
"ignore_above": 1024,
"index": "not_analyzed",
"type": "string"
}
}
},
"partition": {
"properties": {
"broker": {
Expand Down
56 changes: 56 additions & 0 deletions metricbeat/metricbeat.template.json
Expand Up @@ -1318,6 +1318,62 @@
},
"kafka": {
"properties": {
"consumergroup": {
"properties": {
"broker": {
"properties": {
"address": {
"ignore_above": 1024,
"type": "keyword"
},
"id": {
"type": "long"
}
}
},
"client": {
"properties": {
"host": {
"ignore_above": 1024,
"type": "keyword"
},
"id": {
"ignore_above": 1024,
"type": "keyword"
},
"member_id": {
"ignore_above": 1024,
"type": "keyword"
}
}
},
"error": {
"properties": {
"code": {
"type": "long"
}
}
},
"id": {
"ignore_above": 1024,
"type": "keyword"
},
"meta": {
"norms": false,
"type": "text"
},
"offset": {
"type": "long"
},
"partition": {
"type": "long"
},
"topic": {
"ignore_above": 1024,
"type": "keyword"
}
}
},
"partition": {
"properties": {
"broker": {
Expand Down
2 changes: 2 additions & 0 deletions metricbeat/module/kafka/_meta/docs.asciidoc
@@ -1,4 +1,6 @@
== kafka Module

experimental[]

This is the kafka Module.

0 comments on commit 5b973a0

Please sign in to comment.