Skip to content

Commit

Permalink
Support for PreSync, Sync, PostSync resource hooks (#350)
Browse files Browse the repository at this point in the history
* Rewrite controller sync logic to support workflow-based sync

* Redesign hook implementation to support generic resources as hooks
  • Loading branch information
jessesuen committed Jul 7, 2018
1 parent bf2fe9d commit d633f6b
Show file tree
Hide file tree
Showing 20 changed files with 2,585 additions and 792 deletions.
26 changes: 24 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@ required = [
name = "github.com/grpc-ecosystem/grpc-gateway"
version = "v1.3.1"

[[constraint]]
# override argo outdated dependency
[[override]]
branch = "release-1.10"
name = "k8s.io/api"

[[override]]
branch = "release-1.10"
name = "k8s.io/apimachinery"

[[constraint]]
name = "k8s.io/apiextensions-apiserver"
branch = "release-1.10"
Expand All @@ -28,7 +33,7 @@ required = [
branch = "release-1.10"
name = "k8s.io/code-generator"

[[constraint]]
[[override]]
branch = "release-7.0"
name = "k8s.io/client-go"

Expand Down
32 changes: 30 additions & 2 deletions cmd/argocd/commands/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,10 @@ func NewApplicationGetCommand(clientOpts *argocdclient.ClientOptions) *cobra.Com
duration = time.Second * time.Duration(time.Now().UTC().Unix()-opState.StartedAt.Unix())
}
fmt.Printf(format, " Duration:", duration)
fmt.Printf(format, " Phase:", opState.Phase)
if opState.Message != "" {
fmt.Printf(format, " Message:", opState.Message)
}
printHooks(opState)
}
if showParams {
printParams(app)
Expand Down Expand Up @@ -546,7 +546,7 @@ func NewApplicationListCommand(clientOpts *argocdclient.ClientOptions) *cobra.Co
var fmtStr string
headers := []interface{}{"NAME", "CLUSTER", "NAMESPACE", "PROJECT", "STATUS", "HEALTH"}
if output == "wide" {
fmtStr = "%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n"
fmtStr = "%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n"
headers = append(headers, "ENV", "REPO", "PATH", "TARGET")
} else {
fmtStr = "%s\t%s\t%s\t%s\t%s\t%s\n"
Expand Down Expand Up @@ -757,6 +757,8 @@ func NewApplicationSyncCommand(clientOpts *argocdclient.ClientOptions) *cobra.Co
prune bool
dryRun bool
timeout uint
strategy string
force bool
)
var command = &cobra.Command{
Use: "sync APPNAME",
Expand All @@ -775,6 +777,16 @@ func NewApplicationSyncCommand(clientOpts *argocdclient.ClientOptions) *cobra.Co
Revision: revision,
Prune: prune,
}
switch strategy {
case "", "apply":
syncReq.Strategy = &argoappv1.SyncStrategy{Apply: &argoappv1.SyncStrategyApply{}}
syncReq.Strategy.Apply.Force = force
case "hook":
syncReq.Strategy = &argoappv1.SyncStrategy{Hook: &argoappv1.SyncStrategyHook{}}
syncReq.Strategy.Hook.Force = force
default:
log.Fatalf("Unknown sync strategy: '%s'", strategy)
}
_, err := appIf.Sync(context.Background(), &syncReq)
errors.CheckError(err)
status, err := waitUntilOperationCompleted(appIf, appName, timeout)
Expand All @@ -790,6 +802,8 @@ func NewApplicationSyncCommand(clientOpts *argocdclient.ClientOptions) *cobra.Co
command.Flags().BoolVar(&prune, "prune", false, "Allow deleting unexpected resources")
command.Flags().StringVar(&revision, "revision", "", "Sync to a specific revision. Preserves parameter overrides")
command.Flags().UintVar(&timeout, "timeout", defaultCheckTimeoutSeconds, "Time out after this many seconds")
command.Flags().StringVar(&strategy, "strategy", "", "Sync strategy (one of: apply|hook)")
command.Flags().BoolVar(&force, "force", false, "Use a force apply")
return command
}

Expand Down Expand Up @@ -959,6 +973,8 @@ func printOperationResult(appName string, opState *argoappv1.OperationState) err
fmt.Printf(printOpFmtStr, "Message:", opState.Message)
}
if syncRes != nil {
printHooks(opState)

w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
fmt.Printf("\n")
fmt.Fprintf(w, "KIND\tNAME\tMESSAGE\n")
Expand All @@ -977,6 +993,18 @@ func printOperationResult(appName string, opState *argoappv1.OperationState) err
return nil
}

