Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Specify Resources in App Decorator #1358

Closed
wants to merge 4 commits into from

Conversation

tjdasso
Copy link
Contributor

@tjdasso tjdasso commented Oct 14, 2019

Refer #1324 for issue, but this PR allows users to optionally specify resource requirements within the Parsl app decorator, which enables Work Queue to specify tasks to run with specific resource usage.

This can be done at the function level, such as:

@python_app(cores=4, mem=100, disk=100)
def example():
    ...

which would specify each function call to example() to require 4 cores, 100MB of memory, and 100MB of disk. Not all resources need to be specified: any combination of cores, memory, and disk can be specified, and if they are not listed, it will default to Work Queue's default settings (where one task consumes an entire worker).

Additionally, users have an option to specify resource requirements per task, by passing in resource requirements as a keyword argument to the function. More specifically, it uses the keyword argument parsl_resource_specification to map to a dictionary of resource requirements. For example:

@python_app
def example(parsl_resource_specification={cores=2, mem=100, disk=100}):
    ...

This allows for the use case where a user might want to specify different resource requirements for different iterations of the same function call, such as if one function call has a much bigger input which would require more computational resources. If resources are specified in both the decorator and as an argument to the function using the parsl_resource_specification keyword, it defaults to using the argument values, allowing the values within the decorator to act as "default" resource specifications for the app.

@tjdasso
Copy link
Contributor Author

tjdasso commented Oct 14, 2019

It looks like this PR is failing in CI because the other executors aren't expecting the additional kwarg that this PR creates (parsl_resource_specification), and thus fails when it passes in the kwargs to the function during execution. Is there any way to only add the kwargs if we know what executor is being used (ie, if we know that the executor is WQ Executor, which has built-in handling for this additional argument)?

@annawoodard
Copy link
Collaborator

@tjdasso: Yikes, I was not thinking carefully-- I'm sorry for giving you bad advice! Since inserting into kwargs was a hack anyways, I think the best way forward is to do this by changing the submit signature in the executor base class and allowing each executor implementation to decide what to do with it. All the existing executors ignore it except wqex. In this way, if someone implements a new executor, they'll know that they have access to the resource specification, if they want to use it. I haven't tested it with wqex, but here's a sketch of how this might look:

diff --git a/parsl/app/bash.py b/parsl/app/bash.py
index 9ab467c..5c1cb95 100644
--- a/parsl/app/bash.py
+++ b/parsl/app/bash.py
@@ -147,12 +147,6 @@ class BashApp(AppBase):
         """
         # Update kwargs in the app definition with ones passed in at calltime
         self.kwargs.update(kwargs)
-        if 'parsl_resource_specification' not in self.kwargs:
-            self.kwargs['parsl_resource_specification'] = {
-                'cores': self.cores,
-                'disk': self.disk,
-                'mem': self.mem
-            }

         if self.data_flow_kernel is None:
             dfk = DataFlowKernelLoader.dfk()
@@ -164,6 +158,9 @@ class BashApp(AppBase):
                              executors=self.executors,
                              fn_hash=self.func_hash,
                              cache=self.cache,
+                             cores=self.cores,
+                             disk=self.disk,
+                             mem=self.mem,
                              **self.kwargs)

         return app_fut
diff --git a/parsl/app/python.py b/parsl/app/python.py
index 5671d2d..e86cbb3 100644
--- a/parsl/app/python.py
+++ b/parsl/app/python.py
@@ -65,13 +65,6 @@ class PythonApp(AppBase):
         else:
             dfk = self.data_flow_kernel

-        if 'parsl_resource_specification' not in kwargs:
-            kwargs['parsl_resource_specification'] = {
-                'cores': self.cores,
-                'mem': self.mem,
-                'disk': self.disk
-            }
-
         walltime = self.kwargs.get('walltime')
         if walltime is not None:
             self.func = timeout(self.func, walltime)
@@ -79,6 +72,9 @@ class PythonApp(AppBase):
                              executors=self.executors,
                              fn_hash=self.func_hash,
                              cache=self.cache,
+                             cores=self.cores,
+                             disk=self.disk,
+                             mem=self.mem,
                              **kwargs)

         return app_fut
diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py
index 302225b..4ae3cfa 100644
--- a/parsl/dataflow/dflow.py
+++ b/parsl/dataflow/dflow.py
@@ -439,7 +439,7 @@ class DataFlowKernel(object):
                                                          self.monitoring.resource_monitoring_interval)

         with self.submitter_lock:
-            exec_fu = executor.submit(executable, *args, **kwargs)
+            exec_fu = executor.submit(executable, self.tasks[task_id]['resource_specification'], *args, **kwargs)
         self.tasks[task_id]['status'] = States.launched
         if self.monitoring is not None:
             task_log_info = self._create_task_log_info(task_id, 'lazy')
@@ -600,7 +600,7 @@ class DataFlowKernel(object):

         return new_args, kwargs, dep_failures

-    def submit(self, func, *args, executors='all', fn_hash=None, cache=False, **kwargs):
+    def submit(self, func, *args, executors='all', fn_hash=None, cache=False, cores=None, mem=None, disk=None, **kwargs):
         """Add task to the dataflow system.

         If the app task has the executors attributes not set (default=='all')
