Skip to content

Commit

Permalink
feat(metrics): Report controller error counters via metrics. Closes #…
Browse files Browse the repository at this point in the history
  • Loading branch information
simster7 committed Jun 3, 2020
1 parent 8831e4e commit edfa5b9
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 3 deletions.
2 changes: 1 addition & 1 deletion workflow/controller/controller.go
Expand Up @@ -133,7 +133,7 @@ func (wfc *WorkflowController) runTTLController(ctx context.Context) {
}

func (wfc *WorkflowController) runCronController(ctx context.Context) {
cronController := cron.NewCronController(wfc.wfclientset, wfc.restConfig, wfc.namespace, wfc.GetManagedNamespace(), wfc.Config.InstanceID)
cronController := cron.NewCronController(wfc.wfclientset, wfc.restConfig, wfc.namespace, wfc.GetManagedNamespace(), wfc.Config.InstanceID, &wfc.metrics)
cronController.Run(ctx)
}

Expand Down
1 change: 1 addition & 0 deletions workflow/controller/operator.go
Expand Up @@ -164,6 +164,7 @@ func (woc *wfOperationCtx) operate() {
} else {
woc.markWorkflowPhase(wfv1.NodeError, true, fmt.Sprintf("%v", r))
}
woc.controller.metrics.OperationPanic()
woc.log.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
}
}()
Expand Down
35 changes: 35 additions & 0 deletions workflow/controller/operator_test.go
Expand Up @@ -7,6 +7,8 @@ import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -3692,3 +3694,36 @@ func TestNoOnExitWhenSkipped(t *testing.T) {
woc.operate()
assert.Nil(t, woc.getNodeByName("B.onExit"))
}

// This tests that we don't wait a backoff if it would exceed the maxDuration anyway.
func TestPanicMetric(t *testing.T) {
wf := unmarshalWF(noOnExitWhenSkipped)
woc := newWoc(*wf)

// This should make the call to "operate" panic
woc.preExecutionNodePhases = nil
woc.operate()

metricsChan := make(chan prometheus.Metric)
go func() {
woc.controller.metrics.Collect(metricsChan)
close(metricsChan)
}()

seen := false
for {
metric, ok := <-metricsChan
if !ok {
break
}
if strings.Contains(metric.Desc().String(), "OperationPanic") {
seen = true
var writtenMetric dto.Metric
err := metric.Write(&writtenMetric)
if assert.NoError(t, err) {
assert.Equal(t, float64(1), *writtenMetric.Counter.Value)
}
}
}
assert.True(t, seen)
}
6 changes: 5 additions & 1 deletion workflow/cron/controller.go
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/argoproj/argo/pkg/client/informers/externalversions"
extv1alpha1 "github.com/argoproj/argo/pkg/client/informers/externalversions/workflow/v1alpha1"
"github.com/argoproj/argo/workflow/common"
"github.com/argoproj/argo/workflow/metrics"
"github.com/argoproj/argo/workflow/util"
)

Expand All @@ -40,6 +41,7 @@ type Controller struct {
cronWfInformer extv1alpha1.CronWorkflowInformer
cronWfQueue workqueue.RateLimitingInterface
restConfig *rest.Config
metrics *metrics.Metrics
}

const (
Expand All @@ -54,6 +56,7 @@ func NewCronController(
namespace string,
managedNamespace string,
instanceId string,
metrics *metrics.Metrics,
) *Controller {
return &Controller{
wfClientset: wfclientset,
Expand All @@ -66,6 +69,7 @@ func NewCronController(
nameEntryIDMapLock: &sync.Mutex{},
wfQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
cronWfQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
metrics: metrics,
}
}

Expand Down Expand Up @@ -142,7 +146,7 @@ func (cc *Controller) processNextCronItem() bool {
return true
}

cronWorkflowOperationCtx, err := newCronWfOperationCtx(cronWf, cc.wfClientset, cc.wfLister)
cronWorkflowOperationCtx, err := newCronWfOperationCtx(cronWf, cc.wfClientset, cc.wfLister, cc.metrics)
if err != nil {
log.Error(err)
return true
Expand Down
6 changes: 5 additions & 1 deletion workflow/cron/operator.go
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/argoproj/argo/pkg/client/clientset/versioned"
typed "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"github.com/argoproj/argo/workflow/common"
"github.com/argoproj/argo/workflow/metrics"
"github.com/argoproj/argo/workflow/util"
)

Expand All @@ -28,9 +29,10 @@ type cronWfOperationCtx struct {
wfLister util.WorkflowLister
cronWfIf typed.CronWorkflowInterface
log *log.Entry
metrics *metrics.Metrics
}

func newCronWfOperationCtx(cronWorkflow *v1alpha1.CronWorkflow, wfClientset versioned.Interface, wfLister util.WorkflowLister) (*cronWfOperationCtx, error) {
func newCronWfOperationCtx(cronWorkflow *v1alpha1.CronWorkflow, wfClientset versioned.Interface, wfLister util.WorkflowLister, metrics *metrics.Metrics) (*cronWfOperationCtx, error) {
return &cronWfOperationCtx{
name: cronWorkflow.ObjectMeta.Name,
cronWf: cronWorkflow,
Expand All @@ -42,6 +44,7 @@ func newCronWfOperationCtx(cronWorkflow *v1alpha1.CronWorkflow, wfClientset vers
"workflow": cronWorkflow.ObjectMeta.Name,
"namespace": cronWorkflow.ObjectMeta.Namespace,
}),
metrics: metrics,
}, nil
}

Expand Down Expand Up @@ -310,5 +313,6 @@ func (woc *cronWfOperationCtx) reportCronWorkflowError(errString string) {
Message: errString,
Status: v1.ConditionTrue,
})
woc.metrics.CronWorkflowSubmissionError()
woc.persistUpdate()
}
3 changes: 3 additions & 0 deletions workflow/cron/operator_test.go
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/pkg/client/clientset/versioned/fake"
"github.com/argoproj/argo/workflow/metrics"
"github.com/argoproj/argo/workflow/util"
)

