Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Handling panic in go routines #5489

Merged
merged 3 commits into from
Mar 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/workflow-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/argoproj/pkg/stats"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
runtimeutil "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"

// load authentication plugin for obtaining credentials from cloud providers.
Expand Down Expand Up @@ -58,6 +59,8 @@ func NewRootCommand() *cobra.Command {
Use: CLIName,
Short: "workflow-controller is the controller to operate on workflows",
RunE: func(c *cobra.Command, args []string) error {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

cli.SetLogLevel(logLevel)
cmdutil.SetGLogLevel(glogLevel)
cmdutil.SetLogFormatter(logFormat)
Expand Down
3 changes: 3 additions & 0 deletions config/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
runtimeutil "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -74,6 +75,8 @@ func (cc *controller) parseConfigMap(cm *apiv1.ConfigMap) (interface{}, error) {
}

func (cc *controller) Run(stopCh <-chan struct{}, onChange func(config interface{}) error) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

restClient := cc.kubeclientset.CoreV1().RESTClient()
resource := "configmaps"
fieldSelector := fields.ParseSelectorOrDie(fmt.Sprintf("metadata.name=%s", cc.configMap))
Expand Down
20 changes: 20 additions & 0 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
runtimeutil "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -157,6 +158,8 @@ func (wfc *WorkflowController) newThrottler() sync.Throttler {

// RunTTLController runs the workflow TTL controller
func (wfc *WorkflowController) runTTLController(ctx context.Context, workflowTTLWorkers int) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

ttlCtrl := ttlcontroller.NewController(wfc.wfclientset, wfc.wfInformer, wfc.metrics)
err := ttlCtrl.Run(ctx.Done(), workflowTTLWorkers)
if err != nil {
Expand All @@ -165,6 +168,8 @@ func (wfc *WorkflowController) runTTLController(ctx context.Context, workflowTTL
}

func (wfc *WorkflowController) runCronController(ctx context.Context) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

cronController := cron.NewCronController(wfc.wfclientset, wfc.dynamicInterface, wfc.wfInformer, wfc.namespace, wfc.GetManagedNamespace(), wfc.Config.InstanceID, wfc.metrics, wfc.eventRecorderManager)
cronController.Run(ctx)
}
Expand All @@ -180,6 +185,7 @@ var indexers = cache.Indexers{

// Run starts an Workflow resource controller
func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWorkers, podWorkers, podCleanupWorkers int) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)
defer wfc.wfQueue.ShutDown()
defer wfc.podQueue.ShutDown()
defer wfc.podCleanupQueue.ShutDown()
Expand Down Expand Up @@ -236,6 +242,8 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
RetryPeriod: env.LookupEnvDurationOr("LEADER_ELECTION_RETRY_PERIOD", 5*time.Second),
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

logCtx.Info("started leading")
ctx, cancel = context.WithCancel(ctx)

Expand Down Expand Up @@ -335,6 +343,8 @@ func (wfc *WorkflowController) createSynchronizationManager(ctx context.Context)
}

func (wfc *WorkflowController) runConfigMapWatcher(stopCh <-chan struct{}) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

ctx := context.Background()
retryWatcher, err := apiwatch.NewRetryWatcher("1", &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
Expand Down Expand Up @@ -504,6 +514,8 @@ func (wfc *WorkflowController) signalContainers(namespace string, podName string
}

func (wfc *WorkflowController) workflowGarbageCollector(stopCh <-chan struct{}) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

periodicity := env.LookupEnvDurationOr("WORKFLOW_GC_PERIOD", 5*time.Minute)
log.Infof("Performing periodic GC every %v", periodicity)
ticker := time.NewTicker(periodicity)
Expand Down Expand Up @@ -552,6 +564,8 @@ func (wfc *WorkflowController) workflowGarbageCollector(stopCh <-chan struct{})
}

func (wfc *WorkflowController) archivedWorkflowGarbageCollector(stopCh <-chan struct{}) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

periodicity := env.LookupEnvDurationOr("ARCHIVED_WORKFLOW_GC_PERIOD", 24*time.Hour)
if wfc.Config.Persistence == nil {
log.Info("Persistence disabled - so archived workflow GC disabled - you must restart the controller if you enable this")
Expand Down Expand Up @@ -584,6 +598,8 @@ func (wfc *WorkflowController) archivedWorkflowGarbageCollector(stopCh <-chan st
}

func (wfc *WorkflowController) runWorker() {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

ctx := context.Background()
for wfc.processNextItem(ctx) {
}
Expand Down Expand Up @@ -1049,6 +1065,8 @@ func (wfc *WorkflowController) isArchivable(wf *wfv1.Workflow) bool {
}

func (wfc *WorkflowController) syncWorkflowPhaseMetrics() {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

for _, phase := range []wfv1.NodePhase{wfv1.NodePending, wfv1.NodeRunning, wfv1.NodeSucceeded, wfv1.NodeFailed, wfv1.NodeError} {
keys, err := wfc.wfInformer.GetIndexer().IndexKeys(indexes.WorkflowPhaseIndex, string(phase))
errors.CheckError(err)
Expand All @@ -1065,6 +1083,8 @@ func (wfc *WorkflowController) syncWorkflowPhaseMetrics() {
}

func (wfc *WorkflowController) syncPodPhaseMetrics() {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

for _, phase := range []apiv1.PodPhase{apiv1.PodRunning, apiv1.PodPending} {
objs, err := wfc.podInformer.GetIndexer().IndexKeys(indexes.PodPhaseIndex, string(phase))
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions workflow/cron/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
runtimeutil "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
Expand Down Expand Up @@ -69,6 +70,7 @@ func NewCronController(wfclientset versioned.Interface, dynamicInterface dynamic
}

func (cc *Controller) Run(ctx context.Context) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)
defer cc.cronWfQueue.ShutDown()
log.Infof("Starting CronWorkflow controller")
if cc.instanceId != "" {
Expand Down Expand Up @@ -102,6 +104,8 @@ func (cc *Controller) runCronWorker() {
}

func (cc *Controller) processNextCronItem(ctx context.Context) bool {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

key, quit := cc.cronWfQueue.Get()
if quit {
return false
Expand Down Expand Up @@ -200,6 +204,8 @@ func (cc *Controller) addCronWorkflowInformerHandler() {
}

func (cc *Controller) syncAll(ctx context.Context) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

log.Debug("Syncing all CronWorkflows")

workflows, err := cc.wfLister.List()
Expand Down
3 changes: 3 additions & 0 deletions workflow/metrics/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
runtimeutil "k8s.io/apimachinery/pkg/util/runtime"
)

// RunServer starts a metrics server
func (m *Metrics) RunServer(ctx context.Context) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

if !m.metricsConfig.Enabled {
// If metrics aren't enabled, return
return
Expand Down
3 changes: 3 additions & 0 deletions workflow/sync/sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"

log "github.com/sirupsen/logrus"
runtimeutil "k8s.io/apimachinery/pkg/util/runtime"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
)
Expand Down Expand Up @@ -46,6 +47,8 @@ func (cm *Manager) getWorkflowKey(key string) (string, error) {
}

func (cm *Manager) CheckWorkflowExistence() {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

log.Debug("Check the workflow existence")
for _, lock := range cm.syncLockMap {
keys := lock.getCurrentHolders()
Expand Down