@@ -660,6 +660,12 @@ class DataFlowKernel(object):
                                 kw)
                     )

+        resource_specification = {
+            'cores': cores,
+            'mem': mem,
+            'disk': disk
+        }
+
         task_def = {'depends': None,
                     'executor': executor,
                     'func_name': func.__name__,
@@ -674,7 +680,8 @@ class DataFlowKernel(object):
                     'status': States.unsched,
                     'id': task_id,
                     'time_submitted': None,
-                    'time_returned': None}
+                    'time_returned': None,
+                    'resource_specification': resource_specification}

         app_fu = AppFuture(task_def)

diff --git a/parsl/executors/base.py b/parsl/executors/base.py
index 294a2f7..543dbba 100644
--- a/parsl/executors/base.py
+++ b/parsl/executors/base.py
@@ -37,7 +37,7 @@ class ParslExecutor(metaclass=ABCMeta):
         pass

     @abstractmethod
-    def submit(self, *args, **kwargs):
+    def submit(self, func, resource_specification, *args, **kwargs):
         """Submit.

         We haven't yet decided on what the args to this can be,
diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py
index 21f4918..11649fe 100644
--- a/parsl/executors/high_throughput/executor.py
+++ b/parsl/executors/high_throughput/executor.py
@@ -504,7 +504,7 @@ class HighThroughputExecutor(ParslExecutor, RepresentationMixin):
                 logger.debug("[HOLD_BLOCK]: Sending hold to manager: {}".format(manager['manager']))
                 self.hold_worker(manager['manager'])

-    def submit(self, func, *args, **kwargs):
+    def submit(self, func, resource_specification, *args, **kwargs):
         """Submits work to the the outgoing_q.

         The outgoing_q is an external process listens on this
diff --git a/parsl/executors/ipp.py b/parsl/executors/ipp.py
index 3faa412..307721f 100644
--- a/parsl/executors/ipp.py
+++ b/parsl/executors/ipp.py
@@ -212,7 +212,7 @@ sleep infinity
     def scaling_enabled(self):
         return self._scaling_enabled

-    def submit(self, *args, **kwargs):
+    def submit(self, func, resource_specification, *args, **kwargs):
         """Submits work to the thread pool.

         This method is simply pass through and behaves like a submit call as described
@@ -221,7 +221,7 @@ sleep infinity
         Returns:
               Future
         """
-        return self.lb_view.apply_async(*args, **kwargs)
+        return self.lb_view.apply_async(func, *args, **kwargs)

     def scale_out(self, blocks=1):
         """Scales out the number of active workers by 1.
diff --git a/parsl/executors/low_latency/executor.py b/parsl/executors/low_latency/executor.py
index d24737b..3a9b092 100644
--- a/parsl/executors/low_latency/executor.py
+++ b/parsl/executors/low_latency/executor.py
@@ -189,7 +189,7 @@ class LowLatencyExecutor(ParslExecutor, RepresentationMixin):

         logger.info("[MTHREAD] queue management worker finished")

-    def submit(self, func, *args, **kwargs):
+    def submit(self, func, resource_specification, *args, **kwargs):
         """ TODO: docstring """
         self._task_counter += 1
         task_id = self._task_counter
diff --git a/parsl/executors/threads.py b/parsl/executors/threads.py
index c981b0b..df8663b 100644
--- a/parsl/executors/threads.py
+++ b/parsl/executors/threads.py
@@ -54,14 +54,14 @@ class ThreadPoolExecutor(ParslExecutor, RepresentationMixin):
     def scaling_enabled(self):
         return self._scaling_enabled

-    def submit(self, *args, **kwargs):
+    def submit(self, func, resource_specification, *args, **kwargs):
         """Submits work to the thread pool.

         This method is simply pass through and behaves like a submit call as described
         here `Python docs: <https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor>`_

         """
-        return self.executor.submit(*args, **kwargs)
+        return self.executor.submit(func, *args, **kwargs)

     def scale_out(self, workers=1):
         """Scales out the number of active workers by 1.
diff --git a/parsl/executors/workqueue/executor.py b/parsl/executors/workqueue/executor.py
index 1ac492c..c14f819 100644
--- a/parsl/executors/workqueue/executor.py
+++ b/parsl/executors/workqueue/executor.py
@@ -547,7 +547,7 @@ class WorkQueueExecutor(ParslExecutor):
             index += 1
         return new_name