Expand Down Expand Up @@ -167,13 +168,15 @@ func TestCronWorkflowConditionSubmissionError(t *testing.T) {
assert.NoError(t, err)

cs := fake.NewSimpleClientset()
testMetrics := metrics.New(metrics.ServerConfig{}, metrics.ServerConfig{})
woc := &cronWfOperationCtx{
wfClientset: cs,
wfClient: cs.ArgoprojV1alpha1().Workflows(""),
cronWfIf: cs.ArgoprojV1alpha1().CronWorkflows(""),
wfLister: &fakeLister{},
cronWf: &cronWf,
log: logrus.WithFields(logrus.Fields{}),
metrics: &testMetrics,
}
woc.Run()

Expand Down
20 changes: 20 additions & 0 deletions workflow/metrics/metrics.go
Expand Up @@ -40,6 +40,7 @@ type Metrics struct {
workflowsProcessed prometheus.Counter
workflowsByPhase map[v1alpha1.NodePhase]prometheus.Gauge
operationDurations prometheus.Histogram
errors map[ErrorCause]prometheus.Counter
customMetrics map[string]metric

// Used to quickly check if a metric desc is already used by the system
Expand All @@ -55,6 +56,7 @@ func New(metricsConfig, telemetryConfig ServerConfig) Metrics {
workflowsProcessed: newCounter("workflows_processed_count", "Number of workflow updates processed", nil),
workflowsByPhase: getWorkflowPhaseGauges(),
operationDurations: newHistogram("operation_duration_seconds", "Histogram of durations of operations", nil, []float64{0.1, 0.25, 0.5, 0.75, 1.0, 1.25, 1.5, 1.75, 2.0, 2.5, 3.0}),
errors: getErrorCounters(),
customMetrics: make(map[string]metric),
defaultMetricDescs: make(map[string]bool),
}
Expand All @@ -74,6 +76,9 @@ func (m Metrics) allMetrics() []prometheus.Metric {
for _, metric := range m.workflowsByPhase {
allMetrics = append(allMetrics, metric)
}
for _, metric := range m.errors {
allMetrics = append(allMetrics, metric)
}
for _, metric := range m.customMetrics {
allMetrics = append(allMetrics, metric.metric)
}
Expand Down Expand Up @@ -114,3 +119,18 @@ func (m Metrics) UpsertCustomMetric(key string, newMetric prometheus.Metric) err
m.customMetrics[key] = metric{metric: newMetric, lastUpdated: time.Now()}
return nil
}

type ErrorCause string

const (
ErrorCauseOperationPanic ErrorCause = "OperationPanic"
ErrorCauseCronWorkflowSubmissionError ErrorCause = "CronWorkflowSubmissionError"
)

func (m Metrics) OperationPanic() {
m.errors[ErrorCauseOperationPanic].Inc()
}

func (m Metrics) CronWorkflowSubmissionError() {
m.errors[ErrorCauseCronWorkflowSubmissionError].Inc()
}
16 changes: 16 additions & 0 deletions workflow/metrics/util.go
Expand Up @@ -150,6 +150,22 @@ func getWorkflowPhaseGauges() map[wfv1.NodePhase]prometheus.Gauge {
}
}

func getErrorCounters() map[ErrorCause]prometheus.Counter {
getOptsByPahse := func(phase ErrorCause) prometheus.CounterOpts {
return prometheus.CounterOpts{
Namespace: argoNamespace,
Subsystem: workflowsSubsystem,
Name: "error_count",
Help: "Number of errors encountered by the controller by cause",
ConstLabels: map[string]string{"cause": string(phase)},
}
}
return map[ErrorCause]prometheus.Counter{
ErrorCauseOperationPanic: prometheus.NewCounter(getOptsByPahse(ErrorCauseOperationPanic)),
ErrorCauseCronWorkflowSubmissionError: prometheus.NewCounter(getOptsByPahse(ErrorCauseCronWorkflowSubmissionError)),
}
}

func IsValidMetricName(name string) bool {
return model.IsValidMetricName(model.LabelValue(name))
}

0 comments on commit edfa5b9

Please sign in to comment.