Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more metrics to Filebeat harvesters #13395

Merged
merged 10 commits into from
Jan 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add dashboards for the ActiveMQ Filebeat module. {pull}14880[14880]
- Add STAN Metricbeat module. {pull}14839[14839]
- Add new fileset googlecloud/audit for ingesting Google Cloud Audit logs. {pull}15200[15200]
- Expose more metrics of harvesters (e.g. `read_offset`, `start_time`). {pull}13395[13395]

*Heartbeat*
- Add non-privileged icmp on linux and darwin(mac). {pull}13795[13795] {issue}11498[11498]
Expand Down
72 changes: 72 additions & 0 deletions filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (

var (
harvesterMetrics = monitoring.Default.NewRegistry("filebeat.harvester")
filesMetrics = harvesterMetrics.NewRegistry("files")

harvesterStarted = monitoring.NewInt(harvesterMetrics, "started")
harvesterClosed = monitoring.NewInt(harvesterMetrics, "closed")
Expand Down Expand Up @@ -100,9 +101,22 @@ type Harvester struct {
outletFactory OutletFactory
publishState func(file.State) bool

metrics *harvesterProgressMetrics

onTerminate func()
}

// stores the metrics of the harvester
type harvesterProgressMetrics struct {
metricsRegistry *monitoring.Registry
filename *monitoring.String
started *monitoring.String
lastPublished *monitoring.Timestamp
lastPublishedEventTimestamp *monitoring.Timestamp
currentSize *monitoring.Int
readOffset *monitoring.Int
}

// NewHarvester creates a new harvester
func NewHarvester(
config *common.Config,
Expand Down Expand Up @@ -179,11 +193,43 @@ func (h *Harvester) Setup() error {
return fmt.Errorf("Harvester setup failed. Unexpected encoding line reader error: %s", err)
}

h.metrics = newHarvesterProgressMetrics(h.id.String())
h.metrics.filename.Set(h.source.Name())
h.metrics.started.Set(common.Time(time.Now()).String())
h.metrics.readOffset.Set(h.state.Offset)
err = h.updateCurrentSize()
if err != nil {
return err
}

logp.Debug("harvester", "Harvester setup successful. Line terminator: %d", h.config.LineTerminator)

return nil
}

func newHarvesterProgressMetrics(id string) *harvesterProgressMetrics {
r := filesMetrics.NewRegistry(id)
return &harvesterProgressMetrics{
metricsRegistry: r,
filename: monitoring.NewString(r, "name"),
started: monitoring.NewString(r, "start_time"),
lastPublished: monitoring.NewTimestamp(r, "last_event_published_time"),
lastPublishedEventTimestamp: monitoring.NewTimestamp(r, "last_event_timestamp"),
currentSize: monitoring.NewInt(r, "size"),
readOffset: monitoring.NewInt(r, "read_offset"),
}
}

func (h *Harvester) updateCurrentSize() error {
fInfo, err := h.source.Stat()
if err != nil {
return err
}

h.metrics.currentSize.Set(fInfo.Size())
return nil
}

// Run start the harvester and reads files line by line and sends events to the defined output
func (h *Harvester) Run() error {
// Allow for some cleanup on termination
Expand Down Expand Up @@ -250,6 +296,8 @@ func (h *Harvester) Run() error {

logp.Info("Harvester started for file: %s", h.state.Source)

go h.monitorFileSize()

for {
select {
case <-h.done:
Expand Down Expand Up @@ -298,13 +346,37 @@ func (h *Harvester) Run() error {

// Update state of harvester as successfully sent
h.state = state

// Update metics of harvester as event was sent
h.metrics.readOffset.Set(state.Offset)
h.metrics.lastPublished.Set(time.Now())
h.metrics.lastPublishedEventTimestamp.Set(message.Ts)
}
}

func (h *Harvester) monitorFileSize() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

for {
select {
case <-h.done:
return
case <-ticker.C:
err := h.updateCurrentSize()
if err != nil {
logp.Err("Error updating file size: %v; File: %v", err, h.state.Source)
}
}
}
}

// stop is intended for internal use and closed the done channel to stop execution
func (h *Harvester) stop() {
h.stopOnce.Do(func() {
close(h.done)

filesMetrics.Remove(h.id.String())
})
}

Expand Down
56 changes: 56 additions & 0 deletions libbeat/monitoring/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"math"
"strconv"
"sync"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/atomic"
)

Expand Down Expand Up @@ -232,3 +234,57 @@ func fullName(r *Registry, name string) string {
}
return r.name + "." + name
}

// Timestamp is a timestamp variable satisfying the Var interface.
type Timestamp struct {
mu sync.RWMutex
ts time.Time
cached string
}

// NewTimestamp creates and registeres a new timestamp variable.
func NewTimestamp(r *Registry, name string, opts ...Option) *Timestamp {
if r == nil {
r = Default
}

v := &Timestamp{}
addVar(r, name, opts, v, makeExpvar(func() string {
return v.toString()
kvch marked this conversation as resolved.
Show resolved Hide resolved

}))
return v
}

func (v *Timestamp) Set(t time.Time) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method Timestamp.Set should have comment or be unexported

v.mu.Lock()
defer v.mu.Unlock()

v.ts = t
v.cached = ""
}

func (v *Timestamp) Get() time.Time {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method Timestamp.Get should have comment or be unexported

v.mu.RLock()
defer v.mu.RUnlock()

return v.ts
}

func (v *Timestamp) Visit(_ Mode, vs Visitor) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method Timestamp.Visit should have comment or be unexported

vs.OnString(v.toString())
kvch marked this conversation as resolved.
Show resolved Hide resolved
}

func (v *Timestamp) toString() string {
v.mu.RLock()
defer v.mu.RUnlock()

if v.ts.IsZero() {
return ""
}

if v.cached == "" {
v.cached = v.ts.Format(common.TsLayout)
}
return v.cached
}