Skip to content

Commit

Permalink
Address more comments and add logic in stale controller
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Ding <dingyang@vmware.com>
  • Loading branch information
Dyanngg committed Mar 8, 2022
1 parent a3bf01a commit 7c97846
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 56 deletions.
6 changes: 3 additions & 3 deletions docs/multicluster/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ To achieve such ACNP copy-span, admins can, in the acting leader cluster of a Mu
create a ResourceExport of kind `AntreaClusterNetworkPolicy` which contains the ClusterNetworkPolicy spec
they wish to be replicated. The ResourceExport should be created in the Namespace which implements the
Common Area of the ClusterSet. In future releases, some additional tooling may become available to
automate the creation of such ResourceExport and make ACNP replication across cluster eaiser.
automate the creation of such ResourceExport and make ACNP replication across clusters eaiser.

```yaml
apiVersion: multicluster.crd.antrea.io/v1alpha1
Expand Down Expand Up @@ -135,7 +135,7 @@ The above sample spec will create an ACNP in each member cluster which implement
isolation for that cluster.

Note that because the Tier that an ACNP refers to must exist before the ACNP is applied, an importing
cluster may fail to create the ACNP to be replicated, if the tier in the ResourceExport spec cannot be
cluster may fail to create the ACNP to be replicated, if the Tier in the ResourceExport spec cannot be
found in that particular cluster. The ACNP creation status of each member cluster will be reported back
to the Common Area as K8s Events, and can be checked by describing the ResourceImport of the original
ResourceExport:
Expand Down Expand Up @@ -172,6 +172,6 @@ Spec:
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal ACNPImportSucceeded 2m11s resourceimport-controller ACNP successfully created in the importing cluster test-cluster-east
Normal ACNPImportSucceeded 2m11s resourceimport-controller ACNP successfully synced in the importing cluster test-cluster-east
Warning ACNPImportFailed 2m11s resourceimport-controller ACNP Tier does not exist in the importing cluster test-cluster-west
```
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,10 @@ func (r *ResourceImportReconciler) handleResImpUpdateForClusterNetworkPolicy(ctx
}
if !acnpNotFound {
if _, ok := acnp.Annotations[common.AntreaMCACNPAnnotation]; !ok {
err := errors.New("unable to import Antrea ClusterNetworkPolicy which conflicts with existing one")
msg := "unable to import Antrea ClusterNetworkPolicy which conflicts with existing one in cluster " + r.localClusterID
err := errors.New(msg)
klog.ErrorS(err, "", "acnp", klog.KObj(acnp))
return ctrl.Result{}, err
return ctrl.Result{}, r.reportStatusEvent(msg, ctx, resImp)
}
}
acnpObj := getMCAntreaClusterPolicy(resImp)
Expand All @@ -78,57 +79,32 @@ func (r *ResourceImportReconciler) handleResImpUpdateForClusterNetworkPolicy(ctx
// Create or update the ACNP if necessary.
if acnpNotFound {
if err = r.localClusterClient.Create(ctx, acnpObj, &client.CreateOptions{}); err != nil {
klog.ErrorS(err, "failed to create imported Antrea ClusterNetworkPolicy", "acnp", klog.KObj(acnpObj))
return ctrl.Result{}, err
msg := "failed to create imported Antrea ClusterNetworkPolicy in cluster " + r.localClusterID
klog.ErrorS(err, msg, "acnp", klog.KObj(acnpObj))
return ctrl.Result{}, r.reportStatusEvent(msg, ctx, resImp)
}
} else if !apiequality.Semantic.DeepEqual(acnp.Spec, acnpObj.Spec) {
acnp.Spec = acnpObj.Spec
if err = r.localClusterClient.Update(ctx, acnp, &client.UpdateOptions{}); err != nil {
klog.ErrorS(err, "failed to update imported Antrea ClusterNetworkPolicy", "acnp", klog.KObj(acnpObj))
return ctrl.Result{}, err
msg := "failed to update imported Antrea ClusterNetworkPolicy in cluster " + r.localClusterID
klog.ErrorS(err, msg, "acnp", klog.KObj(acnpObj))
return ctrl.Result{}, r.reportStatusEvent(msg, ctx, resImp)
}
}
} else if tierNotFound && !acnpNotFound {
// The ACNP Tier does not exist, and the policy cannot be realized in this particular importing member cluster.
// If there is an ACNP previously created via import (which has a valid Tier by then), it should be cleaned up.
if err = r.localClusterClient.Delete(ctx, acnpObj, &client.DeleteOptions{}); err != nil {
klog.ErrorS(err, "failed to delete imported Antrea ClusterNetworkPolicy that no longer has a valid Tier for the current cluster", "acnp", klog.KObj(acnpObj))
return ctrl.Result{}, err
msg := "failed to delete imported Antrea ClusterNetworkPolicy that no longer has a valid Tier for cluster " + r.localClusterID
klog.ErrorS(err, msg, "acnp", klog.KObj(acnpObj))
return ctrl.Result{}, r.reportStatusEvent(msg, ctx, resImp)
}
}