-    def submit(self, func, *args, **kwargs):
+    def submit(self, func, resource_specification, *args, **kwargs):
         """Submit.

         We haven't yet decided on what the args to this can be,
@@ -560,12 +560,6 @@ class WorkQueueExecutor(ParslExecutor):
         input_files = []
         output_files = []
         std_files = []
-        resources = {}
-
-        # Receive resource specifications from the kwargs, then remove it
-        if 'parsl_resource_specification' in kwargs:
-            resources = kwargs["parsl_resource_specification"]
-            kwargs.pop("parsl_resource_specification")

         # Add input files from the "inputs" keyword argument
         func_inputs = kwargs.get("inputs", [])
@@ -649,7 +643,7 @@ class WorkQueueExecutor(ParslExecutor):
         # Create message to put into the message queue
         logger.debug("Placing task {} on message queue".format(task_id))
         msg = {"task_id": task_id,
-               "resources": resources,
+               "resources": resource_specification,
                "data_loc": function_data_file,
                "result_loc": function_result_file,
                "input_files": input_files,

@annawoodard
Copy link
Collaborator

Unfortunately, specifying resources per task is now a bit awkward. Here is an example of how to do it. In my view, we can decide later if we want to add 'magic' keyword args (analogous to stderr and stdout currently) and not hold this PR up.

per task

import parsl
from parsl.app.app import python_app

parsl.set_stream_logger()
parsl.load()

def foo(name):
    print('Hello, {}!'.format(name))

python_app(foo, cores=4)('small per-task world')
python_app(foo, cores=8)('big per-task world')

per app

import parsl
from parsl.app.app import python_app

parsl.set_stream_logger()
parsl.load()

@python_app(cores=2)
def bar(name):
    print('Hello, {}!'.format(name))

bar('per-app world')

@benclifford
Copy link
Collaborator

@annawoodard if resource_specification is an object rather than a dict, it can be documented and type-checked (in the same way as Configs). This means it can't become a quagmire of random options so easily. Which might be a feature, or might be a downside - depending on whether you intend each executor implementation to be able to have its own arbitrary options not specified by parsl core, or whether you want parsl core to specify what options mean even if they aren't implemented everywhere. (I think the latter, because there's a hard-coded dict initializer in the DFK in your patch)

@benclifford
Copy link
Collaborator

Maybe for usability parsl should be emitting warning when it ignores fields in resource_specification.

@dgasmith
Copy link
Contributor

dgasmith commented Oct 15, 2019

One note is that memory might be better than mem from a user point of view. A simple parser for 100MB or 100GiB is usually advised as well. Best to double check the cores definition complies with the language used in the partial node PR.

@annawoodard Any harm in having thousand of python apps to comply with the new definition?

@benclifford Cannot say enough good things about Pydantic for automatic type checked and serializable objects.

@benclifford
Copy link
Collaborator

@dgasmith we're already using mypy for some static and typeguard for some dynamic checking using type hints. Issue #873 is a slow ongoing project that would welcome contributions.

@annawoodard
Copy link
Collaborator

annawoodard commented Oct 15, 2019

if resource_specification is an object rather than a dict, it can be documented and type-checked (in the same way as Configs).

That's a great point, @benclifford. Do you have a preference between adding each resource spec as a keyword arg to executor.submit (which could also be documented and type checked):

def submit(self, func, memory, disk, cores, *args, **kwargs)

and creating a new class? (I think I favor a new class.)

Maybe for usability parsl should be emitting warning when it ignores fields in resource_specification.

I don't follow-- we wouldn't know it was being ignored until it gets to the executor level, are you saying each executor should emit warnings if it ignores the resource specification? (I agree!)

One note is that memory might be better than mem from a user point of view.

@dgasmith Good point, agreed.

Any harm in having thousand of python apps to comply with the new definition?

That's a good point. Certainly creating more apps creates more overhead, but I would have to revisit the app machinery to understand better how large the effect would be. At the end of the day we'd need to benchmark to really know (and even then the effect would depend on the apps themselves-- I think most of the cost would be in serializing the function, which is done once per app). On the user side, surely you don't actually have thousands of combinations of resource requirements, so you could keep a dictionary of apps keyed on resource reqs as a 'poor person's cache'. In any case, I'm not opposed to introducing more magic keywords-- I just think it's something we should think carefully about as it can cause confusion (for example if someone doesn't know about them and it causes collisions with their keyword arguments)-- and for that reason I don't want that to hold this PR up.

@dthain
Copy link
Contributor

dthain commented Jan 15, 2020

Adding @tjuedema to this thread.

@dthain
Copy link
Contributor

dthain commented May 4, 2020

@ZhuozhaoLi is going to move towards a JSON resource spec here.

@dthain
Copy link
Contributor

dthain commented May 11, 2020

superseded by #1675

@yadudoc yadudoc closed this May 11, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants