diff --git a/otelfiber/fiber.go b/otelfiber/fiber.go index a5dfd521..afe05ced 100644 --- a/otelfiber/fiber.go +++ b/otelfiber/fiber.go @@ -2,6 +2,8 @@ package otelfiber import ( "context" + "github.com/valyala/fasthttp" + "io" "net/http" "time" @@ -31,6 +33,27 @@ const ( UnitMilliseconds = "ms" ) +var _ io.Reader = (*bodyStreamSizeReader)(nil) + +type bodyStreamSizeReader struct { + impl io.Reader + histogram metric.Int64Histogram + measureOpt metric.MeasurementOption + ctx context.Context + read int64 +} + +func (b *bodyStreamSizeReader) Read(p []byte) (n int, err error) { + n, err = b.impl.Read(p) + b.read += int64(n) + if err == io.EOF { + b.histogram.Record(b.ctx, b.read, b.measureOpt) + b.read = 0 + } + + return n, err +} + // Middleware returns fiber handler which will trace incoming requests. func Middleware(opts ...Option) fiber.Handler { cfg := config{} @@ -96,7 +119,8 @@ func Middleware(opts ...Option) fiber.Handler { copy(responseMetricAttrs, requestMetricsAttrs) reqHeader := make(http.Header) - c.Request().Header.VisitAll(func(k, v []byte) { + request := c.Request() + request.Header.VisitAll(func(k, v []byte) { reqHeader.Add(string(k), string(v)) }) @@ -126,13 +150,54 @@ func Middleware(opts ...Option) fiber.Handler { } // extract common attributes from response + response := c.Response() responseAttrs := append( - semconv.HTTPAttributesFromHTTPStatusCode(c.Response().StatusCode()), + semconv.HTTPAttributesFromHTTPStatusCode(response.StatusCode()), semconv.HTTPRouteKey.String(c.Route().Path), // no need to copy c.Route().Path: route strings should be immutable across app lifecycle ) - requestSize := int64(len(c.Request().Body())) - responseSize := int64(len(c.Response().Body())) + var ( + requestSize int64 + responseSize int64 + ) + + if !request.IsBodyStream() { + requestSize = int64(len(request.Body())) + } else { + // NOTICE: we have to create response copy because underlying steam closed before change + copyReq := &fasthttp.Request{} + request.CopyTo(copyReq) + copyReq.SetBodyStream(&bodyStreamSizeReader{ + impl: response.BodyStream(), + histogram: httpServerRequestSize, + measureOpt: metric.WithAttributes(append( + responseMetricAttrs, + responseAttrs...)...), + ctx: context.WithoutCancel(savedCtx), + read: 0, + }, -1) + + request = copyReq + } + + if !response.IsBodyStream() { + responseSize = int64(len(response.Body())) + } else { + // NOTICE: we have to create response copy because underlying steam closed before change + copyResp := &fasthttp.Response{} + response.CopyTo(copyResp) + copyResp.SetBodyStream(&bodyStreamSizeReader{ + impl: response.BodyStream(), + histogram: httpServerResponseSize, + measureOpt: metric.WithAttributes(append( + responseMetricAttrs, + responseAttrs...)...), + ctx: context.WithoutCancel(savedCtx), + read: 0, + }, -1) + + response = copyResp + } defer func() { responseMetricAttrs = append( @@ -141,8 +206,13 @@ func Middleware(opts ...Option) fiber.Handler { httpServerActiveRequests.Add(savedCtx, -1, metric.WithAttributes(requestMetricsAttrs...)) httpServerDuration.Record(savedCtx, float64(time.Since(start).Microseconds())/1000, metric.WithAttributes(responseMetricAttrs...)) - httpServerRequestSize.Record(savedCtx, requestSize, metric.WithAttributes(responseMetricAttrs...)) - httpServerResponseSize.Record(savedCtx, responseSize, metric.WithAttributes(responseMetricAttrs...)) + if !request.IsBodyStream() { + httpServerRequestSize.Record(savedCtx, requestSize, metric.WithAttributes(responseMetricAttrs...)) + } + + if !response.IsBodyStream() { + httpServerResponseSize.Record(savedCtx, responseSize, metric.WithAttributes(responseMetricAttrs...)) + } c.SetUserContext(savedCtx) cancel() @@ -155,7 +225,7 @@ func Middleware(opts ...Option) fiber.Handler { )...) span.SetName(cfg.SpanNameFormatter(c)) - spanStatus, spanMessage := semconv.SpanStatusFromHTTPStatusCodeAndSpanKind(c.Response().StatusCode(), oteltrace.SpanKindServer) + spanStatus, spanMessage := semconv.SpanStatusFromHTTPStatusCodeAndSpanKind(response.StatusCode(), oteltrace.SpanKindServer) span.SetStatus(spanStatus, spanMessage) //Propagate tracing context as headers in outbound response diff --git a/otelfiber/otelfiber_test/fiber_test.go b/otelfiber/otelfiber_test/fiber_test.go index 130802b4..69d7e639 100644 --- a/otelfiber/otelfiber_test/fiber_test.go +++ b/otelfiber/otelfiber_test/fiber_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "io" "net/http" "net/http/httptest" "testing" @@ -447,6 +448,82 @@ func TestCustomAttributes(t *testing.T) { assert.Contains(t, attr, attribute.String("http.query_params", "foo=bar")) } +type fakeFile interface { + io.Reader +} + +var _ fakeFile = (*fakeFileImpl)(nil) + +type fakeFileImpl struct { + done bool + pos int +} + +const fakeFileLen = 4096 * 2 + +func (f *fakeFileImpl) Read(p []byte) (n int, err error) { + if f.done { + f.done = false + f.pos = 0 + return 0, io.EOF + } + + toRead := len(p) + if toRead > fakeFileLen-f.pos { + toRead = fakeFileLen - f.pos + } + + for ix := 0; ix < toRead; ix++ { + p[ix] = byte(ix) + f.pos++ + } + + f.done = fakeFileLen-f.pos == 0 + return toRead, nil +} + +func TestStreamedResponseBody(t *testing.T) { + reader := metric.NewManualReader() + meterProvider := metric.NewMeterProvider(metric.WithReader(reader)) + sr := new(tracetest.SpanRecorder) + tracerProvider := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + + app := fiber.New() + app.Use( + otelfiber.Middleware( + otelfiber.WithMeterProvider(meterProvider), + otelfiber.WithTracerProvider(tracerProvider), + ), + ) + + app.Post("/streams", func(ctx *fiber.Ctx) error { + return ctx.SendStream(&fakeFileImpl{}) + }) + + request := httptest.NewRequest(http.MethodPost, "/streams", nil) + resp, err := app.Test(request, 3000) + + // do and verify the request + require.Equal(t, http.StatusOK, resp.StatusCode) + + spans := sr.Ended() + require.Len(t, spans, 1) + + metrics := metricdata.ResourceMetrics{} + err = reader.Collect(context.Background(), &metrics) + assert.NoError(t, err) + + assert.Len(t, metrics.ScopeMetrics, 1) + scopeMetrics := metrics.ScopeMetrics[0].Metrics + for _, m := range scopeMetrics { + if m.Name == otelfiber.MetricNameHttpServerResponseSize { + histogram := m.Data.(metricdata.Histogram[int64]) + assert.Len(t, histogram.DataPoints, 1) + assert.Equal(t, histogram.DataPoints[0].Sum, int64(fakeFileLen)) + } + } +} + func TestOutboundTracingPropagation(t *testing.T) { sr := new(tracetest.SpanRecorder) provider := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr))