Skip to content

Commit

Permalink
Feat: watch event listener in controller for a faster reconcile (kube…
Browse files Browse the repository at this point in the history
…vela#131)

Signed-off-by: FogDong <dongtianxin.tx@alibaba-inc.com>
  • Loading branch information
FogDong committed Mar 2, 2023
1 parent 584bfb3 commit d3cce43
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 19 deletions.
14 changes: 8 additions & 6 deletions charts/vela-workflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@ helm install --create-namespace -n vela-system workflow kubevela/vela-workflow -

### KubeVela workflow parameters

| Name | Description | Value |
| -------------------------------------- | ------------------------------------------------------ | ------- |
| `workflow.enableSuspendOnFailure` | Enable suspend on workflow failure | `false` |
| `workflow.backoff.maxTime.waitState` | The max backoff time of workflow in a wait condition | `60` |
| `workflow.backoff.maxTime.failedState` | The max backoff time of workflow in a failed condition | `300` |
| `workflow.step.errorRetryTimes` | The max retry times of a failed workflow step | `10` |
| Name | Description | Value |
| -------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- |
| `workflow.enableSuspendOnFailure` | Enable the capability of suspend an failed workflow automatically | `false` |
| `workflow.enablePatchStatusAtOnce` | Enable the capability of patch status at once | `false` |
| `workflow.enableWatchEventListener` | Enable the capability of watch event listener for a faster reconcile, note that you need to install [kube-trigger](https://github.com/kubevela/kube-trigger) first to use this feature | `false` |
| `workflow.backoff.maxTime.waitState` | The max backoff time of workflow in a wait condition | `60` |
| `workflow.backoff.maxTime.failedState` | The max backoff time of workflow in a failed condition | `300` |
| `workflow.step.errorRetryTimes` | The max retry times of a failed workflow step | `10` |


### KubeVela workflow backup parameters
Expand Down
2 changes: 2 additions & 0 deletions charts/vela-workflow/templates/workflow-controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ spec:
- "--max-workflow-wait-backoff-time={{ .Values.workflow.backoff.maxTime.waitState }}"
- "--max-workflow-failed-backoff-time={{ .Values.workflow.backoff.maxTime.failedState }}"
- "--max-workflow-step-error-retry-times={{ .Values.workflow.step.errorRetryTimes }}"
- "--feature-gates=EnableWatchEventListener={{- .Values.enableWatchEventListener | toString -}}"
- "--feature-gates=EnablePatchStatusAtOnce={{- .Values.enablePatchStatusAtOnce | toString -}}"
- "--feature-gates=EnableSuspendOnFailure={{- .Values.workflow.enableSuspendOnFailure | toString -}}"
- "--feature-gates=EnableBackupWorkflowRecord={{- .Values.backup.enabled | toString -}}"
{{ if .Values.backup.enable }}
Expand Down
6 changes: 5 additions & 1 deletion charts/vela-workflow/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@ ignoreWorkflowWithoutControllerRequirement: false

## @section KubeVela workflow parameters

## @param workflow.enableSuspendOnFailure Enable suspend on workflow failure
## @param workflow.enableSuspendOnFailure Enable the capability of suspend an failed workflow automatically
## @param workflow.enablePatchStatusAtOnce Enable the capability of patch status at once
## @param workflow.enableWatchEventListener Enable the capability of watch event listener for a faster reconcile, note that you need to install [kube-trigger](https://github.com/kubevela/kube-trigger) first to use this feature
## @param workflow.backoff.maxTime.waitState The max backoff time of workflow in a wait condition
## @param workflow.backoff.maxTime.failedState The max backoff time of workflow in a failed condition
## @param workflow.step.errorRetryTimes The max retry times of a failed workflow step
workflow:
enableSuspendOnFailure: false
enablePatchStatusAtOnce: false
enableWatchEventListener: false
backoff:
maxTime:
waitState: 60
Expand Down
5 changes: 5 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/healthz"

triggerv1alpha1 "github.com/kubevela/kube-trigger/api/v1alpha1"
velaclient "github.com/kubevela/pkg/controller/client"
"github.com/kubevela/pkg/multicluster"

Expand Down Expand Up @@ -181,6 +182,10 @@ func main() {
)
restConfig.UserAgent = userAgent

if feature.DefaultMutableFeatureGate.Enabled(features.EnableWatchEventListener) {
utilruntime.Must(triggerv1alpha1.AddToScheme(scheme))
}

leaderElectionID := fmt.Sprintf("workflow-%s", strings.ToLower(strings.ReplaceAll(version.VelaVersion, ".", "-")))
mgr, err := ctrl.NewManager(restConfig, ctrl.Options{
Scheme: scheme,
Expand Down
30 changes: 27 additions & 3 deletions controllers/workflowrun_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,26 @@ import (
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/util/feature"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
ctrlEvent "sigs.k8s.io/controller-runtime/pkg/event"
ctrlHandler "sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

triggerv1alpha1 "github.com/kubevela/kube-trigger/api/v1alpha1"
monitorContext "github.com/kubevela/pkg/monitor/context"

"github.com/kubevela/workflow/api/condition"
"github.com/kubevela/workflow/api/v1alpha1"
wfContext "github.com/kubevela/workflow/pkg/context"
"github.com/kubevela/workflow/pkg/cue/packages"
"github.com/kubevela/workflow/pkg/executor"
"github.com/kubevela/workflow/pkg/features"
"github.com/kubevela/workflow/pkg/generator"
"github.com/kubevela/workflow/pkg/monitor/metrics"
"github.com/kubevela/workflow/pkg/types"
Expand Down Expand Up @@ -195,16 +202,27 @@ func (r *WorkflowRunReconciler) matchControllerRequirement(wr *v1alpha1.Workflow

// SetupWithManager sets up the controller with the Manager.
func (r *WorkflowRunReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
builder := ctrl.NewControllerManagedBy(mgr)
if feature.DefaultMutableFeatureGate.Enabled(features.EnableWatchEventListener) {
builder = builder.Watches(&source.Kind{
Type: &triggerv1alpha1.EventListener{},
}, ctrlHandler.EnqueueRequestsFromMapFunc(findObjectForEventListener))
}
return builder.
WithOptions(controller.Options{
MaxConcurrentReconciles: r.ConcurrentReconciles,
}).
WithEventFilter(predicate.Funcs{
// filter the changes in workflow status
// let workflow handle its reconcile
UpdateFunc: func(e ctrlEvent.UpdateEvent) bool {
new := e.ObjectNew.DeepCopyObject().(*v1alpha1.WorkflowRun)
old := e.ObjectOld.DeepCopyObject().(*v1alpha1.WorkflowRun)
new, isNewWR := e.ObjectNew.DeepCopyObject().(*v1alpha1.WorkflowRun)
old, isOldWR := e.ObjectOld.DeepCopyObject().(*v1alpha1.WorkflowRun)

// if the object is a event listener, reconcile the controller
if !isNewWR || !isOldWR {
return true
}

// if the workflow is finished, skip the reconcile
if new.Status.Finished {
Expand Down Expand Up @@ -278,3 +296,9 @@ func timeReconcile(wr *v1alpha1.WorkflowRun) func() {
metrics.WorkflowRunReconcileTimeHistogram.WithLabelValues(beginPhase, string(wr.Status.Phase)).Observe(v)
}
}

func findObjectForEventListener(object client.Object) []reconcile.Request {
return []reconcile.Request{{
NamespacedName: k8stypes.NamespacedName{Name: object.GetName(), Namespace: object.GetNamespace()},
}}
}
8 changes: 5 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
github.com/google/go-cmp v0.5.9
github.com/hashicorp/go-version v1.6.0
github.com/kubevela/kube-trigger v0.0.1-alpha.2.0.20230206040152-1f8885e6d9b4
github.com/kubevela/pkg v0.0.0-20230206074514-7c05c32743e8
github.com/oam-dev/kubevela v1.7.2
github.com/onsi/ginkgo v1.16.5
Expand Down Expand Up @@ -87,6 +88,7 @@ require (
github.com/fatih/color v1.13.0 // indirect
github.com/felixge/httpsnoop v1.0.1 // indirect
github.com/form3tech-oss/jwt-go v3.2.3+incompatible // indirect
github.com/frankban/quicktest v1.11.3 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/getkin/kin-openapi v0.94.0 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
Expand Down Expand Up @@ -140,7 +142,7 @@ require (
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/go-wordwrap v1.0.1 // indirect
github.com/mitchellh/hashstructure/v2 v2.0.1 // indirect
github.com/mitchellh/hashstructure/v2 v2.0.2 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/moby/locker v1.0.1 // indirect
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 // indirect
Expand Down Expand Up @@ -203,15 +205,15 @@ require (
go.uber.org/zap v1.21.0 // indirect
golang.org/x/crypto v0.4.0 // indirect
golang.org/x/net v0.3.0 // indirect
golang.org/x/oauth2 v0.0.0-20220622183110-fd043fe589d2 // indirect
golang.org/x/oauth2 v0.0.0-20220722155238-128564f6959c // indirect
golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde // indirect
golang.org/x/sys v0.3.0 // indirect
golang.org/x/term v0.3.0 // indirect
golang.org/x/text v0.5.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220628213854-d9e0b6570c03 // indirect
google.golang.org/grpc v1.48.0 // indirect
google.golang.org/protobuf v1.28.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
Expand Down
16 changes: 10 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,9 @@ github.com/form3tech-oss/jwt-go v3.2.3+incompatible h1:7ZaBxOI7TMoYBfyA3cQHErNNy
github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/franela/goblin v0.0.0-20210519012713-85d372ac71e2/go.mod h1:VzmDKDJVZI3aJmnRI9VjAn9nJ8qPPsN1fqzr9dqInIo=
github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20=
github.com/frankban/quicktest v1.10.2 h1:19ARM85nVi4xH7xPXuc5eM/udya5ieh7b/Sv+d844Tk=
github.com/frankban/quicktest v1.10.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s=
github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY=
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU=
Expand Down Expand Up @@ -792,6 +793,8 @@ github.com/kr/pty v1.1.8 h1:AkaSdXYQOWeaO3neb8EM634ahkXXe3jYbVh/F9lq+GI=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubevela/kube-trigger v0.0.1-alpha.2.0.20230206040152-1f8885e6d9b4 h1:nz8HtrL3LlT2lJzhG5RclgXCZ9yhU6Yo8co/4tqVRuU=
github.com/kubevela/kube-trigger v0.0.1-alpha.2.0.20230206040152-1f8885e6d9b4/go.mod h1:l0pcbDNxDd/UkEgeqJSXUFVcZ8K2MRRNwX6c9ySM/HM=
github.com/kubevela/pkg v0.0.0-20230206074514-7c05c32743e8 h1:jWkEQVVovRqONGoJ+WHzDlsvJQEkmTMvcer40UbsEXw=
github.com/kubevela/pkg v0.0.0-20230206074514-7c05c32743e8/go.mod h1:zJTitvYbj1Vg4l4FvqjDRJEjufT6GRKs8m+fY3V9d3E=
github.com/kubevela/prism v1.7.0-alpha.1 h1:oeZFn1Oy6gxSSFzMTfsWjLOCKaaooMVm1JGNK4j4Mlo=
Expand Down Expand Up @@ -891,8 +894,8 @@ github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUb
github.com/mitchellh/go-wordwrap v1.0.1 h1:TLuKupo69TCn6TQSyGxwI1EblZZEsQ0vMlAFQflz0v0=
github.com/mitchellh/go-wordwrap v1.0.1/go.mod h1:R62XHJLzvMFRBbcrT7m7WgmE1eOyTSsCt+hzestvNj0=
github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg=
github.com/mitchellh/hashstructure/v2 v2.0.1 h1:L60q1+q7cXE4JeEJJKMnh2brFIe3rZxCihYAB61ypAY=
github.com/mitchellh/hashstructure/v2 v2.0.1/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE=
github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4=
github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE=
github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
Expand Down Expand Up @@ -1533,8 +1536,8 @@ golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j
golang.org/x/oauth2 v0.0.0-20220309155454-6242fa91716a/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc=
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc=
golang.org/x/oauth2 v0.0.0-20220608161450-d0670ef3b1eb/go.mod h1:jaDAt6Dkxork7LmZnYtzbRWj0W47D86a3TGe0YHBvmE=
golang.org/x/oauth2 v0.0.0-20220622183110-fd043fe589d2 h1:+jnHzr9VPj32ykQVai5DNahi9+NSp7yYuCsl5eAQtL0=
golang.org/x/oauth2 v0.0.0-20220622183110-fd043fe589d2/go.mod h1:jaDAt6Dkxork7LmZnYtzbRWj0W47D86a3TGe0YHBvmE=
golang.org/x/oauth2 v0.0.0-20220722155238-128564f6959c h1:q3gFqPqH7NVofKo3c3yETAP//pPI+G5mvB7qqj1Y5kY=
golang.org/x/oauth2 v0.0.0-20220722155238-128564f6959c/go.mod h1:h4gKUeWbJ4rQPri7E0u6Gs4e9Ri2zaLxzw5DI5XGrYg=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down Expand Up @@ -1933,8 +1936,9 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc h1:2gGKlE2+asNV9m7xrywl36YYNnBG5ZQ0r/BOOxqPpmk=
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc/go.mod h1:m7x9LTH6d71AHyAX77c9yqWCCa3UKHcVEj9y7hAtKDk=
Expand Down
3 changes: 3 additions & 0 deletions pkg/features/controller_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@ const (
EnableBackupWorkflowRecord featuregate.Feature = "EnableBackupWorkflowRecord"
// EnablePatchStatusAtOnce enable patch status at once
EnablePatchStatusAtOnce featuregate.Feature = "EnablePatchStatusAtOnce"
// EnableWatchEventListener enable watch event listener
EnableWatchEventListener featuregate.Feature = "EnableWatchEventListener"
)

var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
EnableSuspendOnFailure: {Default: false, PreRelease: featuregate.Alpha},
EnableBackupWorkflowRecord: {Default: false, PreRelease: featuregate.Alpha},
EnablePatchStatusAtOnce: {Default: false, PreRelease: featuregate.Alpha},
EnableWatchEventListener: {Default: false, PreRelease: featuregate.Alpha},
}

func init() {
Expand Down

0 comments on commit d3cce43

Please sign in to comment.