Permalink
Browse files

experimenting with domains

  • Loading branch information...
balupton committed Aug 16, 2013
1 parent 4548aba commit 58f30f2d4548bf03b891fd4a39aefa5bdfcdad8b
Showing with 275 additions and 98 deletions.
  1. +1 −1 package.json
  2. +94 −38 src/lib/taskgroup.coffee
  3. +180 −59 src/test/taskgroup-test.coffee
View
@@ -37,7 +37,7 @@
"coffee-script": "~1.6.2",
"joe": "~1.2.0",
"joe-reporter-console": "~1.2.1",
- "chai": "~1.5.0"
+ "chai": "~1.7.1"
},
"directories": {
"lib": "./out/lib"
View
@@ -10,72 +10,123 @@ class Task extends EventEmitter
type: 'task' # for duck typing
result: null
running: false
+ completed: false
parent: null
# Config
name: null
- fn: null
+ method: null
args: null
+ # Create a new task
+ # - new Task(name, method)
+ # - new Task(method)
constructor: (args...) ->
# Prepare
super
- # Apply
- name = fn = null
+ # Prepare configuration
+ name = method = null
+
+ # Extract the configuration from the arguments
if args.length
+ # If we have both arguments then set name and method
if args.length is 2
- [name,fn] = args
+ [name, method] = args
+
+ # If we just have one argument then just set the method
else if args.length is 1
- [fn] = args
- @setConfig({name,fn})
+ [method] = args
+
+ # Apply configuration
+ @setConfig({name, method})
# Chain
@
setConfig: (opts={}) =>
- # Configure
+ # Apply the configuration directly to our instance
for own key,value of opts
@[key] = value
# Chain
@
run: ->
- # What happens once the task is complete
- complete = (args...) =>
- # Update our status
- @running = false
- @result = args
+ # Prepare
+ domain = null
+
+ # Define our uncaught error callback to put the task into its completion state
+ # as well as emit the error event
+ uncaughtErrorCallback = (args...) =>
+ # Dispose of our domain
+ if domain?
+ domain.dispose()
+ domain = null
+
+ # Apply our completion flags if we have not yet completed
+ unless @completed
+ @completed = true
+ @running = false
+ @result = args
+
+ # Fire our uncaught error handler
+ @emit('error', err)
+
+ # Define our completion callback to put the task into its completion state
+ # as well as emit the completion event
+ # we also check for unexpected double completion
+ completionCallback = (args...) =>
+ # Dispose of our domain
+ if domain?
+ domain.dispose()
+ domain = null
+
+ # Already completed?
+ if @completed is true
+ # We have already completed and this is unexpected so emit an error event
+ err = new Error("A task's completion callback has fired when the task was already in a completed state, this is unexpected")
+ @emit('error', err)
+
+ # Complete for the first (and hopefully only) time
+ else
+ # Update our flags
+ @completed = true
+ @running = false
+ @result = args
- # Complete
- @complete()
+ # Notify our listeners of our completion
+ @emit('complete', @result...)
- # Notify our intention
+ # Reset our flags
+ @completed = false
@running = true
+ @result = null
+
+ # Reset our domain
+ if domain?
+ domain.dispose()
+ domain = null
+ #domain = require('domain').create()
+
+ # Notify our intention
@emit('run')
# Give time for the listeners to complete before continuing
+ # needed for task groups
process.nextTick =>
- # Run it
- args = (@args or []).concat([complete])
- ambi(@fn.bind(@), args...)
+ # Add our completion callback to our specified arguments to send over to the method
+ args = (@args or []).concat([completionCallback])
+
+ # Fire the method be it asynchronously or synchronously with ambi
+ # and wrap it in a domain to ensure that the task executes safely
+ #domain.on('error', completionCallback)
+ #domain.run =>
+ ambi(@method.bind(@), args...)
# Chain
@
- complete: ->
- # Determine completion
- completed = @result? and @running is false
-
- # Continue completion
- if completed
- # Notify our listeners
- @emit('complete', @result...)
-
- # Return completion
- return completed
-
# Task Group
# Events
@@ -93,7 +144,7 @@ class TaskGroup extends EventEmitter
# Config
name: null
- fn: null
+ method: null
concurrency: 1 # use 0 for unlimited
pauseOnError: true # needs testing
@@ -105,21 +156,21 @@ class TaskGroup extends EventEmitter
@remaining = []
# Apply
- name = fn = null
+ name = method = null
if args.length
if args.length is 2
- [name,fn] = args
+ [name, method] = args
else if args.length is 1
- [fn] = args
- @setConfig({name,fn})
+ [method] = args
+ @setConfig({name, method})
# Give setConfig enough chance to fire
process.nextTick =>
# Auto run if we are going the inline style and have no parent
- if @fn
+ if @method
# Add the function as our first unamed task with the extra arguments
args = [@addGroup, @addTask]
- @addTask(@fn.bind(@)).setConfig({args,includeInResults:false})
+ @addTask(@method.bind(@)).setConfig({args, includeInResults:false})
# Proceed to run if we are the topmost group
@run() if !@parent
@@ -164,11 +215,16 @@ class TaskGroup extends EventEmitter
me = @
# Bubble item events
+ # be explicit about error events due to their special nature
item.onAny (args...) ->
+ # return if @event is 'error'
me.emit("item.#{@event}", item, args...)
+ item.on 'error', (err) ->
+ me.emit("item.error", item, args...)
+ me.exit(err)
# Notify our intention
- @emit('add',item)
+ @emit('add', item)
# Add the item
@remaining.push(item)
Oops, something went wrong.

0 comments on commit 58f30f2

Please sign in to comment.