Skip to content

Commit

Permalink
Split large spans into multiple export messages (#36)
Browse files Browse the repository at this point in the history
* Split large spans into multiple export messages

* Clean up code, proper logging

* Move variables to serializer constructor

* Use channels to concurrently serialize and export, added unit tests

* Added test case for 2 mid-size spans being fitted into two separate exports
  • Loading branch information
Kwintenvdb committed Oct 10, 2023
1 parent 4ad3740 commit fe1d3a6
Show file tree
Hide file tree
Showing 5 changed files with 343 additions and 62 deletions.
7 changes: 4 additions & 3 deletions core/trace/dt_span_enricher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import (

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel"
trace "go.opentelemetry.io/otel/trace"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"

"github.com/dynatrace-oss/opentelemetry-exporter-go/core/internal/fw4"
)

func createTracer() trace.Tracer {
tp, _ := newDtTracerProviderWithTestExporter()
func createTracer(opts ...sdktrace.TracerProviderOption) trace.Tracer {
tp, _ := newDtTracerProviderWithTestExporter(opts...)
otel.SetTracerProvider(tp)
return otel.Tracer("Test tracer")
}
Expand Down
62 changes: 34 additions & 28 deletions core/trace/dt_span_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,19 @@ const (

const cSpansPath = "/odin/v1/spans"

const cMaxSizeWarning = 1 * 1024 * 1024 // 1 MB
const cMaxSizeSend = 64 * 1024 * 1024 // 64 MB

var errNotAuthorizedRequest = errors.New("Span Exporter is not authorized to send spans")

type dtSpanExporter interface {
export(ctx context.Context, t exportType, spans dtSpanSet) error
}

type dtSpanExporterImpl struct {
logger *logger.ComponentLogger
config *configuration.DtConfiguration
dialer *net.Dialer
client *http.Client
disabled bool
logger *logger.ComponentLogger
config *configuration.DtConfiguration
dialer *net.Dialer
client *http.Client
serializer *dtSpanSerializer
disabled bool
}

func newDtSpanExporter(config *configuration.DtConfiguration) dtSpanExporter {
Expand All @@ -67,7 +65,8 @@ func newDtSpanExporter(config *configuration.DtConfiguration) dtSpanExporter {
DialContext: d.DialContext,
},
},
disabled: false,
serializer: newSpanSerializer(config.Tenant, config.AgentId, config.QualifiedTenantId()),
disabled: false,
}

return exporter
Expand All @@ -86,26 +85,34 @@ func (e *dtSpanExporterImpl) export(ctx context.Context, t exportType, spans dtS

e.logger.Debugf("Serialize %d spans to export", len(spans))

start := time.Now()
// TODO: In order to support large amounts of spans, implement a splitting algorithm
// so that we can send spans in batches whose sizes do not exceed cMaxSizeSend.
serializedSpans, err := serializeSpans(spans, e.config.Tenant, e.config.AgentId, e.config.QualifiedTenantId())
if err != nil {
return err
}

e.logger.Debugf("Serialization process took %s", time.Since(start))

serializedSpansLen := len(serializedSpans)
if serializedSpansLen > cMaxSizeSend {
errMsg := fmt.Sprintf("skip exporting, serialized spans reached %d bytes. Maximum allowed size is %d bytes",
serializedSpansLen, cMaxSizeSend)
return errors.New(errMsg)
} else if serializedSpansLen > cMaxSizeWarning {
e.logger.Warnf("Size of serialized spans reached %d bytes", serializedSpansLen)
// Asynchronously serialize spans and export each chunk (every SpanExport message) as soon as it is done.
// In most cases, only a single SpanExport should be received on the channel unless we are dealing with large spans
// or resources.
exportChannel := make(chan exportData)
errorChannel := make(chan error)
serializeCtx, cancel := context.WithCancel(ctx)
go func() {
defer cancel()
e.serializer.serializeSpans(spans, exportChannel, errorChannel)
}()

for {
select {
case export := <-exportChannel:
err := e.doExportRequest(ctx, t, export)
if err != nil {
return err
}
case err := <-errorChannel:
return err
case <-serializeCtx.Done():
return nil
}
}
}

reqBody := bytes.NewReader(serializedSpans)
func (e *dtSpanExporterImpl) doExportRequest(ctx context.Context, t exportType, spanExport exportData) error {
reqBody := bytes.NewReader(spanExport)
req, err := e.newRequest(ctx, reqBody)
if err != nil {
return err
Expand All @@ -124,7 +131,6 @@ func (e *dtSpanExporterImpl) export(ctx context.Context, t exportType, spans dtS
} else if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return errors.New("unexpected response code: " + strconv.Itoa(resp.StatusCode))
}

return nil
}

Expand Down
179 changes: 178 additions & 1 deletion core/trace/dt_span_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,15 @@ import (
"io/ioutil"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"

"github.com/dynatrace-oss/opentelemetry-exporter-go/core/configuration"
"github.com/dynatrace-oss/opentelemetry-exporter-go/core/internal/version"
Expand Down Expand Up @@ -143,4 +148,176 @@ func TestDtSpanExporterUpdateHttpClientTimeouts(t *testing.T) {
require.Equal(t, exporter.client.Timeout, time.Millisecond*time.Duration(configuration.DefaultRegularExportConnTimeoutMs+configuration.DefaultRegularExportDataTimeoutMs))
}

// TODO: add unit tests for exporter.export function when Span Enricher and Span Serializer will be implemented
func TestSpanExportWithoutErrors(t *testing.T) {
numRequests := 0
testServer, config := createTestServerAndConfig(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
numRequests++
rw.Write([]byte(`Ok`)) //nolint:errcheck
}))
defer testServer.Close()

exporter := newDtSpanExporter(config).(*dtSpanExporterImpl)
tracer := createTracer()

_, span1 := tracer.Start(context.Background(), "span1")
_, span2 := tracer.Start(context.Background(), "span2")
_, span3 := tracer.Start(context.Background(), "span3")

spans := makeSpanSet(span1, span2, span3)
err := exporter.export(context.Background(), exportTypeForceFlush, spans)

require.NoError(t, err)
require.Equal(t, 1, numRequests, "spans are small enough so only one export request is expected")
}

func TestSpanExportWithoutErrors_MultipleExports(t *testing.T) {
largeString := strings.Repeat("r", 1024*1024) // 1 MB

numRequests := 0
testServer, config := createTestServerAndConfig(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
numRequests++
rw.Write([]byte(`Ok`)) //nolint:errcheck
}))
defer testServer.Close()

exporter := newDtSpanExporter(config).(*dtSpanExporterImpl)
tracer := createTracer()

_, span1 := tracer.Start(context.Background(), "span1",
trace.WithAttributes(attribute.String("large-attr-key", largeString)))
_, span2 := tracer.Start(context.Background(), "span2")
_, span3 := tracer.Start(context.Background(), "span3")

(span1.(*dtSpan)).metadata.sendState = sendStateSpanEnded

spans := makeSpanSet(span1, span2, span3)
err := exporter.export(context.Background(), exportTypeForceFlush, spans)

require.NoError(t, err)
require.Equal(t, 2, numRequests, "since the first span exceeds the warning size, 2 exports must be done")
}

