Skip to content

Commit

Permalink
Removing componentKey from wf metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>
  • Loading branch information
shivamkm07 committed Jan 11, 2024
1 parent 55ec256 commit 32aab9d
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 50 deletions.
28 changes: 13 additions & 15 deletions pkg/diagnostics/workflow_monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ const (

WorkflowEvent = "event"
Timer = "timer"

WorkflowComponentName = "dapr"
)

type workflowMetrics struct {
Expand Down Expand Up @@ -95,45 +93,45 @@ func (w *workflowMetrics) Init(appID, namespace string) error {
w.namespace = namespace

return view.Register(
diagUtils.NewMeasureView(w.workflowOperationCount, []tag.Key{appIDKey, componentKey, namespaceKey, operationKey, statusKey}, view.Count()),
diagUtils.NewMeasureView(w.workflowOperationLatency, []tag.Key{appIDKey, componentKey, namespaceKey, operationKey, statusKey}, defaultLatencyDistribution),
diagUtils.NewMeasureView(w.workflowExecutionCount, []tag.Key{appIDKey, componentKey, namespaceKey, workflowNameKey, statusKey}, view.Count()),
diagUtils.NewMeasureView(w.activityExecutionCount, []tag.Key{appIDKey, componentKey, namespaceKey, activityNameKey, statusKey}, view.Count()),
diagUtils.NewMeasureView(w.activityExecutionLatency, []tag.Key{appIDKey, componentKey, namespaceKey, activityNameKey, statusKey}, defaultLatencyDistribution))
diagUtils.NewMeasureView(w.workflowOperationCount, []tag.Key{appIDKey, namespaceKey, operationKey, statusKey}, view.Count()),
diagUtils.NewMeasureView(w.workflowOperationLatency, []tag.Key{appIDKey, namespaceKey, operationKey, statusKey}, defaultLatencyDistribution),
diagUtils.NewMeasureView(w.workflowExecutionCount, []tag.Key{appIDKey, namespaceKey, workflowNameKey, statusKey}, view.Count()),
diagUtils.NewMeasureView(w.activityExecutionCount, []tag.Key{appIDKey, namespaceKey, activityNameKey, statusKey}, view.Count()),
diagUtils.NewMeasureView(w.activityExecutionLatency, []tag.Key{appIDKey, namespaceKey, activityNameKey, statusKey}, defaultLatencyDistribution))
}

