Skip to content

Commit

Permalink
Raise an error when using cpu_bound with an async function, fix a bug…
Browse files Browse the repository at this point in the history
… with cpu_bound functions on windows
  • Loading branch information
alex-sherman committed Sep 1, 2018
1 parent ba49099 commit 5b46a78
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 5 deletions.
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -2,7 +2,7 @@

setup(
name='unsync',
version='1.0',
version='1.1',
packages=['unsync'],
url='https://github.com/alex-sherman/unsync',
license='MIT',
Expand Down
12 changes: 12 additions & 0 deletions test/test_call_ordering.py
Expand Up @@ -29,6 +29,18 @@ async def long(task):
with raises(asyncio.InvalidStateError):
self.assertEqual('faff', long(other).result())

def test_nested_unsync(self):
@unsync
async def long():
@unsync
async def other():
await asyncio.sleep(0.1)
return 'faff'
return other().result()

with raises(asyncio.InvalidStateError):
self.assertEqual('faff', long().result())

def test_nested_blocking_on_result_after_await(self):
calls = []

Expand Down
5 changes: 2 additions & 3 deletions test/test_process_executor.py
Expand Up @@ -12,10 +12,9 @@ def cpu_bound(duration):
faff += 1
return 'faff'



@pytest.mark.skip
class ProcessTests(TestCase):
def test_raw_cpu_bound(self):
cpu_bound(0.01).result()
def test_cpu_bound(self):
@unsync
async def aggregator(tasks):
Expand Down
8 changes: 7 additions & 1 deletion unsync/unsync.py
Expand Up @@ -2,6 +2,7 @@
import concurrent
import inspect
import threading
import os
from threading import Thread


Expand Down Expand Up @@ -41,11 +42,13 @@ def __call__(self, *args, **kwargs):
self._set_func(args[0])
return self
if inspect.iscoroutinefunction(self.func):
if self.cpu_bound:
raise TypeError('The CPU bound unsync function %s may not be async or a coroutine' % self.func.__name__)
future = self.func(*args, **kwargs)
else:
if self.cpu_bound:
future = unsync.process_executor.submit(
_multiprocess_target, (self.func.__module__ , self.func.__name__), *args, **kwargs)
_multiprocess_target, (self.func.__module__, self.func.__name__), *args, **kwargs)
else:
future = unsync.thread_executor.submit(self.func, *args, **kwargs)
return Unfuture(future)
Expand All @@ -54,6 +57,9 @@ def __get__(self, instance, owner):
return lambda *args, **kwargs: self(instance, *args, **kwargs)

def _multiprocess_target(func_name, *args, **kwargs):
# On Windows MP turns the main module into __mp_main__ in multiprocess targets
if os.name == 'nt' and func_name[0] == '__main__':
func_name = ('__mp_main__', func_name[1])
__import__(func_name[0])
return unsync.unsync_functions[func_name](*args, **kwargs)

Expand Down

0 comments on commit 5b46a78

Please sign in to comment.