Skip to content

Commit

Permalink
Wrap all tasks as Task inside execute()
Browse files Browse the repository at this point in the history
Re #21
  • Loading branch information
bitprophet committed Oct 6, 2011
1 parent ce4ccf4 commit 32a8e0d
Showing 1 changed file with 6 additions and 11 deletions.
17 changes: 6 additions & 11 deletions fabric/tasks.py
Expand Up @@ -69,14 +69,6 @@ def __getattr__(self, k):
return getattr(self.wrapped, k)


def _run_task(task, args, kwargs):
# First, try class-based tasks
if hasattr(task, 'run') and callable(task.run):
return task.run(*args, **kwargs)
# Fallback to callable behavior
return task(*args, **kwargs)


def _get_pool_size(task, hosts):
# Default parallel pool size (calculate per-task in case variables
# change)
Expand Down Expand Up @@ -158,6 +150,9 @@ def execute(task, *args, **kwargs):
else:
dunder_name = getattr(task, '__name__', None)
my_env['command'] = getattr(task, 'name', dunder_name)
# Normalize to Task instance
if not hasattr(task, 'run'):
task = WrappedCallableTask(task)
# Filter out hosts/roles kwargs
new_kwargs = {}
hosts = []
Expand Down Expand Up @@ -216,7 +211,7 @@ def execute(task, *args, **kwargs):
def inner(*args, **kwargs):
key = normalize_to_string(state.env.host_string)
state.connections.pop(key, "")
_run_task(task, args, kwargs)
task.run(*args, **kwargs)
# Stuff into Process wrapper
p = multiprocessing.Process(target=inner, args=args,
kwargs=new_kwargs)
Expand All @@ -226,7 +221,7 @@ def inner(*args, **kwargs):
jobs.append(p)
# Handle serial execution
else:
_run_task(task, args, new_kwargs)
task.run(*args, **new_kwargs)

# If running in parallel, block until job queue is emptied
if jobs:
Expand All @@ -235,4 +230,4 @@ def inner(*args, **kwargs):
# Or just run once for local-only
else:
with settings(**my_env):
_run_task(task, args, new_kwargs)
task.run(*args, **new_kwargs)

0 comments on commit 32a8e0d

Please sign in to comment.