Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Make stream.py importable from jython/python2.5

  • Loading branch information...
commit 3521ee09d541b504662689aeb5bd2f8bc3e93e39 1 parent 48d184d
@aht authored
Showing with 32 additions and 17 deletions.
  1. +32 −17 stream.py
View
49 stream.py
@@ -70,16 +70,13 @@
<http://blog.onideas.ws/tag/project:stream.py>.
"""
-__version__ = '0.8'
-
+from __future__ import with_statement
import __builtin__
import copy
import collections
import heapq
import itertools
-import multiprocessing
-import multiprocessing.queues
import operator
import Queue
import re
@@ -90,13 +87,34 @@
from operator import itemgetter, attrgetter
+zip = itertools.izip
+
+try:
+ import multiprocessing
+ import multiprocessing.queues
+ _nCPU = multiprocessing.cpu_count()
+except ImportError:
+ _nCPU = 1
+
+try:
+ Iterable = collections.Iterable
+except AttributeError:
+ Iterable = object
+
+try:
+ next
+except NameError:
+ def next(iterator):
+ return iterator.next()
+
try:
from operator import methodcaller
except ImportError:
- def methodcaller(method, *args, **kwargs):
- return lambda o: o.method(*args, **kwargs)
+ def methodcaller(methodname, *args, **kwargs):
+ return lambda o: getattr(o, methodname)(*args, **kwargs)
-zip = itertools.izip
+
+__version__ = '0.8'
#_____________________________________________________________________
@@ -107,7 +125,7 @@ class BrokenPipe(Exception):
pass
-class Stream(collections.Iterable):
+class Stream(Iterable):
"""A stream is both a lazy list and an iterator-processing function.
The lazy list is represented by the attribute 'iterator'.
@@ -649,7 +667,7 @@ def _iterrecv(pipe):
# Threaded/forked feeder
-class ThreadedFeeder(collections.Iterable):
+class ThreadedFeeder(Iterable):
def __init__(self, generator, *args, **kwargs):
"""Create a feeder that start the given generator with
*args and **kwargs in a separate thread. The feeder will
@@ -682,7 +700,7 @@ def __repr__(self):
return '<ThreadedFeeder at %s>' % hex(id(self))
-class ForkedFeeder(collections.Iterable):
+class ForkedFeeder(Iterable):
def __init__(self, generator, *args, **kwargs):
"""Create a feeder that start the given generator with
*args and **kwargs in a child process. The feeder will
@@ -720,9 +738,6 @@ def __repr__(self):
# Asynchronous stream processing using a pool of threads or processes
-_nCPU = multiprocessing.cpu_count()
-
-
class ThreadPool(Stream):
"""Work on the input stream asynchronously using a pool of threads.
@@ -756,7 +771,7 @@ def work():
next(dupinput)
except StopIteration:
break
- except Exception as e:
+ except Exception, e:
self.failqueue.put((next(dupinput), e))
self.worker_threads = []
for _ in range(poolsize):
@@ -828,7 +843,7 @@ def work():
next(dupinput)
except StopIteration:
break
- except Exception as e:
+ except Exception, e:
self.failqueue.put((next(dupinput), e))
self.worker_processes = []
for _ in range(self.poolsize):
@@ -871,7 +886,7 @@ class Executor(object):
The constructor takes a pool class and arguments to its constructor::
- >>> executor = Executor(ProcessPool, map(lambda x: x*x))
+ >>> executor = Executor(ThreadPool, map(lambda x: x*x))
Job ids are returned when items are submitted::
@@ -1264,6 +1279,6 @@ def reduce(function, initval=None):
if __name__ == "__main__":
import doctest
- if doctest.testmod().failed:
+ if doctest.testmod()[0]:
import sys
sys.exit(1)
Please sign in to comment.
Something went wrong with that request. Please try again.