Skip to content

Commit

Permalink
Merge pull request coinbase#133 from christopherb-stripe/christopherb…
Browse files Browse the repository at this point in the history
…/add-workflow-tag-metric

Add workflow tag to metrics emitted from the activity task processor
  • Loading branch information
DeRauk authored Feb 4, 2022
2 parents 30342ae + 0d7e637 commit 101ec13
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 7 deletions.
4 changes: 2 additions & 2 deletions lib/temporal/activity/task_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def process
start_time = Time.now

Temporal.logger.debug("Processing Activity task", metadata.to_h)
Temporal.metrics.timing('activity_task.queue_time', queue_time_ms, activity: activity_name, namespace: namespace)
Temporal.metrics.timing('activity_task.queue_time', queue_time_ms, activity: activity_name, namespace: namespace, workflow: metadata.workflow_name)

context = Activity::Context.new(connection, metadata)

Expand All @@ -46,7 +46,7 @@ def process
respond_failed(error)
ensure
time_diff_ms = ((Time.now - start_time) * 1000).round
Temporal.metrics.timing('activity_task.latency', time_diff_ms, activity: activity_name, namespace: namespace)
Temporal.metrics.timing('activity_task.latency', time_diff_ms, activity: activity_name, namespace: namespace, workflow: metadata.workflow_name)
Temporal.logger.debug("Activity task processed", metadata.to_h.merge(execution_time: time_diff_ms))
end

Expand Down
2 changes: 1 addition & 1 deletion lib/temporal/workflow/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def poll_loop

time_diff_ms = ((Time.now - last_poll_time) * 1000).round
Temporal.metrics.timing('workflow_poller.time_since_last_poll', time_diff_ms, metrics_tags)
Temporal.logger.debug("Polling Worklow task queue", { namespace: namespace, task_queue: task_queue })
Temporal.logger.debug("Polling workflow task queue", { namespace: namespace, task_queue: task_queue })

task = poll_for_task
last_poll_time = Time.now
Expand Down
9 changes: 5 additions & 4 deletions spec/unit/lib/temporal/activity/task_processor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
)
end
let(:metadata) { Temporal::Metadata.generate_activity_metadata(task, namespace) }
let(:workflow_name) { task.workflow_type.name }
let(:activity_name) { 'TestActivity' }
let(:connection) { instance_double('Temporal::Connection::GRPC') }
let(:middleware_chain) { Temporal::Middleware::Chain.new }
Expand Down Expand Up @@ -125,15 +126,15 @@

expect(Temporal.metrics)
.to have_received(:timing)
.with('activity_task.queue_time', an_instance_of(Integer), activity: activity_name, namespace: namespace)
.with('activity_task.queue_time', an_instance_of(Integer), activity: activity_name, namespace: namespace, workflow: workflow_name)
end

it 'sends latency metric' do
subject.process

expect(Temporal.metrics)
.to have_received(:timing)
.with('activity_task.latency', an_instance_of(Integer), activity: activity_name, namespace: namespace)
.with('activity_task.latency', an_instance_of(Integer), activity: activity_name, namespace: namespace, workflow: workflow_name)
end

context 'with async activity' do
Expand Down Expand Up @@ -203,15 +204,15 @@

expect(Temporal.metrics)
.to have_received(:timing)
.with('activity_task.queue_time', an_instance_of(Integer), activity: activity_name, namespace: namespace)
.with('activity_task.queue_time', an_instance_of(Integer), activity: activity_name, namespace: namespace, workflow: workflow_name)
end

it 'sends latency metric' do
subject.process

expect(Temporal.metrics)
.to have_received(:timing)
.with('activity_task.latency', an_instance_of(Integer), activity: activity_name, namespace: namespace)
.with('activity_task.latency', an_instance_of(Integer), activity: activity_name, namespace: namespace, workflow: workflow_name)
end

context 'with ScriptError exception' do
Expand Down

0 comments on commit 101ec13

Please sign in to comment.