Skip to content

Commit

Permalink
Add cores option to decorator
Browse files Browse the repository at this point in the history
  • Loading branch information
tjdasso committed Oct 7, 2019
1 parent 21c8e06 commit 190a6b4
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 7 deletions.
12 changes: 8 additions & 4 deletions parsl/app/app.py
Expand Up @@ -21,7 +21,7 @@ class AppBase(metaclass=ABCMeta):
"""

def __init__(self, func, data_flow_kernel=None, walltime=60, executors='all', cache=False):
def __init__(self, func, data_flow_kernel=None, walltime=60, executors='all', cache=False, cores=1):
"""Construct the App object.
Args:
Expand All @@ -44,6 +44,7 @@ def __init__(self, func, data_flow_kernel=None, walltime=60, executors='all', ca
self.data_flow_kernel = data_flow_kernel
self.status = 'created'
self.executors = executors
self.cores = cores
self.cache = cache
if not (isinstance(executors, list) or isinstance(executors, str)):
logger.error("App {} specifies invalid executor option, expects string or list".format(
Expand Down Expand Up @@ -77,7 +78,7 @@ def __call__(self, *args, **kwargs):
pass


def App(apptype, data_flow_kernel=None, walltime=60, cache=False, executors='all'):
def App(apptype, data_flow_kernel=None, walltime=60, cache=False, cores=1, executors='all'):
"""The App decorator function.
Args:
Expand Down Expand Up @@ -114,11 +115,12 @@ def wrapper(f):
data_flow_kernel=data_flow_kernel,
walltime=walltime,
cache=cache,
cores=cores,
executors=executors)
return wrapper


def python_app(function=None, data_flow_kernel=None, walltime=60, cache=False, executors='all'):
def python_app(function=None, data_flow_kernel=None, walltime=60, cache=False, cores=1, executors='all'):
"""Decorator function for making python apps.
Parameters
Expand Down Expand Up @@ -146,14 +148,15 @@ def wrapper(f):
data_flow_kernel=data_flow_kernel,
walltime=walltime,
cache=cache,
cores=cores,
executors=executors)
return wrapper(func)
if function is not None:
return decorator(function)
return decorator


def bash_app(function=None, data_flow_kernel=None, walltime=60, cache=False, executors='all'):
def bash_app(function=None, data_flow_kernel=None, walltime=60, cache=False, cores=1, executors='all'):
"""Decorator function for making bash apps.
Parameters
Expand Down Expand Up @@ -181,6 +184,7 @@ def wrapper(f):
data_flow_kernel=data_flow_kernel,
walltime=walltime,
cache=cache,
cores=cores,
executors=executors)
return wrapper(func)
if function is not None:
Expand Down
5 changes: 3 additions & 2 deletions parsl/app/bash.py
Expand Up @@ -119,8 +119,8 @@ def open_std_fd(fdname):

class BashApp(AppBase):

def __init__(self, func, data_flow_kernel=None, walltime=60, cache=False, executors='all'):
super().__init__(func, data_flow_kernel=data_flow_kernel, walltime=60, executors=executors, cache=cache)
def __init__(self, func, data_flow_kernel=None, walltime=60, cache=False, cores=1, executors='all'):
super().__init__(func, data_flow_kernel=data_flow_kernel, walltime=60, executors=executors, cores=cores, cache=cache)
self.kwargs = {}

# We duplicate the extraction of parameter defaults
Expand All @@ -147,6 +147,7 @@ def __call__(self, *args, **kwargs):
"""
# Update kwargs in the app definition with ones passed in at calltime
self.kwargs.update(kwargs)
self.kwargs["cores"] = self.cores

if self.data_flow_kernel is None:
dfk = DataFlowKernelLoader.dfk()
Expand Down
5 changes: 4 additions & 1 deletion parsl/app/python.py
Expand Up @@ -35,12 +35,13 @@ def inject_exception(thread):
class PythonApp(AppBase):
"""Extends AppBase to cover the Python App."""

def __init__(self, func, data_flow_kernel=None, walltime=60, cache=False, executors='all'):
def __init__(self, func, data_flow_kernel=None, walltime=60, cache=False, cores=1, executors='all'):
super().__init__(
wrap_error(func),
data_flow_kernel=data_flow_kernel,
walltime=walltime,
executors=executors,
cores=cores,
cache=cache
)

Expand All @@ -62,6 +63,8 @@ def __call__(self, *args, **kwargs):
else:
dfk = self.data_flow_kernel

kwargs["cores"] = self.cores

walltime = self.kwargs.get('walltime')
if walltime is not None:
self.func = timeout(self.func, walltime)
Expand Down

0 comments on commit 190a6b4

Please sign in to comment.