func TestSpanExportWithoutErrors_DroppedSpans(t *testing.T) {
largeString := strings.Repeat("r", 64*1024*1024) // 64 MB

numRequests := 0
testServer, config := createTestServerAndConfig(func(rw http.ResponseWriter, req *http.Request) {
numRequests++
rw.Write([]byte(`Ok`)) //nolint:errcheck
})
defer testServer.Close()

exporter := newDtSpanExporter(config).(*dtSpanExporterImpl)
tracer := createTracer()

_, span1 := tracer.Start(context.Background(), "span1",
trace.WithAttributes(attribute.String("large-attr-key", largeString)))
_, span2 := tracer.Start(context.Background(), "span2")
_, span3 := tracer.Start(context.Background(), "span3")

(span1.(*dtSpan)).metadata.sendState = sendStateSpanEnded

spans := makeSpanSet(span1, span2, span3)
err := exporter.export(context.Background(), exportTypeForceFlush, spans)

require.NoError(t, err)
require.Equal(t, 1, numRequests, "the first span exceeds the maximum size, it must be dropped -> 1 export")
}

func TestSpanExportWithError_ResourceTooBig(t *testing.T) {
numRequests := 0
testServer, config := createTestServerAndConfig(func(rw http.ResponseWriter, req *http.Request) {
numRequests++
rw.Write([]byte(`Ok`)) //nolint:errcheck
})
defer testServer.Close()

exporter := newDtSpanExporter(config).(*dtSpanExporterImpl)
largeString := strings.Repeat("r", 64*1024*1024) // 64 MB
tracer := createTracer(sdktrace.WithResource(resource.NewSchemaless(attribute.String("large-string", largeString))))

_, span1 := tracer.Start(context.Background(), "span1")
_, span2 := tracer.Start(context.Background(), "span2")
_, span3 := tracer.Start(context.Background(), "span3")

spans := makeSpanSet(span1, span2, span3)
err := exporter.export(context.Background(), exportTypeForceFlush, spans)

require.ErrorContains(t, err, "resource too big")
require.Equal(t, 0, numRequests, "the resource is too big -> 0 exports")
}

