From 794f91291204d76cb67256edc6f4b91e6dcc099f Mon Sep 17 00:00:00 2001 From: Alexander Matyushentsev Date: Wed, 24 Jul 2019 14:04:21 -0700 Subject: [PATCH] Issue #1940 - Wait for CRD creation during sync process --- controller/sync.go | 96 +++++++++++++++++++++++++++++++--------------- 1 file changed, 66 insertions(+), 30 deletions(-) diff --git a/controller/sync.go b/controller/sync.go index 1fdd5075f5b3c..f94c22c4586fe 100644 --- a/controller/sync.go +++ b/controller/sync.go @@ -7,11 +7,15 @@ import ( "sort" "strings" "sync" + "time" log "github.com/sirupsen/logrus" + "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" apierr "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" @@ -27,22 +31,27 @@ import ( "github.com/argoproj/argo-cd/util/resource" ) +const ( + crdReadinessTimeout = time.Duration(3) * time.Second +) + type syncContext struct { - resourceOverrides map[string]v1alpha1.ResourceOverride - appName string - proj *v1alpha1.AppProject - compareResult *comparisonResult - config *rest.Config - dynamicIf dynamic.Interface - disco discovery.DiscoveryInterface - kubectl kube.Kubectl - namespace string - server string - syncOp *v1alpha1.SyncOperation - syncRes *v1alpha1.SyncOperationResult - syncResources []v1alpha1.SyncOperationResource - opState *v1alpha1.OperationState - log *log.Entry + resourceOverrides map[string]v1alpha1.ResourceOverride + appName string + proj *v1alpha1.AppProject + compareResult *comparisonResult + config *rest.Config + dynamicIf dynamic.Interface + disco discovery.DiscoveryInterface + extensionsclientset *clientset.Clientset + kubectl kube.Kubectl + namespace string + server string + syncOp *v1alpha1.SyncOperation + syncRes *v1alpha1.SyncOperationResult + syncResources []v1alpha1.SyncOperationResource + opState *v1alpha1.OperationState + log *log.Entry // lock to protect concurrent updates of the result list lock sync.Mutex } @@ -137,6 +146,13 @@ func (m *appStateManager) SyncAppState(app *v1alpha1.Application, state *v1alpha return } + extensionsclientset, err := clientset.NewForConfig(restConfig) + if err != nil { + state.Phase = v1alpha1.OperationError + state.Message = fmt.Sprintf("Failed to initialize extensions client: %v", err) + return + } + proj, err := argo.GetAppProject(&app.Spec, listersv1alpha1.NewAppProjectLister(m.projInformer.GetIndexer()), m.namespace) if err != nil { state.Phase = v1alpha1.OperationError @@ -152,21 +168,22 @@ func (m *appStateManager) SyncAppState(app *v1alpha1.Application, state *v1alpha } syncCtx := syncContext{ - resourceOverrides: resourceOverrides, - appName: app.Name, - proj: proj, - compareResult: compareResult, - config: restConfig, - dynamicIf: dynamicIf, - disco: disco, - kubectl: m.kubectl, - namespace: app.Spec.Destination.Namespace, - server: app.Spec.Destination.Server, - syncOp: &syncOp, - syncRes: syncRes, - syncResources: syncResources, - opState: state, - log: log.WithFields(log.Fields{"application": app.Name}), + resourceOverrides: resourceOverrides, + appName: app.Name, + proj: proj, + compareResult: compareResult, + config: restConfig, + dynamicIf: dynamicIf, + disco: disco, + extensionsclientset: extensionsclientset, + kubectl: m.kubectl, + namespace: app.Spec.Destination.Namespace, + server: app.Spec.Destination.Server, + syncOp: &syncOp, + syncRes: syncRes, + syncResources: syncResources, + opState: state, + log: log.WithFields(log.Fields{"application": app.Name}), } if state.Phase == v1alpha1.OperationTerminating { @@ -502,6 +519,22 @@ func (sc *syncContext) setOperationPhase(phase v1alpha1.OperationPhase, message sc.opState.Message = message } +// ensureCRDReady waits until specified CRD is ready (established condition is true). Method is best effort - it does not fail even if CRD is not ready without timeout. +func (sc *syncContext) ensureCRDReady(name string) { + _ = wait.PollImmediate(time.Duration(100)*time.Millisecond, crdReadinessTimeout, func() (bool, error) { + crd, err := sc.extensionsclientset.ApiextensionsV1beta1().CustomResourceDefinitions().Get(name, metav1.GetOptions{}) + if err != nil { + return false, err + } + for _, condition := range crd.Status.Conditions { + if condition.Type == v1beta1.Established { + return condition.Status == v1beta1.ConditionTrue, nil + } + } + return false, nil + }) +} + // applyObject performs a `kubectl apply` of a single resource func (sc *syncContext) applyObject(targetObj *unstructured.Unstructured, dryRun bool, force bool) (v1alpha1.ResultCode, string) { validate := !resource.HasAnnotationOption(targetObj, common.AnnotationSyncOptions, "Validate=false") @@ -509,6 +542,9 @@ func (sc *syncContext) applyObject(targetObj *unstructured.Unstructured, dryRun if err != nil { return v1alpha1.ResultCodeSyncFailed, err.Error() } + if kube.IsCRD(targetObj) && !dryRun { + sc.ensureCRDReady(targetObj.GetName()) + } return v1alpha1.ResultCodeSynced, message }