Skip to content

Commit

Permalink
fix(aws-sqs): Respect scaleOnInFlight value (kedacore#4358)
Browse files Browse the repository at this point in the history
Signed-off-by: Jorge Turrado <jorge_turrado@hotmail.es>
  • Loading branch information
JorTurFer committed Apr 13, 2023
1 parent c56c3cc commit 93e04bd
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 13 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

### Fixes

- TODO ([#XXX](https://github.com/kedacore/keda/issue/XXX))
- **AWS SQS Scaler**: Respect `scaleOnInFlight` value ([#4276](https://github.com/kedacore/keda/issue/4276))

### Deprecations

Expand Down
19 changes: 12 additions & 7 deletions pkg/scalers/aws_sqs_queue_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@ const (
defaultScaleOnInFlight = true
)

var awsSqsQueueMetricNames = []string{
var awsSqsQueueMetricNamesForScalingInFlight = []string{
"ApproximateNumberOfMessages",
"ApproximateNumberOfMessagesNotVisible",
}

var awsSqsQueueMetricNamesForNotScalingInFlight = []string{
"ApproximateNumberOfMessages",
}

type awsSqsQueueScaler struct {
metricType v2.MetricTargetType
metadata *awsSqsQueueMetadata
Expand All @@ -45,6 +49,7 @@ type awsSqsQueueMetadata struct {
awsAuthorization awsAuthorizationMetadata
scalerIndex int
scaleOnInFlight bool
awsSqsQueueMetricNames []string
}

// NewAwsSqsQueueScaler creates a new awsSqsQueueScaler
Expand Down Expand Up @@ -104,10 +109,10 @@ func parseAwsSqsQueueMetadata(config *ScalerConfig, logger logr.Logger) (*awsSqs
}
}

if !meta.scaleOnInFlight {
awsSqsQueueMetricNames = []string{
"ApproximateNumberOfMessages",
}
if meta.scaleOnInFlight {
meta.awsSqsQueueMetricNames = awsSqsQueueMetricNamesForScalingInFlight
} else {
meta.awsSqsQueueMetricNames = awsSqsQueueMetricNamesForNotScalingInFlight
}

if val, ok := config.TriggerMetadata["queueURL"]; ok && val != "" {
Expand Down Expand Up @@ -198,7 +203,7 @@ func (s *awsSqsQueueScaler) GetMetricsAndActivity(ctx context.Context, metricNam
// Get SQS Queue Length
func (s *awsSqsQueueScaler) getAwsSqsQueueLength() (int64, error) {
input := &sqs.GetQueueAttributesInput{
AttributeNames: aws.StringSlice(awsSqsQueueMetricNames),
AttributeNames: aws.StringSlice(s.metadata.awsSqsQueueMetricNames),
QueueUrl: aws.String(s.metadata.queueURL),
}

Expand All @@ -208,7 +213,7 @@ func (s *awsSqsQueueScaler) getAwsSqsQueueLength() (int64, error) {
}

var approximateNumberOfMessages int64
for _, awsSqsQueueMetric := range awsSqsQueueMetricNames {
for _, awsSqsQueueMetric := range s.metadata.awsSqsQueueMetricNames {
metricValue, err := strconv.ParseInt(*output.Attributes[awsSqsQueueMetric], 10, 32)
if err != nil {
return -1, err
Expand Down
48 changes: 43 additions & 5 deletions pkg/scalers/aws_sqs_queue_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,43 @@ var awsSQSMetricIdentifiers = []awsSQSMetricIdentifier{
{&testAWSSQSMetadata[1], 1, "s1-aws-sqs-DeleteArtifactQ"},
}

var awsSQSGetMetricTestData = []*awsSqsQueueMetadata{
{queueURL: testAWSSQSProperQueueURL},
{queueURL: testAWSSQSErrorQueueURL},
{queueURL: testAWSSQSBadDataQueueURL},
var awsSQSGetMetricTestData = []*parseAWSSQSMetadataTestData{
{map[string]string{
"queueURL": testAWSSQSProperQueueURL,
"queueLength": "1",
"awsRegion": "eu-west-1",
"scaleOnInFlight": "false"},
testAWSSQSAuthentication,
testAWSSQSEmptyResolvedEnv,
false,
"not error with scaleOnInFlight disabled"},
{map[string]string{
"queueURL": testAWSSQSProperQueueURL,
"queueLength": "1",
"awsRegion": "eu-west-1",
"scaleOnInFlight": "true"},
testAWSSQSAuthentication,
testAWSSQSEmptyResolvedEnv,
false,
"not error with scaleOnInFlight enabled"},
{map[string]string{
"queueURL": testAWSSQSErrorQueueURL,
"queueLength": "1",
"awsRegion": "eu-west-1",
"scaleOnInFlight": "false"},
testAWSSQSAuthentication,
testAWSSQSEmptyResolvedEnv,
false,
"error queue"},
{map[string]string{
"queueURL": testAWSSQSBadDataQueueURL,
"queueLength": "1",
"awsRegion": "eu-west-1",
"scaleOnInFlight": "true"},
testAWSSQSAuthentication,
testAWSSQSEmptyResolvedEnv,
false,
"bad data"},
}

func TestSQSParseMetadata(t *testing.T) {
Expand Down Expand Up @@ -343,8 +376,13 @@ func TestAWSSQSGetMetricSpecForScaling(t *testing.T) {
}

func TestAWSSQSScalerGetMetrics(t *testing.T) {
for _, meta := range awsSQSGetMetricTestData {
for index, testData := range awsSQSGetMetricTestData {
meta, err := parseAwsSqsQueueMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testData.resolvedEnv, AuthParams: testData.authParams, ScalerIndex: index}, logr.Discard())
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
scaler := awsSqsQueueScaler{"", meta, &mockSqs{}, logr.Discard()}

value, _, err := scaler.GetMetricsAndActivity(context.Background(), "MetricName")
switch meta.queueURL {
case testAWSSQSErrorQueueURL:
Expand Down

0 comments on commit 93e04bd

Please sign in to comment.