/
workers.py
173 lines (147 loc) · 4.44 KB
/
workers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# -*- coding:utf-8 -*-
# ************************** Copyrights and license ***************************
#
# This file is part of gcovr 7.2+main, a parsing and reporting tool for gcov.
# https://gcovr.com/en/stable
#
# _____________________________________________________________________________
#
# Copyright (c) 2013-2024 the gcovr authors
# Copyright (c) 2013 Sandia Corporation.
# Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
# the U.S. Government retains certain rights in this software.
#
# This software is distributed under the 3-clause BSD License.
# For more information, see the README.rst file.
#
# ****************************************************************************
from threading import Thread, Condition, RLock
from contextlib import contextmanager
from queue import Queue, Empty
class LockedDirectories(object):
"""
Class that keeps a list of locked directories
"""
def __init__(self):
self.dirs = set()
self.cv = Condition()
def run_in(self, dir_):
"""
Start running in the directory and lock it
"""
self.cv.acquire()
while dir_ in self.dirs:
self.cv.wait()
self.dirs.add(dir_)
self.cv.release()
def done(self, dir_):
"""
Finished with the directory, unlock it
"""
self.cv.acquire()
self.dirs.remove(dir_)
self.cv.notify_all()
self.cv.release()
@contextmanager
def locked_directory(dir_):
"""
Context for doing something in a locked directory
"""
locked_directory.global_object.run_in(dir_)
try:
yield
finally:
locked_directory.global_object.done(dir_)
locked_directory.global_object = LockedDirectories()
def worker(queue, context, pool):
"""
Run work items from the queue until the sentinal
None value is hit
"""
while True:
work, args, kwargs = queue.get(True)
if not work:
break
kwargs.update(context)
try:
work(*args, **kwargs)
except: # noqa: E722
import sys
pool.raise_exception(sys.exc_info())
break
class Workers(object):
"""
Create a thread-pool which can be given work via an
add method and will run until work is complete
"""
def __init__(self, number, context):
assert number >= 1
self.q = Queue()
self.lock = RLock()
self.exceptions = []
self.contexts = [context() for _ in range(0, number)]
self.workers = [
Thread(target=worker, args=(self.q, c, self)) for c in self.contexts
]
for w in self.workers:
w.start()
def add(self, work, *args, **kwargs):
"""
Add in a method and the arguments to be used
when running it
"""
with self.lock:
if not self.exceptions:
self.q.put((work, args, kwargs))
def add_sentinels(self):
"""
Add the sentinels to the end of the queue so
the threads know to stop
"""
with self.lock:
for _ in self.workers:
self.q.put((None, [], dict()))
def drain(self):
"""
Drain the queue
"""
with self.lock:
while True:
try:
work, args, kwargs = self.q.get(False)
except Empty:
break
self.add_sentinels()
def raise_exception(self, exc_info):
"""
A thread has failed and needs to raise an exception.
"""
with self.lock:
self.drain()
self.exceptions.append(exc_info)
def size(self):
"""
Run the size of the thread pool
"""
return len(self.workers)
def wait(self):
"""
Wait until all work is complete
"""
self.add_sentinels()
for w in self.workers:
# Allow interrupts in Thread.join
while w.is_alive():
w.join(timeout=1)
for exc_type, exc_obj, exc_trace in self.exceptions:
import traceback
traceback.print_exception(exc_type, exc_obj, exc_trace)
if self.exceptions:
raise self.exceptions[0][1]
return self.contexts
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_val is not None:
self.drain()
self.wait()