// WorkflowOperationEvent records total number of Successful/Failed workflow Operations requests. It also records latency for those requests.
func (w *workflowMetrics) WorkflowOperationEvent(ctx context.Context, operation, component, status string, elapsed float64) {
func (w *workflowMetrics) WorkflowOperationEvent(ctx context.Context, operation, status string, elapsed float64) {
if !w.IsEnabled() {
return
}

stats.RecordWithTags(ctx, diagUtils.WithTags(w.workflowOperationCount.Name(), appIDKey, w.appID, componentKey, component, namespaceKey, w.namespace, operationKey, operation, statusKey, status), w.workflowOperationCount.M(1))
stats.RecordWithTags(ctx, diagUtils.WithTags(w.workflowOperationCount.Name(), appIDKey, w.appID, namespaceKey, w.namespace, operationKey, operation, statusKey, status), w.workflowOperationCount.M(1))

if elapsed > 0 {
stats.RecordWithTags(ctx, diagUtils.WithTags(w.workflowOperationLatency.Name(), appIDKey, w.appID, componentKey, component, namespaceKey, w.namespace, operationKey, operation, statusKey, status), w.workflowOperationLatency.M(elapsed))
stats.RecordWithTags(ctx, diagUtils.WithTags(w.workflowOperationLatency.Name(), appIDKey, w.appID, namespaceKey, w.namespace, operationKey, operation, statusKey, status), w.workflowOperationLatency.M(elapsed))
}
}

// WorkflowExecutionEvent records total number of Successful/Failed/Recoverable workflow executions.
// Execution latency for workflow is not supported yet.
func (w *workflowMetrics) WorkflowExecutionEvent(ctx context.Context, component, workflowName, status string) {
func (w *workflowMetrics) WorkflowExecutionEvent(ctx context.Context, workflowName, status string) {
if !w.IsEnabled() {
return
}

stats.RecordWithTags(ctx, diagUtils.WithTags(w.workflowExecutionCount.Name(), appIDKey, w.appID, componentKey, component, namespaceKey, w.namespace, workflowNameKey, workflowName, statusKey, status), w.workflowExecutionCount.M(1))
stats.RecordWithTags(ctx, diagUtils.WithTags(w.workflowExecutionCount.Name(), appIDKey, w.appID, namespaceKey, w.namespace, workflowNameKey, workflowName, statusKey, status), w.workflowExecutionCount.M(1))
}

// ActivityExecutionEvent records total number of Successful/Failed/Recoverable workflow executions. It also records latency for these executions.
func (w *workflowMetrics) ActivityExecutionEvent(ctx context.Context, component, activityName, status string, elapsed float64) {
func (w *workflowMetrics) ActivityExecutionEvent(ctx context.Context, activityName, status string, elapsed float64) {
if !w.IsEnabled() {
return
}

stats.RecordWithTags(ctx, diagUtils.WithTags(w.activityExecutionCount.Name(), appIDKey, w.appID, componentKey, component, namespaceKey, w.namespace, activityNameKey, activityName, statusKey, status), w.activityExecutionCount.M(1))
stats.RecordWithTags(ctx, diagUtils.WithTags(w.activityExecutionCount.Name(), appIDKey, w.appID, namespaceKey, w.namespace, activityNameKey, activityName, statusKey, status), w.activityExecutionCount.M(1))

if elapsed > 0 {
stats.RecordWithTags(ctx, diagUtils.WithTags(w.activityExecutionLatency.Name(), appIDKey, w.appID, componentKey, component, namespaceKey, w.namespace, activityNameKey, activityName, statusKey, status), w.activityExecutionLatency.M(elapsed))
stats.RecordWithTags(ctx, diagUtils.WithTags(w.activityExecutionLatency.Name(), appIDKey, w.appID, namespaceKey, w.namespace, activityNameKey, activityName, statusKey, status), w.activityExecutionLatency.M(elapsed))
}
}
38 changes: 19 additions & 19 deletions pkg/diagnostics/workflow_monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestOperations(t *testing.T) {
t.Run("Failed Create Operation request count", func(t *testing.T) {
w := initWorkflowMetrics()

w.WorkflowOperationEvent(context.Background(), CreateWorkflow, componentName, StatusFailed, 0)
w.WorkflowOperationEvent(context.Background(), CreateWorkflow, StatusFailed, 0)

viewData, _ := view.RetrieveData(countMetricName)
v := view.Find(countMetricName)
Expand All @@ -34,7 +34,7 @@ func TestOperations(t *testing.T) {
t.Run("Successful Create Operation request count", func(t *testing.T) {
w := initWorkflowMetrics()

w.WorkflowOperationEvent(context.Background(), CreateWorkflow, componentName, StatusSuccess, 0)
w.WorkflowOperationEvent(context.Background(), CreateWorkflow, StatusSuccess, 0)

viewData, _ := view.RetrieveData(countMetricName)
v := view.Find(countMetricName)
Expand All @@ -45,7 +45,7 @@ func TestOperations(t *testing.T) {
t.Run("Create Operation request latency", func(t *testing.T) {
w := initWorkflowMetrics()

w.WorkflowOperationEvent(context.Background(), CreateWorkflow, componentName, StatusSuccess, 1)
w.WorkflowOperationEvent(context.Background(), CreateWorkflow, StatusSuccess, 1)

viewData, _ := view.RetrieveData(latencyMetricName)
v := view.Find(latencyMetricName)
Expand All @@ -60,7 +60,7 @@ func TestOperations(t *testing.T) {
t.Run("Failed Get Operation Request", func(t *testing.T) {
w := initWorkflowMetrics()

w.WorkflowOperationEvent(context.Background(), GetWorkflow, componentName, StatusFailed, 0)
w.WorkflowOperationEvent(context.Background(), GetWorkflow, StatusFailed, 0)

viewData, _ := view.RetrieveData(countMetricName)
v := view.Find(countMetricName)
Expand All @@ -71,7 +71,7 @@ func TestOperations(t *testing.T) {
t.Run("Successful Get Operation Request", func(t *testing.T) {
w := initWorkflowMetrics()

w.WorkflowOperationEvent(context.Background(), GetWorkflow, componentName, StatusSuccess, 0)
w.WorkflowOperationEvent(context.Background(), GetWorkflow, StatusSuccess, 0)

viewData, _ := view.RetrieveData(countMetricName)
v := view.Find(countMetricName)
Expand All @@ -82,7 +82,7 @@ func TestOperations(t *testing.T) {
t.Run("Get Operation request latency", func(t *testing.T) {
w := initWorkflowMetrics()

w.WorkflowOperationEvent(context.Background(), GetWorkflow, componentName, StatusSuccess, 1)
w.WorkflowOperationEvent(context.Background(), GetWorkflow, StatusSuccess, 1)

viewData, _ := view.RetrieveData(latencyMetricName)
v := view.Find(latencyMetricName)
Expand All @@ -97,7 +97,7 @@ func TestOperations(t *testing.T) {
t.Run("Failed Add Event request", func(t *testing.T) {
w := initWorkflowMetrics()

w.WorkflowOperationEvent(context.Background(), AddEvent, componentName, StatusFailed, 0)
w.WorkflowOperationEvent(context.Background(), AddEvent, StatusFailed, 0)

viewData, _ := view.RetrieveData(countMetricName)
v := view.Find(countMetricName)
Expand All @@ -108,7 +108,7 @@ func TestOperations(t *testing.T) {
t.Run("Successful Add Event request", func(t *testing.T) {
w := initWorkflowMetrics()

w.WorkflowOperationEvent(context.Background(), AddEvent, componentName, StatusSuccess, 0)
w.WorkflowOperationEvent(context.Background(), AddEvent, StatusSuccess, 0)

viewData, _ := view.RetrieveData(countMetricName)
v := view.Find(countMetricName)
Expand All @@ -119,7 +119,7 @@ func TestOperations(t *testing.T) {
t.Run("Add Event Operation latency", func(t *testing.T) {
w := initWorkflowMetrics()

w.WorkflowOperationEvent(context.Background(), AddEvent, componentName, StatusSuccess, 1)
w.WorkflowOperationEvent(context.Background(), AddEvent, StatusSuccess, 1)

viewData, _ := view.RetrieveData(latencyMetricName)
v := view.Find(latencyMetricName)
Expand All @@ -134,7 +134,7 @@ func TestOperations(t *testing.T) {
t.Run("Failed Purge workflow request", func(t *testing.T) {
w := initWorkflowMetrics()

w.WorkflowOperationEvent(context.Background(), PurgeWorkflow, componentName, StatusFailed, 0)
w.WorkflowOperationEvent(context.Background(), PurgeWorkflow, StatusFailed, 0)

viewData, _ := view.RetrieveData(countMetricName)
v := view.Find(countMetricName)
Expand All @@ -145,7 +145,7 @@ func TestOperations(t *testing.T) {
t.Run("Successful Purge workflow request", func(t *testing.T) {
w := initWorkflowMetrics()

w.WorkflowOperationEvent(context.Background(), PurgeWorkflow, componentName, StatusSuccess, 0)
w.WorkflowOperationEvent(context.Background(), PurgeWorkflow, StatusSuccess, 0)

viewData, _ := view.RetrieveData(countMetricName)
v := view.Find(countMetricName)
Expand All @@ -156,7 +156,7 @@ func TestOperations(t *testing.T) {
t.Run("Purge workflow Operation latency", func(t *testing.T) {
w := initWorkflowMetrics()

w.WorkflowOperationEvent(context.Background(), PurgeWorkflow, componentName, StatusSuccess, 1)
w.WorkflowOperationEvent(context.Background(), PurgeWorkflow, StatusSuccess, 1)

viewData, _ := view.RetrieveData(latencyMetricName)
v := view.Find(latencyMetricName)
Expand All @@ -177,7 +177,7 @@ func TestExecution(t *testing.T) {
t.Run("Failed with retryable error", func(t *testing.T) {
w := initWorkflowMetrics()

w.ActivityExecutionEvent(context.Background(), componentName, activityName, StatusRecoverable, 0)
w.ActivityExecutionEvent(context.Background(), activityName, StatusRecoverable, 0)

viewData, _ := view.RetrieveData(countMetricName)
v := view.Find(countMetricName)
Expand All @@ -188,7 +188,7 @@ func TestExecution(t *testing.T) {
t.Run("Failed with not-retryable error", func(t *testing.T) {
w := initWorkflowMetrics()

w.ActivityExecutionEvent(context.Background(), componentName, activityName, StatusFailed, 0)
w.ActivityExecutionEvent(context.Background(), activityName, StatusFailed, 0)

viewData, _ := view.RetrieveData(countMetricName)
v := view.Find(countMetricName)
Expand All @@ -199,7 +199,7 @@ func TestExecution(t *testing.T) {
t.Run("Successful activity execution", func(t *testing.T) {
w := initWorkflowMetrics()

w.ActivityExecutionEvent(context.Background(), componentName, activityName, StatusSuccess, 0)
w.ActivityExecutionEvent(context.Background(), activityName, StatusSuccess, 0)

viewData, _ := view.RetrieveData(countMetricName)
v := view.Find(countMetricName)
Expand All @@ -210,7 +210,7 @@ func TestExecution(t *testing.T) {
t.Run("activity execution latency", func(t *testing.T) {
w := initWorkflowMetrics()

w.ActivityExecutionEvent(context.Background(), componentName, activityName, StatusSuccess, 1)
w.ActivityExecutionEvent(context.Background(), activityName, StatusSuccess, 1)

viewData, _ := view.RetrieveData(latencyMetricName)
v := view.Find(latencyMetricName)
Expand All @@ -226,7 +226,7 @@ func TestExecution(t *testing.T) {
t.Run("Failed with retryable error", func(t *testing.T) {
w := initWorkflowMetrics()

w.WorkflowExecutionEvent(context.Background(), componentName, workflowName, StatusRecoverable)
w.WorkflowExecutionEvent(context.Background(), workflowName, StatusRecoverable)

viewData, _ := view.RetrieveData(countMetricName)
v := view.Find(countMetricName)
Expand All @@ -237,7 +237,7 @@ func TestExecution(t *testing.T) {
t.Run("Failed with not-retryable error", func(t *testing.T) {
w := initWorkflowMetrics()

w.WorkflowExecutionEvent(context.Background(), componentName, workflowName, StatusFailed)
w.WorkflowExecutionEvent(context.Background(), workflowName, StatusFailed)

viewData, _ := view.RetrieveData(countMetricName)
v := view.Find(countMetricName)
Expand All @@ -248,7 +248,7 @@ func TestExecution(t *testing.T) {
t.Run("Successful workflow execution", func(t *testing.T) {
w := initWorkflowMetrics()

w.WorkflowExecutionEvent(context.Background(), componentName, workflowName, StatusSuccess)
w.WorkflowExecutionEvent(context.Background(), workflowName, StatusSuccess)

viewData, _ := view.RetrieveData(countMetricName)
v := view.Find(countMetricName)
Expand Down
2 changes: 1 addition & 1 deletion pkg/runtime/wfengine/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (a *activityActor) executeActivity(ctx context.Context, actorID string, nam
// Record metrics on exit
defer func() {
if executionStatus != "" {
diag.DefaultWorkflowMonitoring.ActivityExecutionEvent(ctx, diag.WorkflowComponentName, activityName, executionStatus, elapsed)
diag.DefaultWorkflowMonitoring.ActivityExecutionEvent(ctx, activityName, executionStatus, elapsed)
}
}()
loop:
Expand Down
16 changes: 8 additions & 8 deletions pkg/runtime/wfengine/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,11 @@ func (be *actorBackend) CreateOrchestrationInstance(ctx context.Context, e *back
elapsed := diag.ElapsedSince(start)
if err != nil {
// failed request to CREATE workflow, record count and latency metrics.
diag.DefaultWorkflowMonitoring.WorkflowOperationEvent(ctx, diag.CreateWorkflow, diag.WorkflowComponentName, diag.StatusFailed, elapsed)
diag.DefaultWorkflowMonitoring.WorkflowOperationEvent(ctx, diag.CreateWorkflow, diag.StatusFailed, elapsed)
return err
}
// successful request to CREATE workflow, record count and latency metrics.
diag.DefaultWorkflowMonitoring.WorkflowOperationEvent(ctx, diag.CreateWorkflow, diag.WorkflowComponentName, diag.StatusSuccess, elapsed)
diag.DefaultWorkflowMonitoring.WorkflowOperationEvent(ctx, diag.CreateWorkflow, diag.StatusSuccess, elapsed)
defer resp.Close()
return nil
}
Expand All @@ -198,11 +198,11 @@ func (be *actorBackend) GetOrchestrationMetadata(ctx context.Context, id api.Ins
elapsed := diag.ElapsedSince(start)
if err != nil {
// failed request to GET workflow Information, record count and latency metrics.
diag.DefaultWorkflowMonitoring.WorkflowOperationEvent(ctx, diag.GetWorkflow, diag.WorkflowComponentName, diag.StatusFailed, elapsed)
diag.DefaultWorkflowMonitoring.WorkflowOperationEvent(ctx, diag.GetWorkflow, diag.StatusFailed, elapsed)
return nil, err
}
// successful request to GET workflow information, record count and latency metrics.
diag.DefaultWorkflowMonitoring.WorkflowOperationEvent(ctx, diag.GetWorkflow, diag.WorkflowComponentName, diag.StatusSuccess, elapsed)
diag.DefaultWorkflowMonitoring.WorkflowOperationEvent(ctx, diag.GetWorkflow, diag.StatusSuccess, elapsed)

defer res.Close()
data := res.RawData()
Expand Down Expand Up @@ -257,11 +257,11 @@ func (be *actorBackend) AddNewOrchestrationEvent(ctx context.Context, id api.Ins
elapsed := diag.ElapsedSince(start)
if err != nil {
// failed request to ADD EVENT, record count and latency metrics.
diag.DefaultWorkflowMonitoring.WorkflowOperationEvent(ctx, diag.AddEvent, diag.WorkflowComponentName, diag.StatusFailed, elapsed)
diag.DefaultWorkflowMonitoring.WorkflowOperationEvent(ctx, diag.AddEvent, diag.StatusFailed, elapsed)
return err
}
// successful request to ADD EVENT, record count and latency metrics.
diag.DefaultWorkflowMonitoring.WorkflowOperationEvent(ctx, diag.AddEvent, diag.WorkflowComponentName, diag.StatusSuccess, elapsed)
diag.DefaultWorkflowMonitoring.WorkflowOperationEvent(ctx, diag.AddEvent, diag.StatusSuccess, elapsed)
defer resp.Close()
return nil
}
Expand Down Expand Up @@ -337,11 +337,11 @@ func (be *actorBackend) PurgeOrchestrationState(ctx context.Context, id api.Inst
elapsed := diag.ElapsedSince(start)
if err != nil {
// failed request to PURGE WORKFLOW, record latency and count metrics.
diag.DefaultWorkflowMonitoring.WorkflowOperationEvent(ctx, diag.PurgeWorkflow, diag.WorkflowComponentName, diag.StatusFailed, elapsed)
diag.DefaultWorkflowMonitoring.WorkflowOperationEvent(ctx, diag.PurgeWorkflow, diag.StatusFailed, elapsed)
return err
}
// successful request to PURGE WORKFLOW, record latency and count metrics.
diag.DefaultWorkflowMonitoring.WorkflowOperationEvent(ctx, diag.PurgeWorkflow, diag.WorkflowComponentName, diag.StatusSuccess, elapsed)
diag.DefaultWorkflowMonitoring.WorkflowOperationEvent(ctx, diag.PurgeWorkflow, diag.StatusSuccess, elapsed)
defer resp.Close()
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/runtime/wfengine/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ func (wf *workflowActor) runWorkflow(ctx context.Context, actorID string, remind
defer func() {
if executionStatus != "" {
// execution latency for workflow is not supported yet.
diag.DefaultWorkflowMonitoring.WorkflowExecutionEvent(ctx, diag.WorkflowComponentName, workflowName, executionStatus)
diag.DefaultWorkflowMonitoring.WorkflowExecutionEvent(ctx, workflowName, executionStatus)
}
}()

Expand Down
12 changes: 6 additions & 6 deletions tests/integration/suite/daprd/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,9 @@ func (m *metrics) Run(t *testing.T, ctx context.Context) {

// Verify metrics
metrics := m.getMetrics(t, ctx)
assert.Equal(t, 1, int(metrics["dapr_runtime_workflow_operation_count|app_id:myapp|component:dapr|namespace:|operation:create_workflow|status:success"]))
assert.Equal(t, 1, int(metrics["dapr_runtime_workflow_execution_count|app_id:myapp|component:dapr|namespace:|status:success|workflow_name:workflow"]))
assert.Equal(t, 1, int(metrics["dapr_runtime_workflow_activity_execution_count|activity_name:activity_success|app_id:myapp|component:dapr|namespace:|status:success"]))
assert.Equal(t, 1, int(metrics["dapr_runtime_workflow_operation_count|app_id:myapp|namespace:|operation:create_workflow|status:success"]))
assert.Equal(t, 1, int(metrics["dapr_runtime_workflow_execution_count|app_id:myapp|namespace:|status:success|workflow_name:workflow"]))
assert.Equal(t, 1, int(metrics["dapr_runtime_workflow_activity_execution_count|activity_name:activity_success|app_id:myapp|namespace:|status:success"]))
})
t.Run("failed workflow execution", func(t *testing.T) {
id, err := taskhubClient.ScheduleNewOrchestration(ctx, "workflow", api.WithInput("activity_failure"))
Expand All @@ -239,9 +239,9 @@ func (m *metrics) Run(t *testing.T, ctx context.Context) {

// Verify metrics
metrics := m.getMetrics(t, ctx)
assert.Equal(t, 2, int(metrics["dapr_runtime_workflow_operation_count|app_id:myapp|component:dapr|namespace:|operation:create_workflow|status:success"]))
assert.Equal(t, 1, int(metrics["dapr_runtime_workflow_execution_count|app_id:myapp|component:dapr|namespace:|status:failed|workflow_name:workflow"]))
assert.Equal(t, 1, int(metrics["dapr_runtime_workflow_activity_execution_count|activity_name:activity_failure|app_id:myapp|component:dapr|namespace:|status:failed"]))
assert.Equal(t, 2, int(metrics["dapr_runtime_workflow_operation_count|app_id:myapp|namespace:|operation:create_workflow|status:success"]))
assert.Equal(t, 1, int(metrics["dapr_runtime_workflow_execution_count|app_id:myapp|namespace:|status:failed|workflow_name:workflow"]))
assert.Equal(t, 1, int(metrics["dapr_runtime_workflow_activity_execution_count|activity_name:activity_failure|app_id:myapp|namespace:|status:failed"]))
})
})
}
Expand Down

0 comments on commit 32aab9d

Please sign in to comment.