Skip to content

Commit

Permalink
[probes.http] Provide a way to report latency breakdown. (#699)
Browse files Browse the repository at this point in the history
* Add a new field latency_breakdown to the HTTP probe configuration. With this field, following additional latency measurements can be added:

- dns_latency
- connect_latency
- tls_handshake_latency
- request_write_latency
- first_byte_latency

* Update surfacers to ensure that these new latency metrics are parsed as latency metric and are assigned the correct unit.
  • Loading branch information
manugarg committed Mar 25, 2024
1 parent 527a9eb commit cdfa944
Show file tree
Hide file tree
Showing 17 changed files with 669 additions and 160 deletions.
10 changes: 10 additions & 0 deletions metrics/eventmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func NewEventMetrics(ts time.Time) *EventMetrics {
// metric with the same name exists already, new metric is ignored. AddMetric
// returns the receiver EventMetrics to allow for the chaining of these calls,
// for example:
//
// em := metrics.NewEventMetrics(time.Now()).
// AddMetric("sent", &prr.sent).
// AddMetric("rcvd", &prr.rcvd).
Expand Down Expand Up @@ -104,6 +105,7 @@ func (em *EventMetrics) MetricsKeys() []string {
// label with the same name exists already, new label is ignored. AddLabel
// returns the receiver EventMetrics to allow for the chaining of these calls,
// for example:
//
// em := metrics.NewEventMetrics(time.Now()).
// AddMetric("sent", &prr.sent).
// AddLabel("ptype", "http").
Expand Down Expand Up @@ -262,3 +264,11 @@ func (em *EventMetrics) Key() string {
}
return strings.Join(keys, ",")
}

// LatencyUnitToString returns the string representation of the latency unit.
func LatencyUnitToString(latencyUnit time.Duration) string {
if latencyUnit == 0 || latencyUnit == time.Microsecond {
return "us"
}
return latencyUnit.String()[1:]
}
17 changes: 17 additions & 0 deletions metrics/eventmetrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func newEventMetrics(sent, rcvd, rtt int64, respCodes map[string]int64) *EventMetrics {
Expand Down Expand Up @@ -204,3 +206,18 @@ func TestAllocsPerRun(t *testing.T) {

t.Logf("Average allocations per run: ForNew=%v, ForString=%v", newAvg, stringAvg)
}

func TestLatencyUnitToString(t *testing.T) {
tests := map[time.Duration]string{
0: "us",
time.Second: "s",
time.Millisecond: "ms",
time.Microsecond: "us",
time.Nanosecond: "ns",
}
for latencyUnit, want := range tests {
t.Run(want, func(t *testing.T) {
assert.Equal(t, want, LatencyUnitToString(latencyUnit), "LatencyUnitToString()")
})
}
}
106 changes: 96 additions & 10 deletions probes/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,18 @@ type Probe struct {
requestBody *httpreq.RequestBody
}

type latencyDetails struct {
dnsLatency, connectLatency, tlsLatency, reqWriteLatency, firstByteLatency metrics.LatencyValue
}

type probeResult struct {
total, success, timeouts int64
connEvent int64
latency metrics.LatencyValue
respCodes *metrics.Map[int64]
respBodies *metrics.Map[int64]
validationFailure *metrics.Map[int64]
latencyBreakdown *latencyDetails
sslEarliestExpirationSeconds int64
}

Expand Down Expand Up @@ -247,26 +252,57 @@ func isClientTimeout(err error) bool {
return false
}

func (p *Probe) addLatency(latency metrics.LatencyValue, start time.Time) {
latency.AddFloat64(time.Since(start).Seconds() / p.opts.LatencyUnit.Seconds())
}

// httpRequest executes an HTTP request and updates the provided result struct.
func (p *Probe) doHTTPRequest(req *http.Request, client *http.Client, targetName string, result *probeResult, resultMu *sync.Mutex) {
req = p.prepareRequest(req)

start := time.Now()

trace := &httptrace.ClientTrace{}

if lb := result.latencyBreakdown; lb != nil {
if lb.dnsLatency != nil {
trace.DNSDone = func(_ httptrace.DNSDoneInfo) { p.addLatency(lb.dnsLatency, start) }
}
if lb.connectLatency != nil {
trace.ConnectDone = func(_, _ string, _ error) { p.addLatency(lb.connectLatency, start) }
}
if lb.tlsLatency != nil {
trace.TLSHandshakeDone = func(_ tls.ConnectionState, _ error) { p.addLatency(lb.tlsLatency, start) }
}
if lb.reqWriteLatency != nil {
trace.WroteRequest = func(_ httptrace.WroteRequestInfo) { p.addLatency(lb.reqWriteLatency, start) }
}
if lb.firstByteLatency != nil {
trace.GotFirstResponseByte = func() { p.addLatency(lb.firstByteLatency, start) }
}
}

var connEvent atomic.Int32

if p.c.GetKeepAlive() {
trace := &httptrace.ClientTrace{
ConnectDone: func(_, addr string, err error) {
connEvent.Add(1)
if err != nil {
p.l.Warning("Error establishing a new connection to: ", addr, ". Err: ", err.Error())
return
}
p.l.Info("Established a new connection to: ", addr)
},
oldConnectDone := trace.ConnectDone
trace.ConnectDone = func(network, addr string, err error) {
connEvent.Add(1)
if oldConnectDone != nil {
oldConnectDone(network, addr, err)
}
if err != nil {
p.l.Warning("Error establishing a new connection to: ", addr, ". Err: ", err.Error())
return
}
p.l.Info("Established a new connection to: ", addr)
}
}

if trace != nil {
req = req.WithContext(httptrace.WithClientTrace(req.Context(), trace))
}

start := time.Now()
resp, err := client.Do(req)
latency := time.Since(start)

Expand Down Expand Up @@ -332,6 +368,36 @@ func (p *Probe) doHTTPRequest(req *http.Request, client *http.Client, targetName
}
}

func (p *Probe) parseLatencyBreakdown(baseLatencyValue metrics.LatencyValue) *latencyDetails {
if len(p.c.GetLatencyBreakdown()) == 0 {
return nil
}
lbMap := make(map[configpb.ProbeConf_LatencyBreakdown]bool)
for _, l := range p.c.GetLatencyBreakdown() {
lbMap[l] = true
}

all := lbMap[configpb.ProbeConf_ALL_LATENCIES]

ld := &latencyDetails{}
if all || lbMap[configpb.ProbeConf_DNS_LATENCY] {
ld.dnsLatency = baseLatencyValue.Clone().(metrics.LatencyValue)
}
if all || lbMap[configpb.ProbeConf_CONNECT_LATENCY] {
ld.connectLatency = baseLatencyValue.Clone().(metrics.LatencyValue)
}
if all || lbMap[configpb.ProbeConf_TLS_HANDSHAKE_LATENCY] {
ld.tlsLatency = baseLatencyValue.Clone().(metrics.LatencyValue)
}
if all || lbMap[configpb.ProbeConf_REQ_WRITE_LATENCY] {
ld.reqWriteLatency = baseLatencyValue.Clone().(metrics.LatencyValue)
}
if all || lbMap[configpb.ProbeConf_FIRST_BYTE_LATENCY] {
ld.firstByteLatency = baseLatencyValue.Clone().(metrics.LatencyValue)
}
return ld
}

func (p *Probe) runProbe(ctx context.Context, target endpoint.Endpoint, clients []*http.Client, req *http.Request, result *probeResult) {
reqCtx, cancelReqCtx := context.WithTimeout(ctx, p.opts.Timeout)
defer cancelReqCtx()
Expand Down Expand Up @@ -376,6 +442,8 @@ func (p *Probe) newResult() *probeResult {
result.latency = metrics.NewFloat(0)
}

result.latencyBreakdown = p.parseLatencyBreakdown(result.latency)

if p.c.GetExportResponseAsMetrics() {
result.respBodies = metrics.NewMap("resp")
}
Expand Down Expand Up @@ -403,6 +471,24 @@ func (p *Probe) exportMetrics(ts time.Time, result *probeResult, target endpoint
em.AddMetric("validation_failure", result.validationFailure)
}

if result.latencyBreakdown != nil {
if dl := result.latencyBreakdown.dnsLatency; dl != nil {
em.AddMetric("dns_latency", dl.Clone())
}
if cl := result.latencyBreakdown.connectLatency; cl != nil {
em.AddMetric("connect_latency", cl.Clone())
}
if tl := result.latencyBreakdown.tlsLatency; tl != nil {
em.AddMetric("tls_handshake_latency", tl.Clone())
}
if rwl := result.latencyBreakdown.reqWriteLatency; rwl != nil {
em.AddMetric("req_write_latency", rwl.Clone())
}
if fbl := result.latencyBreakdown.firstByteLatency; fbl != nil {
em.AddMetric("first_byte_latency", fbl.Clone())
}
}

em.AddLabel("ptype", "http").AddLabel("probe", p.name).AddLabel("dst", target.Name)
p.opts.RecordMetrics(target, em, dataChan)

Expand Down
Loading

0 comments on commit cdfa944

Please sign in to comment.