Skip to content

Commit

Permalink
[probes.external] Parse stdout as soon as it is available. (#708)
Browse files Browse the repository at this point in the history
- Generate metrics from external probe's stdout as soon as stdout becomes available. This helps in situations where external probe process runs for a bit but it keeps outputting metrics much more frequently, say 1 min interval with output every 10s (see #691 for one such request but it has been asked earlier as well, at least once more).

- With this change stderr will also be read and logged as soon as it's available.

Add and improve testing:
- Don't rely on timeout for testing. That makes it unreliable on CI.
- Reduce external probe process runs for the TestProbeOnceMode test.
- Remove wait for the command exit.
   - Change in #547 appears wrong as it was causing wait to be called on
      only for non-windows platforms, while issue was on windows. It seems
      that the issue was temporary and fixed by itself.
  • Loading branch information
manugarg committed Apr 2, 2024
1 parent 24587fb commit bb142c0
Show file tree
Hide file tree
Showing 9 changed files with 259 additions and 110 deletions.
14 changes: 14 additions & 0 deletions metrics/testutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@ func LabelsMapByTarget(ems []*metrics.EventMetrics) map[string]map[string]string
return lmap
}

func EventMetricsByTargetMetric(ems []*metrics.EventMetrics) map[string]map[string][]*metrics.EventMetrics {
emMap := make(map[string]map[string][]*metrics.EventMetrics)
for _, em := range ems {
target := em.Label("dst")
if emMap[target] == nil {
emMap[target] = make(map[string][]*metrics.EventMetrics)
}
for _, k := range em.MetricsKeys() {
emMap[target][k] = append(emMap[target][k], em)
}
}
return emMap
}

type MetricsMap map[string]map[string][]metrics.Value

// MetricsMapByTarget rearranges a list of metrics into a map of map.
Expand Down
29 changes: 29 additions & 0 deletions metrics/testutils/testutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,32 @@ func TestMetricsMapFilterAndLastValueInt64(t *testing.T) {
})
}
}

func TestEventMetricsByTargetMetric(t *testing.T) {
ems := []*metrics.EventMetrics{
metrics.NewEventMetrics(time.Now()).
AddMetric("success", metrics.NewInt(99)).
AddMetric("total", metrics.NewInt(100)).
AddLabel("dst", "target1"),
metrics.NewEventMetrics(time.Now()).
AddMetric("success", metrics.NewInt(98)).
AddMetric("total", metrics.NewInt(100)).
AddLabel("dst", "target2"),
metrics.NewEventMetrics(time.Now()).
AddMetric("success", metrics.NewInt(99)).
AddMetric("total", metrics.NewInt(101)).
AddLabel("dst", "target1"),
}
emsMap := EventMetricsByTargetMetric(ems)

wantEMs := map[string][]*metrics.EventMetrics{
"target1": {ems[0], ems[2]},
"target2": {ems[1]},
}
for _, tgt := range []string{"target1", "target2"} {
for _, m := range []string{"success", "total"} {
assert.Len(t, emsMap[tgt][m], len(wantEMs[tgt]), "number of ems mismatch")
assert.Equal(t, wantEMs[tgt], emsMap[tgt][m], "eventmetrics mismatch")
}
}
}
80 changes: 64 additions & 16 deletions probes/external/external.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2017-2023 The Cloudprober Authors.
// Copyright 2017-2024 The Cloudprober Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -24,10 +24,13 @@ over stdin/stdout for each probe cycle.
package external

import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
"os/exec"
"regexp"
"sort"
Expand Down Expand Up @@ -82,9 +85,6 @@ type Probe struct {
results map[string]*result // probe results keyed by targets
dataChan chan *metrics.EventMetrics

// This is used for overriding run command logic for testing.
runCommandFunc func(ctx context.Context, cmd string, args, envVars []string) ([]byte, []byte, error)

// default payload metrics that we clone from to build per-target payload
// metrics.
payloadParser *payload.Parser
Expand Down Expand Up @@ -255,6 +255,43 @@ func (p *Probe) processProbeResult(ps *probeStatus, result *result) {
}
}

func (p *Probe) setupStreaming(c *exec.Cmd, target endpoint.Endpoint) error {
stdout := make(chan []byte)
stdoutR, err := c.StdoutPipe()
if err != nil {
return err
}
stderrR, err := c.StderrPipe()
if err != nil {
return err
}
go func() {
defer close(stdout)
defer stdoutR.Close()
scanner := bufio.NewScanner(stdoutR)
for scanner.Scan() {
stdout <- scanner.Bytes()
}
}()
go func() {
defer stderrR.Close()
scanner := bufio.NewScanner(stderrR)
for scanner.Scan() {
p.l.Warningf("Stderr: %s", scanner.Text())
}
}()

go func() {
for line := range stdout {
for _, em := range p.payloadParser.PayloadMetrics(string(line), target.Name) {
p.opts.RecordMetrics(target, em, p.dataChan, options.WithNoAlert())
}
}
}()

return nil
}

func (p *Probe) runOnceProbe(ctx context.Context) {
var wg sync.WaitGroup

Expand All @@ -278,33 +315,44 @@ func (p *Probe) runOnceProbe(ctx context.Context) {
result.total++
startTime := time.Now()

var stdout, stderr []byte
var err error
if p.runCommandFunc != nil {
stdout, stderr, err = p.runCommandFunc(ctx, p.cmdName, args, p.envVars)
c := exec.CommandContext(ctx, p.cmdName, args...)
if p.envVars != nil {
c.Env = append(append(c.Env, os.Environ()...), p.envVars...)
}

var stdoutBuf, stderrBuf bytes.Buffer

if p.c.GetOutputAsMetrics() && !p.c.GetDisableStreamingOutputMetrics() {
if err := p.setupStreaming(c, target); err != nil {
p.l.Errorf("Error setting up stdout/stderr pipe: %v", err)
return
}
} else {
stdout, stderr, err = p.runCommand(ctx, p.cmdName, args, p.envVars)
c.Stdout, c.Stderr = &stdoutBuf, &stderrBuf
}

err := p.runCommand(ctx, c)

success := true
if err != nil {
success = false
stdout, stderr := stdoutBuf.String(), stderrBuf.String()
stderrout := ""
if stdout != "" || stderr != "" {
stderrout = fmt.Sprintf(" Stdout: %s, Stderr: %s", stdout, stderr)
}
if exitErr, ok := err.(*exec.ExitError); ok {
p.l.Errorf("external probe process died with the status: %s. Stderr: %s", exitErr.Error(), stderr)
p.l.Errorf("external probe process died with the status: %s.%s", exitErr.Error(), stderrout)
} else {
p.l.Errorf("Error executing the external program. Err: %v", err)
}
} else {
if len(stderr) != 0 {
p.l.Warningf("Stderr: %s", stderr)
p.l.Errorf("Error executing the external program. Err: %v.%s", err, stderrout)
}
}

p.processProbeResult(&probeStatus{
target: target,
success: success,
payload: stdoutBuf.String(),
latency: time.Since(startTime),
payload: string(stdout),
}, result)
}(target, p.results[target.Key()])
}
Expand Down
Loading

0 comments on commit bb142c0

Please sign in to comment.