Skip to content

Commit

Permalink
chore: Handling panic in go routines (#5489)
Browse files Browse the repository at this point in the history
  • Loading branch information
sarabala1979 authored and simster7 committed Apr 5, 2021
1 parent 631e55d commit 051413e
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 3 deletions.
3 changes: 3 additions & 0 deletions cmd/workflow-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/argoproj/pkg/stats"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
runtimeutil "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"

// load authentication plugin for obtaining credentials from cloud providers.
Expand Down Expand Up @@ -51,6 +52,8 @@ func NewRootCommand() *cobra.Command {
Use: CLIName,
Short: "workflow-controller is the controller to operate on workflows",
RunE: func(c *cobra.Command, args []string) error {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

cli.SetLogLevel(logLevel)
cli.SetGLogLevel(glogLevel)
stats.RegisterStackDumper()
Expand Down
3 changes: 3 additions & 0 deletions config/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
runtimeutil "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -74,6 +75,8 @@ func (cc *controller) parseConfigMap(cm *apiv1.ConfigMap) (interface{}, error) {
}

func (cc *controller) Run(stopCh <-chan struct{}, onChange func(config interface{}) error) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

restClient := cc.kubeclientset.CoreV1().RESTClient()
resource := "configmaps"
fieldSelector := fields.ParseSelectorOrDie(fmt.Sprintf("metadata.name=%s", cc.configMap))
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ require (
k8s.io/client-go v0.17.8
k8s.io/code-generator v0.17.5
k8s.io/kube-openapi v0.0.0-20200410145947-bcb3869e6f29
k8s.io/utils v0.0.0-20200327001022-6496210b90e8
k8s.io/utils v0.0.0-20210305010621-2afb4311ab10
sigs.k8s.io/controller-tools v0.3.0
sigs.k8s.io/yaml v1.2.0
upper.io/db.v3 v3.6.3+incompatible
Expand Down
7 changes: 5 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logr/logr v0.1.0 h1:M1Tv3VzNlEHg6uyACnRdtrploV2P7wZqH8BoQMtz0cg=
github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
github.com/go-openapi/analysis v0.0.0-20180825180245-b006789cd277/go.mod h1:k70tL6pCuVxPJOHXQ+wIac1FUrvNkHolPie/cLEU6hI=
github.com/go-openapi/analysis v0.17.0/go.mod h1:IowGgpVeD0vNm45So8nr+IcQ3pxVtpRoBWb8PVZO0ik=
Expand Down Expand Up @@ -1143,14 +1144,16 @@ k8s.io/klog v0.0.0-20181102134211-b9b56d5dfc92/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUc
k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8=
k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
k8s.io/klog/v2 v2.0.0 h1:Foj74zO6RbjjP4hBEKjnYtjjAhGg4jNynUdYF6fJrok=
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
k8s.io/kube-openapi v0.0.0-20191107075043-30be4d16710a/go.mod h1:1TqjTSzOxsLGIKfj0lK8EeCP7K1iUG65v09OM0/WG5E=
k8s.io/kube-openapi v0.0.0-20200316234421-82d701f24f9d h1:jocF7XFucw2pEiv2wS7wk2FRFCjDFGV1oa4TMs0SAT0=
k8s.io/kube-openapi v0.0.0-20200316234421-82d701f24f9d/go.mod h1:F+5wygcW0wmRTnM3cOgIqGivxkwSWIWT5YdsDbeAOaU=
k8s.io/kube-openapi v0.0.0-20200410145947-bcb3869e6f29 h1:NeQXVJ2XFSkRoPzRo8AId01ZER+j8oV4SZADT4iBOXQ=
k8s.io/kube-openapi v0.0.0-20200410145947-bcb3869e6f29/go.mod h1:F+5wygcW0wmRTnM3cOgIqGivxkwSWIWT5YdsDbeAOaU=
k8s.io/utils v0.0.0-20191114184206-e782cd3c129f/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
k8s.io/utils v0.0.0-20200327001022-6496210b90e8 h1:6JFbaLjRyBz8K2Jvt+pcT+N3vvwMZfg8MfVENwe9aag=
k8s.io/utils v0.0.0-20200327001022-6496210b90e8/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
k8s.io/utils v0.0.0-20210305010621-2afb4311ab10 h1:u5rPykqiCpL+LBfjRkXvnK71gOgIdmq3eHUEkPrbeTI=
k8s.io/utils v0.0.0-20210305010621-2afb4311ab10/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw=
modernc.org/golex v1.0.0/go.mod h1:b/QX9oBD/LhixY6NDh+IdGv17hgB+51fET1i2kPSmvk=
modernc.org/mathutil v1.0.0/go.mod h1:wU0vUrJsVWBZ4P6e7xtFJEhFSNsfRLJ8H458uRjg03k=
Expand Down
11 changes: 11 additions & 0 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
runtimeutil "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -152,6 +153,7 @@ func (wfc *WorkflowController) newThrottler() sync.Throttler {
// RunTTLController runs the workflow TTL controller
func (wfc *WorkflowController) runTTLController(ctx context.Context, workflowTTLWorkers int) {
ttlCtrl := ttlcontroller.NewController(wfc.wfclientset, wfc.wfInformer)
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)
err := ttlCtrl.Run(ctx.Done(), workflowTTLWorkers)
if err != nil {
panic(err)
Expand All @@ -160,6 +162,7 @@ func (wfc *WorkflowController) runTTLController(ctx context.Context, workflowTTL

func (wfc *WorkflowController) runCronController(ctx context.Context) {
cronController := cron.NewCronController(wfc.wfclientset, wfc.dynamicInterface, wfc.namespace, wfc.GetManagedNamespace(), wfc.Config.InstanceID, wfc.metrics, wfc.eventRecorderManager)
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)
cronController.Run(ctx)
}

Expand All @@ -173,6 +176,7 @@ var indexers = cache.Indexers{

// Run starts an Workflow resource controller
func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWorkers, podWorkers int) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)
defer wfc.wfQueue.ShutDown()
defer wfc.podQueue.ShutDown()

Expand Down Expand Up @@ -217,6 +221,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
for i := 0; i < podWorkers; i++ {
go wait.Until(wfc.podWorker, time.Second, ctx.Done())
}

<-ctx.Done()
}

Expand Down Expand Up @@ -274,6 +279,8 @@ func (wfc *WorkflowController) createSynchronizationManager() error {
}

func (wfc *WorkflowController) runConfigMapWatcher(stopCh <-chan struct{}) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

retryWatcher, err := apiwatch.NewRetryWatcher("1", &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return wfc.kubeclientset.CoreV1().ConfigMaps(wfc.managedNamespace).Watch(metav1.ListOptions{})
Expand Down Expand Up @@ -406,6 +413,7 @@ func (wfc *WorkflowController) workflowGarbageCollector(stopCh <-chan struct{})
log.WithFields(log.Fields{"err": err, "value": value}).Fatal("Failed to parse WORKFLOW_GC_PERIOD")
}
}
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)
log.Infof("Performing periodic GC every %v", periodicity)
ticker := time.NewTicker(periodicity)
for {
Expand Down Expand Up @@ -462,6 +470,7 @@ func (wfc *WorkflowController) archivedWorkflowGarbageCollector(stopCh <-chan st
log.WithFields(log.Fields{"err": err, "value": value}).Fatal("Failed to parse ARCHIVED_WORKFLOW_GC_PERIOD")
}
}
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)
if wfc.Config.Persistence == nil {
log.Info("Persistence disabled - so archived workflow GC disabled - you must restart the controller if you enable this")
return
Expand Down Expand Up @@ -493,6 +502,7 @@ func (wfc *WorkflowController) archivedWorkflowGarbageCollector(stopCh <-chan st
}

func (wfc *WorkflowController) runWorker() {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)
for wfc.processNextItem() {
}
}
Expand Down Expand Up @@ -949,6 +959,7 @@ func (wfc *WorkflowController) isArchivable(wf *wfv1.Workflow) bool {
}

