Skip to content
This repository has been archived by the owner on May 2, 2022. It is now read-only.

Commit

Permalink
Merge branch 'master-20180221-01-progressReporter'
Browse files Browse the repository at this point in the history
  • Loading branch information
TaiSakuma committed Feb 22, 2018
2 parents 751f090 + 5edbaba commit 171bd8f
Show file tree
Hide file tree
Showing 13 changed files with 314 additions and 364 deletions.
16 changes: 8 additions & 8 deletions alphatwirl/concurrently/CommunicationChannel0.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Tai Sakuma <tai.sakuma@gmail.com>
from ..progressbar import NullProgressMonitor

from alphatwirl import progressbar

##__________________________________________________________________||
class CommunicationChannel0(object):
Expand All @@ -17,7 +18,7 @@ class CommunicationChannel0(object):
"""

def __init__(self, progressMonitor=None):
self.progressMonitor = NullProgressMonitor() if progressMonitor is None else progressMonitor
self.progressMonitor = progressbar.NullProgressMonitor() if progressMonitor is None else progressMonitor
self.results = [ ]

def __repr__(self):
Expand All @@ -26,13 +27,11 @@ def __repr__(self):
)

def begin(self):
self.progressReporter = self.progressMonitor.createReporter()
reporter = self.progressMonitor.createReporter()
progressbar._progress_reporter = reporter

def put(self, task, *args, **kwargs):
try:
result = task(progressReporter=self.progressReporter, *args, **kwargs)
except TypeError:
result = task(*args, **kwargs)
result = task(*args, **kwargs)
self.results.append(result)

def put_multiple(self, task_args_kwargs_list):
Expand All @@ -52,6 +51,7 @@ def receive(self):

def terminate(self): pass

def end(self): pass
def end(self):
progressbar._progress_reporter = None

##__________________________________________________________________||
12 changes: 4 additions & 8 deletions alphatwirl/concurrently/Worker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Tai Sakuma <tai.sakuma@gmail.com>
import multiprocessing

from alphatwirl import progressbar

##__________________________________________________________________||
class Worker(multiprocessing.Process):
def __init__(self, task_queue, result_queue, lock, progressReporter):
Expand All @@ -11,21 +13,15 @@ def __init__(self, task_queue, result_queue, lock, progressReporter):
self.progressReporter = progressReporter

def run(self):
progressbar._progress_reporter = self.progressReporter
while True:
message = self.task_queue.get()
if message is None:
self.task_queue.task_done()
break
task_idx, package = message
result = self._run_task(package)
result = package.task(*package.args, **package.kwargs)
self.task_queue.task_done()
self.result_queue.put((task_idx, result))

def _run_task(self, package):
try:
result = package.task(progressReporter = self.progressReporter, *package.args, **package.kwargs)
except TypeError:
result = package.task(*package.args, **package.kwargs)
return result

##__________________________________________________________________||
27 changes: 21 additions & 6 deletions alphatwirl/loop/CollectorComposite.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
# Tai Sakuma <tai.sakuma@gmail.com>
import logging

from ..progressbar import ProgressReport

import alphatwirl

class DeprecatedOption(object): pass
DEPRECATEDOPTION = DeprecatedOption()

logger = logging.getLogger(__name__)

##__________________________________________________________________||
class CollectorComposite(object):

Expand All @@ -13,18 +22,25 @@ class CollectorComposite(object):
"""

def __init__(self, progressReporter = None):
def __init__(self, progressReporter=DEPRECATEDOPTION):

if progressReporter is not DEPRECATEDOPTION:
text = '{}: the option "{}" is deprecated.'.format(
self.__class__.__name__,
'progressReporter'
)
logger.warning(text)

self.components = [ ]
self.progressReporter = progressReporter

def __repr__(self):
name_value_pairs = (
('components', self.components),
('progressReporter', self.progressReporter),
)
return '{}({})'.format(
self.__class__.__name__,
', '.join(['{} = {!r}'.format(n, v) for n, v in name_value_pairs]),
', '.join(['{}={!r}'.format(n, v) for n, v in name_value_pairs]),
)

def add(self, collector):
Expand All @@ -48,9 +64,8 @@ def collect(self, dataset_readers_list):

ret = [ ]
for i, collector in enumerate(self.components):
if self.progressReporter is not None:
report = ProgressReport(name = 'collecting results', done = i + 1, total = len(self.components))
self.progressReporter.report(report)
report = ProgressReport(name = 'collecting results', done = i + 1, total = len(self.components))
alphatwirl.progressbar.report_progress(report)
ret.append(collector.collect([(dataset, tuple(r.readers[i] for r in readerComposites))
for dataset, readerComposites in dataset_readers_list]))
return ret
Expand Down
23 changes: 14 additions & 9 deletions alphatwirl/loop/EventLoop.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# Tai Sakuma <tai.sakuma@gmail.com>
from .EventLoopProgressReportWriter import EventLoopProgressReportWriter
import uuid

