Skip to content
This repository has been archived by the owner on May 2, 2022. It is now read-only.

Commit

Permalink
Merge branch 'v0.9.x-20170218-01-remove_copy_from' into v0.9.x
Browse files Browse the repository at this point in the history
  • Loading branch information
TaiSakuma committed Feb 19, 2017
2 parents 6d454ab + 7726385 commit 98a1a9e
Show file tree
Hide file tree
Showing 11 changed files with 178 additions and 263 deletions.
46 changes: 0 additions & 46 deletions AlphaTwirl/Loop/Associator.py

This file was deleted.

19 changes: 15 additions & 4 deletions AlphaTwirl/Loop/EventLoopRunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,29 @@

##__________________________________________________________________||
class EventLoopRunner(object):
"""This class runs instances of `EventLoop`.
"""This class runs instances of `EventLoop` and keeps the results. It
will return the results when `end()` is called.
"""
def __init__(self, progressMonitor = None):
if progressMonitor is None: progressMonitor = NullProgressMonitor()
self.progressReporter = progressMonitor.createReporter()
self.results = [ ]

def begin(self): pass
def __repr__(self):
return '{}(progressReporter = {!r}, results = {!r})'.format(
self.__class__.__name__,
self.progressReporter,
self.results
)

def begin(self):
self.results = [ ]

def run(self, eventLoop):
eventLoop(self.progressReporter)
self.results.append(eventLoop(self.progressReporter))

def end(self): pass
def end(self):
return self.results

##__________________________________________________________________||
33 changes: 27 additions & 6 deletions AlphaTwirl/Loop/EventReader.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Tai Sakuma <tai.sakuma@cern.ch>
import copy

from .EventLoop import EventLoop
from .Associator import Associator

##__________________________________________________________________||
class EventReader(object):
Expand All @@ -10,20 +11,25 @@ class EventReader(object):
split_into_build_events(), which splits the data set into chunks,
creates the function build_events() for each chunk, and returns a
list of the functions. Then, for each build_events(), This class
creates a reader associated with the collector, creates an event
loop, and send it to the event loop runner.
creates a copy of the reader, creates an event loop, and send it
to the event loop runner.
At the end, this class receives results from the event loop runner
and have the collector collect them.
"""
def __init__(self, eventLoopRunner, reader, collector,
split_into_build_events):

self.eventLoopRunner = eventLoopRunner
self.associator = Associator(reader, collector)
self.reader = reader
self.collector = collector
self.split_into_build_events = split_into_build_events

self.EventLoop = EventLoop

self.dataset_names = [ ]

