diff --git a/pkg/clients/sqs/queue.go b/pkg/clients/sqs/queue.go index 2c71ace249..cdbb2ac328 100644 --- a/pkg/clients/sqs/queue.go +++ b/pkg/clients/sqs/queue.go @@ -19,7 +19,6 @@ package sqs import ( "context" "encoding/json" - "errors" "strconv" "strings" "time" @@ -30,10 +29,12 @@ import ( xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" "github.com/google/go-cmp/cmp" + "github.com/pkg/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/crossplane-contrib/provider-aws/apis/sqs/v1beta1" "github.com/crossplane-contrib/provider-aws/pkg/utils/pointer" + policyutils "github.com/crossplane-contrib/provider-aws/pkg/utils/policy" ) const ( @@ -180,47 +181,48 @@ func LateInitialize(in *v1beta1.QueueParameters, attributes map[string]string, t } // IsUpToDate checks whether there is a change in any of the modifiable fields. -func IsUpToDate(p v1beta1.QueueParameters, attributes map[string]string, tags map[string]string) bool { //nolint:gocyclo +func IsUpToDate(p v1beta1.QueueParameters, attributes map[string]string, tags map[string]string) (bool, string, error) { //nolint:gocyclo if len(p.Tags) != len(tags) { - return false + return false, "", nil } for k, v := range p.Tags { pVal, ok := tags[k] if !ok || !strings.EqualFold(pVal, v) { - return false + return false, "", nil } } if aws.ToInt64(p.DelaySeconds) != toInt64(attributes[v1beta1.AttributeDelaySeconds]) { - return false + return false, "", nil } if aws.ToInt64(p.KMSDataKeyReusePeriodSeconds) != toInt64(attributes[v1beta1.AttributeKmsDataKeyReusePeriodSeconds]) { - return false + return false, "", nil } if aws.ToInt64(p.MaximumMessageSize) != toInt64(attributes[v1beta1.AttributeMaximumMessageSize]) { - return false + return false, "", nil } if aws.ToInt64(p.MessageRetentionPeriod) != toInt64(attributes[v1beta1.AttributeMessageRetentionPeriod]) { - return false + return false, "", nil } if aws.ToInt64(p.ReceiveMessageWaitTimeSeconds) != toInt64(attributes[v1beta1.AttributeReceiveMessageWaitTimeSeconds]) { - return false + return false, "", nil } if aws.ToInt64(p.VisibilityTimeout) != toInt64(attributes[v1beta1.AttributeVisibilityTimeout]) { - return false + return false, "", nil } if !cmp.Equal(aws.ToString(p.KMSMasterKeyID), attributes[v1beta1.AttributeKmsMasterKeyID]) { - return false + return false, "", nil } - if !cmp.Equal(aws.ToString(p.Policy), attributes[v1beta1.AttributePolicy]) { - return false + isPolicyUpToDate, policyDiff, err := isSQSPolicyUpToDate(pointer.StringValue(p.Policy), attributes[v1beta1.AttributePolicy]) + if !isPolicyUpToDate { + return false, "Policy: " + policyDiff, errors.Wrap(err, "policy") } if attributes[v1beta1.AttributeContentBasedDeduplication] != "" && strconv.FormatBool(aws.ToBool(p.ContentBasedDeduplication)) != attributes[v1beta1.AttributeContentBasedDeduplication] { - return false + return false, "", nil } if attributes[v1beta1.AttributeSqsManagedSseEnabled] != "" && strconv.FormatBool(aws.ToBool(p.SqsManagedSseEnabled)) != attributes[v1beta1.AttributeSqsManagedSseEnabled] { - return false + return false, "", nil } if p.RedrivePolicy != nil { r := map[string]interface{}{ @@ -230,11 +232,31 @@ func IsUpToDate(p v1beta1.QueueParameters, attributes map[string]string, tags ma val, err := json.Marshal(r) if err == nil { if string(val) != attributes[v1beta1.AttributeRedrivePolicy] { - return false + return false, "", nil } } } - return true + return true, "", nil +} + +// isSQSPolicyUpToDate determines whether a SQS queue policy needs to be updated +func isSQSPolicyUpToDate(specPolicyStr, currPolicyStr string) (bool, string, error) { + if specPolicyStr == "" { + return currPolicyStr == "", "", nil + } else if currPolicyStr == "" { + return false, "", nil + } + + currPolicy, err := policyutils.ParsePolicyString(currPolicyStr) + if err != nil { + return false, "", errors.Wrap(err, "current policy") + } + specPolicy, err := policyutils.ParsePolicyString(specPolicyStr) + if err != nil { + return false, "", errors.Wrap(err, "spec policy") + } + equalPolicies, diff := policyutils.ArePoliciesEqal(&currPolicy, &specPolicy) + return equalPolicies, diff, nil } // TagsDiff returns the tags added and removed from spec when compared to the AWS SQS tags. diff --git a/pkg/clients/sqs/queue_test.go b/pkg/clients/sqs/queue_test.go index 8c184cca05..b2a17a30ca 100644 --- a/pkg/clients/sqs/queue_test.go +++ b/pkg/clients/sqs/queue_test.go @@ -17,12 +17,14 @@ limitations under the License. package sqs import ( + _ "embed" "strconv" "testing" "github.com/aws/aws-sdk-go-v2/aws" xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" + "github.com/crossplane/crossplane-runtime/pkg/test" "github.com/google/go-cmp/cmp" "github.com/crossplane-contrib/provider-aws/apis/sqs/v1beta1" @@ -126,16 +128,31 @@ func TestLateInitialize(t *testing.T) { } } +var ( + //go:embed testdata/queue_policy.min.json + testPolicyRawMin string + + //go:embed testdata/queue_policy.json + testPolicyRaw string + + //go:embed testdata/queue_policy2.json + testPolicy2Raw string +) + func TestIsUpToDate(t *testing.T) { type args struct { p v1beta1.QueueParameters attributes map[string]string tags map[string]string } + type want struct { + isUpToDate bool + err error + } cases := map[string]struct { args args - want bool + want want }{ "SameFields": { args: args{ @@ -146,7 +163,9 @@ func TestIsUpToDate(t *testing.T) { v1beta1.AttributeKmsMasterKeyID: kmsMasterKeyID, }, }, - want: true, + want: want{ + isUpToDate: true, + }, }, "DifferentFields": { args: args{ @@ -155,7 +174,35 @@ func TestIsUpToDate(t *testing.T) { }, attributes: map[string]string{}, }, - want: false, + want: want{ + isUpToDate: false, + }, + }, + "SamePolicy": { + args: args{ + p: v1beta1.QueueParameters{ + Policy: &testPolicyRaw, + }, + attributes: map[string]string{ + v1beta1.AttributePolicy: testPolicyRawMin, + }, + }, + want: want{ + isUpToDate: true, + }, + }, + "DifferentPolicy": { + args: args{ + p: v1beta1.QueueParameters{ + Policy: &testPolicy2Raw, + }, + attributes: map[string]string{ + v1beta1.AttributePolicy: testPolicyRawMin, + }, + }, + want: want{ + isUpToDate: false, + }, }, "Tags": { args: args{ @@ -168,15 +215,20 @@ func TestIsUpToDate(t *testing.T) { tagKey: tagValue, }, }, - want: true, + want: want{ + isUpToDate: true, + }, }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { - got := IsUpToDate(tc.args.p, tc.args.attributes, tc.args.tags) - if diff := cmp.Diff(tc.want, got); diff != "" { - t.Errorf("r: -want, +got:\n%s", diff) + isUpToDate, _, err := IsUpToDate(tc.args.p, tc.args.attributes, tc.args.tags) + if diff := cmp.Diff(tc.want.isUpToDate, isUpToDate); diff != "" { + t.Errorf("isUpToDate: -want, +got:\n%s", diff) + } + if diff := cmp.Diff(tc.want.err, err, test.EquateErrors()); diff != "" { + t.Errorf("error: -want, +got:\n%s", diff) } }) } diff --git a/pkg/clients/sqs/testdata/queue_policy.json b/pkg/clients/sqs/testdata/queue_policy.json new file mode 100644 index 0000000000..485cfdbe6a --- /dev/null +++ b/pkg/clients/sqs/testdata/queue_policy.json @@ -0,0 +1,22 @@ +{ + "Version": "2012-10-17", + "Id": "allow-s3", + "Statement": [ + { + "Sid": "allow-s3", + "Effect": "Allow", + "Principal": { + "Service": "s3.amazonaws.com" + }, + "Action": [ + "SQS:SendMessage" + ], + "Resource": "*", + "Condition": { + "ArnLike": { + "aws:SourceArn": "arn:aws:s3:*:*:*" + } + } + } + ] +} diff --git a/pkg/clients/sqs/testdata/queue_policy.min.json b/pkg/clients/sqs/testdata/queue_policy.min.json new file mode 100644 index 0000000000..cb00d82d08 --- /dev/null +++ b/pkg/clients/sqs/testdata/queue_policy.min.json @@ -0,0 +1 @@ +{"Version":"2012-10-17","Id":"allow-s3","Statement":[{"Sid":"allow-s3","Effect":"Allow","Principal":{"Service":"s3.amazonaws.com"},"Action":"SQS:SendMessage","Resource":"*","Condition":{"ArnLike":{"aws:SourceArn":"arn:aws:s3:*:*:*"}}}]} diff --git a/pkg/clients/sqs/testdata/queue_policy2.json b/pkg/clients/sqs/testdata/queue_policy2.json new file mode 100644 index 0000000000..d29ba0b56d --- /dev/null +++ b/pkg/clients/sqs/testdata/queue_policy2.json @@ -0,0 +1,23 @@ +{ + "Version": "2012-10-17", + "Id": "allow-s3", + "Statement": [ + { + "Sid": "allow-s3", + "Effect": "Allow", + "Principal": { + "Service": "s3.amazonaws.com" + }, + "Action": [ + "SQS:SendMessage", + "sqs:ReceiveMessage" + ], + "Resource": "*", + "Condition": { + "ArnLike": { + "aws:SourceArn": "arn:aws:s3:*:*:*" + } + } + } + ] +} diff --git a/pkg/controller/sqs/queue/controller.go b/pkg/controller/sqs/queue/controller.go index 3ca13ac061..d3d14c5052 100644 --- a/pkg/controller/sqs/queue/controller.go +++ b/pkg/controller/sqs/queue/controller.go @@ -53,6 +53,7 @@ const ( errGetQueueURLFailed = "cannot get Queue URL" errListQueueTagsFailed = "cannot list Queue tags" errUpdateFailed = "failed to update the Queue resource" + errIsUpToDate = "cannot check if resource is up to date" ) // SetupQueue adds a controller that reconciles Queue. @@ -152,10 +153,15 @@ func (e *external) Observe(ctx context.Context, mg resource.Managed) (managed.Ex cr.Status.SetConditions(xpv1.Available()) cr.Status.AtProvider = sqs.GenerateQueueObservation(*getURLOutput.QueueUrl, resAttributes.Attributes) + isUpToDate, diff, err := sqs.IsUpToDate(cr.Spec.ForProvider, resAttributes.Attributes, resTags.Tags) + if err != nil { + return managed.ExternalObservation{}, errors.Wrap(err, errIsUpToDate) + } return managed.ExternalObservation{ ResourceExists: true, - ResourceUpToDate: sqs.IsUpToDate(cr.Spec.ForProvider, resAttributes.Attributes, resTags.Tags), + ResourceUpToDate: isUpToDate, + Diff: diff, ConnectionDetails: sqs.GetConnectionDetails(*cr), }, nil }