Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #1940 - Wait for CRD creation during sync process #1999

Merged
merged 1 commit into from
Jul 24, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 66 additions & 30 deletions controller/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ import (
"sort"
"strings"
"sync"
"time"

log "github.com/sirupsen/logrus"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
Expand All @@ -27,22 +31,27 @@ import (
"github.com/argoproj/argo-cd/util/resource"
)

const (
crdReadinessTimeout = time.Duration(3) * time.Second
)

type syncContext struct {
resourceOverrides map[string]v1alpha1.ResourceOverride
appName string
proj *v1alpha1.AppProject
compareResult *comparisonResult
config *rest.Config
dynamicIf dynamic.Interface
disco discovery.DiscoveryInterface
kubectl kube.Kubectl
namespace string
server string
syncOp *v1alpha1.SyncOperation
syncRes *v1alpha1.SyncOperationResult
syncResources []v1alpha1.SyncOperationResource
opState *v1alpha1.OperationState
log *log.Entry
resourceOverrides map[string]v1alpha1.ResourceOverride
appName string
proj *v1alpha1.AppProject
compareResult *comparisonResult
config *rest.Config
dynamicIf dynamic.Interface
disco discovery.DiscoveryInterface
extensionsclientset *clientset.Clientset
kubectl kube.Kubectl
namespace string
server string
syncOp *v1alpha1.SyncOperation
syncRes *v1alpha1.SyncOperationResult
syncResources []v1alpha1.SyncOperationResource
opState *v1alpha1.OperationState
log *log.Entry
// lock to protect concurrent updates of the result list
lock sync.Mutex
}
Expand Down Expand Up @@ -137,6 +146,13 @@ func (m *appStateManager) SyncAppState(app *v1alpha1.Application, state *v1alpha
return
}

extensionsclientset, err := clientset.NewForConfig(restConfig)
if err != nil {
state.Phase = v1alpha1.OperationError
state.Message = fmt.Sprintf("Failed to initialize extensions client: %v", err)
return
}

proj, err := argo.GetAppProject(&app.Spec, listersv1alpha1.NewAppProjectLister(m.projInformer.GetIndexer()), m.namespace)
if err != nil {
state.Phase = v1alpha1.OperationError
Expand All @@ -152,21 +168,22 @@ func (m *appStateManager) SyncAppState(app *v1alpha1.Application, state *v1alpha
}

syncCtx := syncContext{
resourceOverrides: resourceOverrides,
appName: app.Name,
proj: proj,
compareResult: compareResult,
config: restConfig,
dynamicIf: dynamicIf,
disco: disco,
kubectl: m.kubectl,
namespace: app.Spec.Destination.Namespace,
server: app.Spec.Destination.Server,
syncOp: &syncOp,
syncRes: syncRes,
syncResources: syncResources,
opState: state,
log: log.WithFields(log.Fields{"application": app.Name}),
resourceOverrides: resourceOverrides,
appName: app.Name,
proj: proj,
compareResult: compareResult,
config: restConfig,
dynamicIf: dynamicIf,
disco: disco,
extensionsclientset: extensionsclientset,
kubectl: m.kubectl,
namespace: app.Spec.Destination.Namespace,
server: app.Spec.Destination.Server,
syncOp: &syncOp,
syncRes: syncRes,
syncResources: syncResources,
opState: state,
log: log.WithFields(log.Fields{"application": app.Name}),
}

if state.Phase == v1alpha1.OperationTerminating {
Expand Down Expand Up @@ -502,13 +519,32 @@ func (sc *syncContext) setOperationPhase(phase v1alpha1.OperationPhase, message
sc.opState.Message = message
}

// ensureCRDReady waits until specified CRD is ready (established condition is true). Method is best effort - it does not fail even if CRD is not ready without timeout.
func (sc *syncContext) ensureCRDReady(name string) {
_ = wait.PollImmediate(time.Duration(100)*time.Millisecond, crdReadinessTimeout, func() (bool, error) {
crd, err := sc.extensionsclientset.ApiextensionsV1beta1().CustomResourceDefinitions().Get(name, metav1.GetOptions{})
if err != nil {
return false, err
}
for _, condition := range crd.Status.Conditions {
if condition.Type == v1beta1.Established {
return condition.Status == v1beta1.ConditionTrue, nil
}
}
return false, nil
})
}

// applyObject performs a `kubectl apply` of a single resource
func (sc *syncContext) applyObject(targetObj *unstructured.Unstructured, dryRun bool, force bool) (v1alpha1.ResultCode, string) {
validate := !resource.HasAnnotationOption(targetObj, common.AnnotationSyncOptions, "Validate=false")
message, err := sc.kubectl.ApplyResource(sc.config, targetObj, targetObj.GetNamespace(), dryRun, force, validate)
if err != nil {
return v1alpha1.ResultCodeSyncFailed, err.Error()
}
if kube.IsCRD(targetObj) && !dryRun {
sc.ensureCRDReady(targetObj.GetName())
}
return v1alpha1.ResultCodeSynced, message
}

Expand Down