def __repr__(self):
return '{}(eventLoopRunner = {!r}, reader = {!r}, collector = {!r}, split_into_build_events = {!r})'.format(
self.__class__.__name__,
Expand All @@ -35,16 +41,31 @@ def __repr__(self):

def begin(self):
self.eventLoopRunner.begin()
self.dataset_names = [ ]

def read(self, dataset):
build_events_list = self.split_into_build_events(dataset)
for build_events in build_events_list:
reader = self.associator.make(dataset.name)
self.dataset_names.append(dataset.name)
reader = copy.deepcopy(self.reader)
eventLoop = self.EventLoop(build_events, reader)
self.eventLoopRunner.run(eventLoop)

def end(self):
self.eventLoopRunner.end()
returned_readers = self.eventLoopRunner.end()

if len(self.dataset_names) != len(returned_readers):
import logging
logger = logging.getLogger(__name__)
logger.warning(
'the same number of the readers were not returned: {} readers sent, {} readers returned. cannot collect results'.format(
len(self.dataset_names),
len(returned_readers)
))
return None

for d, r in zip(self.dataset_names, returned_readers):
self.collector.addReader(d, r)
return self.collector.collect()

##__________________________________________________________________||
42 changes: 14 additions & 28 deletions AlphaTwirl/Loop/MPEventLoopRunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,19 @@ class MPEventLoopRunner(object):
After giving all event loops that you need to run to this class,
you need to call the method `end()`::
runner.end()
results = runner.end()
If workers are in the background, this method will wait until
workers finish running all event loops. If the worker is in the
foreground, this method immediately returns.
foreground, this method immediately returns. This method returns
the results, the list of the values eventLoops return, sorted in
the order given with `run()`.
"""

def __init__(self, communicationChannel):
self.communicationChannel = communicationChannel
self._original_readers = [ ]
self.nruns = 0

def __repr__(self):
return '{}(communicationChannel = {!r}'.format(
Expand All @@ -76,42 +78,26 @@ def run(self, eventLoop):
"""

self._original_readers.append(eventLoop.reader)
self.communicationChannel.put(eventLoop)
self.nruns += 1

def end(self):
"""wait until all event loops end
In addition, if necessary, this method also carries out a
somewhat complex copying operation because of the duplication
of objects that occurs in multiprocessing.
If eventLoops were executed in other processes, the readers in
the main process did not read the events; therefore, they
don't have the results. The readers in other processes read
the events. They have the results. The readers in other
process are pickled and sent back to the main process.
However, these returned readers are no longer the same objects
as the original readers in the main process.
The method copies the returned readers to the original readers
if they are different objects.
"""wait until all event loops end and returns the results.
"""

returned_readers = self.communicationChannel.receive()
results = self.communicationChannel.receive()

if len(self._original_readers) != len(returned_readers):
if self.nruns != len(results):
import logging
logger = logging.getLogger(__name__)
# logger.setLevel(logging.DEBUG)
logger.warning(
'the same number of the readers were not received: {} readers put, {} readers received'.format(
len(self._original_readers),
len(returned_readers)
'too fee results received: {} results received, {} expected'.format(
len(results),
self.nruns
))

for original, returned in zip(self._original_readers, returned_readers):
if original is returned: continue
original.copy_from(returned)
return results

##__________________________________________________________________||
1 change: 0 additions & 1 deletion AlphaTwirl/Loop/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from EventReader import EventReader
from EventLoopRunner import EventLoopRunner
from MPEventLoopRunner import MPEventLoopRunner
from Associator import Associator
from Collector import Collector
from NullCollector import NullCollector
from CollectorComposite import CollectorComposite
Expand Down
8 changes: 0 additions & 8 deletions docs/AlphaTwirl.Loop.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,6 @@ AlphaTwirl.Loop package
Submodules
----------

AlphaTwirl.Loop.Associator module
---------------------------------

.. automodule:: AlphaTwirl.Loop.Associator
:members:
:undoc-members:
:show-inheritance:

AlphaTwirl.Loop.Collector module
--------------------------------

Expand Down
37 changes: 0 additions & 37 deletions tests/unit/Loop/test_Associator.py

This file was deleted.

41 changes: 29 additions & 12 deletions tests/unit/Loop/test_EventLoopRunner.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,47 @@
from AlphaTwirl.Loop import EventLoopRunner
import unittest
from AlphaTwirl.Loop import EventLoopRunner

##__________________________________________________________________||
class MockResult(object): pass

##__________________________________________________________________||
class MockEventLoop(object):
def __init__(self):
self.called = False
def __init__(self, result):
self.result = result

def __call__(self, progressReporter):
self.called = True
return self.result

##__________________________________________________________________||
class TestEventLoopRunner(unittest.TestCase):

def setUp(self):
self.runner = EventLoopRunner()
self.obj = EventLoopRunner()

def test_repr(self):
repr(self.obj)

def test_begin(self):
self.runner.begin()
self.obj.begin()

def test_end(self):
self.obj.begin()
self.assertEqual([ ], self.obj.end())

def test_end_without_begin(self):
self.assertEqual([ ], self.obj.end())

def test_run(self):
loop = MockEventLoop()
self.assertFalse(loop.called)
self.runner.run(loop)
self.assertTrue(loop.called)
self.obj.begin()

def test_end(self):
self.runner.end()
result1 = MockResult()
loop1 = MockEventLoop(result1)
self.obj.run(loop1)

result2 = MockResult()
loop2 = MockEventLoop(result2)
self.obj.run(loop2)

self.assertEqual([result1, result2], self.obj.end())

##__________________________________________________________________||

0 comments on commit 98a1a9e

Please sign in to comment.