Skip to content

Commit

Permalink
BearRunner: Do result ordering in SectionExecutor
Browse files Browse the repository at this point in the history
This removes the barrier from BearRunner and puts the responsibility of
ordering the results right to the SectionExecutor. This has the
following advantages:

 * Before nondeterministic wrong ordering could occur because the
   multiprocessing.Queue is not FIFO across processes.
 * Local bears can now run in parallel with global bears.

Fixes #525

Co-Author: Mischa Krueger (Makman2)
  • Loading branch information
sils committed May 16, 2015
1 parent bf052e2 commit 73d06c6
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 66 deletions.
25 changes: 8 additions & 17 deletions coalib/processes/BearRunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ def __init__(self,
global_result_dict,
message_queue,
control_queue,
barrier,
TIMEOUT=0):
"""
This is the object that actually runs on the processes
Expand Down Expand Up @@ -66,14 +65,11 @@ def __init__(self,
(for global results) or a file name to
indicate the result will be put to the
queue. If this BearRunner finished all its
tasks it will put
(CONTROL_ELEMENT.FINISHED, None) to the
queue.
:param barrier: a thing that has a wait() method. This will
be invoked after running the local bears and
may serve as a barrier to avoid getting
global results before local ones are
processed.
local bears it will put
(CONTROL_ELEMENT.LOCAL_FINISHED, None) to
the queue, if it finished all global ones,
(CONTROL_ELEMENT.GLOBAL_FINISHED, None) will
be put there.
:param TIMEOUT: in seconds for all queue actions
"""
if not isinstance(local_bear_list, list):
Expand Down Expand Up @@ -105,10 +101,6 @@ def __init__(self,
if not hasattr(control_queue, "put"):
raise TypeError("control_queue should be a queue like thing "
"(writing possible via 'put')")
if not hasattr(barrier, "wait"):
raise TypeError("barrier should be a barrier like thing ('wait' "
"method should be available)")

multiprocessing.Process.__init__(self)

self.filename_queue = file_name_queue
Expand All @@ -122,7 +114,6 @@ def __init__(self,
self.global_result_dict = global_result_dict
self.message_queue = message_queue
self.control_queue = control_queue
self.barrier = barrier

self.TIMEOUT = TIMEOUT

Expand All @@ -141,10 +132,10 @@ def debug(self, *args, delimiter=' ', end=''):

def run(self):
self.run_local_bears()
self.barrier.wait()
self.run_global_bears()
self.control_queue.put((CONTROL_ELEMENT.LOCAL_FINISHED, None))

self.control_queue.put((CONTROL_ELEMENT.FINISHED, None))
self.run_global_bears()
self.control_queue.put((CONTROL_ELEMENT.GLOBAL_FINISHED, None))

def run_local_bears(self):
try:
Expand Down
2 changes: 1 addition & 1 deletion coalib/processes/CONTROL_ELEMENT.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from coalib.misc.Enum import enum

CONTROL_ELEMENT = enum("LOCAL", "GLOBAL", "FINISHED")
CONTROL_ELEMENT = enum("LOCAL", "GLOBAL", "LOCAL_FINISHED", "GLOBAL_FINISHED")
40 changes: 30 additions & 10 deletions coalib/processes/SectionExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from coalib.output.printers import LOG_LEVEL
from coalib.processes.BearRunner import BearRunner
from coalib.processes.CONTROL_ELEMENT import CONTROL_ELEMENT
from coalib.processes.Barrier import Barrier
from coalib.settings.Section import Section
from coalib.settings.Setting import path_list
from coalib.output.Interactor import Interactor
Expand Down Expand Up @@ -126,20 +125,44 @@ def _process_queues(self,
file_dict):
running_processes = self._get_running_processes(processes)
retval = False
# Number of processes working on local bears
local_processes = running_processes
global_result_buffer = []

# One process is the logger thread
while local_processes > 1:
control_elem, index = control_queue.get(timeout=0.1)

if control_elem == CONTROL_ELEMENT.LOCAL_FINISHED:
local_processes -= 1
elif control_elem == CONTROL_ELEMENT.LOCAL:
assert local_processes != 0
self.interactor.print_results(local_result_dict[index],
file_dict)
retval = retval or len(local_result_dict[index]) > 0
elif control_elem == CONTROL_ELEMENT.GLOBAL:
global_result_buffer.append(index)

# Flush global result buffer
for elem in global_result_buffer:
self.interactor.print_results(global_result_dict[elem],
file_dict)
retval = retval or len(global_result_dict[elem]) > 0

running_processes = self._get_running_processes(processes)
# One process is the logger thread
while running_processes > 1:
try:
control_elem, index = control_queue.get(timeout=0.1)
if control_elem == CONTROL_ELEMENT.LOCAL:
self.interactor.print_results(local_result_dict[index],
file_dict)
retval = retval or len(local_result_dict[index]) > 0
elif control_elem == CONTROL_ELEMENT.GLOBAL:

if control_elem == CONTROL_ELEMENT.GLOBAL:
self.interactor.print_results(global_result_dict[index],
file_dict)
retval = retval or len(global_result_dict[index]) > 0
elif control_elem == CONTROL_ELEMENT.FINISHED:
else:
assert control_elem == CONTROL_ELEMENT.GLOBAL_FINISHED
running_processes = self._get_running_processes(processes)

except queue.Empty:
running_processes = self._get_running_processes(processes)

Expand Down Expand Up @@ -171,8 +194,6 @@ def _instantiate_processes(self, job_count):
message_queue = multiprocessing.Queue()
control_queue = multiprocessing.Queue()

barrier = Barrier(parties=job_count)

bear_runner_args = {"file_name_queue": filename_queue,
"local_bear_list": self.local_bear_list,
"global_bear_list": self.global_bear_list,
Expand All @@ -182,7 +203,6 @@ def _instantiate_processes(self, job_count):
"global_result_dict": global_result_dict,
"message_queue": message_queue,
"control_queue": control_queue,
"barrier": barrier,
"TIMEOUT": 0.1}

self._instantiate_bears(file_dict,
Expand Down
51 changes: 13 additions & 38 deletions coalib/tests/processes/BearRunnerTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from coalib.bears.LocalBear import LocalBear
from coalib.bears.GlobalBear import GlobalBear
from coalib.processes.BearRunner import BearRunner, LogMessage, LOG_LEVEL
from coalib.processes.Barrier import Barrier
from coalib.settings.Section import Section


Expand Down Expand Up @@ -110,7 +109,6 @@ def test_initialization(self):
global_result_dict = manager.dict()
message_queue = queue.Queue()
control_queue = queue.Queue()
barrier = Barrier(parties=1)
self.assertRaises(TypeError,
BearRunner,
0,
Expand All @@ -121,8 +119,7 @@ def test_initialization(self):
local_result_dict,
global_result_dict,
message_queue,
control_queue,
barrier)
control_queue)
self.assertRaises(TypeError,
BearRunner,
file_name_queue,
Expand All @@ -133,8 +130,7 @@ def test_initialization(self):
local_result_dict,
global_result_dict,
message_queue,
control_queue,
barrier)
control_queue)
self.assertRaises(TypeError,
BearRunner,
file_name_queue,
Expand All @@ -145,8 +141,7 @@ def test_initialization(self):
local_result_dict,
global_result_dict,
message_queue,
control_queue,
barrier)
control_queue)
self.assertRaises(TypeError,
BearRunner,
file_name_queue,
Expand All @@ -157,8 +152,7 @@ def test_initialization(self):
local_result_dict,
global_result_dict,
message_queue,
control_queue,
barrier)
control_queue)
self.assertRaises(TypeError,
BearRunner,
file_name_queue,
Expand All @@ -169,8 +163,7 @@ def test_initialization(self):
local_result_dict,
global_result_dict,
message_queue,
control_queue,
barrier)
control_queue)
self.assertRaises(TypeError,
BearRunner,
file_name_queue,
Expand All @@ -181,8 +174,7 @@ def test_initialization(self):
0,
global_result_dict,
message_queue,
control_queue,
barrier)
control_queue)
self.assertRaises(TypeError,
BearRunner,
file_name_queue,
Expand All @@ -193,8 +185,7 @@ def test_initialization(self):
local_result_dict,
0,
message_queue,
control_queue,
barrier)
control_queue)
self.assertRaises(TypeError,
BearRunner,
file_name_queue,
Expand All @@ -205,8 +196,7 @@ def test_initialization(self):
local_result_dict,
global_result_dict,
0,
control_queue,
barrier)
control_queue)
self.assertRaises(TypeError,
BearRunner,
file_name_queue,
Expand All @@ -217,19 +207,6 @@ def test_initialization(self):
local_result_dict,
global_result_dict,
message_queue,
0,
barrier)
self.assertRaises(TypeError,
BearRunner,
file_name_queue,
local_bear_list,
[],
global_bear_queue,
file_dict,
local_result_dict,
global_result_dict,
message_queue,
control_queue,
0)


