Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Write ThreadedFeeder, ForkedFeeder and example/feeder.py

  • Loading branch information...
commit 2588678774b74ac895ca99e2412fec0844a1afc7 1 parent 69daea2
@aht authored
View
1  .gitignore
@@ -1,3 +1,4 @@
*.pyc
build
dist
+*.swp
View
3  MANIFEST
@@ -1,4 +1,5 @@
README
-LICENSE
setup.py
stream.py
+example/feeder.py
+example/pi.py
View
5 MANIFEST.in
@@ -0,0 +1,5 @@
+README
+LICENSE
+setup.py
+stream.py
+include example/*.py
View
32 example/feeder.py
@@ -0,0 +1,32 @@
+import time
+import operator
+from stream import ThreadedFeeder, ForkedFeeder, map, reduce
+
+"""
+Demonstrate the use of a feeder to minimize time spent by the whole pipeline
+waiting for a blocking producer.
+"""
+
+def blocking_producer():
+ n = 0
+ while 1:
+ time.sleep(0.05)
+ n += 1
+ if n < 100:
+ yield 42
+ else:
+ raise StopIteration
+
+if __name__ == '__main__':
+ f = lambda x: x**x**3
+ import sys
+ try:
+ if sys.argv[1] == '-s':
+ ## use a single thread
+ blocking_producer() >> map(f) >> reduce(operator.add)
+ elif sys.argv[1] == '-t':
+ ## use a feeder in a separate thread
+ ThreadedFeeder(blocking_producer) >> map(f) >> reduce(operator.add)
+ except IndexError:
+ ## use a feeder in a child process
+ ForkedFeeder(blocking_producer) >> map(f) >> reduce(operator.add)
View
105 stream.py
@@ -34,6 +34,13 @@
Values are computed only when an accumulator forces some or all evaluation
(not when the stream are set up).
+When a producer is doing blocking I/O, it is possible to use a ThreadedFeeder
+or ForkedFeeder to improve performance. The feeder will start a thread or
+process to run the producer and feed genereated items to a cache, minimizing
+the time that the whole pipeline has to wait when the producer is blocking in
+system calls.
+
+
Examples
========
@@ -56,7 +63,7 @@
import re
s = open('file') \
>> filter(re.compile(regex).search) \
- >> map(splitter(' |:|\.')) \
+ >> map(re.compile(' |:|\.').split) \
>> map(itemgetter(3)) \
>> map(methodcaller('lstrip', '0')) \
>> list
@@ -106,7 +113,7 @@
probe >> item[:10]
"""
-__version__ = '0.6.1'
+__version__ = '0.7'
__author__ = 'Anh Hai Trinh'
__email__ = 'moc.liamg@hnirt.iah.hna:otliam'[::-1]
@@ -616,6 +623,100 @@ def __call__(self, inpipe):
#_____________________________________________________________________
#
+# Buffered stream using a threaded or forked feeder, parallel map
+#_____________________________________________________________________
+
+
+import threading
+import multiprocessing as mp
+from Queue import Queue
+
+
+class ThreadedFeeder(collections.Iterator):
+ __slots__ = 'thread', 'queue'
+
+ def __init__(self, generator, size=0):
+ """Create a feeder that start the given generator in a separate thread and
+ put items into queue with a specified size (default to infinity).
+
+ The feeder will act as an eagerly evaluating proxy of the generator.
+
+ This should improve performance when the generator often blocks in
+ system calls. Note that the GIL might make threading performance worse in
+ multi-processor machines.
+ """
+ self.queue = Queue(size)
+ def feeder():
+ i = generator()
+ while 1:
+ try:
+ self.queue.put(next(i))
+ except StopIteration:
+ self.queue.put(StopIteration)
+ break
+ self.thread = threading.Thread(target=feeder)
+ self.thread.start()
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ item = self.queue.get()
+ if item is StopIteration:
+ raise item
+ else:
+ return item
+
+ def __len__(self):
+ return len([i for i in self])
+
+ def __repr__(self):
+ return '<ThreadedFeeder at %s>' % hex(id(self))
+
+
+class ForkedFeeder(collections.Iterator):
+ __slots__ = 'process', 'receiver'
+
+ def __init__(self, generator):
+ """Create a feeder that start the given generator in a child process which
+ sends results back to the parent.
+
+ The feeder will act as an eagerly evaluating proxy of the generator.
+
+ This should improve performance when the generator often blocks in
+ system calls. Note that serialization could be costly.
+ """
+ self.receiver, sender = mp.Pipe(duplex=False)
+ def feeder():
+ i = generator()
+ while 1:
+ try:
+ sender.send(next(i))
+ except StopIteration:
+ sender.send(StopIteration)
+ break
+ self.process = mp.Process(target=feeder)
+ self.process.start()
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ item = self.receiver.recv()
+ if item is not StopIteration:
+ return item
+ else:
+ raise item
+
+ def __len__(self):
+ return len([i for i in self])
+
+ def __repr__(self):
+ return '<ForkedFeeder at %s>' % hex(id(self))
+
+
+#_____________________________________________________________________
+#
# Useful ultilities
#_____________________________________________________________________
Please sign in to comment.
Something went wrong with that request. Please try again.