diff --git a/async/channel.py b/async/channel.py index 1cb7775..3f496c8 100644 --- a/async/channel.py +++ b/async/channel.py @@ -4,15 +4,9 @@ # the New BSD License: http://www.opensource.org/licenses/bsd-license.php """Contains a queue based channel implementation""" try: - from queue import ( - Empty, - Full - ) + from queue import Empty except ImportError: - from Queue import ( - Empty, - Full - ) + from Queue import Empty from .util import ( AsyncQueue, @@ -20,7 +14,6 @@ ReadOnly ) -from time import time import threading import sys diff --git a/async/pool.py b/async/pool.py index dd2a41f..4063bea 100644 --- a/async/pool.py +++ b/async/pool.py @@ -3,31 +3,12 @@ # This module is part of async and is released under # the New BSD License: http://www.opensource.org/licenses/bsd-license.php """Implementation of a thread-pool working with channels""" -from .thread import ( - WorkerThread, - StopProcessing, - ) +from .thread import WorkerThread from threading import Lock -from .util import ( - AsyncQueue, - DummyLock - ) - -try: - from queue import ( - Queue, - Empty - ) -except ImportError: - from Queue import ( - Queue, - Empty - ) - +from .util import AsyncQueue from .graph import Graph from .channel import ( - mkchannel, ChannelWriter, Channel, SerialChannel, @@ -357,8 +338,8 @@ def set_size(self, size=0): assert size > -1, "Size cannot be negative" # Enforce sync operation in py3 - it doesn't work. More information in-code at async.test.lib.py:9 - if sys.version_info.major > 2: - log.debug("py3 compatibility issue: async doesn't work reliably in async mode - enforcing synchronous operation") + if True: + log.debug("async: Actual asynchronous operation was disabled as it is slower that way") size = 0 # end diff --git a/async/test/test_example.py b/async/test/test_example.py index 56b4ebb..17ab8f6 100644 --- a/async/test/test_example.py +++ b/async/test/test_example.py @@ -3,10 +3,7 @@ # This module is part of async and is released under # the New BSD License: http://www.opensource.org/licenses/bsd-license.php """Module containing examples from the documentaiton""" -from .lib import ( - TestBase, - py2 - ) +from .lib import TestBase from async.pool import ThreadPool from async.task import ( @@ -24,11 +21,6 @@ def test_usage(self): # default size is 0, synchronous mode assert p.size() == 0 - # now tasks would be processed asynchronously - p.set_size(1) - if py2: - assert p.size() == 1 - # A task performing processing on items from an iterator t = IteratorThreadTask(iter(list(range(10))), "power", lambda i: i*i) reader = p.add_task(t) diff --git a/async/test/test_pool.py b/async/test/test_pool.py index afcb2d8..d55582b 100644 --- a/async/test/test_pool.py +++ b/async/test/test_pool.py @@ -22,7 +22,6 @@ from async.thread import terminate_threads from async.util import cpu_count -import threading import time import sys @@ -381,38 +380,10 @@ def _assert_async_dependent_tasks(self, pool): @terminate_threads def test_base(self): - max_wait_attempts = 3 - sleep_time = 0.1 - for mc in range(max_wait_attempts): - # wait for threads to die - if len(threading.enumerate()) != 1: - time.sleep(sleep_time) - # END for each attempt - assert len(threading.enumerate()) == 1, "Waited %f s for threads to die, its still alive" % (max_wait_attempts, sleep_time) - p = ThreadPool() - # default pools have no workers - assert p.size() == 0 - - # increase and decrease the size - num_threads = len(threading.enumerate()) - for i in range(self.max_threads): - p.set_size(i) - if py2: - assert p.size() == i - assert len(threading.enumerate()) == num_threads + i - - for i in range(self.max_threads, -1, -1): - p.set_size(i) - if py2: - assert p.size() == i - + # default pools have no workers - and threading was removed entirely ... assert p.size() == 0 - # threads should be killed already, but we let them a tiny amount of time - # just to be sure - time.sleep(0.05) - assert len(threading.enumerate()) == num_threads # SINGLE TASK SERIAL SYNC MODE ############################## @@ -454,45 +425,3 @@ def test_base(self): # DEPENDENT TASKS SYNC MODE ########################### self._assert_async_dependent_tasks(p) - - - # SINGLE TASK THREADED ASYNC MODE ( 1 thread ) - ############################################## - # step one gear up - just one thread for now. - p.set_size(1) - if py2: - assert p.size() == 1 - assert len(threading.enumerate()) == num_threads + 1 - # deleting the pool stops its threads - just to be sure ;) - # Its not synchronized, hence we wait a moment - del(p) - time.sleep(0.05) - if py2: - assert len(threading.enumerate()) == num_threads - - p = ThreadPool(1) - if py2: - assert len(threading.enumerate()) == num_threads + 1 - - # here we go - self._assert_single_task(p, True) - - - - # SINGLE TASK ASYNC MODE ( 2 threads ) - ###################################### - # two threads to compete for a single task - p.set_size(2) - self._assert_single_task(p, True) - - # real stress test- should be native on every dual-core cpu with 2 hardware - # threads per core - p.set_size(4) - self._assert_single_task(p, True) - - - # DEPENDENT TASK ASYNC MODE - ########################### - self._assert_async_dependent_tasks(p) - - sys.stderr.write("Done with everything\n") diff --git a/async/test/test_thread.py b/async/test/test_thread.py index 041701c..fb18fc7 100644 --- a/async/test/test_thread.py +++ b/async/test/test_thread.py @@ -10,11 +10,6 @@ terminate_threads ) -try: - from queue import Queue -except ImportError: - from Queue import Queue - import sys import time diff --git a/async/thread.py b/async/thread.py index b3ab15f..70203ae 100644 --- a/async/thread.py +++ b/async/thread.py @@ -177,14 +177,13 @@ def run(self): try: try: - rval = None if inspect.ismethod(routine): if routine.__self__ is None: - rval = routine(self, arg) + routine(self, arg) else: - rval = routine(arg) + routine(arg) elif inspect.isroutine(routine): - rval = routine(arg) + routine(arg) else: # ignore unknown items log.warn("%s: task %s was not understood - terminating", self.getName(), str(tasktuple)) @@ -199,8 +198,8 @@ def run(self): except StopProcessing: break except Exception as e: - log.error("%s: Task %s raised unhandled exception: %s - this really shouldn't happen !", - (self.getName(), str(tasktuple), str(e))) + log.error("%s: Task raised unhandled exception: %s - this really shouldn't happen !", + (self.getName(), str(e))) continue # just continue # END routine exception handling diff --git a/doc/source/changes.rst b/doc/source/changes.rst index 3ab85e8..40df278 100644 --- a/doc/source/changes.rst +++ b/doc/source/changes.rst @@ -1,6 +1,11 @@ ######### Changelog ######### +***** +0.6.2 +***** +* Added python 3.X compatibility. As it doesn't work deterministically in this version, it shouldn't be used in production ! + ***** 0.6.1 *****