Skip to content

Commit

Permalink
fix: do not read everything into memory on metrics collection when re…
Browse files Browse the repository at this point in the history
…quest/response body provided as stream
  • Loading branch information
Alantoo committed Mar 5, 2024
1 parent 7984e96 commit 391cb9c
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 4 deletions.
66 changes: 62 additions & 4 deletions otelfiber/fiber.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package otelfiber

import (
"context"
"github.com/valyala/fasthttp"
"io"
"net/http"
"time"

Expand Down Expand Up @@ -31,6 +33,27 @@ const (
UnitMilliseconds = "ms"
)

var _ io.Reader = (*bodyStreamSizeReader)(nil)

type bodyStreamSizeReader struct {
impl io.Reader
histogram metric.Int64Histogram
measureOpt metric.MeasurementOption
ctx context.Context
read int64
}

func (b *bodyStreamSizeReader) Read(p []byte) (n int, err error) {
n, err = b.impl.Read(p)
b.read += int64(n)
if err == io.EOF {
b.histogram.Record(b.ctx, b.read, b.measureOpt)
b.read = 0
}

return n, err
}

// Middleware returns fiber handler which will trace incoming requests.
func Middleware(opts ...Option) fiber.Handler {
cfg := config{}
Expand Down Expand Up @@ -134,16 +157,46 @@ func Middleware(opts ...Option) fiber.Handler {
)

var (
requestSize int64 = -1
responseSize int64 = -1
requestSize int64
responseSize int64
)

if !request.IsBodyStream() {
requestSize = int64(len(request.Body()))
} else {
// NOTICE: we have to create response copy because underlying steam closed before change
copyReq := &fasthttp.Request{}
request.CopyTo(copyReq)
copyReq.SetBodyStream(&bodyStreamSizeReader{
impl: response.BodyStream(),
histogram: httpServerRequestSize,
measureOpt: metric.WithAttributes(append(
responseMetricAttrs,
responseAttrs...)...),
ctx: context.WithoutCancel(savedCtx),
read: 0,
}, -1)

request = copyReq
}

if !response.IsBodyStream() {
responseSize = int64(len(response.Body()))
} else {
// NOTICE: we have to create response copy because underlying steam closed before change
copyResp := &fasthttp.Response{}
response.CopyTo(copyResp)
copyResp.SetBodyStream(&bodyStreamSizeReader{
impl: response.BodyStream(),
histogram: httpServerResponseSize,
measureOpt: metric.WithAttributes(append(
responseMetricAttrs,
responseAttrs...)...),
ctx: context.WithoutCancel(savedCtx),
read: 0,
}, -1)

response = copyResp
}

defer func() {
Expand All @@ -153,8 +206,13 @@ func Middleware(opts ...Option) fiber.Handler {

httpServerActiveRequests.Add(savedCtx, -1, metric.WithAttributes(requestMetricsAttrs...))
httpServerDuration.Record(savedCtx, float64(time.Since(start).Microseconds())/1000, metric.WithAttributes(responseMetricAttrs...))
httpServerRequestSize.Record(savedCtx, requestSize, metric.WithAttributes(responseMetricAttrs...))
httpServerResponseSize.Record(savedCtx, responseSize, metric.WithAttributes(responseMetricAttrs...))
if !request.IsBodyStream() {
httpServerRequestSize.Record(savedCtx, requestSize, metric.WithAttributes(responseMetricAttrs...))
}

if !response.IsBodyStream() {
httpServerResponseSize.Record(savedCtx, responseSize, metric.WithAttributes(responseMetricAttrs...))
}

c.SetUserContext(savedCtx)
cancel()
Expand Down
77 changes: 77 additions & 0 deletions otelfiber/otelfiber_test/fiber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
"testing"
Expand Down Expand Up @@ -446,3 +447,79 @@ func TestCustomAttributes(t *testing.T) {
assert.Contains(t, attr, attribute.String("http.route", "/user/:id"))
assert.Contains(t, attr, attribute.String("http.query_params", "foo=bar"))
}

type fakeFile interface {
io.Reader
}

var _ fakeFile = (*fakeFileImpl)(nil)

type fakeFileImpl struct {
done bool
pos int
}

const fakeFileLen = 4096 * 2

func (f *fakeFileImpl) Read(p []byte) (n int, err error) {
if f.done {
f.done = false
f.pos = 0
return 0, io.EOF
}

toRead := len(p)
if toRead > fakeFileLen-f.pos {
toRead = fakeFileLen - f.pos
}

for ix := 0; ix < toRead; ix++ {
p[ix] = byte(ix)
f.pos++
}

f.done = fakeFileLen-f.pos == 0
return toRead, nil
}

func TestStreamedResponseBody(t *testing.T) {
reader := metric.NewManualReader()
meterProvider := metric.NewMeterProvider(metric.WithReader(reader))
sr := new(tracetest.SpanRecorder)
tracerProvider := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr))

app := fiber.New()
app.Use(
otelfiber.Middleware(
otelfiber.WithMeterProvider(meterProvider),
otelfiber.WithTracerProvider(tracerProvider),
),
)

app.Post("/streams", func(ctx *fiber.Ctx) error {
return ctx.SendStream(&fakeFileImpl{})
})

request := httptest.NewRequest(http.MethodPost, "/streams", nil)
resp, err := app.Test(request, 3000)

// do and verify the request
require.Equal(t, http.StatusOK, resp.StatusCode)

spans := sr.Ended()
require.Len(t, spans, 1)

metrics := metricdata.ResourceMetrics{}
err = reader.Collect(context.Background(), &metrics)
assert.NoError(t, err)

assert.Len(t, metrics.ScopeMetrics, 1)
scopeMetrics := metrics.ScopeMetrics[0].Metrics
for _, m := range scopeMetrics {
if m.Name == otelfiber.MetricNameHttpServerResponseSize {
histogram := m.Data.(metricdata.Histogram[int64])
assert.Len(t, histogram.DataPoints, 1)
assert.Equal(t, histogram.DataPoints[0].Sum, int64(fakeFileLen))
}
}
}

0 comments on commit 391cb9c

Please sign in to comment.