Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Specify Resources in App Decorator #1358

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 16 additions & 4 deletions parsl/app/app.py
Expand Up @@ -21,7 +21,7 @@ class AppBase(metaclass=ABCMeta):

"""

def __init__(self, func, data_flow_kernel=None, walltime=60, executors='all', cache=False):
def __init__(self, func, data_flow_kernel=None, walltime=60, executors='all', cache=False, cores=None, disk=None, mem=None):
"""Construct the App object.

Args:
Expand All @@ -44,6 +44,9 @@ def __init__(self, func, data_flow_kernel=None, walltime=60, executors='all', ca
self.data_flow_kernel = data_flow_kernel
self.status = 'created'
self.executors = executors
self.cores = cores
self.disk = disk
self.mem = mem
self.cache = cache
if not (isinstance(executors, list) or isinstance(executors, str)):
logger.error("App {} specifies invalid executor option, expects string or list".format(
Expand Down Expand Up @@ -77,7 +80,7 @@ def __call__(self, *args, **kwargs):
pass


def App(apptype, data_flow_kernel=None, walltime=60, cache=False, executors='all'):
def App(apptype, data_flow_kernel=None, walltime=60, cache=False, cores=None, mem=None, disk=None, executors='all'):
"""The App decorator function.

Args:
Expand Down Expand Up @@ -114,11 +117,14 @@ def wrapper(f):
data_flow_kernel=data_flow_kernel,
walltime=walltime,
cache=cache,
cores=cores,
mem=mem,
disk=disk,
executors=executors)
return wrapper


def python_app(function=None, data_flow_kernel=None, walltime=60, cache=False, executors='all'):
def python_app(function=None, data_flow_kernel=None, walltime=60, cache=False, cores=None, mem=None, disk=None, executors='all'):
"""Decorator function for making python apps.

Parameters
Expand Down Expand Up @@ -146,14 +152,17 @@ def wrapper(f):
data_flow_kernel=data_flow_kernel,
walltime=walltime,
cache=cache,
cores=cores,
mem=mem,
disk=disk,
executors=executors)
return wrapper(func)
if function is not None:
return decorator(function)
return decorator


def bash_app(function=None, data_flow_kernel=None, walltime=60, cache=False, executors='all'):
def bash_app(function=None, data_flow_kernel=None, walltime=60, cache=False, cores=None, mem=None, disk=None, executors='all'):
"""Decorator function for making bash apps.

Parameters
Expand Down Expand Up @@ -181,6 +190,9 @@ def wrapper(f):
data_flow_kernel=data_flow_kernel,
walltime=walltime,
cache=cache,
cores=cores,
mem=mem,
disk=disk,
executors=executors)
return wrapper(func)
if function is not None:
Expand Down
10 changes: 8 additions & 2 deletions parsl/app/bash.py
Expand Up @@ -119,8 +119,8 @@ def open_std_fd(fdname):

class BashApp(AppBase):

def __init__(self, func, data_flow_kernel=None, walltime=60, cache=False, executors='all'):
super().__init__(func, data_flow_kernel=data_flow_kernel, walltime=60, executors=executors, cache=cache)
def __init__(self, func, data_flow_kernel=None, walltime=60, cache=False, cores=None, mem=None, disk=None, executors='all'):
super().__init__(func, data_flow_kernel=data_flow_kernel, walltime=60, executors=executors, cores=cores, mem=mem, disk=disk, cache=cache)
self.kwargs = {}

# We duplicate the extraction of parameter defaults
Expand All @@ -147,6 +147,12 @@ def __call__(self, *args, **kwargs):
"""
# Update kwargs in the app definition with ones passed in at calltime
self.kwargs.update(kwargs)
if 'parsl_resource_specification' not in self.kwargs:
self.kwargs['parsl_resource_specification'] = {
'cores': self.cores,
'disk': self.disk,
'mem': self.mem
}

if self.data_flow_kernel is None:
dfk = DataFlowKernelLoader.dfk()
Expand Down
12 changes: 11 additions & 1 deletion parsl/app/python.py
Expand Up @@ -35,12 +35,15 @@ def inject_exception(thread):
class PythonApp(AppBase):
"""Extends AppBase to cover the Python App."""

def __init__(self, func, data_flow_kernel=None, walltime=60, cache=False, executors='all'):
def __init__(self, func, data_flow_kernel=None, walltime=60, cache=False, cores=None, disk=None, mem=None, executors='all'):
super().__init__(
wrap_error(func),
data_flow_kernel=data_flow_kernel,
walltime=walltime,
executors=executors,
cores=cores,
disk=disk,
mem=mem,
cache=cache
)

Expand All @@ -62,6 +65,13 @@ def __call__(self, *args, **kwargs):
else:
dfk = self.data_flow_kernel

if 'parsl_resource_specification' not in kwargs:
kwargs['parsl_resource_specification'] = {
'cores': self.cores,
'mem': self.mem,
'disk': self.disk
}

walltime = self.kwargs.get('walltime')
if walltime is not None:
self.func = timeout(self.func, walltime)
Expand Down
19 changes: 19 additions & 0 deletions parsl/executors/workqueue/executor.py
Expand Up @@ -120,6 +120,7 @@ def WorkQueueSubmitThread(task_queue=multiprocessing.Queue(),
input_files = item["input_files"]
output_files = item["output_files"]
std_files = item["std_files"]
resource_specs = item["resources"]

full_script_name = workqueue_worker.__file__
script_name = full_script_name.split("/")[-1]
Expand Down Expand Up @@ -182,6 +183,17 @@ def WorkQueueSubmitThread(task_queue=multiprocessing.Queue(),
for item in std_files:
t.specify_file(item[0], item[1], WORK_QUEUE_OUTPUT, cache=item[2])

# Specify resource requirements for the task
if "cores" in resource_specs and resource_specs["cores"] is not None:
t.specify_cores(resource_specs["cores"])
logger.debug("Task {} using {} cores".format(t.id, resource_specs["cores"]))
if "disk" in resource_specs and resource_specs["disk"] is not None:
t.specify_disk(resource_specs["disk"])
logger.debug("Task {} using {} MB disk".format(t.id, resource_specs["disk"]))
if "mem" in resource_specs and resource_specs["mem"] is not None:
t.specify_memory(resource_specs["mem"])
logger.debug("Task {} using {} MB memory".format(t.id, resource_specs["mem"]))

# Submit the task to the WorkQueue object
logger.debug("Submitting task {} to WorkQueue".format(parsl_id))
try:
Expand Down Expand Up @@ -548,6 +560,12 @@ def submit(self, func, *args, **kwargs):
input_files = []
output_files = []
std_files = []
resources = {}

# Receive resource specifications from the kwargs, then remove it
if 'parsl_resource_specification' in kwargs:
resources = kwargs["parsl_resource_specification"]
kwargs.pop("parsl_resource_specification")

# Add input files from the "inputs" keyword argument
func_inputs = kwargs.get("inputs", [])
Expand Down Expand Up @@ -631,6 +649,7 @@ def submit(self, func, *args, **kwargs):
# Create message to put into the message queue
logger.debug("Placing task {} on message queue".format(task_id))
msg = {"task_id": task_id,
"resources": resources,
"data_loc": function_data_file,
"result_loc": function_result_file,
"input_files": input_files,
Expand Down