Skip to content

Commit

Permalink
Started to propagate jobName parameter everywhere
Browse files Browse the repository at this point in the history
  • Loading branch information
rpechayr committed Mar 7, 2014
1 parent 9a59ed5 commit a528b7f
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 64 deletions.
2 changes: 1 addition & 1 deletion examples/enqueueer.coffee
Expand Up @@ -11,7 +11,7 @@ enqueuer = flowqueues.createClient(redis, "../tests/samples/config.yml")

for i in [1..3000]
job = {arg1: "arg1", arg2: "arg22"}
enqueuer.enqueueTo(job, "low")
enqueuer.enqueueTo('basic_flow', job, "low")

#Wait for a few seconds and close redis connection. This could be better done by using async.parallel for example
#but this is just an example
Expand Down
21 changes: 11 additions & 10 deletions src/client.coffee
Expand Up @@ -12,28 +12,29 @@ class Client
constructor: (@config) ->
@dataSource = @config.dataSource

enqueueForTask:(taskName, job, queue, cbs = null) ->
enqueueForTask:(jobName, taskName, job, queue, cbs = null) ->
encodedJob = helpers.encode(job)
@dataSource.rpush Queue.pendingQueueNameForTaskName(taskName, queue), encodedJob , (err, _) =>
@dataSource.rpush Queue.pendingQueueNameForTaskName(jobName, taskName, queue), encodedJob , (err, _) =>
if cbs?
cbs(err)

enqueue:(job, cbs = null) ->
enqueue:(jobName, jobData, cbs = null) ->
queue = "main"
if @config.queues.length > 0
queue = @config.queues[0]
@enqueueTo job, queue, cbs
@enqueueTo jobName, jobData, queue, cbs

enqueueTo: (job, queue, cbs = null) ->
taskDesc = @config.taskDescriptions[@config.firstTaskName]
@enqueueForTask(taskDesc.name, job, queue, cbs)
enqueueTo: (jobName, jobData, queue, cbs = null) ->
jobDesc = @config.jobDescriptions[jobName] #TODO: handle not found
taskDesc = jobDesc.taskDescriptions[jobDesc.firstTaskName]
@enqueueForTask(jobName, taskDesc.name, jobData, queue, cbs)

pendingTasksCount: (taskName, queue, cbs) ->
@dataSource.llen Queue.pendingQueueNameForTaskName(taskName, queue), (err, res) =>
pendingTasksCount: (jobName, taskName, queue, cbs) ->
@dataSource.llen Queue.pendingQueueNameForTaskName(jobName, taskName, queue), (err, res) =>
cbs(res)

workingTasksCount: (taskName, cbs) ->
@dataSource.llen Queue.workingSetNameForTaskName(taskName), (err, res) =>
@dataSource.llen Queue.workingSetNameForTaskName(jobName, taskName), (err, res) =>
cbs(res)


Expand Down
18 changes: 11 additions & 7 deletions src/config_loader.coffee
Expand Up @@ -5,26 +5,30 @@ Released under the MIT License
###

TaskDescription = require("./task_description").TaskDescription
JobDescription = require("./job_description").JobDescription

yaml = require('js-yaml')
fs = require('fs')

class ConfigLoader

constructor: (@config) ->
#

load:(file) ->
workflows = yaml.safeLoad(fs.readFileSync(file, 'utf8')).workflows
conf = yaml.safeLoad(fs.readFileSync(file, 'utf8')).flowqueues_config
@config.overridenJobDir = conf.jobs_dir if conf.jobs_dir
for workflow in workflows #TODO: should this be called jobDesc or Workflow ?
do (workflow) ->

@config.setFirstTaskName(conf.first_task)
for task in conf.tasks
for workflow in conf.workflows #TODO: should this be called jobDesc or Workflow ?
do (workflow) =>
jobDesc = new JobDescription workflow.name
@config.addJobDescription jobDesc
jobDesc.setFirstTaskName(workflow.first_task)
for task in workflow.tasks
do (task) =>
concurrency = task.concurrency || 1
name = task.name #TODO: handle error if name empty
next = task.next || {}
taskDesc = new TaskDescription(name, next, concurrency)
@config.addTaskDescription(taskDesc)
jobDesc.addTaskDescription(taskDesc)

exports.ConfigLoader = ConfigLoader
10 changes: 5 additions & 5 deletions src/queue.coffee
Expand Up @@ -15,17 +15,17 @@ class Queue
@baseKeyName: ->
return "flowqueues"

@baseQueueNameForTask:(taskName, ignoreHost = false) ->
@baseQueueNameForTask:(jobName, taskName, ignoreHost = false) ->
interFix = "#{@hostname()}:"
if ignoreHost == true
interFix = ""
return "#{@baseKeyName()}:#{interFix}#{taskName}"
return "#{@baseKeyName()}:#{interFix}#{jobName}:#{taskName}"

@pendingQueueNameForTaskName: (taskName, queue) ->
@pendingQueueNameForTaskName: (jobName, taskName, queue) ->
ignoreHostName = (taskName == @firstTaskName)
return "#{@baseQueueNameForTask(taskName, ignoreHostName)}:#{queue}:pending"
return "#{@baseQueueNameForTask(jobName, taskName, ignoreHostName)}:#{queue}:pending"

