Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Application dependencies #15280

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions assets/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -5858,6 +5858,37 @@
}
}
},
"v1alpha1ApplicationDependency": {
"type": "object",
"title": "ApplicationDependency specifies dependency settings for an Application",
"properties": {
"blockOnEmpty": {
"type": "boolean",
"title": "BlockOnEmpty specifies whether to block sync when the list of applications determined by the selector is empty"
},
"refreshDependencies": {
"type": "boolean",
"title": "RefreshDependencies specifies whether all dependencies should be refreshed before starting a sync"
},
"selectors": {
"type": "array",
"title": "Selectors specifies conditions for matching application's dependencies",
"items": {
"$ref": "#/definitions/v1alpha1ApplicationSelector"
}
},
"syncDelay": {
"type": "integer",
"format": "int64",
"title": "SyncDelay specifies the duration in seconds to wait before starting to sync when dependencies are defined"
},
"timeout": {
"type": "integer",
"format": "int64",
"title": "Timeout specifies the maximum duration in seconds to wait on dependencies before the sync fails"
}
}
},
"v1alpha1ApplicationDestination": {
"type": "object",
"title": "ApplicationDestination holds information about the application's destination",
Expand Down Expand Up @@ -5925,6 +5956,22 @@
}
}
},
"v1alpha1ApplicationSelector": {
"type": "object",
"title": "ApplicationSelector specifies which applications this Application depends on",
jannfis marked this conversation as resolved.
Show resolved Hide resolved
"properties": {
"labelSelector": {
"$ref": "#/definitions/v1LabelSelector"
},
"namePattern": {
"type": "array",
"title": "NamePattern selects applications by matching their names",
"items": {
"type": "string"
}
}
}
},
"v1alpha1ApplicationSet": {
"type": "object",
"title": "ApplicationSet is a set of Application resources\n+genclient\n+genclient:noStatus\n+k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object\n+kubebuilder:resource:path=applicationsets,shortName=appset;appsets\n+kubebuilder:subresource:status",
Expand Down Expand Up @@ -6531,6 +6578,9 @@
"description": "ApplicationSpec represents desired application state. Contains link to repository with application definition and additional parameters link definition revision.",
"type": "object",
"properties": {
"dependsOn": {
"$ref": "#/definitions/v1alpha1ApplicationDependency"
},
"destination": {
"$ref": "#/definitions/v1alpha1ApplicationDestination"
},
Expand Down Expand Up @@ -7538,6 +7588,10 @@
"type": "object",
"title": "OperationState contains information about state of a running operation",
"properties": {
"blockedOnEmpty": {
"type": "boolean",
"title": "BlockedOnEmpty is true when the application is waiting for any dependency to be created"
},
"finishedAt": {
"$ref": "#/definitions/v1Time"
},
Expand All @@ -7562,6 +7616,13 @@
},
"syncResult": {
"$ref": "#/definitions/v1alpha1SyncOperationResult"
},
"waitingFor": {
"type": "array",
"title": "WaitingFor specifies a list of applications that this operation is waiting for",
jannfis marked this conversation as resolved.
Show resolved Hide resolved
"items": {
"$ref": "#/definitions/v1alpha1SyncDependency"
}
}
}
},
Expand Down Expand Up @@ -8867,6 +8928,20 @@
}
}
},
"v1alpha1SyncDependency": {
"type": "object",
"properties": {
"applicationName": {
"type": "string"
},
"applicationNamespace": {
"type": "string"
},
"refreshedAt": {
"$ref": "#/definitions/v1Time"
}
}
},
"v1alpha1SyncOperation": {
"description": "SyncOperation contains details about a sync operation.",
"type": "object",
Expand Down
2 changes: 1 addition & 1 deletion cmd/argocd/commands/admin/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func reconcileApplications(
)

appStateManager := controller.NewAppStateManager(
argoDB, appClientset, repoServerClient, namespace, kubeutil.NewKubectl(), settingsMgr, stateCache, projInformer, server, cache, time.Second, argo.NewResourceTracking(), false, 0, serverSideDiff)
argoDB, appClientset, repoServerClient, namespace, kubeutil.NewKubectl(), settingsMgr, stateCache, projInformer, appLister, server, cache, time.Second, argo.NewResourceTracking(), false, 0, serverSideDiff)