import alphatwirl

from .EventLoopProgressReportWriter import EventLoopProgressReportWriter

##__________________________________________________________________||
class EventLoop(object):
"""An event loop
Expand All @@ -15,26 +18,28 @@ def __init__(self, build_events, reader):
self.taskid = uuid.uuid4()

def __repr__(self):
return '{}(build_events = {!r}, reader = {!r}, progressReportWriter = {!r})'.format(
return '{}(build_events={!r}, reader={!r}, progressReportWriter={!r})'.format(
self.__class__.__name__,
self.build_events,
self.reader,
self.progressReportWriter
)

def __call__(self, progressReporter = None):
def __call__(self):
events = self.build_events()
self._reportProgress(progressReporter, events)
self._reportProgress(events)
self.reader.begin(events)
for event in events:
self._reportProgress(progressReporter, event)
self._reportProgress(event)
self.reader.event(event)
self.reader.end()
return self.reader

def _reportProgress(self, progressReporter, event):
if progressReporter is None: return
report = self.progressReportWriter.write(self.taskid, event.config, event)
progressReporter.report(report)
def _reportProgress(self, event):
try:
report = self.progressReportWriter.write(self.taskid, event.config, event)
alphatwirl.progressbar.report_progress(report)
except:
pass

##__________________________________________________________________||
7 changes: 6 additions & 1 deletion alphatwirl/progressbar/BProgressMonitor.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# Tai Sakuma <tai.sakuma@gmail.com>
import multiprocessing

from .ProgressReporter import ProgressReporter
from .ProgressReportPickup import ProgressReportPickup

import multiprocessing
import alphatwirl

##__________________________________________________________________||
class BProgressMonitor(object):
Expand Down Expand Up @@ -73,8 +75,11 @@ def __repr__(self):
def begin(self):
self.pickup = ProgressReportPickup(self.queue, self.presentation)
self.pickup.start()
reporter = self.createReporter()
alphatwirl.progressbar._progress_reporter = reporter

def end(self):
alphatwirl.progressbar._progress_reporter = None
self.queue.put(None)
self.pickup.join()

Expand Down
2 changes: 1 addition & 1 deletion alphatwirl/progressbar/ProgressReporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(self, queue):
self._read_time()

def __repr__(self):
return '{}(queue = {!r}, interval = {!r}'.format(
return '{}(queue={!r}, interval={!r})'.format(
self.__class__.__name__,
self.queue,
self.interval
Expand Down
10 changes: 10 additions & 0 deletions alphatwirl/progressbar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,13 @@
from .ProgressReportPickup import ProgressReportPickup
from .ProgressReport import ProgressReport
from .ProgressReporter import ProgressReporter

##__________________________________________________________________||
_progress_reporter = None

def report_progress(report):
if _progress_reporter is None:
return
_progress_reporter.report(report)

##__________________________________________________________________||
55 changes: 33 additions & 22 deletions tests/misc/progressbar.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,53 @@
import time, random
import uuid

import argparse

import alphatwirl

##__________________________________________________________________||
parser = argparse.ArgumentParser()
parser.add_argument('--parallel-mode', default='multiprocessing', choices=['multiprocessing', 'subprocess', 'htcondor'], help='mode for concurrency')
parser.add_argument('-p', '--process', default=16, type=int, help='number of processes to run in parallel')
parser.add_argument('-q', '--quiet', default=False, action='store_true', help='quiet mode')
args = parser.parse_args()

##__________________________________________________________________||
from alphatwirl.progressbar import ProgressReport

class Task(object):
def __init__(self, name):
self.name = name
def __call__(self, progressReporter=None):
n = random.randint(5, 1000000)
def __call__(self):
## n = random.randint(5, 1000000)
n = random.randint(5, 100000)
taskid = uuid.uuid4()
time.sleep(random.randint(0, 3))
for i in range(n):
time.sleep(0.0001)
report = alphatwirl.progressbar.ProgressReport(name=self.name, done=i + 1, total=n, taskid=taskid)
progressReporter.report(report)
report = ProgressReport(name=self.name, done=(i + 1), total=n, taskid=taskid)
alphatwirl.progressbar.report_progress(report)
return None

##__________________________________________________________________||
progressBar = alphatwirl.progressbar.ProgressBar() if sys.stdout.isatty() else alphatwirl.progressbar.ProgressPrint()
parallel = alphatwirl.parallel.build_parallel(
parallel_mode=args.parallel_mode,
quiet=args.quiet,
processes=args.process
)

##__________________________________________________________________||
progressMonitor = alphatwirl.progressbar.BProgressMonitor(presentation=progressBar)
dropbox = alphatwirl.concurrently.MultiprocessingDropbox(nprocesses=10, progressMonitor=progressMonitor)
channel = alphatwirl.concurrently.CommunicationChannel(dropbox)
progressMonitor.begin()
channel.begin()
channel.put(Task("loop"))
channel.put(Task("another loop"))
channel.put(Task("more loop"))
channel.put(Task("loop loop loop"))
channel.put(Task("l"))
channel.put(Task("loop6"))
channel.put(Task("loop7"))
channel.put(Task("loop8"))
channel.put(Task("loop6"))
channel.receive()
channel.end()
progressMonitor.end()
parallel.begin()
parallel.communicationChannel.put(Task("loop"))
parallel.communicationChannel.put(Task("another loop"))
parallel.communicationChannel.put(Task("more loop"))
parallel.communicationChannel.put(Task("loop loop loop"))
parallel.communicationChannel.put(Task("l"))
parallel.communicationChannel.put(Task("loop6"))
parallel.communicationChannel.put(Task("loop7"))
parallel.communicationChannel.put(Task("loop8"))
parallel.communicationChannel.put(Task("loop6"))
parallel.communicationChannel.receive()
parallel.end()

##__________________________________________________________________||
39 changes: 20 additions & 19 deletions tests/unit/concurrently/test_CommunicationChannel0.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,21 @@
import mock

from alphatwirl.concurrently import CommunicationChannel0
from alphatwirl import progressbar

##__________________________________________________________________||
class MockProgressReporter(object):
pass

@pytest.fixture()
def mock_progressmonitor():
ret = mock.MagicMock()
ret.createReporter.return_value = MockProgressReporter()
return ret

@pytest.fixture()
def obj():
return CommunicationChannel0()
def obj(mock_progressmonitor):
return CommunicationChannel0(progressMonitor=mock_progressmonitor)

##__________________________________________________________________||
def test_repr(obj):
Expand All @@ -20,7 +30,9 @@ def test_repr(obj):
##__________________________________________________________________||
def test_begin_end(obj):
obj.begin()
assert isinstance(progressbar._progress_reporter, MockProgressReporter)
obj.end()
assert progressbar._progress_reporter is None

##__________________________________________________________________||
def test_begin_begin_end(obj):
Expand All @@ -41,18 +53,7 @@ def test_put_receive(obj):
task1 = mock.Mock(name='task1')
task1.return_value = result1
obj.put(task1, 123, 'ABC', A=34)
assert [mock.call(123, 'ABC', A=34, progressReporter=None)] == task1.call_args_list
assert [result1] == obj.receive()
obj.end()

##__________________________________________________________________||
def test_put_receive_typeerror(obj):
obj.begin()
result1 = mock.Mock(name='result1')
task1 = mock.Mock(name='task1')
task1.side_effect = [TypeError, result1]
obj.put(task1, 123, 'ABC', A=34)
assert [mock.call(123, 'ABC', A=34, progressReporter=None), mock.call(123, 'ABC', A=34)] == task1.call_args_list
assert [mock.call(123, 'ABC', A=34)] == task1.call_args_list
assert [result1] == obj.receive()
obj.end()

Expand Down Expand Up @@ -159,7 +160,7 @@ def test_put_multiple(obj):

result3 = mock.Mock(name='result3')
task3 = mock.Mock(name='task3')
task3.side_effect = [TypeError, result3]
task3.return_value = result3

result4 = mock.Mock(name='result4')
task4 = mock.Mock(name='task4')
Expand All @@ -172,10 +173,10 @@ def test_put_multiple(obj):
dict(task=task4, args=(222, 'def')),
])

assert [mock.call(progressReporter=None)] == task1.call_args_list
assert [mock.call(123, 'ABC', A=34, progressReporter=None)] == task2.call_args_list
assert [mock.call(B=123, progressReporter=None), mock.call(B=123)] == task3.call_args_list
assert [mock.call(222, 'def', progressReporter=None)] == task4.call_args_list
assert [mock.call()] == task1.call_args_list
assert [mock.call(123, 'ABC', A=34)] == task2.call_args_list
assert [mock.call(B=123)] == task3.call_args_list
assert [mock.call(222, 'def')] == task4.call_args_list
assert [result1, result2, result3, result4] == obj.receive()

obj.end()
Expand Down

0 comments on commit 171bd8f

Please sign in to comment.