Skip to content
This repository has been archived by the owner on Nov 5, 2021. It is now read-only.

Commit

Permalink
WIP: track and reuse empty slots in HTTP probe
Browse files Browse the repository at this point in the history
  • Loading branch information
Nerijus Bendziunas committed Mar 18, 2021
1 parent adab69a commit 6dc5ba0
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 7 deletions.
33 changes: 30 additions & 3 deletions probes/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type Probe struct {
waitGroup sync.WaitGroup

requestBody []byte
slots []string
}

type probeResult struct {
Expand Down Expand Up @@ -237,6 +238,8 @@ func (p *Probe) Init(name string, opts *options.Options) error {
}
p.l.Infof("Targets update interval: %v", p.targetsUpdateInterval)

p.slots = make([]string, int(p.opts.Interval/p.gapBetweenTargets()))

return nil
}

Expand Down Expand Up @@ -454,6 +457,16 @@ func (p *Probe) gapBetweenTargets() time.Duration {
return interTargetGap
}

func (p *Probe) findFreeSlot() (int, error) {
// TODO: don't sweep whole range every time
for i, t := range p.slots {
if t == "" {
return i, nil
}
}
return 0, fmt.Errorf("no free slots")
}

// updateTargetsAndStartProbes refreshes targets and starts probe loop for
// new targets and cancels probe loops for targets that are no longer active.
// Note that this function is not concurrency safe. It is never called
Expand Down Expand Up @@ -483,12 +496,18 @@ func (p *Probe) updateTargetsAndStartProbes(ctx context.Context, dataChan chan *
continue
}
cancelF()

for i, target := range p.slots {
if target == targetKey {
p.slots[i] = ""
}
}

updatedTargets[targetKey] = "DELETE"
delete(p.cancelFuncs, targetKey)
}

gapBetweenTargets := p.gapBetweenTargets()
var startWaitTime time.Duration

// Start probe loop for new targets.
for key, target := range activeTargets {
Expand All @@ -498,19 +517,27 @@ func (p *Probe) updateTargetsAndStartProbes(ctx context.Context, dataChan chan *
}
updatedTargets[key] = "ADD"

slot, err := p.findFreeSlot()

if err != nil {
p.l.Errorf("no free slots for %s", key)
break
}

probeCtx, cancelF := context.WithCancel(ctx)
p.waitGroup.Add(1)

startWaitTime := gapBetweenTargets * time.Duration(slot)

go func(target endpoint.Endpoint, waitTime time.Duration) {
defer p.waitGroup.Done()
// Wait for wait time + some jitter before starting this probe loop.
time.Sleep(waitTime + time.Duration(rand.Int63n(gapBetweenTargets.Microseconds()/10))*time.Microsecond)
p.startForTarget(probeCtx, target, dataChan)
}(target, startWaitTime)

startWaitTime += gapBetweenTargets

p.cancelFuncs[key] = cancelF
p.slots[slot] = target.Key()
}
}

Expand Down
16 changes: 12 additions & 4 deletions probes/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,16 @@ func testProbeWithLargeBody(t *testing.T, bodySize int) {
func TestMultipleTargetsMultipleRequests(t *testing.T) {
testTargets := []string{"test.com", "fail-test.com", "fails-to-resolve.com"}
reqPerProbe := int64(3)
intervalBetweenTargetsMsec := 1
opts := &options.Options{
Targets: targets.StaticTargets(strings.Join(testTargets, ",")),
Interval: 10 * time.Millisecond,
StatsExportInterval: 20 * time.Millisecond,
ProbeConf: &configpb.ProbeConf{RequestsPerProbe: proto.Int32(int32(reqPerProbe))},
LogMetrics: func(_ *metrics.EventMetrics) {},
ProbeConf: &configpb.ProbeConf{
RequestsPerProbe: proto.Int32(int32(reqPerProbe)),
IntervalBetweenTargetsMsec: proto.Int32(int32(intervalBetweenTargetsMsec)),
},
LogMetrics: func(_ *metrics.EventMetrics) {},
}

p := &Probe{}
Expand Down Expand Up @@ -343,12 +347,16 @@ func compareNumberOfMetrics(t *testing.T, ems []*metrics.EventMetrics, targets [
func TestUpdateTargetsAndStartProbes(t *testing.T) {
testTargets := [2]string{"test1.com", "test2.com"}
reqPerProbe := int64(3)
intervalBetweenTargetsMsec := 1
opts := &options.Options{
Targets: targets.StaticTargets(fmt.Sprintf("%s,%s", testTargets[0], testTargets[1])),
Interval: 10 * time.Millisecond,
StatsExportInterval: 20 * time.Millisecond,
ProbeConf: &configpb.ProbeConf{RequestsPerProbe: proto.Int32(int32(reqPerProbe))},
LogMetrics: func(_ *metrics.EventMetrics) {},
ProbeConf: &configpb.ProbeConf{
RequestsPerProbe: proto.Int32(int32(reqPerProbe)),
IntervalBetweenTargetsMsec: proto.Int32(int32(intervalBetweenTargetsMsec)),
},
LogMetrics: func(_ *metrics.EventMetrics) {},
}
p := &Probe{}
p.Init("http_test", opts)
Expand Down

0 comments on commit 6dc5ba0

Please sign in to comment.