@workingSetNameForTaskName:(taskName) ->
@workingSetNameForTaskName:(jobName, taskName) ->
return "#{@baseQueueNameForTask(taskName)}:working"
exports.Queue = Queue

42 changes: 20 additions & 22 deletions src/worker.coffee
Expand Up @@ -70,55 +70,53 @@ class Worker
next()
schedulePolling()

isWorkerAvailableForTaskName:(taskName, previouslyRemaining, cbs) ->
@workingCountForTaskName taskName, (count) =>
taskDescription = @config.taskDescriptions[taskName]
isWorkerAvailableForTaskName:(jobName, taskName, previouslyRemaining, cbs) ->
Queue.workingTasksCount jobName, taskName, (count) =>
jobDescription = @config.jobDescriptions[jobName]
taskDescription = jobDescription.taskDescriptions[taskName]
status = false
if count < taskDescription.concurrency
status = true
cbs(status, taskDescription.concurrency - count)

workingCountForTaskName:(taskName, cbs) ->
@dataSource.llen Queue.workingSetNameForTaskName(taskName), (err, length) =>
cbs(length)

performTaskOnJob: (job, taskDescription, queue, next, callback) ->
@registerJobInProgress job, taskDescription.name, (err) =>

performTaskOnJob: (jobName, job, taskDescription, queue, next, callback) ->
@registerJobInProgress jobName, job, taskDescription.name, (err) =>
process.nextTick () =>
TaskPerformer.performTask @config.jobsDir(), taskDescription, job, (status) =>
@sequencer.scheduleInvocation (done) =>
@unregisterJobInProgress job, taskDescription.name, (err) =>
@unregisterJobInProgress jobName, job, taskDescription.name, (err) =>
done()
nextTaskName = taskDescription.getNextTaskNameForKey(status)
log "Done #{taskDescription.name}!" if helpers.verbose()
if !nextTaskName?
callback()
else
@client.enqueueForTask nextTaskName, job, queue, () =>
@client.enqueueForTask jobName, nextTaskName, job, queue, () =>
#TODO: try swaping the two lines. Depth First vs Breadth First execution
@processTaskForName nextTaskName
@processTaskForName jobName, nextTaskName
callback()
#poor lonely instruction. end for @registerJobInProgress
next()

registerJobInProgress:(job, taskName, cbs) ->
data = helpers.encode(cbs)
@dataSource.rpush Queue.workingSetNameForTaskName(taskName), data, (err, _) =>
registerJobInProgress:(jobName, jobData, taskName, cbs) ->
data = helpers.encode(jobData)
@dataSource.rpush Queue.workingSetNameForTaskName(jobName, taskName), data, (err, _) =>
cbs(err)

unregisterJobInProgress:(job, taskName, cbs = null) ->
data = helpers.encode(cbs)
key = Queue.workingSetNameForTaskName(taskName)
unregisterJobInProgress:(jobName, jobData, taskName, cbs = null) ->
data = helpers.encode(jobData)
key = Queue.workingSetNameForTaskName(jobName, taskName)
@dataSource.lrem key, 1, data, (err, _) =>
if cbs?
cbs(err)

reserveJobOnQueue:(taskName, queue, cbs) ->
@dataSource.lpop Queue.pendingQueueNameForTaskName(taskName, queue), (err, res) =>
reserveJobOnQueue:(jobName, taskName, queue, cbs) ->
@dataSource.lpop Queue.pendingQueueNameForTaskName(jobName, taskName, queue), (err, res) =>
job = helpers.decode(res)
cbs(job)

reserveJob: (taskName, foundJobCbs, queue) ->
reserveJob: (jobName, taskName, foundJobCbs, queue) ->
queueIndex = 0
foundJob = null
#This is an async implementation of a break in a for loop using the "async" framework
Expand All @@ -131,7 +129,7 @@ class Worker
foundJobCbs(foundJob, @config.queues[queueIndex])

block = (cbs) =>
@reserveJobOnQueue taskName, @config.queues[queueIndex], (job) =>
@reserveJobOnQueue jobName, taskName, @config.queues[queueIndex], (job) =>
if job?
foundJob = job
else
Expand Down
39 changes: 20 additions & 19 deletions tests/samples/config.yml
@@ -1,23 +1,24 @@
workflows:
flowqueues_config:
jobs_dir: "../tests/samples"
- name: basic_flow
first_task: basic_task
tasks:
- name: basic_task
concurrency: 10
next:
success: basic_task2
workflows:
- name: basic_flow
first_task: basic_task
tasks:
- name: basic_task
concurrency: 10
next:
success: basic_task2

- name: basic_task2
concurrency: 100
- name: other_flow
first_task: basic_task2
tasks:
- name: basic_task
concurrency: 10
next:
success: basic_task2
- name: basic_task2
concurrency: 100
- name: other_flow
first_task: basic_task2
tasks:
- name: basic_task
concurrency: 10
next:
success: basic_task2

- name: basic_task2
concurrency: 100
- name: basic_task2
concurrency: 100

0 comments on commit a528b7f

Please sign in to comment.