func (wfc *WorkflowController) syncWorkflowPhaseMetrics() {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)
for _, phase := range []wfv1.NodePhase{wfv1.NodePending, wfv1.NodeRunning, wfv1.NodeSucceeded, wfv1.NodeFailed, wfv1.NodeError} {
objs, err := wfc.wfInformer.GetIndexer().ByIndex(indexes.WorkflowPhaseIndex, string(phase))
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions workflow/cron/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
runtimeutil "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
Expand Down Expand Up @@ -71,6 +72,7 @@ func NewCronController(wfclientset versioned.Interface, dynamicInterface dynamic
}

func (cc *Controller) Run(ctx context.Context) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)
defer cc.cronWfQueue.ShutDown()
defer cc.wfQueue.ShutDown()
log.Infof("Starting CronWorkflow controller")
Expand Down Expand Up @@ -109,6 +111,8 @@ func (cc *Controller) runCronWorker() {
}

func (cc *Controller) processNextCronItem() bool {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

key, quit := cc.cronWfQueue.Get()
if quit {
return false
Expand Down Expand Up @@ -207,6 +211,7 @@ func (cc *Controller) addCronWorkflowInformerHandler() {
}

func (cc *Controller) syncAll() {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)
log.Debug("Syncing all CronWorkflows")

workflows, err := cc.wfLister.List()
Expand Down
3 changes: 3 additions & 0 deletions workflow/metrics/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
runtimeutil "k8s.io/apimachinery/pkg/util/runtime"
)

// RunServer starts a metrics server
func (m *Metrics) RunServer(ctx context.Context) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

if !m.metricsConfig.Enabled {
// If metrics aren't enabled, return
return
Expand Down

0 comments on commit 051413e

Please sign in to comment.