func TestSpanExportWithoutError_LargeResource(t *testing.T) {
numRequests := 0
testServer, config := createTestServerAndConfig(func(rw http.ResponseWriter, req *http.Request) {
numRequests++
rw.Write([]byte(`Ok`)) //nolint:errcheck
})
defer testServer.Close()

exporter := newDtSpanExporter(config).(*dtSpanExporterImpl)
largeString := strings.Repeat("r", 1024*1024) // 1 MB
tracer := createTracer(sdktrace.WithResource(resource.NewSchemaless(attribute.String("large-string", largeString))))

_, span1 := tracer.Start(context.Background(), "span1")
_, span2 := tracer.Start(context.Background(), "span2")
_, span3 := tracer.Start(context.Background(), "span3")

spans := makeSpanSet(span1, span2, span3)
err := exporter.export(context.Background(), exportTypeForceFlush, spans)

require.NoError(t, err)
require.Equal(t, 3, numRequests, "large resource attached to each SpanExport -> split into 3 requests")
}

func TestSpanExportWithoutError_MidSizedSpans(t *testing.T) {
numRequests := 0
testServer, config := createTestServerAndConfig(func(rw http.ResponseWriter, req *http.Request) {
numRequests++
rw.Write([]byte(`Ok`)) //nolint:errcheck
})
defer testServer.Close()

exporter := newDtSpanExporter(config).(*dtSpanExporterImpl)
tracer := createTracer()

largeString := strings.Repeat("r", 1024*512) // 500 KB
_, span1 := tracer.Start(context.Background(), "span1",
trace.WithAttributes(attribute.String("large-attr-key", largeString)))
_, span2 := tracer.Start(context.Background(), "span2",
trace.WithAttributes(attribute.String("large-attr-key", largeString)))

(span1.(*dtSpan)).metadata.sendState = sendStateSpanEnded
(span2.(*dtSpan)).metadata.sendState = sendStateSpanEnded

spans := makeSpanSet(span1, span2)
err := exporter.export(context.Background(), exportTypeForceFlush, spans)

require.NoError(t, err)
require.Equal(t, 2, numRequests, "the spans fit individually into 1 export under warning size but 2 exceeds the warning size -> 2 exports")
}

func makeSpanSet(spans ...trace.Span) dtSpanSet {
spanSet := make(dtSpanSet)
for _, span := range spans {
spanSet[span.(*dtSpan)] = struct{}{}
}
return spanSet
}

func createTestServerAndConfig(handler http.HandlerFunc) (*httptest.Server, *configuration.DtConfiguration) {
testServer := httptest.NewServer(handler)
config := &configuration.DtConfiguration{
ClusterId: -1234,
Tenant: "testDtTenant",
AgentId: 10,
BaseUrl: testServer.URL,
AuthToken: "testDtToken",
SpanProcessingIntervalMs: configuration.DefaultSpanProcessingIntervalMs,
LoggingDestination: configuration.LoggingDestination_Stdout,
LoggingFlags: "SpanExporter=true,SpanProcessor=true,TracerProvider=true",
RumClientIpHeaders: nil,
DebugAddStackOnStart: false,
}
return testServer, config
}
5 changes: 3 additions & 2 deletions core/trace/dt_span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/stretchr/testify/require"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"

"github.com/dynatrace-oss/opentelemetry-exporter-go/core/configuration"
Expand Down Expand Up @@ -81,13 +82,13 @@ func newTestExporter(o testExporterOptions) *testExporter {
}

// newDtTracerProviderWithTestExporter create Dynatrace Tracer Provider with testExporter
func newDtTracerProviderWithTestExporter() (*DtTracerProvider, *testExporter) {
func newDtTracerProviderWithTestExporter(opts ...sdktrace.TracerProviderOption) (*DtTracerProvider, *testExporter) {
defaultTextExporterOptions := testExporterOptions{
iterationIntervalMs: 500,
numIterations: 1,
}

if tp, err := NewTracerProvider(); err == nil {
if tp, err := NewTracerProvider(opts...); err == nil {
exporter := newTestExporter(defaultTextExporterOptions)
tp.processor.exporter = exporter

Expand Down

0 comments on commit fe1d3a6

Please sign in to comment.