Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Use polling model for workflow phase metric #4557

Merged
merged 7 commits into from
Nov 24, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,10 @@ github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY
github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7 h1:uSoVVbwJiQipAclBbw+8quDsfcvFjOpI5iCf4p/cqCs=
github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7/go.mod h1:6zEj6s6u/ghQa61ZWa/C2Aw3RkjiTBOix7dkqa1VLIs=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 h1:Hs82Z41s6SdL1CELW+XaDYmOH4hkBN4/N9og/AsOv7E=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/aliyun/aliyun-oss-go-sdk v2.0.6+incompatible h1:ZDgadcjGIrbHMBLSqQVHkMOdNd/jF6bsSRJd/Ysxlos=
github.com/aliyun/aliyun-oss-go-sdk v2.0.6+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
Expand Down Expand Up @@ -1047,6 +1049,7 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc h1:TnonUr8u3himcMY0vSh23jFOXA+cnucl1gB6EQTReBI=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
64 changes: 19 additions & 45 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,6 @@ import (
"github.com/argoproj/argo/workflow/util"
)

const enoughTimeForInformerSync = 1 * time.Second

const semaphoreConfigIndexName = "bySemaphoreConfigMap"

Comment on lines -58 to -61
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some pork-barrel style changes in this PR

// WorkflowController is the controller for workflow resources
type WorkflowController struct {
// namespace of the workflow controller
Expand Down Expand Up @@ -109,6 +105,7 @@ const (
workflowTemplateResyncPeriod = 20 * time.Minute
podResyncPeriod = 30 * time.Minute
clusterWorkflowTemplateResyncPeriod = 20 * time.Minute
enoughTimeForInformerSync = 1 * time.Second
)

// NewWorkflowController instantiates a new WorkflowController
Expand Down Expand Up @@ -170,7 +167,8 @@ var indexers = cache.Indexers{
indexes.ClusterWorkflowTemplateIndex: indexes.MetaNamespaceLabelIndexFunc(common.LabelKeyClusterWorkflowTemplate),
indexes.CronWorkflowIndex: indexes.MetaNamespaceLabelIndexFunc(common.LabelKeyCronWorkflow),
indexes.WorkflowTemplateIndex: indexes.MetaNamespaceLabelIndexFunc(common.LabelKeyWorkflowTemplate),
semaphoreConfigIndexName: workflowIndexerBySemaphoreKeys,
indexes.SemaphoreConfigIndexName: indexes.WorkflowSemaphoreKeysIndexFunc(),
indexes.WorkflowPhaseIndex: indexes.MetaLabelIndexFunc(common.LabelKeyPhase),
}

// Run starts an Workflow resource controller
Expand Down Expand Up @@ -200,7 +198,9 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, podWorkers in

go wfc.runTTLController(ctx)
go wfc.runCronController(ctx)

go wfc.metrics.RunServer(ctx)
go wait.Until(wfc.syncWorkflowPhaseMetrics, 5*time.Second, ctx.Done())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think 5 seconds is a good balance here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

So real-time for Prometheus means up to 15s old. Plus whatever delay the app has.

Every 15s would mean Prometheus would be up to 30s out of date. @jessesuen I'd like to do as little polling as possible.


wfc.createClusterWorkflowTemplateInformer(ctx)
wfc.waitForCacheSync(ctx)
Expand All @@ -220,20 +220,6 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, podWorkers in
<-ctx.Done()
}

func workflowIndexerBySemaphoreKeys(obj interface{}) ([]string, error) {
un, ok := obj.(*unstructured.Unstructured)
if !ok {
log.Warnf("cannot convert obj into unstructured.Unstructured in Indexer %s", semaphoreConfigIndexName)
return []string{}, nil
}
wf, err := util.FromUnstructured(un)
if err != nil {
log.Warnf("failed to convert to workflow from unstructured: %v", err)
return []string{}, nil
}
return wf.GetSemaphoreKeys(), nil
}

