Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add wait parameter to jobs #99

Merged
merged 2 commits into from
Jan 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,23 @@ class NotifyWorkflow < Gush::Workflow
end
```

### Dynamic waitable time for jobs

There might be a case you want to configure a job to be executed after a time. Based on above example, we want to configure `AdminNotificationJob` to be executed after 5 seconds.

```ruby

class NotifyWorkflow < Gush::Workflow
def configure(user_ids)
notification_jobs = user_ids.map do |user_id|
run NotificationJob, params: {user_id: user_id}, queue: 'user'
end

run AdminNotificationJob, after: notification_jobs, queue: 'admin', wait: 5.seconds
end
end
```

## Command line interface (CLI)

### Checking status
Expand Down
9 changes: 7 additions & 2 deletions lib/gush/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,13 @@ def enqueue_job(workflow_id, job)
job.enqueue!
persist_job(workflow_id, job)
queue = job.queue || configuration.namespace

Gush::Worker.set(queue: queue).perform_later(*[workflow_id, job.name])
wait = job.wait

if wait.present?
Gush::Worker.set(queue: queue, wait: wait).perform_later(*[workflow_id, job.name])
else
Gush::Worker.set(queue: queue).perform_later(*[workflow_id, job.name])
end
end

private
Expand Down
4 changes: 3 additions & 1 deletion lib/gush/job.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
module Gush
class Job
attr_accessor :workflow_id, :incoming, :outgoing, :params,
:finished_at, :failed_at, :started_at, :enqueued_at, :payloads, :klass, :queue
:finished_at, :failed_at, :started_at, :enqueued_at, :payloads,
:klass, :queue, :wait
attr_reader :id, :klass, :output_payload, :params

def initialize(opts = {})
Expand Down Expand Up @@ -126,6 +127,7 @@ def assign_variables(opts)
@output_payload = opts[:output_payload]
@workflow_id = opts[:workflow_id]
@queue = opts[:queue]
@wait = opts[:wait]
end
end
end
3 changes: 2 additions & 1 deletion lib/gush/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ def run(klass, opts = {})
workflow_id: id,
id: client.next_free_job_id(id, klass.to_s),
params: opts.fetch(:params, {}),
queue: opts[:queue]
queue: opts[:queue],
wait: opts[:wait]
})

jobs << node
Expand Down
12 changes: 12 additions & 0 deletions spec/gush/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@
end

describe "#start_workflow" do
context "when there is wait parameter configured" do
let(:freeze_time) { Time.utc(2023, 01, 21, 14, 36, 0) }

it "schedules job execution" do
travel_to freeze_time do
workflow = WaitableTestWorkflow.create
client.start_workflow(workflow)
expect(Gush::Worker).to have_a_job_enqueued_at(workflow.id, job_with_id("Prepare"), 5.minutes)
end
end
end

it "enqueues next jobs from the workflow" do
workflow = TestWorkflow.create
expect {
Expand Down
7 changes: 7 additions & 0 deletions spec/gush/workflow_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ def configure(*args)
expect(flow.jobs.first.params).to eq ({ something: 1 })
end

it "allows passing wait param to the job" do
flow = Gush::Workflow.new
flow.run(Gush::Job, wait: 5.seconds)
flow.save
expect(flow.jobs.first.wait).to eq (5.seconds)
end

context "when graph is empty" do
it "adds new job with the given class as a node" do
flow = Gush::Workflow.new
Expand Down
21 changes: 21 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
require 'active_support/testing/time_helpers'
require 'gush'
require 'json'
require 'pry'
Expand Down Expand Up @@ -34,6 +35,11 @@ def configure(param)
end
end

class WaitableTestWorkflow < Gush::Workflow
def configure
run Prepare, wait: 5.minutes
end
end

REDIS_URL = ENV["REDIS_URL"] || "redis://localhost:6379/12"

Expand Down Expand Up @@ -86,7 +92,22 @@ def job_with_id(job_name)
end
end

RSpec::Matchers.define :have_a_job_enqueued_at do |flow, job, at|
expected_execution_timestamp = (Time.current.utc + at).to_i

match do |actual|
expected = hash_including(args: include(flow, job), at: expected_execution_timestamp)

expect(ActiveJob::Base.queue_adapter.enqueued_jobs).to match_array(expected)
end

failure_message do |actual|
"expected to have enqueued job #{job} to be executed at #{Time.current.utc + at}, but instead has: #{Time.at(enqueued_jobs.first[:at]).to_datetime.utc}"
end
end

RSpec.configure do |config|
config.include ActiveSupport::Testing::TimeHelpers
config.include ActiveJob::TestHelper
config.include GushHelpers

Expand Down