appsList, err := appClientset.ArgoprojV1alpha1().Applications(namespace).List(ctx, v1.ListOptions{LabelSelector: selector})
if err != nil {
Expand Down
145 changes: 139 additions & 6 deletions controller/appcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ const (
defaultDeploymentInformerResyncDuration = 10 * time.Second
// orphanedIndex contains application which monitor orphaned resources by namespace
orphanedIndex = "orphaned"
// refreshAfterForDependencies defines the interval for refresh while waiting for dependency application
// TODO: Value is a first shot. Should probably be configurable.
refreshAfterForDependencies time.Duration = 2 * time.Second
)

type CompareWith int
Expand Down Expand Up @@ -277,7 +280,7 @@ func NewApplicationController(
}
}
stateCache := statecache.NewLiveStateCache(db, appInformer, ctrl.settingsMgr, kubectl, ctrl.metricsServer, ctrl.handleObjectUpdated, clusterSharding, argo.NewResourceTracking())
appStateManager := NewAppStateManager(db, applicationClientset, repoClientset, namespace, kubectl, ctrl.settingsMgr, stateCache, projInformer, ctrl.metricsServer, argoCache, ctrl.statusRefreshTimeout, argo.NewResourceTracking(), persistResourceHealth, repoErrorGracePeriod, serverSideDiff)
appStateManager := NewAppStateManager(db, applicationClientset, repoClientset, namespace, kubectl, ctrl.settingsMgr, stateCache, projInformer, appLister, ctrl.metricsServer, argoCache, ctrl.statusRefreshTimeout, argo.NewResourceTracking(), persistResourceHealth, repoErrorGracePeriod, serverSideDiff)
ctrl.appInformer = appInformer
ctrl.appLister = appLister
ctrl.projInformer = projInformer
Expand Down Expand Up @@ -1244,6 +1247,49 @@ func (ctrl *ApplicationController) setAppCondition(app *appv1.Application, condi
}
}

// refreshDependencies refreshes the dependencies for a given application
// and persists the results in the application's spec.
func (ctrl *ApplicationController) refreshDependencies(app *appv1.Application, state *appv1.OperationState) {
logCtx := log.WithField("application", app.QualifiedName())
for i, dep := range state.WaitingFor {

a, err := ctrl.appLister.Applications(dep.ApplicationNamespace).Get(dep.ApplicationName)
if err != nil {
logCtx.Errorf("Could not retrieve dependency %s: %v", dep.ApplicationName, err)
continue
}

// For now, we need to ensure that the dependencies and the dependant
// belong to the same AppProject. This will likely change in a future
// iteration.
if app.Spec.GetProject() != a.Spec.GetProject() {
logCtx.Infof("Not refreshing dependency app %s because AppProject does not match: is %s, must be %s", a.QualifiedName(), a.Spec.GetProject(), app.Spec.GetProject())
continue
}

// Either an operation has been requested or is already in progress.
// We do not request a new one.
if a.Operation != nil || (a.Status.OperationState != nil && a.Status.OperationState.Phase == synccommon.OperationRunning) {
logCtx.Debugf("Dependency %s: Operation already in progress, not going to trigger another one", dep.QualifiedName())
continue
}

// If the dependency has never refreshed, refresh it now
if dep.RefreshedAt.IsZero() {
logCtx.Infof("Requesting refresh for app %s to check dependencies", dep.QualifiedName())
ctrl.requestAppRefresh(dep.QualifiedName(), CompareWithLatest.Pointer(), nil)
dep.RefreshedAt = &metav1.Time{Time: time.Now()}
} else {
logCtx.Debugf("Already requested a refresh for dependency %s", dep.QualifiedName())
}

state.WaitingFor[i] = dep
}

// Persist the dependencies' states in the resource
ctrl.setOperationState(app, state)
}

