Skip to content

Commit

Permalink
Allow setting queue for each job
Browse files Browse the repository at this point in the history
  • Loading branch information
aeris committed Oct 20, 2017
1 parent 9c4c87e commit c9f9d92
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 4 deletions.
4 changes: 2 additions & 2 deletions lib/gush/client.rb
Expand Up @@ -18,7 +18,6 @@ def create_workflow(name)
rescue NameError
raise WorkflowNotFound.new("Workflow with given name doesn't exist")
end
flow
end

def start_workflow(workflow, job_names = [])
Expand Down Expand Up @@ -156,8 +155,9 @@ def workflow_report(message)
def enqueue_job(workflow_id, job)
job.enqueue!
persist_job(workflow_id, job)
queue = job.queue || configuration.namespace

Gush::Worker.set(queue: configuration.namespace).perform_later(*[workflow_id, job.name])
Gush::Worker.set(queue: queue).perform_later(*[workflow_id, job.name])
end

private
Expand Down
13 changes: 12 additions & 1 deletion lib/gush/job.rb
@@ -1,7 +1,7 @@
module Gush
class Job
attr_accessor :workflow_id, :incoming, :outgoing, :params,
:finished_at, :failed_at, :started_at, :enqueued_at, :payloads, :klass
:finished_at, :failed_at, :started_at, :enqueued_at, :payloads, :klass, :queue
attr_reader :name, :output_payload, :params

def initialize(workflow, opts = {})
Expand All @@ -10,10 +10,17 @@ def initialize(workflow, opts = {})
assign_variables(options)
end

def payload(clazz)
payload = payloads.detect { |f| f[:class] == clazz.name }
raise "Unable to find payload for #{clazz}, available: #{payloads.collect { |f| f[:class]}}" unless payload
payload[:output]
end

def as_json
{
name: name,
klass: self.class.to_s,
queue: queue,
incoming: incoming,
outgoing: outgoing,
finished_at: finished_at,
Expand Down Expand Up @@ -98,6 +105,9 @@ def has_no_dependencies?
end

private
def logger
Rails.logger
end

def current_timestamp
Time.now.to_i
Expand All @@ -114,6 +124,7 @@ def assign_variables(opts)
@params = opts[:params] || {}
@klass = opts[:klass]
@output_payload = opts[:output_payload]
@queue = opts[:queue]
end
end
end
3 changes: 2 additions & 1 deletion lib/gush/workflow.rb
Expand Up @@ -104,7 +104,8 @@ def stopped?
def run(klass, opts = {})
node = klass.new(self, {
name: client.next_free_job_id(id,klass.to_s),
params: opts.fetch(:params, {})
params: opts.fetch(:params, {}),
queue: opts[:queue]
})

jobs << node
Expand Down

0 comments on commit c9f9d92

Please sign in to comment.