statusEvent := &corev1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: randName(acnpImportStatusPrefix + r.localClusterID + "-"),
Namespace: resImp.Namespace,
},
InvolvedObject: corev1.ObjectReference{
APIVersion: resourceImportAPIVersion,
Kind: resourceImportKind,
Name: resImp.Name,
Namespace: resImp.Namespace,
UID: resImp.GetUID(),
},
FirstTimestamp: metav1.Now(),
LastTimestamp: metav1.Now(),
ReportingController: acnpEventReportingController,
ReportingInstance: acnpEventReportingInstance,
Action: "reconciled",
}
if tierNotFound {
statusEvent.Type = corev1.EventTypeWarning
statusEvent.Reason = acnpImportFailed
statusEvent.Message = "ACNP Tier does not exist in the importing cluster " + r.localClusterID
} else {
statusEvent.Type = corev1.EventTypeNormal
statusEvent.Reason = acnpImportSucceeded
statusEvent.Message = "ACNP successfully created in the importing cluster " + r.localClusterID
msg := "ACNP Tier does not exist in the importing cluster " + r.localClusterID
return ctrl.Result{}, r.reportStatusEvent(msg, ctx, resImp)
}
if err = r.remoteCommonArea.Create(ctx, statusEvent, &client.CreateOptions{}); err != nil {
klog.ErrorS(err, "failed to create acnp import event for resourceimport", "resImp", klog.KObj(resImp))
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
return ctrl.Result{}, r.reportStatusEvent("", ctx, resImp)
}

func (r *ResourceImportReconciler) handleResImpDeleteForClusterNetworkPolicy(ctx context.Context, resImp *multiclusterv1alpha1.ResourceImport) (ctrl.Result, error) {
Expand Down Expand Up @@ -170,6 +146,41 @@ func getMCAntreaClusterPolicy(resImp *multiclusterv1alpha1.ResourceImport) *v1al
}
}

func (r *ResourceImportReconciler) reportStatusEvent(errMsg string, ctx context.Context, resImp *multiclusterv1alpha1.ResourceImport) error {
statusEvent := &corev1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: randName(acnpImportStatusPrefix + r.localClusterID + "-"),
Namespace: resImp.Namespace,
},
InvolvedObject: corev1.ObjectReference{
APIVersion: resourceImportAPIVersion,
Kind: resourceImportKind,
Name: resImp.Name,
Namespace: resImp.Namespace,
UID: resImp.GetUID(),
},
FirstTimestamp: metav1.Now(),
LastTimestamp: metav1.Now(),
ReportingController: acnpEventReportingController,
ReportingInstance: acnpEventReportingInstance,
Action: "reconciled",
}
if errMsg != "" {
statusEvent.Type = corev1.EventTypeWarning
statusEvent.Reason = acnpImportFailed
statusEvent.Message = errMsg
} else {
statusEvent.Type = corev1.EventTypeNormal
statusEvent.Reason = acnpImportSucceeded
statusEvent.Message = "ACNP successfully synced in the importing cluster " + r.localClusterID
}
if err := r.remoteCommonArea.Create(ctx, statusEvent, &client.CreateOptions{}); err != nil {
klog.ErrorS(err, "failed to create acnp import event for resourceimport", "resImp", klog.KObj(resImp))
return err
}
return nil
}

