Skip to content

Commit

Permalink
fix(controller): use update instead of patche (#876)
Browse files Browse the repository at this point in the history
Signed-off-by: cndoit18 <cndoit18@outlook.com>
  • Loading branch information
cndoit18 committed May 23, 2022
1 parent 95ff926 commit ac6d4d1
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 11 deletions.
5 changes: 1 addition & 4 deletions pkg/controller/apply.go
Expand Up @@ -12,8 +12,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

var force = true

func (c *Controller) Apply(ctx context.Context, res *Resource) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "apply")
defer span.Finish()
Expand Down Expand Up @@ -47,8 +45,7 @@ func (c *Controller) Apply(ctx context.Context, res *Resource) error {

l.Info("apply changing", "key", key, "kind", resource.GetObjectKind().GroupVersionKind())

if err := c.Client.Patch(ctx, resource, client.Apply, &client.PatchOptions{
Force: &force,
if err := c.Client.Update(ctx, resource, &client.UpdateOptions{
FieldManager: application.GetName(ctx),
}); err != nil {
l.Error(err, "Cannot deploy resource", "key", key, "kind", resource.GetObjectKind().GroupVersionKind())
Expand Down
11 changes: 7 additions & 4 deletions pkg/controller/common.go
Expand Up @@ -20,6 +20,7 @@ import (
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand All @@ -38,10 +39,11 @@ type Controller struct {

deletableResources map[schema.GroupVersionKind]struct{}

ConfigStore *configstore.Store
rm ResourceManager
Log logr.Logger
Scheme *runtime.Scheme
ConfigStore *configstore.Store
rm ResourceManager
Log logr.Logger
Scheme *runtime.Scheme
DiscoveryClient *discovery.DiscoveryClient
}

func NewController(ctx context.Context, base controllers.Controller, rm ResourceManager, config *configstore.Store) *Controller {
Expand All @@ -68,6 +70,7 @@ func NewController(ctx context.Context, base controllers.Controller, rm Resource
func (c *Controller) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
c.Client = mgr.GetClient()
c.Scheme = mgr.GetScheme()
c.DiscoveryClient = discovery.NewDiscoveryClientForConfigOrDie(mgr.GetConfig())

return nil
}
Expand Down
90 changes: 87 additions & 3 deletions pkg/controller/resource.go
Expand Up @@ -20,6 +20,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/version"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand Down Expand Up @@ -71,7 +72,7 @@ func (c *Controller) Changed(ctx context.Context, depManager *checksum.Dependenc
return false, nil
}

func (c *Controller) ProcessFunc(ctx context.Context, resource runtime.Object, dependencies ...graph.Resource) func(context.Context, graph.Resource) error { // nolint:funlen
func (c *Controller) ProcessFunc(ctx context.Context, resource runtime.Object, dependencies ...graph.Resource) func(context.Context, graph.Resource) error { // nolint:funlen,gocognit
depManager := checksum.New(c.Scheme)

depManager.Add(ctx, owner.Get(ctx), true)
Expand Down Expand Up @@ -137,10 +138,93 @@ func (c *Controller) ProcessFunc(ctx context.Context, resource runtime.Object, d
return nil
})

err = c.applyAndCheck(ctx, r)
info, err := c.DiscoveryClient.ServerVersion()
if err != nil {
return errors.Wrap(err, "failed to get server version")
}

if version.MustParseGeneric(info.String()).AtLeast(version.MustParseGeneric("v1.22.0")) {
return errors.Wrapf(
c.applyAndCheck(ctx, r),
"apply %s (%s/%s)", gvk, namespace, name,
)
}

res.mutable.AppendMutation(func(ctx context.Context, resource runtime.Object) error {
newSvc, ok := resource.(*corev1.Service)
if !ok {
return nil
}

oldSvc := &corev1.Service{}
err := c.Client.Get(ctx, client.ObjectKeyFromObject(newSvc), oldSvc)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
}

return err
}

// copied from https://github.com/kubernetes/kubernetes/blob/076168b84d0af4ad65cb5664fc1cef40f837e9dc/pkg/registry/core/service/strategy.go#L318
if newSvc.Spec.ClusterIP == "" {
newSvc.Spec.ClusterIP = oldSvc.Spec.ClusterIP
}

if len(newSvc.Spec.ClusterIPs) == 0 {
newSvc.Spec.ClusterIPs = oldSvc.Spec.ClusterIPs
}

if needsNodePort(oldSvc) && needsNodePort(newSvc) {
// Map NodePorts by name. The user may have changed other properties
// of the port, but we won't see that here.
np := map[string]int32{}
for i := range oldSvc.Spec.Ports {
p := &oldSvc.Spec.Ports[i]
np[p.Name] = p.NodePort
}
for i := range newSvc.Spec.Ports {
p := &newSvc.Spec.Ports[i]
if p.NodePort == 0 {
p.NodePort = np[p.Name]
}
}
}

return errors.Wrapf(err, "apply %s (%s/%s)", gvk, namespace, name)
if needsHCNodePort(oldSvc) && needsHCNodePort(newSvc) {
if newSvc.Spec.HealthCheckNodePort == 0 {
newSvc.Spec.HealthCheckNodePort = oldSvc.Spec.HealthCheckNodePort
}
}

return nil
})

return errors.Wrapf(
c.applyAndCheck(ctx, r),
"apply %s (%s/%s)", gvk, namespace, name,
)
}
}

func needsHCNodePort(svc *corev1.Service) bool {
if svc.Spec.Type != corev1.ServiceTypeLoadBalancer {
return false
}

if svc.Spec.ExternalTrafficPolicy != corev1.ServiceExternalTrafficPolicyTypeLocal {
return false
}

return true
}

func needsNodePort(svc *corev1.Service) bool {
if svc.Spec.Type == corev1.ServiceTypeNodePort || svc.Spec.Type == corev1.ServiceTypeLoadBalancer {
return true
}

return false
}

func (c *Controller) AddUnsctructuredToManage(ctx context.Context, resource *unstructured.Unstructured, dependencies ...graph.Resource) (graph.Resource, error) { // nolint:interfacer
Expand Down

0 comments on commit ac6d4d1

Please sign in to comment.