Skip to content

Commit

Permalink
Unify initialization code in kafka metricsets (#8705)
Browse files Browse the repository at this point in the history
  • Loading branch information
jsoriano committed Oct 29, 2018
1 parent abe7440 commit 72018e6
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package consumergroup
package kafka

import (
"fmt"
Expand All @@ -31,9 +31,6 @@ type metricsetConfig struct {
Username string `config:"username"`
Password string `config:"password"`
ClientID string `config:"client_id"`

Groups []string `config:"groups"`
Topics []string `config:"topics"`
}

var defaultConfig = metricsetConfig{
Expand Down
59 changes: 20 additions & 39 deletions metricbeat/module/kafka/consumergroup/consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,10 @@
package consumergroup

import (
"crypto/tls"
"fmt"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/kafka"
Expand All @@ -40,9 +36,8 @@ func init() {

// MetricSet type defines all fields of the MetricSet
type MetricSet struct {
mb.BaseMetricSet
*kafka.MetricSet

broker *kafka.Broker
topics nameSet
groups nameSet
}
Expand All @@ -59,56 +54,42 @@ var debugf = logp.MakeDebug("kafka")
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
cfgwarn.Beta("The kafka consumergroup metricset is beta")

config := defaultConfig
if err := base.Module().UnpackConfig(&config); err != nil {
return nil, err
opts := kafka.MetricSetOptions{
Version: "0.9.0.0",
}

var tls *tls.Config
tlsCfg, err := tlscommon.LoadTLSConfig(config.TLS)
ms, err := kafka.NewMetricSet(base, opts)
if err != nil {
return nil, err
}
if tlsCfg != nil {
tls = tlsCfg.BuildModuleConfig("")
}

timeout := base.Module().Config().Timeout

cfg := kafka.BrokerSettings{
MatchID: true,
DialTimeout: timeout,
ReadTimeout: timeout,
ClientID: config.ClientID,
Retries: config.Retries,
Backoff: config.Backoff,
TLS: tls,
Username: config.Username,
Password: config.Password,

// consumer groups API requires at least 0.9.0.0
Version: kafka.Version("0.9.0.0"),
config := struct {
Groups []string `config:"groups"`
Topics []string `config:"topics"`
}{}
if err := base.Module().UnpackConfig(&config); err != nil {
return nil, err
}

return &MetricSet{
BaseMetricSet: base,
broker: kafka.NewBroker(base.Host(), cfg),
groups: makeNameSet(config.Groups...),
topics: makeNameSet(config.Topics...),
MetricSet: ms,
groups: makeNameSet(config.Groups...),
topics: makeNameSet(config.Topics...),
}, nil
}

// Fetch consumer group metrics from kafka
func (m *MetricSet) Fetch(r mb.ReporterV2) {
if err := m.broker.Connect(); err != nil {
r.Error(errors.Wrap(err, "broker connection failed"))
broker, err := m.Connect()
if err != nil {
r.Error(err)
return
}
defer m.broker.Close()
defer broker.Close()

brokerInfo := common.MapStr{
"id": m.broker.ID(),
"address": m.broker.AdvertisedAddr(),
"id": broker.ID(),
"address": broker.AdvertisedAddr(),
}

emitEvent := func(event common.MapStr) {
Expand All @@ -131,7 +112,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) {
MetricSetFields: event,
})
}
err := fetchGroupInfo(emitEvent, m.broker, m.groups.pred(), m.topics.pred())
err = fetchGroupInfo(emitEvent, broker, m.groups.pred(), m.topics.pred())
if err != nil {
r.Error(err)
}
Expand Down
80 changes: 80 additions & 0 deletions metricbeat/module/kafka/metricset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package kafka

import (
"crypto/tls"

"github.com/elastic/beats/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/metricbeat/mb"
)

// MetricSet is the base metricset for all Kafka metricsets
type MetricSet struct {
mb.BaseMetricSet
broker *Broker
}

// MetricSetOptions are the options of a Kafka metricset
type MetricSetOptions struct {
Version string
}

// NewMetricSet creates a base metricset for Kafka metricsets
func NewMetricSet(base mb.BaseMetricSet, options MetricSetOptions) (*MetricSet, error) {
config := defaultConfig
if err := base.Module().UnpackConfig(&config); err != nil {
return nil, err
}

tlsCfg, err := tlscommon.LoadTLSConfig(config.TLS)
if err != nil {
return nil, err
}

var tls *tls.Config
if tlsCfg != nil {
tls = tlsCfg.BuildModuleConfig("")
}

timeout := base.Module().Config().Timeout
cfg := BrokerSettings{
MatchID: true,
DialTimeout: timeout,
ReadTimeout: timeout,
ClientID: config.ClientID,
Retries: config.Retries,
Backoff: config.Backoff,
TLS: tls,
Username: config.Username,
Password: config.Password,
Version: Version(options.Version),
}

return &MetricSet{
BaseMetricSet: base,
broker: NewBroker(base.Host(), cfg),
}, nil

}

// Connect connects with a kafka broker
func (m *MetricSet) Connect() (*Broker, error) {
err := m.broker.Connect()
return m.broker, err
}
52 changes: 0 additions & 52 deletions metricbeat/module/kafka/partition/config.go

This file was deleted.

0 comments on commit 72018e6

Please sign in to comment.