Skip to content

Commit

Permalink
forwarder retry queue payloads max size (#6440)
Browse files Browse the repository at this point in the history
Add `forwarder_retry_queue_payloads_max_size`
  • Loading branch information
ogaca-dd committed Oct 21, 2020
1 parent 2dc6ef9 commit b3ff778
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 59 deletions.
3 changes: 2 additions & 1 deletion pkg/config/config.go
Expand Up @@ -302,7 +302,8 @@ func InitConfig(config Config) {
// Forwarder
config.BindEnvAndSetDefault("additional_endpoints", map[string][]string{})
config.BindEnvAndSetDefault("forwarder_timeout", 20)
config.BindEnvAndSetDefault("forwarder_retry_queue_max_size", 30)
config.BindEnvAndSetDefault("forwarder_retry_queue_max_size", 0)
config.BindEnvAndSetDefault("forwarder_retry_queue_payloads_max_size", 30*megaByte)
config.BindEnvAndSetDefault("forwarder_connection_reset_interval", 0) // in seconds, 0 means disabled
config.BindEnvAndSetDefault("forwarder_apikey_validation_interval", DefaultAPIKeyValidationInterval) // in minutes
config.BindEnvAndSetDefault("forwarder_num_workers", 1)
Expand Down
8 changes: 7 additions & 1 deletion pkg/config/config_template.yaml
Expand Up @@ -163,13 +163,19 @@ api_key:
#
# forwarder_timeout: 20

## @param forwarder_retry_queue_max_size - integer - optional - default: 30
## @param forwarder_retry_queue_max_size - integer - optional - default: 0
## The forwarder retries failed requests. Use this setting to change the
## maximum length of the forwarder's retry queue (each request in the queue
## takes no more than 2MB in memory)
#
# forwarder_retry_queue_max_size: 30

## @param forwarder_retry_queue_payloads_max_size - integer - optional - default: 31457280 (30Mb)
## It defines the maximum size in bytes of all the payloads in the forwarder's retry queue.
## If "forwarder_retry_queue_max_size" is greater than 0, this parameter is ignored.
#
# forwarder_retry_queue_payloads_max_size: 31457280

## @param forwarder_num_workers - integer - optional - default: 1
## The number of workers used by the forwarder.
#
Expand Down
6 changes: 4 additions & 2 deletions pkg/forwarder/README.md
Expand Up @@ -79,8 +79,10 @@ transactions first and then (when the workers have time) we retry the erroneous
ones (newest transactions are retried first).

We start dropping transactions (oldest first) when the number of transactions
in the retry queue is bigger than `forwarder_retry_queue_max_size` (see the
agent configuration).
in the retry queue is bigger than `forwarder_retry_queue_max_size` if
`forwarder_retry_queue_max_size` is greater than zero otherwise when the sum
of all the payload sizes is bigger than `forwarder_retry_queue_payloads_max_size`
(see the agent configuration).

Disclaimer: using multiple API keys with the **Datadog** backend will multiply
your billing ! Most customers will only use one API key.
Expand Down
81 changes: 51 additions & 30 deletions pkg/forwarder/domain_forwarder.go
Expand Up @@ -43,32 +43,34 @@ func initDomainForwarderExpvars() {
// HTTP and retrying them if needed. One domainForwarder is created per HTTP
// backend.
type domainForwarder struct {
isRetrying int32
domain string
numberOfWorkers int
highPrio chan Transaction // use to receive new transactions
lowPrio chan Transaction // use to retry transactions
requeuedTransaction chan Transaction
stopRetry chan bool
stopConnectionReset chan bool
workers []*Worker
retryQueue []Transaction
retryQueueLimit int
connectionResetInterval time.Duration
internalState uint32
m sync.Mutex // To control Start/Stop races
isRetrying int32
domain string
numberOfWorkers int
highPrio chan Transaction // use to receive new transactions
lowPrio chan Transaction // use to retry transactions
requeuedTransaction chan Transaction
stopRetry chan bool
stopConnectionReset chan bool
workers []*Worker
retryQueue []Transaction
retryQueueLimit int
retryQueueAllPayloadsMaxSize int
connectionResetInterval time.Duration
internalState uint32
m sync.Mutex // To control Start/Stop races

blockedList *blockedEndpoints
}

func newDomainForwarder(domain string, numberOfWorkers int, retryQueueLimit int, connectionResetInterval time.Duration) *domainForwarder {
func newDomainForwarder(domain string, numberOfWorkers int, retryQueueLimit int, retryQueueAllPayloadsMaxSize int, connectionResetInterval time.Duration) *domainForwarder {
return &domainForwarder{
domain: domain,
numberOfWorkers: numberOfWorkers,
retryQueueLimit: retryQueueLimit,
connectionResetInterval: connectionResetInterval,
internalState: Stopped,
blockedList: newBlockedEndpoints(),
domain: domain,
numberOfWorkers: numberOfWorkers,
retryQueueLimit: retryQueueLimit,
retryQueueAllPayloadsMaxSize: retryQueueAllPayloadsMaxSize,
connectionResetInterval: connectionResetInterval,
internalState: Stopped,
blockedList: newBlockedEndpoints(),
}
}

Expand Down Expand Up @@ -97,6 +99,7 @@ func (f *domainForwarder) retryTransactions(retryBefore time.Time) {
droppedWorkerBusy := 0

sort.Sort(byCreatedTimeAndPriority(f.retryQueue))
totalPayloadsSize := 0

for _, t := range f.retryQueue {
if !f.blockedList.isBlock(t.GetTarget()) {
Expand All @@ -109,14 +112,25 @@ func (f *domainForwarder) retryTransactions(retryBefore time.Time) {
transactionsDropped.Add(1)
tlmTxDropped.Inc(f.domain)
}
} else if len(newQueue) < f.retryQueueLimit {
newQueue = append(newQueue, t)
transactionsRequeued.Add(1)
tlmTxRequeud.Inc(f.domain)
} else {
droppedRetryQueueFull++
transactionsDropped.Add(1)
tlmTxDropped.Inc(f.domain)
var retry bool
newTotalPayloadsSize := totalPayloadsSize + t.GetPayloadSize()
if f.retryQueueAllPayloadsMaxSize > 0 {
retry = newTotalPayloadsSize <= f.retryQueueAllPayloadsMaxSize
} else {
retry = len(newQueue) < f.retryQueueLimit
}

if retry {
newQueue = append(newQueue, t)
transactionsRequeued.Add(1)
tlmTxRequeud.Inc(f.domain)
totalPayloadsSize = newTotalPayloadsSize
} else {
droppedRetryQueueFull++
transactionsDropped.Add(1)
tlmTxDropped.Inc(f.domain)
}
}
}

Expand All @@ -125,8 +139,15 @@ func (f *domainForwarder) retryTransactions(retryBefore time.Time) {
tlmTxRetryQueueSize.Set(float64(len(f.retryQueue)), f.domain)

if droppedRetryQueueFull+droppedWorkerBusy > 0 {
log.Errorf("Dropped %d transactions in this retry attempt: %d for exceeding the retry queue size limit of %d, %d because the workers are too busy",
droppedRetryQueueFull+droppedWorkerBusy, droppedRetryQueueFull, f.retryQueueLimit, droppedWorkerBusy)
var errorMessage string
if f.retryQueueAllPayloadsMaxSize > 0 {
errorMessage = fmt.Sprintf("the retry queue payloads size limit of %d", f.retryQueueAllPayloadsMaxSize)
} else {
errorMessage = fmt.Sprintf("the retry queue size limit of %d", f.retryQueueLimit)
}

log.Errorf("Dropped %d transactions in this retry attempt: %d for exceeding %s, %d because the workers are too busy",
droppedRetryQueueFull+droppedWorkerBusy, droppedRetryQueueFull, errorMessage, droppedWorkerBusy)
}
}

Expand Down
58 changes: 46 additions & 12 deletions pkg/forwarder/domain_forwarder_test.go
Expand Up @@ -14,7 +14,7 @@ import (
)

func TestNewDomainForwarder(t *testing.T) {
forwarder := newDomainForwarder("test", 1, 10, 120*time.Second)
forwarder := newDomainForwarder("test", 1, 10, 0, 120*time.Second)

assert.NotNil(t, forwarder)
assert.Equal(t, 1, forwarder.numberOfWorkers)
Expand All @@ -32,7 +32,7 @@ func TestNewDomainForwarder(t *testing.T) {
}

func TestDomainForwarderStart(t *testing.T) {
forwarder := newDomainForwarder("test", 1, 10, 0)
forwarder := newDomainForwarder("test", 1, 10, 0, 0)
err := forwarder.Start()

assert.Nil(t, err)
Expand All @@ -51,14 +51,14 @@ func TestDomainForwarderStart(t *testing.T) {
}

func TestDomainForwarderInit(t *testing.T) {
forwarder := newDomainForwarder("test", 1, 10, 0)
forwarder := newDomainForwarder("test", 1, 10, 0, 0)
forwarder.init()
assert.Len(t, forwarder.workers, 0)
assert.Len(t, forwarder.retryQueue, 0)
}

func TestDomainForwarderStop(t *testing.T) {
forwarder := newDomainForwarder("test", 1, 10, 0)
forwarder := newDomainForwarder("test", 1, 10, 0, 0)
forwarder.Stop(false) // this should be a noop
forwarder.Start()
assert.Equal(t, Started, forwarder.State())
Expand All @@ -69,7 +69,7 @@ func TestDomainForwarderStop(t *testing.T) {
}

func TestDomainForwarderStop_WithConnectionReset(t *testing.T) {
forwarder := newDomainForwarder("test", 1, 10, 120*time.Second)
forwarder := newDomainForwarder("test", 1, 10, 0, 120*time.Second)
forwarder.Stop(false) // this should be a noop
forwarder.Start()
assert.Equal(t, Started, forwarder.State())
Expand All @@ -80,20 +80,21 @@ func TestDomainForwarderStop_WithConnectionReset(t *testing.T) {
}

func TestDomainForwarderSubmitIfStopped(t *testing.T) {
forwarder := newDomainForwarder("test", 1, 10, 0)
forwarder := newDomainForwarder("test", 1, 10, 0, 0)

require.NotNil(t, forwarder)
assert.NotNil(t, forwarder.sendHTTPTransactions(nil))
}

func TestDomainForwarderSendHTTPTransactions(t *testing.T) {
forwarder := newDomainForwarder("test", 1, 10, 0)
forwarder := newDomainForwarder("test", 1, 10, 0, 0)
tr := newTestTransaction()

// fw is stopped, we should get an error
err := forwarder.sendHTTPTransactions(tr)
assert.NotNil(t, err)

defer forwarder.Stop(false)
forwarder.Start()
// Stopping the worker for the TestRequeueTransaction
forwarder.workers[0].Stop(false)
Expand All @@ -102,18 +103,21 @@ func TestDomainForwarderSendHTTPTransactions(t *testing.T) {
assert.Nil(t, err)
transactionToProcess := <-forwarder.highPrio
assert.Equal(t, tr, transactionToProcess)

// Reset `forwarder.workers` otherwise `defer forwarder.Stop(false)` will timeout.
forwarder.workers = nil
}

func TestRequeueTransaction(t *testing.T) {
forwarder := newDomainForwarder("test", 1, 10, 0)
forwarder := newDomainForwarder("test", 1, 10, 0, 0)
tr := NewHTTPTransaction()
assert.Len(t, forwarder.retryQueue, 0)
forwarder.requeueTransaction(tr)
assert.Len(t, forwarder.retryQueue, 1)
}

func TestRetryTransactions(t *testing.T) {
forwarder := newDomainForwarder("test", 1, 10, 0)
forwarder := newDomainForwarder("test", 1, 10, 0, 0)
forwarder.init()
forwarder.retryQueueLimit = 1

Expand Down Expand Up @@ -144,7 +148,7 @@ func TestRetryTransactions(t *testing.T) {
}

func TestForwarderRetry(t *testing.T) {
forwarder := newDomainForwarder("test", 1, 10, 0)
forwarder := newDomainForwarder("test", 1, 10, 0, 0)
forwarder.Start()
defer forwarder.Stop(false)

Expand All @@ -163,6 +167,7 @@ func TestForwarderRetry(t *testing.T) {
ready.On("GetCreatedAt").Return(time.Now()).Times(1)
notReady.On("GetCreatedAt").Return(time.Now()).Times(1)
notReady.On("GetTarget").Return("blocked").Times(1)
notReady.On("GetPayloadSize").Return(0).Times(1)

forwarder.retryTransactions(time.Now())
<-ready.processed
Expand All @@ -176,7 +181,7 @@ func TestForwarderRetry(t *testing.T) {
}

func TestForwarderRetryLifo(t *testing.T) {
forwarder := newDomainForwarder("test", 1, 10, 0)
forwarder := newDomainForwarder("test", 1, 10, 0, 0)
forwarder.init()

transaction1 := newTestTransaction()
Expand Down Expand Up @@ -205,7 +210,7 @@ func TestForwarderRetryLifo(t *testing.T) {
}

func TestForwarderRetryLimitQueue(t *testing.T) {
forwarder := newDomainForwarder("test", 1, 10, 0)
forwarder := newDomainForwarder("test", 1, 10, 0, 0)
forwarder.init()

forwarder.retryQueueLimit = 1
Expand All @@ -220,9 +225,11 @@ func TestForwarderRetryLimitQueue(t *testing.T) {

transaction1.On("GetCreatedAt").Return(time.Now()).Times(1)
transaction1.On("GetTarget").Return("blocked").Times(1)
transaction1.On("GetPayloadSize").Return(0).Times(1)

transaction2.On("GetCreatedAt").Return(time.Now().Add(1 * time.Minute)).Times(1)
transaction2.On("GetTarget").Return("blocked").Times(1)
transaction2.On("GetPayloadSize").Return(0).Times(1)

forwarder.retryTransactions(time.Now())

Expand All @@ -234,3 +241,30 @@ func TestForwarderRetryLimitQueue(t *testing.T) {
// assert that the oldest transaction was dropped
assert.Equal(t, transaction2, forwarder.retryQueue[0])
}

func TestDomainForwarderRetryQueueAllPayloadsMaxSize(t *testing.T) {
oldFlushInterval := flushInterval
defer func() { flushInterval = oldFlushInterval }()
flushInterval = 1 * time.Minute

forwarder := newDomainForwarder("test", 0, 10, 1+2, 0)
forwarder.blockedList.close("blocked")
forwarder.blockedList.errorPerEndpoint["blocked"].until = time.Now().Add(1 * time.Minute)

defer forwarder.Stop(true)
forwarder.Start()

for _, payloadSize := range []int{4, 3, 2, 1} {
tr := newTestTransaction()
tr.On("GetPayloadSize").Return(payloadSize)
tr.On("GetTarget").Return("blocked")
tr.On("GetCreatedAt").Return(time.Now().Add(time.Duration(-payloadSize) * time.Second))
forwarder.retryQueue = append(forwarder.retryQueue, tr)
}

forwarder.retryTransactions(time.Now())

require.Len(t, forwarder.retryQueue, 2)
require.Equal(t, 1, forwarder.retryQueue[0].GetPayloadSize())
require.Equal(t, 2, forwarder.retryQueue[1].GetPayloadSize())
}
37 changes: 24 additions & 13 deletions pkg/forwarder/forwarder.go
Expand Up @@ -182,12 +182,13 @@ var _ Forwarder = &DefaultForwarder{}

// Options contain the configuration options for the DefaultForwarder
type Options struct {
NumberOfWorkers int
RetryQueueSize int
DisableAPIKeyChecking bool
APIKeyValidationInterval time.Duration
KeysPerDomain map[string][]string
ConnectionResetInterval time.Duration
NumberOfWorkers int
RetryQueueSize int
RetryQueuePayloadsTotalMaxSize int
DisableAPIKeyChecking bool
APIKeyValidationInterval time.Duration
KeysPerDomain map[string][]string
ConnectionResetInterval time.Duration
}

// NewOptions creates new Options with default values
Expand All @@ -201,14 +202,24 @@ func NewOptions(keysPerDomain map[string][]string) *Options {
)
validationInterval = config.DefaultAPIKeyValidationInterval
}
const forwarderRetryQueueMaxSizeKey = "forwarder_retry_queue_max_size"
const forwarderRetryQueuePayloadsMaxSizeKey = "forwarder_retry_queue_payloads_max_size"
retryQueueSize := config.Datadog.GetInt(forwarderRetryQueueMaxSizeKey)
retryQueuePayloadsTotalMaxSize := config.Datadog.GetInt(forwarderRetryQueuePayloadsMaxSizeKey)

if retryQueueSize > 0 {
log.Warnf("'%s' is deprecated. It is recommended to use '%s' as it takes the payload sizes into account.", forwarderRetryQueueMaxSizeKey, forwarderRetryQueuePayloadsMaxSizeKey)
retryQueuePayloadsTotalMaxSize = 0
}

return &Options{
NumberOfWorkers: config.Datadog.GetInt("forwarder_num_workers"),
RetryQueueSize: config.Datadog.GetInt("forwarder_retry_queue_max_size"),
DisableAPIKeyChecking: false,
APIKeyValidationInterval: time.Duration(validationInterval) * time.Minute,
KeysPerDomain: keysPerDomain,
ConnectionResetInterval: time.Duration(config.Datadog.GetInt("forwarder_connection_reset_interval")) * time.Second,
NumberOfWorkers: config.Datadog.GetInt("forwarder_num_workers"),
RetryQueueSize: retryQueueSize,
RetryQueuePayloadsTotalMaxSize: retryQueuePayloadsTotalMaxSize,
DisableAPIKeyChecking: false,
APIKeyValidationInterval: time.Duration(validationInterval) * time.Minute,
KeysPerDomain: keysPerDomain,
ConnectionResetInterval: time.Duration(config.Datadog.GetInt("forwarder_connection_reset_interval")) * time.Second,
}
}

Expand Down Expand Up @@ -244,7 +255,7 @@ func NewDefaultForwarder(options *Options) *DefaultForwarder {
log.Errorf("No API keys for domain '%s', dropping domain ", domain)
} else {
f.keysPerDomains[domain] = keys
f.domainForwarders[domain] = newDomainForwarder(domain, options.NumberOfWorkers, options.RetryQueueSize, options.ConnectionResetInterval)
f.domainForwarders[domain] = newDomainForwarder(domain, options.NumberOfWorkers, options.RetryQueueSize, options.RetryQueuePayloadsTotalMaxSize, options.ConnectionResetInterval)
}
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/forwarder/test_common.go
Expand Up @@ -54,6 +54,10 @@ func (t *testTransaction) GetPriority() TransactionPriority {
return TransactionPriorityNormal
}

func (t *testTransaction) GetPayloadSize() int {
return t.Called().Get(0).(int)
}

// Compile-time checking to ensure that MockedForwarder implements Forwarder
var _ Forwarder = &MockedForwarder{}

Expand Down

0 comments on commit b3ff778

Please sign in to comment.