func randSeq(n int) string {
b := make([]rune, n)
for i := range b {
Expand Down
62 changes: 48 additions & 14 deletions multicluster/controllers/multicluster/stale_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
mcsv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1"
"antrea.io/antrea/multicluster/controllers/multicluster/common"
"antrea.io/antrea/multicluster/controllers/multicluster/commonarea"
crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1"
)

// StaleController will clean up ServiceImport and MC Service if no corresponding ResourceImport
Expand Down Expand Up @@ -68,17 +69,29 @@ func (c *StaleController) cleanup() error {
if *c.remoteCommonAreaManager == nil {
return errors.New("ClusterSet has not been initialized properly, no available remote common area")
}

remoteCluster, err := getRemoteCommonArea(c.remoteCommonAreaManager)
if err != nil {
return err
}

localClusterID := string((*c.remoteCommonAreaManager).GetLocalClusterID())
if len(localClusterID) == 0 {
return errors.New("localClusterID is not initialized, retry later")
}
resImpList := &mcsv1alpha1.ResourceImportList{}
if err := remoteCluster.List(ctx, resImpList, &client.ListOptions{Namespace: remoteCluster.GetNamespace()}); err != nil {
return err
}
if err := c.cleanupStaleServiceResources(remoteCluster, localClusterID, resImpList); err != nil {
return err
}
if err := c.cleanupACNPResources(resImpList); err != nil {
return err
}
return nil
}

func (c *StaleController) cleanupStaleServiceResources(remoteCluster commonarea.RemoteCommonArea,
localClusterID string, resImpList *mcsv1alpha1.ResourceImportList) error {
svcImpList := &k8smcsv1alpha1.ServiceImportList{}
if err := c.List(ctx, svcImpList, &client.ListOptions{}); err != nil {
return err
Expand All @@ -89,11 +102,6 @@ func (c *StaleController) cleanup() error {
return err
}

resImpList := &mcsv1alpha1.ResourceImportList{}
if err := remoteCluster.List(ctx, resImpList, &client.ListOptions{Namespace: remoteCluster.GetNamespace()}); err != nil {
return err
}

svcImpItems := svcImpList.Items
var mcsSvcItems []corev1.Service
for _, svc := range svcList.Items {
Expand All @@ -104,12 +112,11 @@ func (c *StaleController) cleanup() error {

for _, resImp := range resImpList.Items {
for k, svc := range mcsSvcItems {
if svc.Name == common.AntreaMCSPrefix+resImp.Spec.Name && svc.Namespace == resImp.Spec.Namespace {
if resImp.Spec.Kind == common.ServiceKind && svc.Name == common.AntreaMCSPrefix+resImp.Spec.Name && svc.Namespace == resImp.Spec.Namespace {
// Set the valid Service item as empty Service, then all left non-empty items should be removed.
mcsSvcItems[k] = corev1.Service{}
}
}

for n, svcImp := range svcImpItems {
if svcImp.Name == resImp.Spec.Name && svcImp.Namespace == resImp.Spec.Namespace {
svcImpItems[n] = k8smcsv1alpha1.ServiceImport{}
Expand All @@ -120,18 +127,17 @@ func (c *StaleController) cleanup() error {
for _, svc := range mcsSvcItems {
s := svc
if s.Name != "" {
klog.InfoS("clean up Service", "service", klog.KObj(&s))
klog.InfoS("Cleaning up stale Service", "service", klog.KObj(&s))
if err := c.Client.Delete(ctx, &s, &client.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
return err
}
}
}

for _, svcImp := range svcImpItems {
si := svcImp
if si.Name != "" {
klog.InfoS("clean up ServiceImport", "serviceimport", klog.KObj(&si))
if err = c.Client.Delete(ctx, &si, &client.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
klog.InfoS("Cleaning up stale ServiceImport", "serviceimport", klog.KObj(&si))
if err := c.Client.Delete(ctx, &si, &client.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
return err
}
}
Expand Down Expand Up @@ -167,7 +173,7 @@ func (c *StaleController) cleanup() error {
for _, r := range resExpItems {
re := r
if re.Name != "" {
klog.InfoS("clean up ResourceExport", "ResourceExport", klog.KObj(&re))
klog.InfoS("Cleaning up ResourceExport", "ResourceExport", klog.KObj(&re))
if err := remoteCluster.Delete(ctx, &re, &client.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
return err
}
Expand All @@ -176,6 +182,34 @@ func (c *StaleController) cleanup() error {
return nil
}

func (c *StaleController) cleanupACNPResources(resImpList *mcsv1alpha1.ResourceImportList) error {
acnpList := &crdv1alpha1.ClusterNetworkPolicyList{}
if err := c.List(ctx, acnpList, &client.ListOptions{}); err != nil {
return err
}
staleMCACNPItems := map[string]crdv1alpha1.ClusterNetworkPolicy{}
for _, acnp := range acnpList.Items {
if _, ok := acnp.Annotations[common.AntreaMCACNPAnnotation]; ok {
staleMCACNPItems[acnp.Name] = acnp
}
}
for _, resImp := range resImpList.Items {
if resImp.Spec.Kind == common.AntreaClusterNetworkPolicyKind {
acnpNameFromResImp := common.AntreaMCSPrefix + resImp.Spec.Name
if _, ok := staleMCACNPItems[acnpNameFromResImp]; ok {
delete(staleMCACNPItems, acnpNameFromResImp)
}
}
}
for _, acnp := range staleMCACNPItems {
klog.InfoS("Cleaning up stale ACNP", "acnp", klog.KObj(&acnp))
if err := c.Client.Delete(ctx, &acnp, &client.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
return err
}
}
return nil
}

// Enqueue will be called after StaleController is initialized.
func (c *StaleController) Enqueue() {
// The key can be anything as we only have single item.
Expand Down
Loading

0 comments on commit 7c97846

Please sign in to comment.