func (ctrl *ApplicationController) processRequestedAppOperation(app *appv1.Application) {
logCtx := log.WithField("application", app.QualifiedName())
var state *appv1.OperationState
Expand Down Expand Up @@ -1300,6 +1346,9 @@ func (ctrl *ApplicationController) processRequestedAppOperation(app *appv1.Appli
state.Message = err.Error()
} else {
ctrl.appStateManager.SyncAppState(app, state)
if state != nil && len(state.WaitingFor) > 0 {
ctrl.refreshDependencies(app, state)
}
}

// Check whether application is allowed to use project
Expand Down Expand Up @@ -1327,6 +1376,24 @@ func (ctrl *ApplicationController) processRequestedAppOperation(app *appv1.Appli
// SyncAppState will operate in a Terminating phase, allowing the worker to perform
// cleanup (e.g. delete jobs, workflows, etc...)
}

// If the operation is in progress, and we are blocked on waiting
// for dependencies, immediately refresh the application so that
// we enter a refresh cycle.
if freshApp.Status.OperationState != nil && freshApp.Status.OperationState.Phase == synccommon.OperationRunning {
refresh := false
if freshApp.Status.OperationState.BlockedOnEmpty {
logCtx.Infof("Requesting app refresh for blocked app")
refresh = true
} else if len(freshApp.Status.OperationState.WaitingFor) > 0 {
logCtx.Infof("Requesting app refresh because waiting on dependencies")
refresh = true
}
if refresh {
ctrl.appRefreshQueue.AddAfter(fmt.Sprintf("%s/%d", freshApp.QualifiedName(), ComparisonWithNothing), refreshAfterForDependencies)
ctrl.appOperationQueue.AddAfter(freshApp.QualifiedName(), refreshAfterForDependencies)
}
}
}
} else if state.Phase == synccommon.OperationFailed || state.Phase == synccommon.OperationError {
if !terminating && (state.RetryCount < state.Operation.Retry.Limit || state.Operation.Retry.Limit < 0) {
Expand Down Expand Up @@ -1389,11 +1456,20 @@ func (ctrl *ApplicationController) setOperationState(app *appv1.Application, sta
logCtx.Errorf("error marshaling json: %v", err)
return
}
if app.Status.OperationState != nil && app.Status.OperationState.FinishedAt != nil && state.FinishedAt == nil {
patchJSON, err = jsonpatch.MergeMergePatches(patchJSON, []byte(`{"status": {"operationState": {"finishedAt": null}}}`))
if err != nil {
logCtx.Errorf("error merging operation state patch: %v", err)
return
if app.Status.OperationState != nil {
if app.Status.OperationState.FinishedAt != nil && state.FinishedAt == nil {
patchJSON, err = jsonpatch.MergeMergePatches(patchJSON, []byte(`{"status": {"operationState": {"finishedAt": null}}}`))
if err != nil {
logCtx.Errorf("error merging operation state patch: %v", err)
return
}
}
if len(app.Status.OperationState.WaitingFor) > 0 && len(state.WaitingFor) == 0 {
patchJSON, err = jsonpatch.MergeMergePatches(patchJSON, []byte(`{"status": {"operationState": {"waitingFor": null}}}`))
if err != nil {
logCtx.Errorf("error merging operation state patch: %v", err)
return
}
}
}

Expand Down Expand Up @@ -1660,6 +1736,60 @@ func currentSourceEqualsSyncedSource(app *appv1.Application) bool {
return app.Spec.Source.Equals(&app.Status.Sync.ComparedTo.Source)
}

// shouldRefreshForDependency returns whether an app should be refreshed for a
// change in the state of one of its dependencies.
func (ctrl *ApplicationController) shouldRefreshForDependency(app *appv1.Application) bool {
logCtx := log.WithField("application", app.QualifiedName())

// No need to refresh when we're not waiting for dependencies to sync
if app.Spec.DependsOn == nil || !app.IsWaiting() {
logCtx.Debugf("not waiting for dependencies, skipping refresh")
return false
}

// If we're waiting for any dependency to be created, we need refresh to
// see whether they're created by now.
if app.Status.OperationState.BlockedOnEmpty {
return true
}
jannfis marked this conversation as resolved.
Show resolved Hide resolved

needRefresh := false
numWaiting := len(app.Status.OperationState.WaitingFor)
numRemoved := 0
for _, syncDep := range app.Status.OperationState.WaitingFor {
depApp, err := ctrl.appLister.Applications(syncDep.ApplicationNamespace).Get(syncDep.ApplicationName)
if err != nil {
// The application could have been deleted meanwhile, it's not any
// of the dependencies anymore.
if apierr.IsNotFound(err) {
logCtx.Infof("Dependency application %s has been removed", syncDep.QualifiedName())
numRemoved += 1
continue
} else {
logCtx.Warnf("Error getting sync dependency %s: %v", syncDep.QualifiedName(), err)
return false
}
}

// Project of dependency must match
if app.Spec.GetProject() != depApp.Spec.GetProject() {
continue
}

// If there was a state change for one of the dependencies, we want to
// trigger a refresh.
if depApp.Status.Health.Status == health.HealthStatusHealthy && depApp.Status.Sync.Status == appv1.SyncStatusCodeSynced {
logCtx.Debugf("Sync dependency %s has become healthy and synced", syncDep.QualifiedName())
needRefresh = true
}
}

// We need to refresh when either one of our dependencies changed state to
// healthy, or when all dependencies we were waiting for have been
// removed meanwhile.
return needRefresh || (numWaiting > 0 && numRemoved == numWaiting)
}

// needRefreshAppStatus answers if application status needs to be refreshed.
// Returns true if application never been compared, has changed or comparison result has expired.
// Additionally, it returns whether full refresh was requested or not.
Expand Down Expand Up @@ -1707,6 +1837,9 @@ func (ctrl *ApplicationController) needRefreshAppStatus(app *appv1.Application,
} else if requested, level := ctrl.isRefreshRequested(app.QualifiedName()); requested {
compareWith = level
reason = "controller refresh requested"
} else if ctrl.shouldRefreshForDependency(app) {
compareWith = ComparisonWithNothing
reason = "refreshing for change in dependencies' status"
}
}

Expand Down
Loading
Loading