func (wfc *WorkflowController) waitForCacheSync(ctx context.Context) {
// Wait for all involved caches to be synced, before processing items from the queue is started
if !cache.WaitForCacheSync(ctx.Done(), wfc.wfInformer.HasSynced, wfc.wftmplInformer.Informer().HasSynced, wfc.podInformer.HasSynced) {
Expand Down Expand Up @@ -317,15 +303,15 @@ func (wfc *WorkflowController) runConfigMapWatcher(stopCh <-chan struct{}) {

// notifySemaphoreConfigUpdate will notify semaphore config update to pending workflows
func (wfc *WorkflowController) notifySemaphoreConfigUpdate(cm *apiv1.ConfigMap) {
wfs, err := wfc.wfInformer.GetIndexer().ByIndex(semaphoreConfigIndexName, fmt.Sprintf("%s/%s", cm.Namespace, cm.Name))
wfs, err := wfc.wfInformer.GetIndexer().ByIndex(indexes.SemaphoreConfigIndexName, fmt.Sprintf("%s/%s", cm.Namespace, cm.Name))
if err != nil {
log.Errorf("failed get the workflow from informer. %v", err)
}

for _, obj := range wfs {
un, ok := obj.(*unstructured.Unstructured)
if !ok {
log.Warnf("received object from indexer %s is not an unstructured", semaphoreConfigIndexName)
log.Warnf("received object from indexer %s is not an unstructured", indexes.SemaphoreConfigIndexName)
continue
}
wf, err := util.FromUnstructured(un)
Expand Down Expand Up @@ -694,21 +680,6 @@ func getWfPriority(obj interface{}) (int32, time.Time) {
return int32(priority), un.GetCreationTimestamp().Time
}

func getWfPhase(obj interface{}) wfv1.NodePhase {
un, ok := obj.(*unstructured.Unstructured)
if !ok {
return ""
}
phase, hasPhase, err := unstructured.NestedString(un.Object, "status", "phase")
if err != nil {
return ""
}
if !hasPhase {
return wfv1.NodePending
}
return wfv1.NodePhase(phase)
}

func (wfc *WorkflowController) addWorkflowInformerHandlers() {
wfc.wfInformer.AddEventHandler(
cache.FilteringResourceEventHandler{
Expand Down Expand Up @@ -765,18 +736,10 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers() {
},
)
wfc.wfInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
wf := obj.(*unstructured.Unstructured)
wfc.metrics.WorkflowAdded(string(wf.GetUID()), getWfPhase(obj))
},
UpdateFunc: func(old, new interface{}) {
wf := new.(*unstructured.Unstructured)
wfc.metrics.WorkflowUpdated(string(wf.GetUID()), getWfPhase(old), getWfPhase(new))
},
DeleteFunc: func(obj interface{}) {
wf, ok := obj.(*unstructured.Unstructured)
if ok { // maybe cache.DeletedFinalStateUnknown
wfc.metrics.WorkflowDeleted(string(wf.GetUID()), getWfPhase(obj))
wfc.metrics.StopRealtimeMetricsForKey(string(wf.GetUID()))
simster7 marked this conversation as resolved.
Show resolved Hide resolved
}
},
})
Expand Down Expand Up @@ -987,3 +950,14 @@ func (wfc *WorkflowController) releaseAllWorkflowLocks(obj interface{}) {
func (wfc *WorkflowController) isArchivable(wf *wfv1.Workflow) bool {
return wfc.archiveLabelSelector.Matches(labels.Set(wf.Labels))
}

func (wfc *WorkflowController) syncWorkflowPhaseMetrics() {
for _, phase := range []wfv1.NodePhase{wfv1.NodePending, wfv1.NodeRunning, wfv1.NodeSucceeded, wfv1.NodeFailed, wfv1.NodeError} {
objs, err := wfc.wfInformer.GetIndexer().ByIndex(indexes.WorkflowPhaseIndex, string(phase))
if err != nil {
log.WithError(err).Errorf("failed to list workflows by '%s'", phase)
continue
}
wfc.metrics.SetWorkflowPhaseGauge(phase, len(objs))
}
}
2 changes: 2 additions & 0 deletions workflow/controller/indexes/indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ const (
ClusterWorkflowTemplateIndex = "clusterworkflowtemplate"
CronWorkflowIndex = "cronworkflow"
WorkflowTemplateIndex = "workflowtemplate"
WorkflowPhaseIndex = "workflow.phase"
SemaphoreConfigIndexName = "bySemaphoreConfigMap"
)
34 changes: 34 additions & 0 deletions workflow/controller/indexes/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,48 @@ package indexes
import (
"fmt"

"github.com/prometheus/common/log"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/tools/cache"

"github.com/argoproj/argo/workflow/util"
)

func MetaNamespaceLabelIndex(namespace, label string) string {
return namespace + "/" + label
}

func MetaLabelIndexFunc(label string) cache.IndexFunc {
simster7 marked this conversation as resolved.
Show resolved Hide resolved
return func(obj interface{}) ([]string, error) {
v, err := meta.Accessor(obj)
if err != nil {
return []string{}, fmt.Errorf("object has no meta: %v", err)
}
if value, exists := v.GetLabels()[label]; exists {
return []string{value}, nil
} else {
return []string{}, nil
}
}
}

func WorkflowSemaphoreKeysIndexFunc() cache.IndexFunc {
simster7 marked this conversation as resolved.
Show resolved Hide resolved
return func(obj interface{}) ([]string, error) {
un, ok := obj.(*unstructured.Unstructured)
if !ok {
log.Warnf("cannot convert obj into unstructured.Unstructured in Indexer %s", SemaphoreConfigIndexName)
return []string{}, nil
}
wf, err := util.FromUnstructured(un)
if err != nil {
log.Warnf("failed to convert to workflow from unstructured: %v", err)
return []string{}, nil
}
return wf.GetSemaphoreKeys(), nil
}
}

func MetaNamespaceLabelIndexFunc(label string) cache.IndexFunc {
return func(obj interface{}) ([]string, error) {
v, err := meta.Accessor(obj)
Expand Down
46 changes: 10 additions & 36 deletions workflow/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,53 +122,20 @@ func (m *Metrics) allMetrics() []prometheus.Metric {
return allMetrics
}

func (m *Metrics) WorkflowAdded(key string, phase v1alpha1.NodePhase) {
m.mutex.Lock()
defer m.mutex.Unlock()

if _, exists := m.workflows[key]; exists {
return
}
m.workflows[key] = []string{}
if _, ok := m.workflowsByPhase[phase]; ok {
m.workflowsByPhase[phase].Inc()
}
}

func (m *Metrics) WorkflowUpdated(key string, fromPhase, toPhase v1alpha1.NodePhase) {
m.mutex.Lock()
defer m.mutex.Unlock()

if _, exists := m.workflows[key]; !exists || fromPhase == toPhase {
return
}
if _, ok := m.workflowsByPhase[fromPhase]; ok {
m.workflowsByPhase[fromPhase].Dec()
}
if _, ok := m.workflowsByPhase[toPhase]; ok {
m.workflowsByPhase[toPhase].Inc()
}
}

func (m *Metrics) WorkflowDeleted(key string, phase v1alpha1.NodePhase) {
func (m *Metrics) StopRealtimeMetricsForKey(key string) {
m.mutex.Lock()
defer m.mutex.Unlock()

if _, exists := m.workflows[key]; !exists {
return
}
m.StopRealtimeMetricsForKey(key)
delete(m.workflows, key)
if _, ok := m.workflowsByPhase[phase]; ok {
m.workflowsByPhase[phase].Dec()
}
}

func (m *Metrics) StopRealtimeMetricsForKey(key string) {
realtimeMetrics := m.workflows[key]
for _, metric := range realtimeMetrics {
delete(m.customMetrics, metric)
}

delete(m.workflows, key)
}

func (m *Metrics) OperationCompleted(durationSeconds float64) {
Expand Down Expand Up @@ -210,6 +177,13 @@ func (m *Metrics) UpsertCustomMetric(key string, ownerKey string, newMetric prom
return nil
}

func (m *Metrics) SetWorkflowPhaseGauge(phase v1alpha1.NodePhase, num int) {
m.mutex.Lock()
defer m.mutex.Unlock()

m.workflowsByPhase[phase].Set(float64(num))
}

type ErrorCause string

const (
Expand Down
59 changes: 3 additions & 56 deletions workflow/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,56 +48,9 @@ func TestMetrics(t *testing.T) {
}
m := New(config, config)

m.WorkflowAdded("wf", v1alpha1.NodeRunning)
var metric dto.Metric
err := m.workflowsByPhase[v1alpha1.NodeRunning].Write(&metric)
if assert.NoError(t, err) {
assert.Equal(t, float64(1), *metric.Gauge.Value)
}

// Test that we don't double add
m.WorkflowAdded("wf", v1alpha1.NodeRunning)
err = m.workflowsByPhase[v1alpha1.NodeRunning].Write(&metric)
if assert.NoError(t, err) {
assert.Equal(t, float64(1), *metric.Gauge.Value)
}

m.WorkflowUpdated("wf", v1alpha1.NodeRunning, v1alpha1.NodeSucceeded)
err = m.workflowsByPhase[v1alpha1.NodeRunning].Write(&metric)
if assert.NoError(t, err) {
assert.Equal(t, float64(0), *metric.Gauge.Value)
}
err = m.workflowsByPhase[v1alpha1.NodeSucceeded].Write(&metric)
if assert.NoError(t, err) {
assert.Equal(t, float64(1), *metric.Gauge.Value)
}

m.WorkflowDeleted("wf", v1alpha1.NodeSucceeded)
err = m.workflowsByPhase[v1alpha1.NodeRunning].Write(&metric)
if assert.NoError(t, err) {
assert.Equal(t, float64(0), *metric.Gauge.Value)
}

// Test that we don't double delete
m.WorkflowDeleted("wf", v1alpha1.NodeSucceeded)
err = m.workflowsByPhase[v1alpha1.NodeRunning].Write(&metric)
if assert.NoError(t, err) {
assert.Equal(t, float64(0), *metric.Gauge.Value)
}

// Test that we don't update workflows that we're not tracking
m.WorkflowUpdated("does-not-exist", v1alpha1.NodeRunning, v1alpha1.NodeSucceeded)
err = m.workflowsByPhase[v1alpha1.NodeRunning].Write(&metric)
if assert.NoError(t, err) {
assert.Equal(t, float64(0), *metric.Gauge.Value)
}
err = m.workflowsByPhase[v1alpha1.NodeSucceeded].Write(&metric)
if assert.NoError(t, err) {
assert.Equal(t, float64(0), *metric.Gauge.Value)
}

m.OperationCompleted(0.05)
err = m.operationDurations.Write(&metric)
err := m.operationDurations.Write(&metric)
if assert.NoError(t, err) {
assert.Equal(t, uint64(1), *metric.Histogram.Bucket[0].CumulativeCount)
}
Expand Down Expand Up @@ -190,33 +143,27 @@ func TestRealTimeMetricDeletion(t *testing.T) {
}
m := New(config, config)

m.WorkflowAdded("123", v1alpha1.NodeRunning)
rtMetric, err := ConstructRealTimeGaugeMetric(&v1alpha1.Prometheus{Name: "name", Help: "hello"}, func() float64 { return 0.0 })
assert.NoError(t, err)
assert.Empty(t, m.workflows["123"])
assert.Len(t, m.customMetrics, 0)

err = m.UpsertCustomMetric("metrickey", "123", rtMetric, true)
assert.NoError(t, err)
assert.NotEmpty(t, m.workflows["123"])
assert.Len(t, m.customMetrics, 1)

m.WorkflowDeleted("123", v1alpha1.NodeRunning)
m.StopRealtimeMetricsForKey("123")
assert.Empty(t, m.workflows["123"])
assert.Len(t, m.customMetrics, 0)

m.WorkflowAdded("456", v1alpha1.NodeRunning)
metric, err := ConstructOrUpdateMetric(nil, &v1alpha1.Prometheus{Name: "name", Help: "hello", Gauge: &v1alpha1.Gauge{Value: "1"}})
assert.NoError(t, err)
assert.Empty(t, m.workflows["456"])
assert.Len(t, m.customMetrics, 0)

err = m.UpsertCustomMetric("metrickey", "456", metric, false)
assert.NoError(t, err)
assert.Empty(t, m.workflows["456"])
assert.Len(t, m.customMetrics, 1)

m.WorkflowDeleted("456", v1alpha1.NodeRunning)
m.StopRealtimeMetricsForKey("456")
assert.Empty(t, m.workflows["456"])
assert.Len(t, m.customMetrics, 1)
}