-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
Processing.py
416 lines (364 loc) · 16.9 KB
/
Processing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
import multiprocessing
import queue
import os
import platform
import subprocess
from coalib.collecting.Collectors import collect_files
from coalib.collecting import Dependencies
from coalib.misc.StringConverter import StringConverter
from coalib.output.printers import LOG_LEVEL
from coalib.processes.BearRunning import run
from coalib.processes.CONTROL_ELEMENT import CONTROL_ELEMENT
from coalib.results.Result import Result
from coalib.results.RESULT_SEVERITY import RESULT_SEVERITY
from coalib.results.SourceRange import SourceRange
from coalib.settings.Setting import path_list
from coalib.processes.LogPrinterThread import LogPrinterThread
def get_cpu_count():
try:
return multiprocessing.cpu_count()
# cpu_count is not implemented for some CPU architectures/OSes
except NotImplementedError: # pragma: no cover
return 2
def fill_queue(queue_fill, any_list):
"""
Takes element from a list and populates a queue with those elements.
:param queue_fill: The queue to be filled.
:param any_list: List containing the elements.
"""
for elem in any_list:
queue_fill.put(elem)
def get_running_processes(processes):
return sum((1 if process.is_alive() else 0) for process in processes)
def create_process_group(command_array, **kwargs):
if platform.system() == "Windows": # pragma: no cover
proc = subprocess.Popen(
command_array,
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP,
**kwargs)
else:
proc = subprocess.Popen(command_array,
preexec_fn=os.setsid,
**kwargs)
return proc
def print_result(results,
file_dict,
retval,
print_results,
section,
log_printer,
file_diff_dict,
ignore_ranges):
"""
Takes the results produced by each bear and gives them to the print_results
method to present to the user.
:param results: A list of results.
:param file_dict: A dictionary containing the name of files and its
contents.
:param retval: It is True if no results were yielded ever before.
If it is False this function will return False no
matter what happens. Else it depends on if this
invocation yields results.
:param print_results: A function that prints all given results appropriate
to the output medium.
:param file_diff_dict: A dictionary that contains filenames as keys and
diff objects as values.
:param ignore_ranges: A list of SourceRanges. Results that affect code in
any of those ranges will be ignored.
:return: Returns False if any results were yielded. Else
True.
"""
min_severity_str = str(section.get('min_severity', 'INFO')).upper()
min_severity = RESULT_SEVERITY.str_dict.get(min_severity_str, 'INFO')
results = list(filter(lambda result:
type(result) is Result and
result.severity >= min_severity and
not result.to_ignore(ignore_ranges),
results))
print_results(log_printer, section, results, file_dict, file_diff_dict)
return retval or len(results) > 0
def get_file_dict(filename_list, log_printer):
"""
Reads all files into a dictionary.
:param filename_list: List of names of paths to files to get contents of.
:param log_printer: The logger which logs errors.
:return: Reads the content of each file into a dictionary
with filenames as keys.
"""
file_dict = {}
for filename in filename_list:
try:
with open(filename, "r", encoding="utf-8") as _file:
file_dict[filename] = _file.readlines()
except UnicodeDecodeError:
log_printer.warn("Failed to read file '{}'. It seems to contain "
"non-unicode characters. Leaving it "
"out.".format(filename))
except Exception as exception: # pragma: no cover
log_printer.log_exception("Failed to read file '{}' because of "
"an unknown error. Leaving it "
"out.".format(filename),
exception,
log_level=LOG_LEVEL.WARNING)
return file_dict
def instantiate_bears(section,
local_bear_list,
global_bear_list,
file_dict,
message_queue):
"""
Instantiates each bear with the arguments it needs.
:param section: The section the bears belong to.
:param local_bear_list: List of local bears to instantiate.
:param global_bear_list: List of global bears to instantiate.
:param file_dict: Dictionary containing filenames and their
contents.
:param message_queue: Queue responsible to maintain the messages
delivered by the bears.
"""
for i in range(len(local_bear_list)):
local_bear_list[i] = local_bear_list[i](section,
message_queue,
timeout=0.1)
for i in range(len(global_bear_list)):
global_bear_list[i] = global_bear_list[i](file_dict,
section,
message_queue,
timeout=0.1)
def instantiate_processes(section,
local_bear_list,
global_bear_list,
job_count,
log_printer):
"""
Instantiate the number of processes that will run bears which will be
responsible for running bears in a multiprocessing environment.
:param section: The section the bears belong to.
:param local_bear_list: List of local bears belonging to the section.
:param global_bear_list: List of global bears belonging to the section.
:param job_count: Max number of processes to create.
:param log_printer: The log printer to warn to.
:return: A tuple containing a list of processes,
and the arguments passed to each process which are
the same for each object.
"""
filename_list = collect_files(path_list(section.get('files', "")),
path_list(section.get('ignore', "")))
file_dict = get_file_dict(filename_list, log_printer)
manager = multiprocessing.Manager()
global_bear_queue = multiprocessing.Queue()
filename_queue = multiprocessing.Queue()
local_result_dict = manager.dict()
global_result_dict = manager.dict()
message_queue = multiprocessing.Queue()
control_queue = multiprocessing.Queue()
bear_runner_args = {"file_name_queue": filename_queue,
"local_bear_list": local_bear_list,
"global_bear_list": global_bear_list,
"global_bear_queue": global_bear_queue,
"file_dict": file_dict,
"local_result_dict": local_result_dict,
"global_result_dict": global_result_dict,
"message_queue": message_queue,
"control_queue": control_queue,
"timeout": 0.1}
instantiate_bears(section,
local_bear_list,
global_bear_list,
file_dict,
message_queue)
fill_queue(filename_queue, file_dict.keys())
fill_queue(global_bear_queue, range(len(global_bear_list)))
return ([multiprocessing.Process(target=run, kwargs=bear_runner_args)
for i in range(job_count)],
bear_runner_args)
def get_ignore_scope(line, keyword):
"""
Retrieves the bears that are to be ignored defined in the given line.
:param line: The line containing the ignore declaration.
:param keyword: The keyword that was found. Everything after the rightmost
occurrence of it will be considered for the scope.
:return: A list of lower cased bearnames or an empty list (-> "all")
"""
toignore = line[line.rfind(keyword) + len(keyword):]
if toignore.startswith("all"):
return []
else:
return list(StringConverter(toignore, list_delimiters=', '))
def yield_ignore_ranges(file_dict):
"""
Yields tuples of affected bears and a SourceRange that shall be ignored for
those.
:param file_dict: The file dictionary.
"""
for filename, file in file_dict.items():
start = None
bears = []
for line_number, line in enumerate(file, start=1):
line = line.lower()
if "start ignoring " in line:
start = line_number
bears = get_ignore_scope(line, "start ignoring ")
elif "stop ignoring" in line:
if start:
yield (bears,
SourceRange.from_values(filename,
start,
end_line=line_number))
elif "ignore " in line:
yield (get_ignore_scope(line, "ignore "),
SourceRange.from_values(filename,
line_number,
end_line=line_number+1))
def process_queues(processes,
control_queue,
local_result_dict,
global_result_dict,
file_dict,
print_results,
section,
log_printer):
"""
Iterate the control queue and send the results recieved to the print_result
method so that they can be presented to the user.
:param processes: List of processes which can be used to run
Bears.
:param control_queue: Containing control elements that indicate
whether there is a result available and which
bear it belongs to.
:param local_result_dict: Dictionary containing results respective to
local bears. It is modified by the processes
i.e. results are added to it by multiple
processes.
:param global_result_dict: Dictionary containing results respective to
global bears. It is modified by the processes
i.e. results are added to it by multiple
processes.
:param file_dict: Dictionary containing file contents with
filename as keys.
:param print_results: Prints all given results appropriate to the
output medium.
:return: Return True if all bears execute succesfully and
Results were delivered to the user. Else False.
"""
file_diff_dict = {}
running_processes = get_running_processes(processes)
retval = False
# Number of processes working on local bears
local_processes = len(processes)
global_result_buffer = []
ignore_ranges = list(yield_ignore_ranges(file_dict))
# One process is the logger thread
while local_processes > 1 and running_processes > 1:
try:
control_elem, index = control_queue.get(timeout=0.1)
if control_elem == CONTROL_ELEMENT.LOCAL_FINISHED:
local_processes -= 1
elif control_elem == CONTROL_ELEMENT.LOCAL:
assert local_processes != 0
retval = print_result(local_result_dict[index],
file_dict,
retval,
print_results,
section,
log_printer,
file_diff_dict,
ignore_ranges)
elif control_elem == CONTROL_ELEMENT.GLOBAL:
global_result_buffer.append(index)
except queue.Empty:
running_processes = get_running_processes(processes)
# Flush global result buffer
for elem in global_result_buffer:
retval = print_result(global_result_dict[elem],
file_dict,
retval,
print_results,
section,
log_printer,
file_diff_dict,
ignore_ranges)
running_processes = get_running_processes(processes)
# One process is the logger thread
while running_processes > 1:
try:
control_elem, index = control_queue.get(timeout=0.1)
if control_elem == CONTROL_ELEMENT.GLOBAL:
retval = print_result(global_result_dict[index],
file_dict,
retval,
print_results,
section,
log_printer,
file_diff_dict,
ignore_ranges)
else:
assert control_elem == CONTROL_ELEMENT.GLOBAL_FINISHED
running_processes = get_running_processes(processes)
except queue.Empty:
running_processes = get_running_processes(processes)
return retval
def execute_section(section,
global_bear_list,
local_bear_list,
print_results,
log_printer):
"""
Executes the section with the given bears.
The execute_section method does the following things:
1. Prepare a Process
* Load files
* Create queues
2. Spawn up one or more Processes
3. Output results from the Processes
4. Join all processes
:param section: The section to execute.
:param global_bear_list: List of global bears belonging to the section.
:param local_bear_list: List of local bears belonging to the section.
:param print_results: Prints all given results appropriate to the
output medium.
:param log_printer: The log_printer to warn to.
:return: Tuple containing a bool (True if results were
yielded, False otherwise), a Manager.dict
containing all local results(filenames are key)
and a Manager.dict containing all global bear
results (bear names are key) as well as the
file dictionary.
"""
local_bear_list = Dependencies.resolve(local_bear_list)
global_bear_list = Dependencies.resolve(global_bear_list)
try:
running_processes = int(section['jobs'])
except ValueError:
log_printer.warn("Unable to convert setting 'jobs' into a number. "
"Falling back to CPU count.")
running_processes = get_cpu_count()
except IndexError:
running_processes = get_cpu_count()
processes, arg_dict = instantiate_processes(section,
local_bear_list,
global_bear_list,
running_processes,
log_printer)
logger_thread = LogPrinterThread(arg_dict["message_queue"],
log_printer)
# Start and join the logger thread along with the processes to run bears
processes.append(logger_thread)
for runner in processes:
runner.start()
try:
return (process_queues(processes,
arg_dict["control_queue"],
arg_dict["local_result_dict"],
arg_dict["global_result_dict"],
arg_dict["file_dict"],
print_results,
section,
log_printer),
arg_dict["local_result_dict"],
arg_dict["global_result_dict"],
arg_dict["file_dict"])
finally:
logger_thread.running = False
for runner in processes:
runner.join()