# aht/stream.py

PSorter/QSorter: no longer start a new thread

1 parent 0292c26 commit 9a735b9efed6c68b130152a74d85f1760f5612f2 committed Jan 15, 2010
Showing with 9 additions and 23 deletions.
1. +2 −2 example/pi.py
2. +7 −17 stream.py
3. +0 −4 test/sorter.py
4 example/pi.py
 @@ -4,7 +4,7 @@ import operator from decimal import Decimal, getcontext -from stream import Stream, Filter, seq, gseq, apply, map, fold, zip, item, drop +from stream import Stream, Processor, seq, gseq, apply, map, fold, zip, item, drop """ Compute digits of pi using the Gregory series, and its accelerated variants. @@ -31,7 +31,7 @@ def Gregory(type=float): series1 = Gregory() -@Filter +@Processor def Aitken(s): """Accelerate the convergence of the a series using Aitken's delta-squared process (SCIP calls it Euler).
24 stream.py
 @@ -355,15 +355,15 @@ def try_next_idx(): # Process streams with functions and higher-order ones -class Filter(Stream): +class Processor(Stream): """A decorator to turn an iterator-processing function into - a Stream filter. + a Stream processor object. """ def __init__(self, function): """function: an iterator-processing function, one that takes an iterator and return an iterator """ - super(Filter, self).__init__() + super(Processor, self).__init__() self.function = function def __call__(self, iterator): @@ -1155,16 +1155,11 @@ class PSorter(Stream): def __init__(self): self.inpipes = [] - def start(self): - sorter = ThreadedFeeder(heapq.merge, *__builtin__.map(_iterrecv, self.inpipes)) - self.sorter_thread = sorter.thread - self.iterator = iter(sorter) + def __iter__(self): + return heapq.merge(*__builtin__.map(_iterrecv, self.inpipes)) def __pipe__(self, inpipe): self.inpipes.append(inpipe.outpipe) - - def join(self): - self.sorter_thread.join() def __repr__(self): return '' % hex(id(self)) @@ -1177,17 +1172,12 @@ class QSorter(Stream): def __init__(self): self.inqueues = [] - def start(self): - sorter = ThreadedFeeder(heapq.merge, *__builtin__.map(_iterqueue, self.inqueues)) - self.sorter_thread = sorter.thread - self.iterator = iter(sorter) + def __iter__(self): + return heapq.merge(*__builtin__.map(_iterqueue, self.inqueues)) def __pipe__(self, inpipe): self.inqueues.append(inpipe.outqueue) - def join(self): - self.sorter_thread.join() - def __repr__(self): return '' % hex(id(self))
4 test/sorter.py
 @@ -11,17 +11,13 @@ def test_PSorter(): sorter = PSorter() ForkedFeeder(lambda: iter(xrange(10))) >> sorter ForkedFeeder(lambda: iter(xrange(0, 20, 2))) >> sorter - sorter.start() assert sorter >> list == [0, 0, 1, 2, 2, 3, 4, 4, 5, 6, 6, 7, 8, 8, 9, 10, 12, 14, 16, 18] - sorter.join() def test_QSorter(): sorter = QSorter() ThreadedFeeder(lambda: iter(xrange(10))) >> sorter ThreadedFeeder(lambda: iter(xrange(0, 20, 2))) >> sorter - sorter.start() assert sorter >> list == [0, 0, 1, 2, 2, 3, 4, 4, 5, 6, 6, 7, 8, 8, 9, 10, 12, 14, 16, 18] - sorter.join() if __name__ == '__main__':