Skip to content

Commit

Permalink
processor: support span keepalive
Browse files Browse the repository at this point in the history
Signed-off-by: Andrea Luzzardi <andrea@luzzardi.com>
  • Loading branch information
aluzzardi committed Mar 13, 2024
1 parent c3c7f15 commit 2fc011f
Showing 1 changed file with 45 additions and 15 deletions.
60 changes: 45 additions & 15 deletions batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ const (
DefaultScheduleDelay = 5000
DefaultExportTimeout = 30000
DefaultMaxExportBatchSize = 512

defaultSpanKeepAlive = 30 * time.Second
)

// BatchSpanProcessorOption configures a BatchSpanProcessor.
Expand Down Expand Up @@ -63,14 +65,20 @@ type batchSpanProcessor struct {
queue chan trace.ReadOnlySpan
dropped uint32

batch []trace.ReadOnlySpan
batchMutex sync.Mutex
batchSpans map[otrace.SpanID]struct{}
timer *time.Timer
stopWait sync.WaitGroup
stopOnce sync.Once
stopCh chan struct{}
stopped atomic.Bool
batch []trace.ReadOnlySpan
batchMutex sync.Mutex
batchSpans map[otrace.SpanID]struct{}
inProgressSpans map[otrace.SpanID]*inProgressSpan
timer *time.Timer
stopWait sync.WaitGroup
stopOnce sync.Once
stopCh chan struct{}
stopped atomic.Bool
}

type inProgressSpan struct {
trace.ReadOnlySpan
UpdatedAt time.Time
}

var _ trace.SpanProcessor = (*batchSpanProcessor)(nil)
Expand Down Expand Up @@ -101,13 +109,14 @@ func NewBatchSpanProcessor(exporter trace.SpanExporter, options ...BatchSpanProc
opt(&o)
}
bsp := &batchSpanProcessor{
e: exporter,
o: o,
batch: make([]trace.ReadOnlySpan, 0, o.MaxExportBatchSize),
batchSpans: make(map[otrace.SpanID]struct{}),
timer: time.NewTimer(o.BatchTimeout),
queue: make(chan trace.ReadOnlySpan, o.MaxQueueSize),
stopCh: make(chan struct{}),
e: exporter,
o: o,
batch: make([]trace.ReadOnlySpan, 0, o.MaxExportBatchSize),
batchSpans: make(map[otrace.SpanID]struct{}),
inProgressSpans: make(map[otrace.SpanID]*inProgressSpan),
timer: time.NewTimer(o.BatchTimeout),
queue: make(chan trace.ReadOnlySpan, o.MaxQueueSize),
stopCh: make(chan struct{}),
}

bsp.stopWait.Add(1)
Expand Down Expand Up @@ -269,6 +278,27 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
defer cancel()
}

// Update in progress spans
for _, span := range bsp.batch {
if span.EndTime().IsZero() {
bsp.inProgressSpans[span.SpanContext().SpanID()] = &inProgressSpan{
ReadOnlySpan: span,
UpdatedAt: time.Now(),
}
} else {
delete(bsp.inProgressSpans, span.SpanContext().SpanID())
}
}

// add in progress spans that are not part of the batch
for _, span := range bsp.inProgressSpans {
// ignore spans that were recently updated
if span.UpdatedAt.IsZero() || span.UpdatedAt.Before(time.Now().Add(-defaultSpanKeepAlive)) {
bsp.addToBatch(span.ReadOnlySpan)
span.UpdatedAt = time.Now()
}
}

if l := len(bsp.batch); l > 0 {
err := bsp.e.ExportSpans(ctx, bsp.batch)

Expand Down

0 comments on commit 2fc011f

Please sign in to comment.