Skip to content
This repository has been archived by the owner on Apr 14, 2024. It is now read-only.

Commit

Permalink
Removed threaded operation entirely to help travis tests to work.
Browse files Browse the repository at this point in the history
It's better that way, as it's time to see that async doesn't do what
it was meant to do, at least not in multi-threaded mode.
  • Loading branch information
Byron committed Nov 12, 2014
1 parent 9770bb3 commit 665f4f2
Show file tree
Hide file tree
Showing 7 changed files with 18 additions and 124 deletions.
11 changes: 2 additions & 9 deletions async/channel.py
Expand Up @@ -4,23 +4,16 @@
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Contains a queue based channel implementation"""
try:
from queue import (
Empty,
Full
)
from queue import Empty
except ImportError:
from Queue import (
Empty,
Full
)
from Queue import Empty

from .util import (
AsyncQueue,
SyncQueue,
ReadOnly
)

from time import time
import threading
import sys

Expand Down
27 changes: 4 additions & 23 deletions async/pool.py
Expand Up @@ -3,31 +3,12 @@
# This module is part of async and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Implementation of a thread-pool working with channels"""
from .thread import (
WorkerThread,
StopProcessing,
)
from .thread import WorkerThread
from threading import Lock

from .util import (
AsyncQueue,
DummyLock
)

try:
from queue import (
Queue,
Empty
)
except ImportError:
from Queue import (
Queue,
Empty
)

from .util import AsyncQueue
from .graph import Graph
from .channel import (
mkchannel,
ChannelWriter,
Channel,
SerialChannel,
Expand Down Expand Up @@ -357,8 +338,8 @@ def set_size(self, size=0):
assert size > -1, "Size cannot be negative"

# Enforce sync operation in py3 - it doesn't work. More information in-code at async.test.lib.py:9
if sys.version_info.major > 2:
log.debug("py3 compatibility issue: async doesn't work reliably in async mode - enforcing synchronous operation")
if True:
log.debug("async: Actual asynchronous operation was disabled as it is slower that way")
size = 0
# end

Expand Down
10 changes: 1 addition & 9 deletions async/test/test_example.py
Expand Up @@ -3,10 +3,7 @@
# This module is part of async and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Module containing examples from the documentaiton"""
from .lib import (
TestBase,
py2
)
from .lib import TestBase

from async.pool import ThreadPool
from async.task import (
Expand All @@ -24,11 +21,6 @@ def test_usage(self):
# default size is 0, synchronous mode
assert p.size() == 0

# now tasks would be processed asynchronously
p.set_size(1)
if py2:
assert p.size() == 1

# A task performing processing on items from an iterator
t = IteratorThreadTask(iter(list(range(10))), "power", lambda i: i*i)
reader = p.add_task(t)
Expand Down
73 changes: 1 addition & 72 deletions async/test/test_pool.py
Expand Up @@ -22,7 +22,6 @@
from async.thread import terminate_threads
from async.util import cpu_count

import threading
import time
import sys

Expand Down Expand Up @@ -381,38 +380,10 @@ def _assert_async_dependent_tasks(self, pool):

@terminate_threads
def test_base(self):
max_wait_attempts = 3
sleep_time = 0.1
for mc in range(max_wait_attempts):
# wait for threads to die
if len(threading.enumerate()) != 1:
time.sleep(sleep_time)
# END for each attempt
assert len(threading.enumerate()) == 1, "Waited %f s for threads to die, its still alive" % (max_wait_attempts, sleep_time)

p = ThreadPool()

# default pools have no workers
assert p.size() == 0

# increase and decrease the size
num_threads = len(threading.enumerate())
for i in range(self.max_threads):
p.set_size(i)
if py2:
assert p.size() == i
assert len(threading.enumerate()) == num_threads + i

for i in range(self.max_threads, -1, -1):
p.set_size(i)
if py2:
assert p.size() == i

# default pools have no workers - and threading was removed entirely ...
assert p.size() == 0
# threads should be killed already, but we let them a tiny amount of time
# just to be sure
time.sleep(0.05)
assert len(threading.enumerate()) == num_threads

# SINGLE TASK SERIAL SYNC MODE
##############################
Expand Down Expand Up @@ -454,45 +425,3 @@ def test_base(self):
# DEPENDENT TASKS SYNC MODE
###########################
self._assert_async_dependent_tasks(p)


# SINGLE TASK THREADED ASYNC MODE ( 1 thread )
##############################################
# step one gear up - just one thread for now.
p.set_size(1)
if py2:
assert p.size() == 1
assert len(threading.enumerate()) == num_threads + 1
# deleting the pool stops its threads - just to be sure ;)
# Its not synchronized, hence we wait a moment
del(p)
time.sleep(0.05)
if py2:
assert len(threading.enumerate()) == num_threads

p = ThreadPool(1)
if py2:
assert len(threading.enumerate()) == num_threads + 1

# here we go
self._assert_single_task(p, True)



# SINGLE TASK ASYNC MODE ( 2 threads )
######################################
# two threads to compete for a single task
p.set_size(2)
self._assert_single_task(p, True)

# real stress test- should be native on every dual-core cpu with 2 hardware
# threads per core
p.set_size(4)
self._assert_single_task(p, True)


# DEPENDENT TASK ASYNC MODE
###########################
self._assert_async_dependent_tasks(p)

sys.stderr.write("Done with everything\n")
5 changes: 0 additions & 5 deletions async/test/test_thread.py
Expand Up @@ -10,11 +10,6 @@
terminate_threads
)

try:
from queue import Queue
except ImportError:
from Queue import Queue

import sys
import time

Expand Down
11 changes: 5 additions & 6 deletions async/thread.py
Expand Up @@ -177,14 +177,13 @@ def run(self):

try:
try:
rval = None
if inspect.ismethod(routine):
if routine.__self__ is None:
rval = routine(self, arg)
routine(self, arg)
else:
rval = routine(arg)
routine(arg)
elif inspect.isroutine(routine):
rval = routine(arg)
routine(arg)
else:
# ignore unknown items
log.warn("%s: task %s was not understood - terminating", self.getName(), str(tasktuple))
Expand All @@ -199,8 +198,8 @@ def run(self):
except StopProcessing:
break
except Exception as e:
log.error("%s: Task %s raised unhandled exception: %s - this really shouldn't happen !",
(self.getName(), str(tasktuple), str(e)))
log.error("%s: Task raised unhandled exception: %s - this really shouldn't happen !",
(self.getName(), str(e)))
continue # just continue
# END routine exception handling

Expand Down
5 changes: 5 additions & 0 deletions doc/source/changes.rst
@@ -1,6 +1,11 @@
#########
Changelog
#########
*****
0.6.2
*****
* Added python 3.X compatibility. As it doesn't work deterministically in this version, it shouldn't be used in production !

*****
0.6.1
*****
Expand Down

0 comments on commit 665f4f2

Please sign in to comment.