func printHooks(opState *argoappv1.OperationState) {
if len(opState.HookResources) > 0 {
fmt.Printf("\n")
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
fmt.Fprintf(w, "HOOK\tKIND\tNAME\tSTATUS\tMESSAGE\n")
for _, hookStatus := range opState.HookResources {
fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\n", hookStatus.Type, hookStatus.Kind, hookStatus.Name, hookStatus.Status, hookStatus.Message)
}
_ = w.Flush()
}
}

// NewApplicationManifestsCommand returns a new instance of an `argocd app manifests` command
func NewApplicationManifestsCommand(clientOpts *argocdclient.ClientOptions) *cobra.Command {
var (
Expand Down
3 changes: 3 additions & 0 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ var (
// AnnotationConnectionModifiedAt contains timestamp when connection state had been modified
AnnotationConnectionModifiedAt = MetadataPrefix + "/connection-modified-at"

// AnnotationHook contains the hook type of a resource
AnnotationHook = MetadataPrefix + "/hook"

// LabelKeyApplicationControllerInstanceID is the label which allows to separate application among multiple running application controllers.
LabelKeyApplicationControllerInstanceID = application.ApplicationFullName + "/controller-instanceid"

Expand Down
78 changes: 33 additions & 45 deletions controller/appcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,11 @@ import (
"context"
"encoding/json"
"fmt"
"reflect"
"runtime/debug"
"sync"
"time"

"github.com/argoproj/argo-cd/common"
appv1 "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
appclientset "github.com/argoproj/argo-cd/pkg/client/clientset/versioned"
appinformers "github.com/argoproj/argo-cd/pkg/client/informers/externalversions"
"github.com/argoproj/argo-cd/util/db"
"github.com/argoproj/argo-cd/util/kube"
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -28,6 +23,13 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

"github.com/argoproj/argo-cd/common"
appv1 "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
appclientset "github.com/argoproj/argo-cd/pkg/client/clientset/versioned"
appinformers "github.com/argoproj/argo-cd/pkg/client/informers/externalversions"
"github.com/argoproj/argo-cd/util/db"
"github.com/argoproj/argo-cd/util/kube"
)

const (
Expand Down Expand Up @@ -310,22 +312,21 @@ func (ctrl *ApplicationController) setAppCondition(app *appv1.Application, condi
}

func (ctrl *ApplicationController) processRequestedAppOperation(app *appv1.Application) {
state := appv1.OperationState{Phase: appv1.OperationRunning, Operation: *app.Operation, StartedAt: metav1.Now()}
var state *appv1.OperationState
// Recover from any unexpected panics and automatically set the status to be failed
defer func() {
if r := recover(); r != nil {
log.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
// TODO: consider adding Error OperationStatus in addition to Failed
state.Phase = appv1.OperationError
if rerr, ok := r.(error); ok {
state.Message = rerr.Error()
} else {
state.Message = fmt.Sprintf("%v", r)
}
ctrl.setOperationState(app.Name, state, app.Operation)
ctrl.setOperationState(app, state, app.Operation)
}
}()
if app.Status.OperationState != nil && !app.Status.OperationState.Phase.Completed() {
if isOperationRunning(app) {
// If we get here, we are about process an operation but we notice it is already Running.
// We need to detect if the controller crashed before completing the operation, or if the
// the app object we pulled off the informer is simply stale and doesn't reflect the fact
Expand All @@ -336,45 +337,23 @@ func (ctrl *ApplicationController) processRequestedAppOperation(app *appv1.Appli
log.Errorf("Failed to retrieve latest application state: %v", err)
return
}
if freshApp.Status.OperationState == nil || freshApp.Status.OperationState.Phase.Completed() {
if !isOperationRunning(freshApp) {
log.Infof("Skipping operation on stale application state (%s)", app.ObjectMeta.Name)
return
}
log.Warnf("Found interrupted application operation %s %v", app.ObjectMeta.Name, app.Status.OperationState)
app = freshApp
state = app.Status.OperationState.DeepCopy()
log.Infof("Resuming in-progress operation %s %v", app.ObjectMeta.Name, state)
} else {
ctrl.setOperationState(app.Name, state, app.Operation)
state = &appv1.OperationState{Phase: appv1.OperationRunning, Operation: *app.Operation, StartedAt: metav1.Now()}
ctrl.setOperationState(app, state, app.Operation)
log.Infof("Initialized new operation %s %v", app.ObjectMeta.Name, state)
}

if app.Operation.Sync != nil {
opRes := ctrl.appStateManager.SyncAppState(app, app.Operation.Sync.Revision, nil, app.Operation.Sync.DryRun, app.Operation.Sync.Prune)
state.Phase = opRes.Phase
state.Message = opRes.Message
state.SyncResult = opRes.SyncResult
} else if app.Operation.Rollback != nil {
var deploymentInfo *appv1.DeploymentInfo
for _, info := range app.Status.History {
if info.ID == app.Operation.Rollback.ID {
deploymentInfo = &info
break
}
}
if deploymentInfo == nil {
state.Phase = appv1.OperationFailed
state.Message = fmt.Sprintf("application %s does not have deployment with id %v", app.Name, app.Operation.Rollback.ID)
} else {
opRes := ctrl.appStateManager.SyncAppState(app, deploymentInfo.Revision, &deploymentInfo.ComponentParameterOverrides, app.Operation.Rollback.DryRun, app.Operation.Rollback.Prune)
state.Phase = opRes.Phase
state.Message = opRes.Message
state.RollbackResult = opRes.SyncResult
}
} else {
state.Phase = appv1.OperationFailed
state.Message = "Invalid operation request"
}
ctrl.setOperationState(app.Name, state, app.Operation)
ctrl.appStateManager.SyncAppState(app, state)
ctrl.setOperationState(app, state, app.Operation)
}

func (ctrl *ApplicationController) setOperationState(appName string, state appv1.OperationState, operation *appv1.Operation) {
func (ctrl *ApplicationController) setOperationState(app *appv1.Application, state *appv1.OperationState, operation *appv1.Operation) {
retryUntilSucceed(func() error {
var inProgressOpValue *appv1.Operation
if state.Phase == "" {
Expand All @@ -391,6 +370,11 @@ func (ctrl *ApplicationController) setOperationState(appName string, state appv1
state.FinishedAt = &nowTime
}

if reflect.DeepEqual(app.Operation, inProgressOpValue) && reflect.DeepEqual(app.Status.OperationState, state) {
log.Infof("no updates necessary to '%s' skipping patch", app.Name)
return nil
}

patch, err := json.Marshal(map[string]interface{}{
"status": map[string]interface{}{
"operationState": state,
Expand All @@ -401,11 +385,11 @@ func (ctrl *ApplicationController) setOperationState(appName string, state appv1
return err
}
appClient := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(ctrl.namespace)
_, err = appClient.Patch(appName, types.MergePatchType, patch)
_, err = appClient.Patch(app.Name, types.MergePatchType, patch)
if err != nil {
return err
}
log.Infof("updated '%s' operation (phase: %s)", appName, state.Phase)
log.Infof("updated '%s' operation (phase: %s)", app.Name, state.Phase)
return nil
}, "Update application operation state", context.Background(), updateOperationStateTimeout)
}
Expand Down Expand Up @@ -463,7 +447,7 @@ func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext boo
}

func (ctrl *ApplicationController) tryRefreshAppStatus(app *appv1.Application) (*appv1.ComparisonResult, *[]appv1.ComponentParameter, *appv1.HealthStatus, error) {
comparisonResult, manifestInfo, err := ctrl.appStateManager.CompareAppState(app)
comparisonResult, manifestInfo, err := ctrl.appStateManager.CompareAppState(app, "", nil)
if err != nil {
return nil, nil, nil, err
}
Expand Down Expand Up @@ -558,3 +542,7 @@ func newApplicationInformer(
)
return informer
}

func isOperationRunning(app *appv1.Application) bool {
return app.Status.OperationState != nil && !app.Status.OperationState.Phase.Completed()
}

0 comments on commit d633f6b

Please sign in to comment.