Skip to content

Commit

Permalink
Merge pull request coinbase#73 from stripe-private-oss-forks/nagl-RUN…
Browse files Browse the repository at this point in the history
…_WOFLO-310

Expose scheduled_time and current_attempt_scheduled_time on activity metadata
  • Loading branch information
nagl-stripe authored and GitHub Enterprise committed Mar 11, 2022
2 parents 7170411 + 33208a7 commit 4c03bf7
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 7 deletions.
5 changes: 4 additions & 1 deletion lib/temporal/metadata.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ def generate_activity_metadata(task, namespace)
workflow_id: task.workflow_execution.workflow_id,
workflow_name: task.workflow_type.name,
headers: from_payload_map(task.header&.fields || {}),
heartbeat_details: from_details_payloads(task.heartbeat_details)
heartbeat_details: from_details_payloads(task.heartbeat_details),
# temporal doesn't render sub-second times, so we ignore the nanos field
scheduled_time: task.scheduled_time.seconds,
current_attempt_scheduled_time: task.current_attempt_scheduled_time.seconds
)
end

Expand Down
10 changes: 7 additions & 3 deletions lib/temporal/metadata/activity.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
module Temporal
module Metadata
class Activity < Base
attr_reader :namespace, :id, :name, :task_token, :attempt, :workflow_run_id, :workflow_id, :workflow_name, :headers, :heartbeat_details
attr_reader :namespace, :id, :name, :task_token, :attempt, :workflow_run_id, :workflow_id, :workflow_name, :headers, :heartbeat_details, :scheduled_time, :current_attempt_scheduled_time

def initialize(namespace:, id:, name:, task_token:, attempt:, workflow_run_id:, workflow_id:, workflow_name:, headers: {}, heartbeat_details:)
def initialize(namespace:, id:, name:, task_token:, attempt:, workflow_run_id:, workflow_id:, workflow_name:, headers: {}, heartbeat_details:, scheduled_time:, current_attempt_scheduled_time:)
@namespace = namespace
@id = id
@name = name
Expand All @@ -16,6 +16,8 @@ def initialize(namespace:, id:, name:, task_token:, attempt:, workflow_run_id:,
@workflow_name = workflow_name
@headers = headers
@heartbeat_details = heartbeat_details
@scheduled_time = scheduled_time
@current_attempt_scheduled_time = current_attempt_scheduled_time

freeze
end
Expand All @@ -32,7 +34,9 @@ def to_h
'run_id' => workflow_run_id,
'activity_id' => id,
'activity_name' => name,
'attempt' => attempt
'attempt' => attempt,
'scheduled_time' => scheduled_time,
'current_attempt_scheduled_time' => current_attempt_scheduled_time,
}
end
end
Expand Down
8 changes: 6 additions & 2 deletions lib/temporal/testing/local_workflow_context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ def execute_activity(activity_class, *input, **args)
workflow_id: workflow_id,
workflow_name: self.metadata.name,
headers: execution_options.headers,
heartbeat_details: nil
heartbeat_details: nil,
scheduled_time: Time.now.to_i,
current_attempt_scheduled_time: Time.now.to_i,
)
context = LocalActivityContext.new(metadata)

Expand Down Expand Up @@ -110,7 +112,9 @@ def execute_local_activity(activity_class, *input, **args)
workflow_id: workflow_id,
workflow_name: self.metadata.name,
headers: execution_options.headers,
heartbeat_details: nil
heartbeat_details: nil,
scheduled_time: Time.now.to_i,
current_attempt_scheduled_time: Time.now.to_i,
)
context = LocalActivityContext.new(metadata)

Expand Down
2 changes: 2 additions & 0 deletions spec/fabricators/activity_metadata_fabricator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@
workflow_name 'TestWorkflow'
headers { {} }
heartbeat_details nil
scheduled_time 0
current_attempt_scheduled_time 0
end
2 changes: 2 additions & 0 deletions spec/fabricators/grpc/activity_task_fabricator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
workflow_execution { Fabricate(:api_workflow_execution) }
current_attempt_scheduled_time { Google::Protobuf::Timestamp.new.tap { |t| t.from_time(Time.now) } }
started_time { Google::Protobuf::Timestamp.new.tap { |t| t.from_time(Time.now) } }
scheduled_time { Google::Protobuf::Timestamp.new.tap { |t| t.from_time(Time.now) } }
current_attempt_scheduled_time { Google::Protobuf::Timestamp.new.tap { |t| t.from_time(Time.now) } }
header do |attrs|
fields = (attrs[:headers] || {}).each_with_object({}) do |(field, value), h|
h[field] = Temporal.configuration.converter.to_payload(value)
Expand Down
6 changes: 5 additions & 1 deletion spec/unit/lib/temporal/metadata/activity_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
expect(subject.workflow_name).to eq(args.workflow_name)
expect(subject.headers).to eq(args.headers)
expect(subject.heartbeat_details).to eq(args.heartbeat_details)
expect(subject.scheduled_time).to eq(args.scheduled_time)
expect(subject.current_attempt_scheduled_time).to eq(args.current_attempt_scheduled_time)
end

it { is_expected.to be_frozen }
Expand All @@ -36,7 +38,9 @@
'namespace' => subject.namespace,
'workflow_id' => subject.workflow_id,
'workflow_name' => subject.workflow_name,
'run_id' => subject.workflow_run_id
'run_id' => subject.workflow_run_id,
'scheduled_time' => subject.scheduled_time,
'current_attempt_scheduled_time' => subject.current_attempt_scheduled_time,
})
end
end
Expand Down
2 changes: 2 additions & 0 deletions spec/unit/lib/temporal/testing/temporal_override_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ def execute
expect(metadata.namespace).to eq('default-namespace')
expect(metadata.workflow_id).to eq(workflow_id)
expect(metadata.workflow_run_id).to eq(run_id)
expect(metadata.scheduled_time).to be > 0
expect(metadata.current_attempt_scheduled_time).to be > 0
end

describe 'execution control' do
Expand Down

0 comments on commit 4c03bf7

Please sign in to comment.