Expand All @@ -247,7 +224,6 @@ def setUp(self):
self.global_result_dict = manager.dict()
self.message_queue = queue.Queue()
self.control_queue = queue.Queue()
self.barrier = Barrier(parties=1)
self.uut = BearRunner(self.file_name_queue,
self.local_bear_list,
self.global_bear_list,
Expand All @@ -256,8 +232,7 @@ def setUp(self):
self.local_result_dict,
self.global_result_dict,
self.message_queue,
self.control_queue,
self.barrier)
self.control_queue)

def test_inheritance(self):
self.assertIsInstance(self.uut, multiprocessing.Process)
Expand Down Expand Up @@ -348,7 +323,6 @@ def setUp(self):
self.global_result_dict = manager.dict()
self.message_queue = queue.Queue()
self.control_queue = queue.Queue()
self.barrier = Barrier(parties=1)
self.uut = BearRunner(self.file_name_queue,
self.local_bear_list,
self.global_bear_list,
Expand All @@ -357,8 +331,7 @@ def setUp(self):
self.local_result_dict,
self.global_result_dict,
self.message_queue,
self.control_queue,
self.barrier)
self.control_queue)

self.file1 = "file1"
self.file2 = "arbitrary"
Expand Down Expand Up @@ -415,13 +388,15 @@ def test_run(self):
severity=RESULT_SEVERITY.INFO)]


control_elem, index = self.control_queue.get()
self.assertEqual(control_elem, CONTROL_ELEMENT.LOCAL_FINISHED)
control_elem, index = self.control_queue.get()
self.assertEqual(control_elem, CONTROL_ELEMENT.GLOBAL)
real = self.global_result_dict[index]
self.assertEqual(sorted(global_results_expected), sorted(real))

control_elem, none = self.control_queue.get(timeout=0)
self.assertEqual(control_elem, CONTROL_ELEMENT.FINISHED)
self.assertEqual(control_elem, CONTROL_ELEMENT.GLOBAL_FINISHED)
self.assertEqual(none, None)

# The invalid bear gets a None in that dict for dependency resolution
Expand Down
Loading

0 comments on commit 73d06c6

Please sign in to comment.