Skip to content

Commit

Permalink
Merge pull request #1973 from MisterMX/fix/sqs-queue-policy-diff
Browse files Browse the repository at this point in the history
fix(sqs): Structured policy compare
  • Loading branch information
MisterMX committed Dec 18, 2023
2 parents 51d618c + 6924091 commit 0009150
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 25 deletions.
56 changes: 39 additions & 17 deletions pkg/clients/sqs/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package sqs
import (
"context"
"encoding/json"
"errors"
"strconv"
"strings"
"time"
Expand All @@ -30,10 +29,12 @@ import (
xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1"
"github.com/crossplane/crossplane-runtime/pkg/reconciler/managed"
"github.com/google/go-cmp/cmp"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/crossplane-contrib/provider-aws/apis/sqs/v1beta1"
"github.com/crossplane-contrib/provider-aws/pkg/utils/pointer"
policyutils "github.com/crossplane-contrib/provider-aws/pkg/utils/policy"
)

const (
Expand Down Expand Up @@ -180,47 +181,48 @@ func LateInitialize(in *v1beta1.QueueParameters, attributes map[string]string, t
}

// IsUpToDate checks whether there is a change in any of the modifiable fields.
func IsUpToDate(p v1beta1.QueueParameters, attributes map[string]string, tags map[string]string) bool { //nolint:gocyclo
func IsUpToDate(p v1beta1.QueueParameters, attributes map[string]string, tags map[string]string) (bool, string, error) { //nolint:gocyclo
if len(p.Tags) != len(tags) {
return false
return false, "", nil
}

for k, v := range p.Tags {
pVal, ok := tags[k]
if !ok || !strings.EqualFold(pVal, v) {
return false
return false, "", nil
}
}

if aws.ToInt64(p.DelaySeconds) != toInt64(attributes[v1beta1.AttributeDelaySeconds]) {
return false
return false, "", nil
}
if aws.ToInt64(p.KMSDataKeyReusePeriodSeconds) != toInt64(attributes[v1beta1.AttributeKmsDataKeyReusePeriodSeconds]) {
return false
return false, "", nil
}
if aws.ToInt64(p.MaximumMessageSize) != toInt64(attributes[v1beta1.AttributeMaximumMessageSize]) {
return false
return false, "", nil
}
if aws.ToInt64(p.MessageRetentionPeriod) != toInt64(attributes[v1beta1.AttributeMessageRetentionPeriod]) {
return false
return false, "", nil
}
if aws.ToInt64(p.ReceiveMessageWaitTimeSeconds) != toInt64(attributes[v1beta1.AttributeReceiveMessageWaitTimeSeconds]) {
return false
return false, "", nil
}
if aws.ToInt64(p.VisibilityTimeout) != toInt64(attributes[v1beta1.AttributeVisibilityTimeout]) {
return false
return false, "", nil
}
if !cmp.Equal(aws.ToString(p.KMSMasterKeyID), attributes[v1beta1.AttributeKmsMasterKeyID]) {
return false
return false, "", nil
}
if !cmp.Equal(aws.ToString(p.Policy), attributes[v1beta1.AttributePolicy]) {
return false
isPolicyUpToDate, policyDiff, err := isSQSPolicyUpToDate(pointer.StringValue(p.Policy), attributes[v1beta1.AttributePolicy])
if !isPolicyUpToDate {
return false, "Policy: " + policyDiff, errors.Wrap(err, "policy")
}
if attributes[v1beta1.AttributeContentBasedDeduplication] != "" && strconv.FormatBool(aws.ToBool(p.ContentBasedDeduplication)) != attributes[v1beta1.AttributeContentBasedDeduplication] {
return false
return false, "", nil
}
if attributes[v1beta1.AttributeSqsManagedSseEnabled] != "" && strconv.FormatBool(aws.ToBool(p.SqsManagedSseEnabled)) != attributes[v1beta1.AttributeSqsManagedSseEnabled] {
return false
return false, "", nil
}
if p.RedrivePolicy != nil {
r := map[string]interface{}{
Expand All @@ -230,11 +232,31 @@ func IsUpToDate(p v1beta1.QueueParameters, attributes map[string]string, tags ma
val, err := json.Marshal(r)
if err == nil {
if string(val) != attributes[v1beta1.AttributeRedrivePolicy] {
return false
return false, "", nil
}
}
}
return true
return true, "", nil
}

// isSQSPolicyUpToDate determines whether a SQS queue policy needs to be updated
func isSQSPolicyUpToDate(specPolicyStr, currPolicyStr string) (bool, string, error) {
if specPolicyStr == "" {
return currPolicyStr == "", "", nil
} else if currPolicyStr == "" {
return false, "", nil
}

currPolicy, err := policyutils.ParsePolicyString(currPolicyStr)
if err != nil {
return false, "", errors.Wrap(err, "current policy")
}
specPolicy, err := policyutils.ParsePolicyString(specPolicyStr)
if err != nil {
return false, "", errors.Wrap(err, "spec policy")
}
equalPolicies, diff := policyutils.ArePoliciesEqal(&currPolicy, &specPolicy)
return equalPolicies, diff, nil
}

// TagsDiff returns the tags added and removed from spec when compared to the AWS SQS tags.
Expand Down
66 changes: 59 additions & 7 deletions pkg/clients/sqs/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ limitations under the License.
package sqs

import (
_ "embed"
"strconv"
"testing"

"github.com/aws/aws-sdk-go-v2/aws"
xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1"
"github.com/crossplane/crossplane-runtime/pkg/reconciler/managed"
"github.com/crossplane/crossplane-runtime/pkg/test"
"github.com/google/go-cmp/cmp"

"github.com/crossplane-contrib/provider-aws/apis/sqs/v1beta1"
Expand Down Expand Up @@ -126,16 +128,31 @@ func TestLateInitialize(t *testing.T) {
}
}

var (
//go:embed testdata/queue_policy.min.json
testPolicyRawMin string

//go:embed testdata/queue_policy.json
testPolicyRaw string

//go:embed testdata/queue_policy2.json
testPolicy2Raw string
)

func TestIsUpToDate(t *testing.T) {
type args struct {
p v1beta1.QueueParameters
attributes map[string]string
tags map[string]string
}
type want struct {
isUpToDate bool
err error
}

cases := map[string]struct {
args args
want bool
want want
}{
"SameFields": {
args: args{
Expand All @@ -146,7 +163,9 @@ func TestIsUpToDate(t *testing.T) {
v1beta1.AttributeKmsMasterKeyID: kmsMasterKeyID,
},
},
want: true,
want: want{
isUpToDate: true,
},
},
"DifferentFields": {
args: args{
Expand All @@ -155,7 +174,35 @@ func TestIsUpToDate(t *testing.T) {
},
attributes: map[string]string{},
},
want: false,
want: want{
isUpToDate: false,
},
},
"SamePolicy": {
args: args{
p: v1beta1.QueueParameters{
Policy: &testPolicyRaw,
},
attributes: map[string]string{
v1beta1.AttributePolicy: testPolicyRawMin,
},
},
want: want{
isUpToDate: true,
},
},
"DifferentPolicy": {
args: args{
p: v1beta1.QueueParameters{
Policy: &testPolicy2Raw,
},
attributes: map[string]string{
v1beta1.AttributePolicy: testPolicyRawMin,
},
},
want: want{
isUpToDate: false,
},
},
"Tags": {
args: args{
Expand All @@ -168,15 +215,20 @@ func TestIsUpToDate(t *testing.T) {
tagKey: tagValue,
},
},
want: true,
want: want{
isUpToDate: true,
},
},
}

for name, tc := range cases {
t.Run(name, func(t *testing.T) {
got := IsUpToDate(tc.args.p, tc.args.attributes, tc.args.tags)
if diff := cmp.Diff(tc.want, got); diff != "" {
t.Errorf("r: -want, +got:\n%s", diff)
isUpToDate, _, err := IsUpToDate(tc.args.p, tc.args.attributes, tc.args.tags)
if diff := cmp.Diff(tc.want.isUpToDate, isUpToDate); diff != "" {
t.Errorf("isUpToDate: -want, +got:\n%s", diff)
}
if diff := cmp.Diff(tc.want.err, err, test.EquateErrors()); diff != "" {
t.Errorf("error: -want, +got:\n%s", diff)
}
})
}
Expand Down
22 changes: 22 additions & 0 deletions pkg/clients/sqs/testdata/queue_policy.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"Version": "2012-10-17",
"Id": "allow-s3",
"Statement": [
{
"Sid": "allow-s3",
"Effect": "Allow",
"Principal": {
"Service": "s3.amazonaws.com"
},
"Action": [
"SQS:SendMessage"
],
"Resource": "*",
"Condition": {
"ArnLike": {
"aws:SourceArn": "arn:aws:s3:*:*:*"
}
}
}
]
}
1 change: 1 addition & 0 deletions pkg/clients/sqs/testdata/queue_policy.min.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"Version":"2012-10-17","Id":"allow-s3","Statement":[{"Sid":"allow-s3","Effect":"Allow","Principal":{"Service":"s3.amazonaws.com"},"Action":"SQS:SendMessage","Resource":"*","Condition":{"ArnLike":{"aws:SourceArn":"arn:aws:s3:*:*:*"}}}]}
23 changes: 23 additions & 0 deletions pkg/clients/sqs/testdata/queue_policy2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"Version": "2012-10-17",
"Id": "allow-s3",
"Statement": [
{
"Sid": "allow-s3",
"Effect": "Allow",
"Principal": {
"Service": "s3.amazonaws.com"
},
"Action": [
"SQS:SendMessage",
"sqs:ReceiveMessage"
],
"Resource": "*",
"Condition": {
"ArnLike": {
"aws:SourceArn": "arn:aws:s3:*:*:*"
}
}
}
]
}
8 changes: 7 additions & 1 deletion pkg/controller/sqs/queue/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ const (
errGetQueueURLFailed = "cannot get Queue URL"
errListQueueTagsFailed = "cannot list Queue tags"
errUpdateFailed = "failed to update the Queue resource"
errIsUpToDate = "cannot check if resource is up to date"
)

// SetupQueue adds a controller that reconciles Queue.
Expand Down Expand Up @@ -152,10 +153,15 @@ func (e *external) Observe(ctx context.Context, mg resource.Managed) (managed.Ex
cr.Status.SetConditions(xpv1.Available())

cr.Status.AtProvider = sqs.GenerateQueueObservation(*getURLOutput.QueueUrl, resAttributes.Attributes)
isUpToDate, diff, err := sqs.IsUpToDate(cr.Spec.ForProvider, resAttributes.Attributes, resTags.Tags)
if err != nil {
return managed.ExternalObservation{}, errors.Wrap(err, errIsUpToDate)
}

return managed.ExternalObservation{
ResourceExists: true,
ResourceUpToDate: sqs.IsUpToDate(cr.Spec.ForProvider, resAttributes.Attributes, resTags.Tags),
ResourceUpToDate: isUpToDate,
Diff: diff,
ConnectionDetails: sqs.GetConnectionDetails(*cr),
}, nil
}
Expand Down

0 comments on commit 0009150

Please sign in to comment.