forked from horovod/horovod
-
Notifications
You must be signed in to change notification settings - Fork 0
/
threads.py
95 lines (83 loc) · 3.2 KB
/
threads.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# Copyright 2019 Uber Technologies, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import threading
from six.moves import queue
def execute_function_multithreaded(fn,
args_list,
block_until_all_done=True,
max_concurrent_executions=1000):
"""
Executes fn in multiple threads each with one set of the args in the
args_list.
:param fn: function to be executed
:type fn:
:param args_list:
:type args_list: list(list)
:param block_until_all_done: if is True, function will block until all the
threads are done and will return the results of each thread's execution.
:type block_until_all_done: bool
:param max_concurrent_executions:
:type max_concurrent_executions: int
:return:
If block_until_all_done is False, returns None. If block_until_all_done is
True, function returns the dict of results.
{
index: execution result of fn with args_list[index]
}
:rtype: dict
"""
result_queue = queue.Queue()
worker_queue = queue.Queue()
for i, arg in enumerate(args_list):
arg.append(i)
worker_queue.put(arg)
def fn_execute():
while True:
try:
arg = worker_queue.get(block=False)
except queue.Empty:
return
exec_index = arg[-1]
res = fn(*arg[:-1])
result_queue.put((exec_index, res))
threads = []
number_of_threads = min(max_concurrent_executions, len(args_list))
for _ in range(number_of_threads):
thread = threading.Thread(target=fn_execute)
if not block_until_all_done:
thread.daemon = True
thread.start()
threads.append(thread)
# Returns the results only if block_until_all_done is set.
results = None
if block_until_all_done:
# Because join() cannot be interrupted by signal, a single join()
# needs to be separated into join()s with timeout in a while loop.
have_alive_child = True
while have_alive_child:
have_alive_child = False
for t in threads:
t.join(0.1)
if t.is_alive():
have_alive_child = True
results = {}
while not result_queue.empty():
item = result_queue.get()
results[item[0]] = item[1]
if len(results) != len(args_list):
raise RuntimeError(
'Some threads for func {func} did not complete '
'successfully.'.format(func=fn.__name__))
return results