Skip to content

Commit

Permalink
[probes.external] Refactor the external probe code (#695)
Browse files Browse the repository at this point in the history
- Split out server handling code into a different file
- Add tests for serverutils
- Use context to cancel various loops (note: we'll still not be able to cancel the pending read).
  • Loading branch information
manugarg committed Mar 22, 2024
1 parent c70d29b commit 527a9eb
Show file tree
Hide file tree
Showing 6 changed files with 717 additions and 527 deletions.
250 changes: 1 addition & 249 deletions probes/external/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@ over stdin/stdout for each probe cycle.
package external

import (
"bufio"
"context"
"errors"
"fmt"
"io"
"os"
"os/exec"
"regexp"
"sort"
Expand All @@ -45,27 +43,13 @@ import (
"github.com/cloudprober/cloudprober/metrics/payload"
configpb "github.com/cloudprober/cloudprober/probes/external/proto"
serverpb "github.com/cloudprober/cloudprober/probes/external/proto"
"github.com/cloudprober/cloudprober/probes/external/serverutils"
"github.com/cloudprober/cloudprober/probes/options"
"github.com/cloudprober/cloudprober/targets/endpoint"
"github.com/google/shlex"
"google.golang.org/protobuf/proto"
)

var (
// TimeBetweenRequests is the time interval between probe requests for
// multiple targets. In server mode, probe requests for multiple targets are
// sent to the same external probe process. Sleeping between requests provides
// some time buffer for the probe process to dequeue the incoming requests and
// avoids filling up the communication pipe.
//
// Note that this value impacts the effective timeout for a target as timeout
// is applied for all the targets in aggregate. For example, 100th target in
// the targets list will have the effective timeout of (timeout - 1ms).
// TODO(manugarg): Make sure that the last target in the list has an impact of
// less than 1% on its timeout.
TimeBetweenRequests = 10 * time.Microsecond
validLabelRe = regexp.MustCompile(`@(target|address|port|probe|target\.label\.[^@]+)@`)
validLabelRe = regexp.MustCompile(`@(target|address|port|probe|target\.label\.[^@]+)@`)
)

type result struct {
Expand Down Expand Up @@ -196,118 +180,6 @@ type command interface {
Wait() error
}

// monitorCommand waits for the process to terminate and sets cmdRunning to
// false when that happens.
func (p *Probe) monitorCommand(startCtx context.Context, cmd command) error {
err := cmd.Wait()

// Spare logging error message if killed explicitly.
select {
case <-startCtx.Done():
return nil
default:
}

if exitErr, ok := err.(*exec.ExitError); ok {
return fmt.Errorf("external probe process died with the status: %s. Stderr: %s", exitErr.Error(), string(exitErr.Stderr))
}
return err
}

func (p *Probe) startCmdIfNotRunning(startCtx context.Context) error {
// Start external probe command if it's not running already. Note that here we
// are trusting the cmdRunning to be set correctly. It can be false for 4
// reasons:
// Correct reasons:
// 1) This is the first call and process has actually never been started.
// 2) Process died for some reason and monitor set cmdRunning to false.
// Incorrect reasons:
// 3) cmd.Start() started the process but still returned an error.
// 4) cmd.Wait() returned incorrectly, while the process was still running.
//
// 3 or 4 should never really happen, but managing processes can be tricky.
// Documenting here to help with debugging if we run into an issue.
p.cmdRunningMu.Lock()
defer p.cmdRunningMu.Unlock()
if p.cmdRunning {
return nil
}
p.l.Infof("Starting external command: %s %s", p.cmdName, strings.Join(p.cmdArgs, " "))
cmd := exec.CommandContext(startCtx, p.cmdName, p.cmdArgs...)
var err error
if p.cmdStdin, err = cmd.StdinPipe(); err != nil {
return err
}
if p.cmdStdout, err = cmd.StdoutPipe(); err != nil {
return err
}
if p.cmdStderr, err = cmd.StderrPipe(); err != nil {
return err
}
if len(p.envVars) > 0 {
cmd.Env = append(cmd.Env, p.envVars...)
}

go func() {
scanner := bufio.NewScanner(p.cmdStderr)
for scanner.Scan() {
p.l.Warningf("Stderr of %s: %s", cmd.Path, scanner.Text())
}
}()

if err = cmd.Start(); err != nil {
p.l.Errorf("error while starting the cmd: %s %s. Err: %v", cmd.Path, cmd.Args, err)
return fmt.Errorf("error while starting the cmd: %s %s. Err: %v", cmd.Path, cmd.Args, err)
}

doneChan := make(chan struct{})
// This goroutine waits for the process to terminate and sets cmdRunning to
// false when that happens.
go func() {
if err := p.monitorCommand(startCtx, cmd); err != nil {
p.l.Error(err.Error())
}
close(doneChan)
p.cmdRunningMu.Lock()
p.cmdRunning = false
p.cmdRunningMu.Unlock()
}()
go p.readProbeReplies(doneChan)
p.cmdRunning = true
return nil
}

func (p *Probe) readProbeReplies(done chan struct{}) error {
bufReader := bufio.NewReader(p.cmdStdout)
// Start a background goroutine to read probe replies from the probe server
// process's stdout and put them on the probe's replyChan. Note that replyChan
// is a one element channel. Idea is that we won't need buffering other than
// the one provided by Unix pipes.
for {
select {
case <-done:
return nil
default:
}
rep, err := serverutils.ReadProbeReply(bufReader)
if err != nil {
// Return if external probe process pipe has closed. We get:
// io.EOF: when other process has closed the pipe.
// os.ErrClosed: when we have closed the pipe (through cmd.Wait()).
// *os.PathError: deferred close of the pipe.
_, isPathError := err.(*os.PathError)
if err == os.ErrClosed || err == io.EOF || isPathError {
p.l.Errorf("External probe process pipe is closed. Err: %s", err.Error())
return err
}
p.l.Errorf("Error reading probe reply: %s", err.Error())
continue
}
p.replyChan <- rep
}

}

func (p *Probe) labels(ep endpoint.Endpoint) map[string]string {
labels := make(map[string]string)
if p.labelKeys["probe"] {
Expand Down Expand Up @@ -336,37 +208,6 @@ func (p *Probe) labels(ep endpoint.Endpoint) map[string]string {
return labels
}

func (p *Probe) sendRequest(requestID int32, ep endpoint.Endpoint) error {
req := &serverpb.ProbeRequest{
RequestId: proto.Int32(requestID),
TimeLimit: proto.Int32(int32(p.opts.Timeout / time.Millisecond)),
Options: []*serverpb.ProbeRequest_Option{},
}
for _, opt := range p.c.GetOptions() {
value := opt.GetValue()
if len(p.labelKeys) != 0 { // If we're looking for substitions.
res, found := strtemplate.SubstituteLabels(value, p.labels(ep))
if !found {
p.l.Warningf("Missing substitution in option %q", value)
} else {
value = res
}
}
req.Options = append(req.Options, &serverpb.ProbeRequest_Option{
Name: opt.Name,
Value: proto.String(value),
})
}

p.l.Debugf("Sending a probe request %v to the external probe server for target %v", requestID, ep.Name)
return serverutils.WriteMessage(req, p.cmdStdin)
}

type requestInfo struct {
target endpoint.Endpoint
timestamp time.Time
}

// probeStatus captures the single probe status. It's only used by runProbe
// functions to pass a probe's status to processProbeResult method.
type probeStatus struct {
Expand Down Expand Up @@ -414,95 +255,6 @@ func (p *Probe) processProbeResult(ps *probeStatus, result *result) {
}
}

func (p *Probe) runServerProbe(ctx, startCtx context.Context) {
outstandingReqs := make(map[int32]requestInfo)
var outstandingReqsMu sync.RWMutex
sendDoneCh := make(chan struct{})

if err := p.startCmdIfNotRunning(startCtx); err != nil {
p.l.Error(err.Error())
return
}

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()

// Read probe replies until we have no outstanding requests or context has
// run out.
for {
select {
case <-ctx.Done():
p.l.Error(ctx.Err().Error())
return
case rep := <-p.replyChan:
outstandingReqsMu.Lock()
reqInfo, ok := outstandingReqs[rep.GetRequestId()]
if ok {
delete(outstandingReqs, rep.GetRequestId())
}
outstandingReqsMu.Unlock()
if !ok {
// Not our reply, could be from the last timed out probe.
p.l.Warningf("Got a reply that doesn't match any outstading request: Request id from reply: %v. Ignoring.", rep.GetRequestId())
continue
}
success := true
if rep.GetErrorMessage() != "" {
p.l.Errorf("Probe for target %v failed with error message: %s", reqInfo.target, rep.GetErrorMessage())
success = false
}
ps := &probeStatus{
target: reqInfo.target,
success: success,
latency: time.Since(reqInfo.timestamp),
payload: rep.GetPayload(),
}
p.processProbeResult(ps, p.results[reqInfo.target.Key()])
}

// If we are done sending requests, we can exit if we have no
// outstanding requests.
select {
case <-sendDoneCh:
if len(outstandingReqs) == 0 {
return
}
default:
}
}
}()

// Send probe requests
for _, target := range p.targets {
p.requestID++
p.results[target.Key()].total++
outstandingReqsMu.Lock()
outstandingReqs[p.requestID] = requestInfo{
target: target,
timestamp: time.Now(),
}
outstandingReqsMu.Unlock()
p.sendRequest(p.requestID, target)
time.Sleep(TimeBetweenRequests)
}

// Send signal to receiver loop that we are done sending requests.
close(sendDoneCh)

// Wait for receiver goroutine to exit.
wg.Wait()

// Handle requests that we have not yet received replies for: "requests" will
// contain only outstanding requests by this point.
outstandingReqsMu.Lock()
defer outstandingReqsMu.Unlock()
for _, req := range outstandingReqs {
p.processProbeResult(&probeStatus{target: req.target, success: false}, p.results[req.target.Key()])
}
}

func (p *Probe) runOnceProbe(ctx context.Context) {
var wg sync.WaitGroup

Expand Down
Loading

0 comments on commit 527a9eb

Please sign in to comment.