Skip to content

Commit

Permalink
apply resources to child clusters with method PATCH (#666)
Browse files Browse the repository at this point in the history
Signed-off-by: Mars <abstractmj@qq.com>
  • Loading branch information
abstractmj committed Apr 17, 2023
1 parent e156bf0 commit b18c297
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 8 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.4.2
github.com/mattbaird/jsonpatch v0.0.0-20200820163806-098863c1fc24
github.com/pkg/errors v0.9.1
github.com/rancher/remotedialer v0.2.6-0.20210318171128-d1ebd5202be4
github.com/sirupsen/logrus v1.8.1
github.com/spf13/cobra v1.6.1
Expand Down Expand Up @@ -129,7 +130,6 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
Expand Down
6 changes: 5 additions & 1 deletion pkg/known/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ const (
// This is useful when you want to create a CR, but you don't want to declare a CRD in the parent kube-apiserver.
SkipValidatingAnnotation = "apps.clusternet.io/skip-validating"

//ObjectOwnedByDescriptionAnnotation is the name of an annotation which contains the description of the object owned by
// ObjectOwnedByDescriptionAnnotation is the name of an annotation which contains the description of the object owned by
ObjectOwnedByDescriptionAnnotation = "apps.clusternet.io/owned-by-description"

// LastAppliedConfigAnnotation is the annotation used to store the previous
// configuration of a resource for use in a three way diff by UpdateApplyAnnotation.
LastAppliedConfigAnnotation = "clusternet.io/last-applied-configuration"
)
117 changes: 111 additions & 6 deletions pkg/utils/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ package utils
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"reflect"
"strings"
"sync"

"github.com/mattbaird/jsonpatch"
"github.com/pkg/errors"
"helm.sh/helm/v3/pkg/action"
"helm.sh/helm/v3/pkg/chart"
"helm.sh/helm/v3/pkg/registry"
Expand All @@ -39,15 +39,20 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/jsonmergepatch"
"k8s.io/apimachinery/pkg/util/mergepatch"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
kubectlscheme "k8s.io/kubectl/pkg/scheme"

appsapi "github.com/clusternet/clusternet/pkg/apis/apps/v1alpha1"
clusterapi "github.com/clusternet/clusternet/pkg/apis/clusters/v1beta1"
Expand Down Expand Up @@ -377,13 +382,25 @@ func ApplyDescription(ctx context.Context, clusternetClient *clusternetclientset
recorder.Event(desc, corev1.EventTypeWarning, "FailedMarshalingResource", msg)
continue
}

annotations := resource.GetAnnotations()
if annotations == nil {
annotations = map[string]string{}
}
// remove kubectl last-applied-config annotation
delete(annotations, corev1.LastAppliedConfigAnnotation)
annotations[known.ObjectOwnedByDescriptionAnnotation] = desc.Namespace + "." + desc.Name
resource.SetAnnotations(annotations)
trimedObject, err := resource.MarshalJSON()
if err != nil {
allErrs = append(allErrs, err)
msg := fmt.Sprintf("failed to unmarshal resource: %v", err)
klog.ErrorDepth(5, msg)
recorder.Event(desc, corev1.EventTypeWarning, "FailedMarshalingResource", msg)
continue
}
// add clusternet-agent last-applied-config annotation
annotations[known.LastAppliedConfigAnnotation] = string(trimedObject)
resource.SetAnnotations(annotations)
wg.Add(1)
go func(resource *unstructured.Unstructured) {
defer wg.Done()
Expand Down Expand Up @@ -543,8 +560,7 @@ func ApplyResourceWithRetry(ctx context.Context, dynamicClient dynamic.Interface
}
if ResourceNeedResync(resource, curObj, ignoreAdd) {
// try to update resource
_, lastError = dynamicClient.Resource(restMapping.Resource).Namespace(resource.GetNamespace()).
Update(context.TODO(), resource, metav1.UpdateOptions{})
lastError = doApplyPatch(ctx, dynamicClient, restMapper, resource, curObj)
if lastError == nil {
return true, nil
}
Expand All @@ -563,8 +579,8 @@ func ApplyResourceWithRetry(ctx context.Context, dynamicClient dynamic.Interface
setNestedField(resourceCopy, getNestedString(curObj.Object, fields...), fields...)
}
// update with immutable values applied
_, lastError = dynamicClient.Resource(restMapping.Resource).Namespace(resourceCopy.GetNamespace()).
Update(context.TODO(), resourceCopy, metav1.UpdateOptions{})
// try to update resource
lastError = doApplyPatch(ctx, dynamicClient, restMapper, resource, curObj)
if lastError == nil {
return true, nil
}
Expand All @@ -578,6 +594,95 @@ func ApplyResourceWithRetry(ctx context.Context, dynamicClient dynamic.Interface
return lastError
}

func getOriginalConfiguration(current *unstructured.Unstructured) []byte {
annots := current.GetAnnotations()
if annots == nil {
return nil
}

original, ok := annots[known.LastAppliedConfigAnnotation]
if !ok {
return nil
}
return []byte(original)
}

func doApplyPatch(
ctx context.Context,
dynamicClient dynamic.Interface, restMapper meta.RESTMapper,
target, current *unstructured.Unstructured) error {
curData, err := json.Marshal(current)
if err != nil {
return errors.Wrap(err, "serializing current configuration")
}
newData, err := json.Marshal(target.Object)
if err != nil {
return errors.Wrap(err, "serializing target configuration")
}
originalData := getOriginalConfiguration(current)
restMapping, err := restMapper.RESTMapping(target.GroupVersionKind().GroupKind(), target.GroupVersionKind().Version)
if err != nil {
return errors.Wrap(err, "please check whether the advertised apiserver of current child cluster is accessible")
}

// Refer to the implementation of kubectl patcher
var patchType types.PatchType
var patch []byte
var lookupPatchMeta strategicpatch.LookupPatchMeta

versionedObject, err := kubectlscheme.Scheme.New(restMapping.GroupVersionKind)
switch {
case pkgruntime.IsNotRegisteredError(err):
// fall back to generic JSON merge patch
patchType = types.MergePatchType
preconditions := []mergepatch.PreconditionFunc{mergepatch.RequireKeyUnchanged("apiVersion"),
mergepatch.RequireKeyUnchanged("kind"), mergepatch.RequireMetadataKeyUnchanged("name")}
patch, err = jsonmergepatch.CreateThreeWayJSONMergePatch(originalData, newData, curData, preconditions...)
if err != nil {
if mergepatch.IsPreconditionFailed(err) {
return fmt.Errorf("%s", "At least one of apiVersion, kind and name was changed")
}
klog.Errorf("create jsonmergepath failed for gvk %s resource %s/%s, err %s",
restMapping.GroupVersionKind, target.GetNamespace(), target.GetName(), err.Error())
return err
}
case err != nil:
klog.Errorf("getting instance of versioned object for %v, err %s", restMapping.GroupVersionKind, err.Error())
return fmt.Errorf("getting instance of versioned object for %v, err %s",
restMapping.GroupVersionKind, err.Error())
case err == nil:
// Compute a three way strategic merge patch to send to server.
// TODO: Try to use openapi first if the openapi spec is available
patchType = types.StrategicMergePatchType
lookupPatchMeta, err = strategicpatch.NewPatchMetaFromStruct(versionedObject)
if err != nil {
klog.Errorf("get patch meta failed for gvk %s resource %s/%s, err %s",
restMapping.GroupVersionKind, target.GetNamespace(), target.GetName(), err.Error())
return err
}
patch, err = strategicpatch.CreateThreeWayMergePatch(originalData, newData, curData, lookupPatchMeta, true)
if err != nil {
klog.Errorf("create three way merge patch failed for gvk %s resource %s/%s, err %s",
restMapping.GroupVersionKind, target.GetNamespace(), target.GetName(), err.Error())
return err
}
}

if string(patch) == "{}" {
return nil
}
// try to update resource
_, err = dynamicClient.Resource(restMapping.Resource).Namespace(target.GetNamespace()).
Patch(ctx, target.GetName(), patchType, patch, metav1.PatchOptions{})
if err != nil {
klog.Errorf("call patch resource %s/%s failed, patch type %s, err %s",
target.GetNamespace(), target.GetName(), patchType, err.Error())
// return original error
return err
}
return nil
}

func DeleteResourceWithRetry(ctx context.Context, dynamicClient dynamic.Interface, restMapper meta.RESTMapper, resource *unstructured.Unstructured) error {
deletePropagationBackground := metav1.DeletePropagationBackground

Expand Down

0 comments on commit b18c297

Please sign in to comment.