-
Notifications
You must be signed in to change notification settings - Fork 44
/
threads.py
91 lines (68 loc) · 3.01 KB
/
threads.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import logging
import sys
import concurrent.futures as cf
from parsl.executors.base import ParslExecutor
from parsl.dataflow.error import ConfigurationError
from parsl.utils import RepresentationMixin
logger = logging.getLogger(__name__)
class ThreadPoolExecutor(ParslExecutor, RepresentationMixin):
"""A thread-based executor.
Parameters
----------
max_threads : int
Number of threads. Default is 2.
thread_name_prefix : string
Thread name prefix (only supported in python v3.6+).
storage_access : list of :class:`~parsl.data_provider.scheme.Scheme`
Specifications for accessing data this executor remotely. Multiple `Scheme`s are not yet supported.
managed : bool
If True, parsl will control dynamic scaling of this executor, and be responsible. Otherwise,
this is managed by the user.
"""
def __init__(self, label='threads', max_threads=2, thread_name_prefix='', storage_access=None, working_dir=None, managed=True):
self.label = label
self._scaling_enabled = False
self.max_threads = max_threads
self.thread_name_prefix = thread_name_prefix
self.storage_access = storage_access if storage_access is not None else []
if len(self.storage_access) > 1:
raise ConfigurationError('Multiple storage access schemes are not yet supported')
self.working_dir = working_dir
self.managed = managed
def start(self):
if sys.version_info > (3, 6):
self.executor = cf.ThreadPoolExecutor(max_workers=self.max_threads,
thread_name_prefix=self.thread_name_prefix)
else:
self.executor = cf.ThreadPoolExecutor(max_workers=self.max_threads)
@property
def scaling_enabled(self):
return self._scaling_enabled
def submit(self, *args, **kwargs):
"""Submits work to the thread pool.
This method is simply pass through and behaves like a submit call as described
here `Python docs: <https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor>`_
"""
return self.executor.submit(*args, **kwargs)
def scale_out(self, workers=1):
"""Scales out the number of active workers by 1.
This method is notImplemented for threads and will raise the error if called.
Raises:
NotImplemented exception
"""
raise NotImplementedError
def scale_in(self, blocks):
"""Scale in the number of active blocks by specified amount.
This method is not implemented for threads and will raise the error if called.
Raises:
NotImplemented exception
"""
raise NotImplementedError
def shutdown(self, block=False):
"""Shutdown the ThreadPool.
Kwargs:
- block (Bool): To block for confirmations or not
"""
x = self.executor.shutdown(wait=block)
logger.debug("Done with executor shutdown")
return x