Skip to content

Commit

Permalink
feat: Worker busy and active pod metrics (#4823)
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Behar <simbeh7@gmail.com>
  • Loading branch information
simster7 committed Jan 7, 2021
1 parent 53110b6 commit 6b3ce50
Show file tree
Hide file tree
Showing 13 changed files with 224 additions and 31 deletions.
22 changes: 18 additions & 4 deletions workflow/controller/controller.go
Expand Up @@ -139,10 +139,10 @@ func NewWorkflowController(restConfig *rest.Config, kubeclientset kubernetes.Int
wfc.metrics = metrics.New(wfc.getMetricsServerConfig())

workqueue.SetProvider(wfc.metrics) // must execute SetProvider before we created the queues
wfc.wfQueue = workqueue.NewNamedRateLimitingQueue(&fixedItemIntervalRateLimiter{}, "workflow_queue")
wfc.wfQueue = wfc.metrics.RateLimiterWithBusyWorkers(&fixedItemIntervalRateLimiter{}, "workflow_queue")
wfc.throttler = wfc.newThrottler()
wfc.podQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pod_queue")
wfc.podCleanupQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pod_cleanup_queue")
wfc.podQueue = wfc.metrics.RateLimiterWithBusyWorkers(workqueue.DefaultControllerRateLimiter(), "pod_queue")
wfc.podCleanupQueue = wfc.metrics.RateLimiterWithBusyWorkers(workqueue.DefaultControllerRateLimiter(), "pod_cleanup_queue")

return &wfc, nil
}
Expand All @@ -153,7 +153,7 @@ func (wfc *WorkflowController) newThrottler() sync.Throttler {

// RunTTLController runs the workflow TTL controller
func (wfc *WorkflowController) runTTLController(ctx context.Context, workflowTTLWorkers int) {
ttlCtrl := ttlcontroller.NewController(wfc.wfclientset, wfc.wfInformer)
ttlCtrl := ttlcontroller.NewController(wfc.wfclientset, wfc.wfInformer, wfc.metrics)
err := ttlCtrl.Run(ctx.Done(), workflowTTLWorkers)
if err != nil {
panic(err)
Expand Down Expand Up @@ -239,6 +239,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
go wfc.runCronController(ctx)
go wfc.metrics.RunServer(ctx)
go wait.Until(wfc.syncWorkflowPhaseMetrics, 15*time.Second, ctx.Done())
go wait.Until(wfc.syncPodPhaseMetrics, 15*time.Second, ctx.Done())

for i := 0; i < wfWorkers; i++ {
go wait.Until(wfc.runWorker, time.Second, ctx.Done())
Expand Down Expand Up @@ -401,6 +402,7 @@ func (wfc *WorkflowController) processNextPodCleanupItem() bool {
return false
}
defer wfc.podCleanupQueue.Done(key)

namespace, podName, action := parsePodCleanupKey(key.(podCleanupKey))
logCtx := log.WithFields(log.Fields{"key": key, "action": action})
logCtx.Info("cleaning up pod")
Expand Down Expand Up @@ -853,6 +855,7 @@ func (wfc *WorkflowController) newPodInformer() cache.SharedIndexInformer {
source := wfc.newWorkflowPodWatch()
informer := cache.NewSharedIndexInformer(source, &apiv1.Pod{}, podResyncPeriod, cache.Indexers{
indexes.WorkflowIndex: indexes.MetaWorkflowIndexFunc,
indexes.PodPhaseIndex: indexes.PodPhaseIndexFunc,
})
informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -991,3 +994,14 @@ func (wfc *WorkflowController) syncWorkflowPhaseMetrics() {
wfc.metrics.SetWorkflowPhaseGauge(phase, len(objs))
}
}

func (wfc *WorkflowController) syncPodPhaseMetrics() {
for _, phase := range []apiv1.PodPhase{apiv1.PodRunning, apiv1.PodPending} {
objs, err := wfc.podInformer.GetIndexer().ByIndex(indexes.PodPhaseIndex, string(phase))
if err != nil {
log.WithError(err).Error("failed to list active pods")
return
}
wfc.metrics.SetPodPhaseGauge(phase, len(objs))
}
}
1 change: 1 addition & 0 deletions workflow/controller/indexes/indexes.go
Expand Up @@ -12,5 +12,6 @@ const (
WorkflowIndex = "workflow"
WorkflowTemplateIndex = "workflowtemplate"
WorkflowPhaseIndex = "workflow.phase"
PodPhaseIndex = "pod.phase"
SemaphoreConfigIndexName = "bySemaphoreConfigMap"
)
14 changes: 14 additions & 0 deletions workflow/controller/indexes/pod_index.go
@@ -0,0 +1,14 @@
package indexes

import (
corev1 "k8s.io/api/core/v1"
)

func PodPhaseIndexFunc(obj interface{}) ([]string, error) {
pod, ok := obj.(*corev1.Pod)

if !ok {
return nil, nil
}
return []string{string(pod.Status.Phase)}, nil
}
23 changes: 23 additions & 0 deletions workflow/controller/indexes/pod_index_test.go
@@ -0,0 +1,23 @@
package indexes

import (
"testing"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
)

func TestMetaPodPhaseIndexFunc(t *testing.T) {
t.Run("NoPhase", func(t *testing.T) {
values, err := PodPhaseIndexFunc(&corev1.Pod{})
assert.NoError(t, err)
assert.Equal(t, []string{""}, values)
})
t.Run("Phase", func(t *testing.T) {
values, err := PodPhaseIndexFunc(&corev1.Pod{
Status: corev1.PodStatus{Phase: corev1.PodRunning},
})
assert.NoError(t, err)
assert.ElementsMatch(t, values, []string{string(corev1.PodRunning)})
})
}
6 changes: 3 additions & 3 deletions workflow/controller/operator.go
Expand Up @@ -924,7 +924,7 @@ func (woc *wfOperationCtx) podReconciliation() error {
// node is not a pod, it is already complete, or it can be re-run.
continue
}
if _, ok := seenPods[nodeID]; !ok {
if seenPod, ok := seenPods[nodeID]; !ok {

// grace-period to allow informer sync
recentlyStarted := recentlyStarted(node)
Expand All @@ -948,8 +948,8 @@ func (woc *wfOperationCtx) podReconciliation() error {
} else {
// At this point we are certain that the pod associated with our node is running or has been run;
// it is safe to extract the k8s-node information given this knowledge.
if node.HostNodeName != seenPods[nodeID].Spec.NodeName {
node.HostNodeName = seenPods[nodeID].Spec.NodeName
if node.HostNodeName != seenPod.Spec.NodeName {
node.HostNodeName = seenPod.Spec.NodeName
woc.wf.Status.Nodes[nodeID] = node
woc.updated = true
}
Expand Down
4 changes: 2 additions & 2 deletions workflow/cron/controller.go
Expand Up @@ -63,8 +63,8 @@ func NewCronController(wfclientset versioned.Interface, dynamicInterface dynamic
cron: newCronFacade(),
keyLock: sync.NewKeyLock(),
dynamicInterface: dynamicInterface,
wfQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "wf_cron_queue"),
cronWfQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cron_wf_queue"),
wfQueue: metrics.RateLimiterWithBusyWorkers(workqueue.DefaultControllerRateLimiter(), "wf_cron_queue"),
cronWfQueue: metrics.RateLimiterWithBusyWorkers(workqueue.DefaultControllerRateLimiter(), "cron_wf_queue"),
metrics: metrics,
eventRecorderManager: eventRecorderManager,
}
Expand Down
18 changes: 18 additions & 0 deletions workflow/metrics/metrics.go
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/workqueue"

"github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
Expand Down Expand Up @@ -43,12 +44,14 @@ type Metrics struct {
telemetryConfig ServerConfig

workflowsProcessed prometheus.Counter
podsByPhase map[corev1.PodPhase]prometheus.Gauge
workflowsByPhase map[v1alpha1.NodePhase]prometheus.Gauge
workflows map[string][]string
operationDurations prometheus.Histogram
errors map[ErrorCause]prometheus.Counter
customMetrics map[string]metric
workqueueMetrics map[string]prometheus.Metric
workersBusy map[string]prometheus.Gauge

// Used to quickly check if a metric desc is already used by the system
defaultMetricDescs map[string]bool
Expand All @@ -72,12 +75,14 @@ func New(metricsConfig, telemetryConfig ServerConfig) *Metrics {
metricsConfig: metricsConfig,
telemetryConfig: telemetryConfig,
workflowsProcessed: newCounter("workflows_processed_count", "Number of workflow updates processed", nil),
podsByPhase: getPodPhaseGauges(),
workflowsByPhase: getWorkflowPhaseGauges(),
workflows: make(map[string][]string),
operationDurations: newHistogram("operation_duration_seconds", "Histogram of durations of operations", nil, []float64{0.1, 0.25, 0.5, 0.75, 1.0, 1.25, 1.5, 1.75, 2.0, 2.5, 3.0}),
errors: getErrorCounters(),
customMetrics: make(map[string]metric),
workqueueMetrics: make(map[string]prometheus.Metric),
workersBusy: make(map[string]prometheus.Gauge),
defaultMetricDescs: make(map[string]bool),
metricNameHelps: make(map[string]string),
logMetric: prometheus.NewCounterVec(prometheus.CounterOpts{
Expand Down Expand Up @@ -110,12 +115,18 @@ func (m *Metrics) allMetrics() []prometheus.Metric {
for _, metric := range m.workflowsByPhase {
allMetrics = append(allMetrics, metric)
}
for _, metric := range m.podsByPhase {
allMetrics = append(allMetrics, metric)
}
for _, metric := range m.errors {
allMetrics = append(allMetrics, metric)
}
for _, metric := range m.workqueueMetrics {
allMetrics = append(allMetrics, metric)
}
for _, metric := range m.workersBusy {
allMetrics = append(allMetrics, metric)
}
for _, metric := range m.customMetrics {
allMetrics = append(allMetrics, metric.metric)
}
Expand Down Expand Up @@ -184,6 +195,13 @@ func (m *Metrics) SetWorkflowPhaseGauge(phase v1alpha1.NodePhase, num int) {
m.workflowsByPhase[phase].Set(float64(num))
}

func (m *Metrics) SetPodPhaseGauge(phase corev1.PodPhase, num int) {
m.mutex.Lock()
defer m.mutex.Unlock()

m.podsByPhase[phase].Set(float64(num))
}

type ErrorCause string

const (
Expand Down
26 changes: 14 additions & 12 deletions workflow/metrics/metrics_test.go
Expand Up @@ -5,13 +5,23 @@ import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"k8s.io/client-go/util/workqueue"

"github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
)

func write(metric prometheus.Metric) dto.Metric {
var m dto.Metric
err := metric.Write(&m)
if err != nil {
panic(err)
}
return m
}

func TestServerConfig_SameServerAs(t *testing.T) {
a := ServerConfig{
Enabled: true,
Expand Down Expand Up @@ -48,16 +58,12 @@ func TestMetrics(t *testing.T) {
}
m := New(config, config)

var metric dto.Metric
m.OperationCompleted(0.05)
err := m.operationDurations.Write(&metric)
if assert.NoError(t, err) {
assert.Equal(t, uint64(1), *metric.Histogram.Bucket[0].CumulativeCount)
}
assert.Equal(t, uint64(1), *write(m.operationDurations).Histogram.Bucket[0].CumulativeCount)

assert.Nil(t, m.GetCustomMetric("does-not-exist"))

err = m.UpsertCustomMetric("metric", "", newCounter("test", "test", nil), false)
err := m.UpsertCustomMetric("metric", "", newCounter("test", "test", nil), false)
if assert.NoError(t, err) {
assert.NotNil(t, m.GetCustomMetric("metric"))
}
Expand All @@ -67,7 +73,7 @@ func TestMetrics(t *testing.T) {

badMetric, err := constructOrUpdateGaugeMetric(nil, &v1alpha1.Prometheus{
Name: "count",
Help: "Number of Workflows currently accessible by the controller by status",
Help: "Number of Workflows currently accessible by the controller by status (refreshed every 15s)",
Labels: []*v1alpha1.MetricLabel{{Key: "status", Value: "Running"}},
Gauge: &v1alpha1.Gauge{
Value: "1",
Expand Down Expand Up @@ -126,11 +132,7 @@ func TestWorkflowQueueMetrics(t *testing.T) {
wfQueue.Add("hello")

if assert.NotNil(t, m.workqueueMetrics["workflow_queue-adds"]) {
var metric dto.Metric
err := m.workqueueMetrics["workflow_queue-adds"].Write(&metric)
if assert.NoError(t, err) {
assert.Equal(t, 1.0, *metric.Counter.Value)
}
assert.Equal(t, 1.0, *write(m.workqueueMetrics["workflow_queue-adds"]).Counter.Value)
}
}

Expand Down
44 changes: 36 additions & 8 deletions workflow/metrics/util.go
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
v1 "k8s.io/api/core/v1"

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
)
Expand Down Expand Up @@ -153,22 +154,39 @@ func newHistogram(name, help string, labels map[string]string, buckets []float64
}

func getWorkflowPhaseGauges() map[wfv1.NodePhase]prometheus.Gauge {
getOptsByPahse := func(phase wfv1.NodePhase) prometheus.GaugeOpts {
getOptsByPhase := func(phase wfv1.NodePhase) prometheus.GaugeOpts {
return prometheus.GaugeOpts{
Namespace: argoNamespace,
Subsystem: workflowsSubsystem,
Name: "count",
Help: "Number of Workflows currently accessible by the controller by status",
Help: "Number of Workflows currently accessible by the controller by status (refreshed every 15s)",
ConstLabels: map[string]string{"status": string(phase)},
}
}
return map[wfv1.NodePhase]prometheus.Gauge{
wfv1.NodePending: prometheus.NewGauge(getOptsByPahse(wfv1.NodePending)),
wfv1.NodeRunning: prometheus.NewGauge(getOptsByPahse(wfv1.NodeRunning)),
wfv1.NodeSucceeded: prometheus.NewGauge(getOptsByPahse(wfv1.NodeSucceeded)),
wfv1.NodeSkipped: prometheus.NewGauge(getOptsByPahse(wfv1.NodeSkipped)),
wfv1.NodeFailed: prometheus.NewGauge(getOptsByPahse(wfv1.NodeFailed)),
wfv1.NodeError: prometheus.NewGauge(getOptsByPahse(wfv1.NodeError)),
wfv1.NodePending: prometheus.NewGauge(getOptsByPhase(wfv1.NodePending)),
wfv1.NodeRunning: prometheus.NewGauge(getOptsByPhase(wfv1.NodeRunning)),
wfv1.NodeSucceeded: prometheus.NewGauge(getOptsByPhase(wfv1.NodeSucceeded)),
wfv1.NodeFailed: prometheus.NewGauge(getOptsByPhase(wfv1.NodeFailed)),
wfv1.NodeError: prometheus.NewGauge(getOptsByPhase(wfv1.NodeError)),
}
}

func getPodPhaseGauges() map[v1.PodPhase]prometheus.Gauge {
getOptsByPhase := func(phase v1.PodPhase) prometheus.GaugeOpts {
return prometheus.GaugeOpts{
Namespace: argoNamespace,
Subsystem: workflowsSubsystem,
Name: "pods_count",
Help: "Number of Pods from Workflows currently accessible by the controller by status (refreshed every 15s)",
ConstLabels: map[string]string{"status": string(phase)},
}
}
return map[v1.PodPhase]prometheus.Gauge{
v1.PodPending: prometheus.NewGauge(getOptsByPhase(v1.PodPending)),
v1.PodRunning: prometheus.NewGauge(getOptsByPhase(v1.PodRunning)),
//v1.PodSucceeded: prometheus.NewGauge(getOptsByPhase(v1.PodSucceeded)),
//v1.PodFailed: prometheus.NewGauge(getOptsByPhase(v1.PodFailed)),
}
}

Expand All @@ -188,6 +206,16 @@ func getErrorCounters() map[ErrorCause]prometheus.Counter {
}
}

func getWorkersBusy(name string) prometheus.Gauge {
return prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: argoNamespace,
Subsystem: workflowsSubsystem,
Name: "workers_busy_count",
Help: "Number of workers currently busy",
ConstLabels: map[string]string{"worker_type": name},
})
}

func IsValidMetricName(name string) bool {
return model.IsValidMetricName(model.LabelValue(name))
}
Expand Down
51 changes: 51 additions & 0 deletions workflow/metrics/work_queue.go
@@ -0,0 +1,51 @@
package metrics

import "k8s.io/client-go/util/workqueue"

type workersBusyRateLimiterWorkQueue struct {
workqueue.RateLimitingInterface
workerType string
metrics *Metrics
}

func (m *Metrics) RateLimiterWithBusyWorkers(workQueue workqueue.RateLimiter, queueName string) workqueue.RateLimitingInterface {
m.newWorker(queueName)
return workersBusyRateLimiterWorkQueue{
RateLimitingInterface: workqueue.NewNamedRateLimitingQueue(workQueue, queueName),
workerType: queueName,
metrics: m,
}
}

func (m *Metrics) newWorker(workerType string) {
m.mutex.Lock()
defer m.mutex.Unlock()

m.workersBusy[workerType] = getWorkersBusy(workerType)
}

func (m *Metrics) workerBusy(workerType string) {
m.mutex.Lock()
defer m.mutex.Unlock()

m.workersBusy[workerType].Inc()
}

func (m *Metrics) workerFree(workerType string) {
m.mutex.Lock()
defer m.mutex.Unlock()

m.workersBusy[workerType].Dec()
}

func (w workersBusyRateLimiterWorkQueue) Get() (interface{}, bool) {
item, shutdown := w.RateLimitingInterface.Get()
w.metrics.workerBusy(w.workerType)
return item, shutdown

}

func (w workersBusyRateLimiterWorkQueue) Done(item interface{}) {
w.RateLimitingInterface.Done(item)
w.metrics.workerFree(w.workerType)
}

0 comments on commit 6b3ce50

Please sign in to comment.