Skip to content

Commit

Permalink
sync multi destination sends
Browse files Browse the repository at this point in the history
  • Loading branch information
DylanLovesCoffee committed Jun 23, 2024
1 parent 8e9775f commit c964a62
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 51 deletions.
16 changes: 14 additions & 2 deletions pkg/logs/client/http/sync_destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package http

import (
"sync"
"time"

"github.com/DataDog/datadog-agent/comp/logs/agent/config"
Expand All @@ -19,20 +20,23 @@ import (

// SyncDestination sends a payload over HTTP and does not retry.
type SyncDestination struct {
destination *Destination
destination *Destination
senderDoneChan chan *sync.WaitGroup
}

// NewSyncDestination returns a new synchronous Destination.
func NewSyncDestination(endpoint config.Endpoint,
contentType string,
destinationsContext *client.DestinationsContext,
senderDoneChan chan *sync.WaitGroup,
telemetryName string,
cfg pkgconfigmodel.Reader) *SyncDestination {

return newSyncDestination(endpoint,
contentType,
destinationsContext,
time.Second*10,
senderDoneChan,
telemetryName,
cfg)
}
Expand All @@ -41,11 +45,13 @@ func newSyncDestination(endpoint config.Endpoint,
contentType string,
destinationsContext *client.DestinationsContext,
timeout time.Duration,
senderDoneChan chan *sync.WaitGroup,
telemetryName string,
cfg pkgconfigmodel.Reader) *SyncDestination {

return &SyncDestination{
destination: newDestination(endpoint, contentType, destinationsContext, timeout, 1, false, telemetryName, cfg),
destination: newDestination(endpoint, contentType, destinationsContext, timeout, 1, false, telemetryName, cfg),
senderDoneChan: senderDoneChan,
}
}

Expand Down Expand Up @@ -81,6 +87,12 @@ func (d *SyncDestination) run(input chan *message.Payload, output chan *message.
metrics.TlmDestinationErrors.Inc()
log.Debugf("Could not send payload: %v", err)
}

if d.senderDoneChan != nil {
senderDoneWg := <-d.senderDoneChan
senderDoneWg.Done()
}

metrics.LogsSent.Add(int64(len(p.Messages)))
metrics.TlmLogsSent.Add(float64(len(p.Messages)))
output <- p
Expand Down
52 changes: 34 additions & 18 deletions pkg/logs/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package pipeline
import (
"context"
"fmt"
"sync"

"github.com/DataDog/datadog-agent/comp/core/hostname/hostnameinterface"
"github.com/DataDog/datadog-agent/comp/logs/agent/config"
Expand All @@ -25,11 +26,13 @@ import (

// Pipeline processes and sends messages to the backend
type Pipeline struct {
InputChan chan *message.Message
flushChan chan struct{}
processor *processor.Processor
strategy sender.Strategy
sender *sender.Sender
InputChan chan *message.Message
flushChan chan struct{}
processor *processor.Processor
strategy sender.Strategy
sender *sender.Sender
serverless bool
flushDoneChan chan struct{}
}

// NewPipeline returns a new Pipeline
Expand All @@ -44,7 +47,14 @@ func NewPipeline(outputChan chan *message.Payload,
hostname hostnameinterface.Component,
cfg pkgconfigmodel.Reader) *Pipeline {

mainDestinations := getDestinations(endpoints, destinationsContext, pipelineID, serverless, status, cfg)
var senderDoneChan chan *sync.WaitGroup
var flushDoneChan chan struct{}
if serverless {
senderDoneChan = make(chan *sync.WaitGroup)
flushDoneChan = make(chan struct{})
}

mainDestinations := getDestinations(endpoints, destinationsContext, pipelineID, serverless, senderDoneChan, status, cfg)

strategyInput := make(chan *message.Message, config.ChanSize)
senderInput := make(chan *message.Payload, 1) // Only buffer 1 message since payloads can be large
Expand All @@ -63,18 +73,20 @@ func NewPipeline(outputChan chan *message.Payload,
encoder = processor.RawEncoder
}

strategy := getStrategy(strategyInput, senderInput, flushChan, endpoints, serverless, pipelineID)
logsSender = sender.NewSender(cfg, senderInput, outputChan, mainDestinations, config.DestinationPayloadChanSize)
strategy := getStrategy(strategyInput, senderInput, flushChan, endpoints, serverless, flushDoneChan, pipelineID)
logsSender = sender.NewSender(cfg, senderInput, outputChan, mainDestinations, config.DestinationPayloadChanSize, senderDoneChan, flushDoneChan)

inputChan := make(chan *message.Message, config.ChanSize)
processor := processor.New(inputChan, strategyInput, processingRules, encoder, diagnosticMessageReceiver, hostname, pipelineID)

return &Pipeline{
InputChan: inputChan,
flushChan: flushChan,
processor: processor,
strategy: strategy,
sender: logsSender,
InputChan: inputChan,
flushChan: flushChan,
processor: processor,
strategy: strategy,
sender: logsSender,
serverless: serverless,
flushDoneChan: flushDoneChan,
}
}

Expand All @@ -96,25 +108,29 @@ func (p *Pipeline) Stop() {
func (p *Pipeline) Flush(ctx context.Context) {
p.flushChan <- struct{}{}
p.processor.Flush(ctx) // flush messages in the processor into the sender

if p.serverless {
<-p.flushDoneChan
}
}

func getDestinations(endpoints *config.Endpoints, destinationsContext *client.DestinationsContext, pipelineID int, serverless bool, status statusinterface.Status, cfg pkgconfigmodel.Reader) *client.Destinations {
func getDestinations(endpoints *config.Endpoints, destinationsContext *client.DestinationsContext, pipelineID int, serverless bool, senderDoneChan chan *sync.WaitGroup, status statusinterface.Status, cfg pkgconfigmodel.Reader) *client.Destinations {
reliable := []client.Destination{}
additionals := []client.Destination{}

if endpoints.UseHTTP {
for i, endpoint := range endpoints.GetReliableEndpoints() {
telemetryName := fmt.Sprintf("logs_%d_reliable_%d", pipelineID, i)
if serverless {
reliable = append(reliable, http.NewSyncDestination(endpoint, http.JSONContentType, destinationsContext, telemetryName, cfg))
reliable = append(reliable, http.NewSyncDestination(endpoint, http.JSONContentType, destinationsContext, senderDoneChan, telemetryName, cfg))
} else {
reliable = append(reliable, http.NewDestination(endpoint, http.JSONContentType, destinationsContext, endpoints.BatchMaxConcurrentSend, true, telemetryName, cfg))
}
}
for i, endpoint := range endpoints.GetUnReliableEndpoints() {
telemetryName := fmt.Sprintf("logs_%d_unreliable_%d", pipelineID, i)
if serverless {
additionals = append(additionals, http.NewSyncDestination(endpoint, http.JSONContentType, destinationsContext, telemetryName, cfg))
additionals = append(additionals, http.NewSyncDestination(endpoint, http.JSONContentType, destinationsContext, senderDoneChan, telemetryName, cfg))
} else {
additionals = append(additionals, http.NewDestination(endpoint, http.JSONContentType, destinationsContext, endpoints.BatchMaxConcurrentSend, false, telemetryName, cfg))
}
Expand All @@ -132,13 +148,13 @@ func getDestinations(endpoints *config.Endpoints, destinationsContext *client.De
}

//nolint:revive // TODO(AML) Fix revive linter
func getStrategy(inputChan chan *message.Message, outputChan chan *message.Payload, flushChan chan struct{}, endpoints *config.Endpoints, serverless bool, pipelineID int) sender.Strategy {
func getStrategy(inputChan chan *message.Message, outputChan chan *message.Payload, flushChan chan struct{}, endpoints *config.Endpoints, serverless bool, flushDoneChan chan struct{}, pipelineID int) sender.Strategy {
if endpoints.UseHTTP || serverless {
encoder := sender.IdentityContentType
if endpoints.Main.UseCompression {
encoder = sender.NewGzipContentEncoding(endpoints.Main.CompressionLevel)
}
return sender.NewBatchStrategy(inputChan, outputChan, flushChan, sender.ArraySerializer, endpoints.BatchWait, endpoints.BatchMaxSize, endpoints.BatchMaxContentSize, "logs", encoder)
return sender.NewBatchStrategy(inputChan, outputChan, flushChan, serverless, flushDoneChan, sender.ArraySerializer, endpoints.BatchWait, endpoints.BatchMaxSize, endpoints.BatchMaxContentSize, "logs", encoder)
}
return sender.NewStreamStrategy(inputChan, outputChan, sender.IdentityContentType)
}
21 changes: 16 additions & 5 deletions pkg/logs/sender/batch_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ var (

// batchStrategy contains all the logic to send logs in batch.
type batchStrategy struct {
inputChan chan *message.Message
outputChan chan *message.Payload
flushChan chan struct{}
buffer *MessageBuffer
inputChan chan *message.Message
outputChan chan *message.Payload
flushChan chan struct{}
serverless bool
flushDoneChan chan struct{}
buffer *MessageBuffer
// pipelineName provides a name for the strategy to differentiate it from other instances in other internal pipelines
pipelineName string
serializer Serializer
Expand All @@ -39,18 +41,22 @@ type batchStrategy struct {
func NewBatchStrategy(inputChan chan *message.Message,
outputChan chan *message.Payload,
flushChan chan struct{},
serverless bool,
flushDoneChan chan struct{},
serializer Serializer,
batchWait time.Duration,
maxBatchSize int,
maxContentSize int,
pipelineName string,
contentEncoding ContentEncoding) Strategy {
return newBatchStrategyWithClock(inputChan, outputChan, flushChan, serializer, batchWait, maxBatchSize, maxContentSize, pipelineName, clock.New(), contentEncoding)
return newBatchStrategyWithClock(inputChan, outputChan, flushChan, serverless, flushDoneChan, serializer, batchWait, maxBatchSize, maxContentSize, pipelineName, clock.New(), contentEncoding)
}

func newBatchStrategyWithClock(inputChan chan *message.Message,
outputChan chan *message.Payload,
flushChan chan struct{},
serverless bool,
flushDoneChan chan struct{},
serializer Serializer,
batchWait time.Duration,
maxBatchSize int,
Expand All @@ -63,6 +69,8 @@ func newBatchStrategyWithClock(inputChan chan *message.Message,
inputChan: inputChan,
outputChan: outputChan,
flushChan: flushChan,
serverless: serverless,
flushDoneChan: flushDoneChan,
buffer: NewMessageBuffer(maxBatchSize, maxContentSize),
serializer: serializer,
batchWait: batchWait,
Expand Down Expand Up @@ -133,6 +141,9 @@ func (s *batchStrategy) processMessage(m *message.Message, outputChan chan *mess
// to the next stage of the pipeline.
func (s *batchStrategy) flushBuffer(outputChan chan *message.Payload) {
if s.buffer.IsEmpty() {
if s.serverless {
s.flushDoneChan <- struct{}{}
}
return
}
messages := s.buffer.GetMessages()
Expand Down
12 changes: 6 additions & 6 deletions pkg/logs/sender/batch_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestBatchStrategySendsPayloadWhenBufferIsFull(t *testing.T) {
output := make(chan *message.Payload)
flushChan := make(chan struct{})

s := NewBatchStrategy(input, output, flushChan, LineSerializer, 100*time.Millisecond, 2, 2, "test", &identityContentType{})
s := NewBatchStrategy(input, output, flushChan, false, nil, LineSerializer, 100*time.Millisecond, 2, 2, "test", &identityContentType{})
s.Start()

message1 := message.NewMessage([]byte("a"), nil, "", 0)
Expand Down Expand Up @@ -52,7 +52,7 @@ func TestBatchStrategySendsPayloadWhenBufferIsOutdated(t *testing.T) {
timerInterval := 100 * time.Millisecond

clk := clock.NewMock()
s := newBatchStrategyWithClock(input, output, flushChan, LineSerializer, timerInterval, 100, 100, "test", clk, &identityContentType{})
s := newBatchStrategyWithClock(input, output, flushChan, false, nil, LineSerializer, timerInterval, 100, 100, "test", clk, &identityContentType{})
s.Start()

for round := 0; round < 3; round++ {
Expand All @@ -77,7 +77,7 @@ func TestBatchStrategySendsPayloadWhenClosingInput(t *testing.T) {
flushChan := make(chan struct{})

clk := clock.NewMock()
s := newBatchStrategyWithClock(input, output, flushChan, LineSerializer, 100*time.Millisecond, 2, 2, "test", clk, &identityContentType{})
s := newBatchStrategyWithClock(input, output, flushChan, false, nil, LineSerializer, 100*time.Millisecond, 2, 2, "test", clk, &identityContentType{})
s.Start()

message := message.NewMessage([]byte("a"), nil, "", 0)
Expand All @@ -102,7 +102,7 @@ func TestBatchStrategyShouldNotBlockWhenStoppingGracefully(t *testing.T) {
output := make(chan *message.Payload)
flushChan := make(chan struct{})

s := NewBatchStrategy(input, output, flushChan, LineSerializer, 100*time.Millisecond, 2, 2, "test", &identityContentType{})
s := NewBatchStrategy(input, output, flushChan, false, nil, LineSerializer, 100*time.Millisecond, 2, 2, "test", &identityContentType{})
s.Start()
message := message.NewMessage([]byte{}, nil, "", 0)

Expand All @@ -126,7 +126,7 @@ func TestBatchStrategySynchronousFlush(t *testing.T) {

// batch size is large so it will not flush until we trigger it manually
// flush time is large so it won't automatically trigger during this test
strategy := NewBatchStrategy(input, output, flushChan, LineSerializer, time.Hour, 100, 100, "test", &identityContentType{})
strategy := NewBatchStrategy(input, output, flushChan, false, nil, LineSerializer, time.Hour, 100, 100, "test", &identityContentType{})
strategy.Start()

// all of these messages will get buffered
Expand Down Expand Up @@ -171,7 +171,7 @@ func TestBatchStrategyFlushChannel(t *testing.T) {

// batch size is large so it will not flush until we trigger it manually
// flush time is large so it won't automatically trigger during this test
strategy := NewBatchStrategy(input, output, flushChan, LineSerializer, time.Hour, 100, 100, "test", &identityContentType{})
strategy := NewBatchStrategy(input, output, flushChan, false, nil, LineSerializer, time.Hour, 100, 100, "test", &identityContentType{})
strategy.Start()

// all of these messages will get buffered
Expand Down
46 changes: 33 additions & 13 deletions pkg/logs/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package sender

import (
"strconv"
"sync"
"time"

pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model"
Expand All @@ -29,23 +30,27 @@ var (
// the auditor or block the pipeline if they fail. There will always be at
// least 1 reliable destination (the main destination).
type Sender struct {
config pkgconfigmodel.Reader
inputChan chan *message.Payload
outputChan chan *message.Payload
destinations *client.Destinations
done chan struct{}
bufferSize int
config pkgconfigmodel.Reader
inputChan chan *message.Payload
outputChan chan *message.Payload
destinations *client.Destinations
done chan struct{}
bufferSize int
senderDoneChan chan *sync.WaitGroup
flushDoneChan chan struct{}
}

// NewSender returns a new sender.
func NewSender(config pkgconfigmodel.Reader, inputChan chan *message.Payload, outputChan chan *message.Payload, destinations *client.Destinations, bufferSize int) *Sender {
func NewSender(config pkgconfigmodel.Reader, inputChan chan *message.Payload, outputChan chan *message.Payload, destinations *client.Destinations, bufferSize int, senderDoneChan chan *sync.WaitGroup, flushDoneChan chan struct{}) *Sender {
return &Sender{
config: config,
inputChan: inputChan,
outputChan: outputChan,
destinations: destinations,
done: make(chan struct{}),
bufferSize: bufferSize,
config: config,
inputChan: inputChan,
outputChan: outputChan,
destinations: destinations,
done: make(chan struct{}),
bufferSize: bufferSize,
senderDoneChan: senderDoneChan,
flushDoneChan: flushDoneChan,
}
}

Expand All @@ -69,12 +74,17 @@ func (s *Sender) run() {

for payload := range s.inputChan {
var startInUse = time.Now()
senderDoneWg := &sync.WaitGroup{}

sent := false
for !sent {
for _, destSender := range reliableDestinations {
if destSender.Send(payload) {
sent = true
if s.senderDoneChan != nil {
senderDoneWg.Add(1)
s.senderDoneChan <- senderDoneWg
}
}
}

Expand Down Expand Up @@ -102,11 +112,21 @@ func (s *Sender) run() {
if !destSender.NonBlockingSend(payload) {
tlmPayloadsDropped.Inc("false", strconv.Itoa(i))
tlmMessagesDropped.Add(float64(len(payload.Messages)), "false", strconv.Itoa(i))
if s.senderDoneChan != nil {
senderDoneWg.Add(1)
s.senderDoneChan <- senderDoneWg
}
}
}

inUse := float64(time.Since(startInUse) / time.Millisecond)
tlmSendWaitTime.Add(inUse)

if s.senderDoneChan != nil {
senderDoneWg.Wait()
// In serverless ensure the payload is sent to all destinations to sync with a flush
s.flushDoneChan <- struct{}{}
}
}

// Cleanup the destinations
Expand Down
Loading

0 comments on commit c964a62

Please sign in to comment.