-
-
Notifications
You must be signed in to change notification settings - Fork 4.6k
/
thread.py
53 lines (38 loc) · 1.5 KB
/
thread.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# -*- coding: utf-8 -*-
"""Thread execution pool."""
from __future__ import absolute_import, unicode_literals
import sys
from concurrent.futures import ThreadPoolExecutor, wait
from .base import BasePool, apply_target
__all__ = ('TaskPool',)
class ApplyResult(object):
def __init__(self, future):
self.f = future
self.get = self.f.result
def wait(self, timeout=None):
wait([self.f], timeout)
class TaskPool(BasePool):
"""Thread Task Pool."""
body_can_be_buffer = True
signal_safe = False
def __init__(self, *args, **kwargs):
super(TaskPool, self).__init__(*args, **kwargs)
# from 3.5, it is calculated from number of CPUs
if (3, 0) <= sys.version_info < (3, 5) and self.limit is None:
self.limit = 5
self.executor = ThreadPoolExecutor(max_workers=self.limit)
def on_stop(self):
self.executor.shutdown()
super(TaskPool, self).on_stop()
def on_apply(self, target, args=None, kwargs=None, callback=None,
accept_callback=None, **_):
f = self.executor.submit(apply_target, target, args, kwargs,
callback, accept_callback)
return ApplyResult(f)
def _get_info(self):
return {
'max-concurrency': self.limit,
'threads': len(self.executor._threads)
# TODO use a public api to retrieve the current number of threads
# in the executor when available. (Currently not available).
}