Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding aws-s3 metric for delayed time #34306

Merged
merged 22 commits into from Jan 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Expand Up @@ -176,6 +176,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Allow user configuration of keep-alive behaviour for HTTPJSON and CEL inputs. {issue}33951[33951] {pull}34014[34014]
- Add support for polling system UDP stats for UDP input metrics. {pull}34070[34070]
- Add support for recognizing the log level in Elasticsearch JVM logs {pull}34159[34159]
- Added metric `sqs_lag_time` for aws-s3 input. {pull}34306[34306]
- Add metrics for TCP packet processing. {pull}34333[34333]
- Add metrics for unix socket packet processing. {pull}34335[34335]
- Add beta `take over` mode for `filestream` for simple migration from `log` inputs {pull}34292[34292]
Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Expand Up @@ -733,6 +733,7 @@ observe the activity of the input.
| `sqs_messages_returned_total` | Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes).
| `sqs_messages_deleted_total` | Number of SQS messages deleted.
| `sqs_message_processing_time` | Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return).
| `sqs_lag_time` | Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds.
| `s3_objects_requested_total` | Number of S3 objects downloaded.
| `s3_objects_listed_total` | Number of S3 objects returned by list operations.
| `s3_objects_processed_total` | Number of S3 objects that matched file_selectors rules.
Expand Down
2 changes: 2 additions & 0 deletions x-pack/filebeat/input/awss3/input_integration_test.go
Expand Up @@ -190,6 +190,7 @@ func TestInputRunSQS(t *testing.T) {
assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0)
assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12)
assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0)
}

func TestInputRunS3(t *testing.T) {
Expand Down Expand Up @@ -426,4 +427,5 @@ func TestInputRunSNS(t *testing.T) {
assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0)
assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12)
assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0)
}
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awss3/interfaces.go
Expand Up @@ -138,7 +138,7 @@ func (a *awsSQSAPI) ReceiveMessage(ctx context.Context, maxMessages int) ([]type
MaxNumberOfMessages: int32(min(maxMessages, sqsMaxNumberOfMessagesLimit)),
VisibilityTimeout: int32(a.visibilityTimeout.Seconds()),
WaitTimeSeconds: int32(a.longPollWaitTime.Seconds()),
AttributeNames: []types.QueueAttributeName{sqsApproximateReceiveCountAttribute},
AttributeNames: []types.QueueAttributeName{sqsApproximateReceiveCountAttribute, sqsSentTimestampAttribute},
})
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
Expand Down
4 changes: 4 additions & 0 deletions x-pack/filebeat/input/awss3/metrics.go
Expand Up @@ -23,6 +23,7 @@ type inputMetrics struct {
sqsMessagesReturnedTotal *monitoring.Uint // Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes).
sqsMessagesDeletedTotal *monitoring.Uint // Number of SQS messages deleted.
sqsMessageProcessingTime metrics.Sample // Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return).
sqsLagTime metrics.Sample // Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds.

s3ObjectsRequestedTotal *monitoring.Uint // Number of S3 objects downloaded.
s3ObjectsAckedTotal *monitoring.Uint // Number of S3 objects processed that were fully ACKed.
Expand Down Expand Up @@ -50,6 +51,7 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry) *inputMetri
sqsMessagesReturnedTotal: monitoring.NewUint(reg, "sqs_messages_returned_total"),
sqsMessagesDeletedTotal: monitoring.NewUint(reg, "sqs_messages_deleted_total"),
sqsMessageProcessingTime: metrics.NewUniformSample(1024),
sqsLagTime: metrics.NewUniformSample(1024),
s3ObjectsRequestedTotal: monitoring.NewUint(reg, "s3_objects_requested_total"),
s3ObjectsAckedTotal: monitoring.NewUint(reg, "s3_objects_acked_total"),
s3ObjectsListedTotal: monitoring.NewUint(reg, "s3_objects_listed_total"),
Expand All @@ -61,6 +63,8 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry) *inputMetri
}
adapter.NewGoMetrics(reg, "sqs_message_processing_time", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.sqsMessageProcessingTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible.
adapter.NewGoMetrics(reg, "sqs_lag_time", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.sqsLagTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible.
adapter.NewGoMetrics(reg, "s3_object_processing_time", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.s3ObjectProcessingTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible.
return out
Expand Down
34 changes: 28 additions & 6 deletions x-pack/filebeat/input/awss3/sqs_s3_event.go
Expand Up @@ -27,6 +27,7 @@ import (

const (
sqsApproximateReceiveCountAttribute = "ApproximateReceiveCount"
sqsSentTimestampAttribute = "SentTimestamp"
sqsInvalidParameterValueErrorCode = "InvalidParameterValue"
sqsReceiptHandleIsInvalidErrCode = "ReceiptHandleIsInvalid"
)
Expand Down Expand Up @@ -133,6 +134,18 @@ func (p *sqsS3EventProcessor) ProcessSQS(ctx context.Context, msg *types.Message
keepaliveWg.Add(1)
go p.keepalive(keepaliveCtx, log, &keepaliveWg, msg)

receiveCount := getSQSReceiveCount(msg.Attributes)
if receiveCount == 1 {
// Only contribute to the sqs_lag_time histogram on the first message
// to avoid skewing the metric when processing retries.
if s, found := msg.Attributes[sqsSentTimestampAttribute]; found {
if sentTimeMillis, err := strconv.ParseInt(s, 10, 64); err == nil {
sentTime := time.UnixMilli(sentTimeMillis)
p.metrics.sqsLagTime.Update(time.Since(sentTime).Nanoseconds())
}
}
}

handles, processingErr := p.processS3Events(ctx, log, *msg.Body)

// Stop keepalive routine before changing visibility.
Expand All @@ -155,12 +168,10 @@ func (p *sqsS3EventProcessor) ProcessSQS(ctx context.Context, msg *types.Message
if p.maxReceiveCount > 0 && !errors.Is(processingErr, &nonRetryableError{}) {
// Prevent poison pill messages from consuming all workers. Check how
// many times this message has been received before making a disposition.
if v, found := msg.Attributes[sqsApproximateReceiveCountAttribute]; found {
if receiveCount, err := strconv.Atoi(v); err == nil && receiveCount >= p.maxReceiveCount {
processingErr = nonRetryableErrorWrap(fmt.Errorf(
"sqs ApproximateReceiveCount <%v> exceeds threshold %v: %w",
receiveCount, p.maxReceiveCount, processingErr))
}
if receiveCount >= p.maxReceiveCount {
processingErr = nonRetryableErrorWrap(fmt.Errorf(
"sqs ApproximateReceiveCount <%v> exceeds threshold %v: %w",
receiveCount, p.maxReceiveCount, processingErr))
}
}

kgeller marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -350,3 +361,14 @@ func (p *sqsS3EventProcessor) finalizeS3Objects(handles []s3ObjectHandler) error
}
return multierr.Combine(errs...)
}

// getSQSReceiveCount returns the SQS ApproximateReceiveCount attribute. If the value
// cannot be read then -1 is returned.
func getSQSReceiveCount(attributes map[string]string) int {
if s, found := attributes[sqsApproximateReceiveCountAttribute]; found {
if receiveCount, err := strconv.Atoi(s); err == nil {
return receiveCount
}
}
return -1
}