Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): Add node-level resources duration as Argo variable for metrics. Closes #3110 #3161

Merged
merged 2 commits into from Jun 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/variables.md
Expand Up @@ -55,6 +55,7 @@ step.
| `inputs.parameters.<NAME>` | Input parameter of the metric-emitting template |
| `outputs.parameters.<NAME>` | Output parameter of the metric-emitting template |
| `outputs.result` | Output result of the metric-emitting template |
| `resourcesDuration` | Resources duration as a string. Can also be indexed for a selected resource, if available (may be one of `resourcesDuration.cpu` or `resourcesDuration.memory`. For more info, see the [Resource Duration](resource-duration.md) doc.|

### Realtime Metrics

Expand Down
6 changes: 6 additions & 0 deletions workflow/common/common.go
Expand Up @@ -136,6 +136,12 @@ const (
LocalVarPodName = "pod.name"
// LocalVarRetries is a step level variable that references the retries number if retryStrategy is specified
LocalVarRetries = "retries"
// LocalVarDuration is a step level variable (currently only available in metric emission) that tracks the duration of the step
LocalVarDuration = "duration"
// LocalVarStatus is a step level variable (currently only available in metric emission) that tracks the duration of the step
LocalVarStatus = "status"
// LocalVarResourcesDuration is a step level variable (currently only available in metric emission) that tracks the resources duration of the step
LocalVarResourcesDuration = "resourcesDuration"

KubeConfigDefaultMountPath = "/kube/config"
KubeConfigDefaultVolumeName = "kubeconfig"
Expand Down
1 change: 0 additions & 1 deletion workflow/controller/operator_test.go
Expand Up @@ -2671,7 +2671,6 @@ func TestStepsOnExitFailures(t *testing.T) {
woc.operate()
woc.operate()

fmt.Println(woc.globalParams)
assert.Contains(t, woc.globalParams[common.GlobalVarWorkflowFailures], `[{\"displayName\":\"exit-handlers\",\"message\":\"Unexpected pod phase for exit-handlers: \",\"templateName\":\"intentional-fail\",\"phase\":\"Error\",\"podName\":\"exit-handlers\"`)
}

Expand Down
17 changes: 12 additions & 5 deletions workflow/controller/steps.go
Expand Up @@ -469,19 +469,19 @@ func (woc *wfOperationCtx) prepareMetricScope(node *wfv1.NodeStatus) (map[string
localScope := woc.globalParams.DeepCopy()

if node.Fulfilled() {
localScope["duration"] = fmt.Sprintf("%f", node.FinishedAt.Sub(node.StartedAt.Time).Seconds())
realTimeScope["duration"] = func() float64 {
localScope[common.LocalVarDuration] = fmt.Sprintf("%f", node.FinishedAt.Sub(node.StartedAt.Time).Seconds())
realTimeScope[common.LocalVarDuration] = func() float64 {
return node.FinishedAt.Sub(node.StartedAt.Time).Seconds()
}
} else {
localScope["duration"] = fmt.Sprintf("%f", time.Since(node.StartedAt.Time).Seconds())
realTimeScope["duration"] = func() float64 {
localScope[common.LocalVarDuration] = fmt.Sprintf("%f", time.Since(node.StartedAt.Time).Seconds())
realTimeScope[common.LocalVarDuration] = func() float64 {
return time.Since(node.StartedAt.Time).Seconds()
}
}

if node.Phase != "" {
localScope["status"] = string(node.Phase)
localScope[common.LocalVarStatus] = string(node.Phase)
}

if node.Inputs != nil {
Expand All @@ -501,5 +501,12 @@ func (woc *wfOperationCtx) prepareMetricScope(node *wfv1.NodeStatus) (map[string
}
}

if node.ResourcesDuration != nil {
localScope[common.LocalVarResourcesDuration] = node.ResourcesDuration.String()
for name, duration := range node.ResourcesDuration {
localScope[fmt.Sprintf("%s.%s", common.LocalVarResourcesDuration, name)] = duration.String()
}
}

return localScope, realTimeScope
}
48 changes: 48 additions & 0 deletions workflow/controller/steps_test.go
Expand Up @@ -3,10 +3,12 @@ package controller
import (
"testing"

"github.com/ghodss/yaml"
"github.com/stretchr/testify/assert"

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/test"
"github.com/argoproj/argo/workflow/common"
)

// TestStepsFailedRetries ensures a steps template will recognize exhausted retries
Expand Down Expand Up @@ -119,3 +121,49 @@ func TestStepsWithParamAndGlobalParam(t *testing.T) {
woc.operate()
assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Phase)
}

func TestResourceDurationMetric(t *testing.T) {
var nodeStatus = `
boundaryID: many-items-z26lj
displayName: sleep(4:four)
finishedAt: "2020-06-02T16:04:50Z"
hostNodeName: minikube
id: many-items-z26lj-3491220632
name: many-items-z26lj[0].sleep(4:four)
outputs:
artifacts:
- archiveLogs: true
name: main-logs
s3:
accessKeySecret:
key: accesskey
name: my-minio-cred
bucket: my-bucket
endpoint: minio:9000
insecure: true
key: many-items-z26lj/many-items-z26lj-3491220632/main.log
secretKeySecret:
key: secretkey
name: my-minio-cred
exitCode: "0"
phase: Succeeded
resourcesDuration:
cpu: 33
memory: 24
startedAt: "2020-06-02T16:04:21Z"
templateName: sleep
templateScope: local/many-items-z26lj
type: Pod
`

woc := wfOperationCtx{globalParams: make(common.Parameters)}
var node wfv1.NodeStatus
err := yaml.Unmarshal([]byte(nodeStatus), &node)
if assert.NoError(t, err) {
localScope, _ := woc.prepareMetricScope(&node)
assert.Contains(t, localScope["resourcesDuration"], "24s*(100Mi memory)")
assert.Contains(t, localScope["resourcesDuration"], "33s*(1 cpu)")
assert.Equal(t, "33s", localScope["resourcesDuration.cpu"])
assert.Equal(t, "24s", localScope["resourcesDuration.memory"])
}
}