Skip to content

Commit

Permalink
fix(kafka/cluster): clusterpolicy update + delete
Browse files Browse the repository at this point in the history
Signed-off-by: Charel Baum (external expert on behalf of DB Netz AG) <charel.baum-extern@deutschebahn.com>
  • Loading branch information
Charel Baum (external expert on behalf of DB Netz AG) committed Oct 18, 2023
1 parent dd7f4a0 commit 67ef42d
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 16 deletions.
6 changes: 6 additions & 0 deletions apis/kafka/generator-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,9 @@ resources:
errors:
404:
code: NotFoundException
fields:
ClusterPolicyVersion:
is_read_only: true
from:
operation: GetClusterPolicy
path: CurrentVersion
2 changes: 2 additions & 0 deletions apis/kafka/v1alpha1/zz_cluster.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions apis/kafka/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions package/crds/kafka.aws.crossplane.io_clusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1295,6 +1295,9 @@ spec:
clusterARN:
description: The Amazon Resource Name (ARN) of the cluster.
type: string
clusterPolicyVersion:
description: Cluster policy version.
type: string
state:
description: The state of the cluster. The possible states are
ACTIVE, CREATING, DELETING, FAILED, HEALING, MAINTENANCE, REBOOTING_BROKER,
Expand Down
37 changes: 21 additions & 16 deletions pkg/controller/kafka/cluster/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,47 +361,51 @@ func (u *hooks) isUpToDate(ctx context.Context, wanted *svcapitypes.Cluster, cur
return false, diff, nil
}

clusterPolicyState, diff, err := u.getClusterPolicyState(ctx, wanted)
clusterPolicyState, clusterPolicyVersion, diff, err := u.getClusterPolicyState(ctx, wanted)
u.cache.clusterPolicyState = clusterPolicyState
// write clusterPolicy currentVersion into status to be used in potential update and to be visible for user
wanted.Status.AtProvider.ClusterPolicyVersion = clusterPolicyVersion
if clusterPolicyState != subResourceOK {
return false, diff, err
}

return true, "", nil
}

func (u *hooks) getClusterPolicyState(ctx context.Context, wanted *svcapitypes.Cluster) (subResourceState, string, error) {
func (u *hooks) getClusterPolicyState(ctx context.Context, wanted *svcapitypes.Cluster) (subResourceState, *string, string, error) {
res, err := u.client.GetClusterPolicyWithContext(ctx, &svcsdk.GetClusterPolicyInput{
ClusterArn: pointer.String(meta.GetExternalName(wanted)),
})
if IsNotFound(err) {
if wanted.Spec.ForProvider.ClusterPolicy == nil {
return subResourceOK, "spec.forProvider.clusterPolicy", nil
return subResourceOK, nil, "", nil
}
return subResourceNeedsUpdate, "spec.forProvider.clusterPolicy", nil
return subResourceNeedsUpdate, nil, "spec.forProvider.clusterPolicy", nil
}
if err != nil {
return subResourceOK, "", errors.Wrap(err, errGetClusterPolicy)
return subResourceOK, nil, "", errorutils.Wrap(err, errGetClusterPolicy)
}

if res.Policy == nil {
if wanted.Spec.ForProvider.ClusterPolicy == nil {
return subResourceNeedsDeletion, "spec.forProvider.clusterPolicy", nil
return subResourceOK, nil, "", nil
}
return subResourceOK, "spec.forProvider.clusterPolicy", nil
return subResourceNeedsUpdate, nil, "spec.forProvider.clusterPolicy", nil
} else if wanted.Spec.ForProvider.ClusterPolicy == nil {
return subResourceNeedsDeletion, nil, "spec.forProvider.clusterPolicy", nil
}

currentPolicy, err := policy.ParsePolicyString(*res.Policy)
if err != nil {
return subResourceOK, "", errors.Wrap(err, errParseClusterPolicy)
return subResourceOK, nil, "", errorutils.Wrap(err, errParseClusterPolicy)
}
wantedPolicy := policy.ConvertResourcePolicyToPolicy(wanted.Spec.ForProvider.ClusterPolicy)

equal, diff := policy.ArePoliciesEqal(wantedPolicy, &currentPolicy)
if !equal {
return subResourceNeedsUpdate, diff, nil
return subResourceNeedsUpdate, res.CurrentVersion, diff, nil
}
return subResourceOK, diff, nil
return subResourceOK, nil, diff, nil
}

func isConnectivityInfoUpToDate(wanted *svcapitypes.CustomBrokerNodeGroupInfo, current *svcsdk.BrokerNodeGroupInfo) (bool, string) {
Expand Down Expand Up @@ -981,28 +985,29 @@ func (u *hooks) update(ctx context.Context, mg resource.Managed) (managed.Extern
CurrentVersion: currentVersion,
})
if err != nil {
return managed.ExternalUpdate{}, errors.Wrap(err, errUpdateConnectivity)
return managed.ExternalUpdate{}, errorutils.Wrap(err, errUpdateConnectivity)
}
}

if u.cache.clusterPolicyState == subResourceNeedsUpdate {
policyRaw, err := policy.ConvertResourcePolicyToPolicyString(wanted.ClusterPolicy)
if err != nil {
return managed.ExternalUpdate{}, errors.Wrap(err, errMarshalClusterPolicy)
return managed.ExternalUpdate{}, errorutils.Wrap(err, errMarshalClusterPolicy)
}
_, err = u.client.PutClusterPolicyWithContext(ctx, &svcsdk.PutClusterPolicyInput{
ClusterArn: &currentARN,
Policy: policyRaw,
ClusterArn: &currentARN,
CurrentVersion: cr.Status.AtProvider.ClusterPolicyVersion,
Policy: policyRaw,
})
if err != nil {
return managed.ExternalUpdate{}, errors.Wrap(err, errPutClusterPolicy)
return managed.ExternalUpdate{}, errorutils.Wrap(err, errPutClusterPolicy)
}
} else if u.cache.clusterPolicyState == subResourceNeedsDeletion {
_, err := u.client.DeleteClusterPolicyWithContext(ctx, &svcsdk.DeleteClusterPolicyInput{
ClusterArn: &currentARN,
})
if err != nil {
return managed.ExternalUpdate{}, errors.Wrap(err, errDeleteClusterPolicy)
return managed.ExternalUpdate{}, errorutils.Wrap(err, errDeleteClusterPolicy)
}
}
}
Expand Down

0 comments on commit 67ef42d

Please sign in to comment.