Skip to content

Commit

Permalink
Fixed flat_map with mapper returning an asyncio Future (#460)
Browse files Browse the repository at this point in the history
As of python 3.7 Asyncio Futures implement an __iter__ method,
probably for backward compatibility. As a consequence, asyncio
futures are also iterable. So we must check if the mapper result
is a Future before checking if it is Iterable.

Fixes #457
  • Loading branch information
MainRo committed Oct 5, 2019
1 parent 5732604 commit ce12560
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 3 deletions.
7 changes: 4 additions & 3 deletions rx/core/operators/flatmap.py
Expand Up @@ -10,11 +10,12 @@
def _flat_map_internal(source, mapper=None, mapper_indexed=None):
def projection(x, i):
mapper_result = mapper(x) if mapper else mapper_indexed(x, i)
if isinstance(mapper_result, collections.abc.Iterable):
if is_future(mapper_result):
result = from_future(mapper_result)
elif isinstance(mapper_result, collections.abc.Iterable):
result = from_(mapper_result)
else:
result = from_future(mapper_result) if is_future(
mapper_result) else mapper_result
result = mapper_result
return result

return source.pipe(
Expand Down
33 changes: 33 additions & 0 deletions tests/test_observable/test_flatmap_async.py
@@ -0,0 +1,33 @@
import unittest
import asyncio
from rx import operators as ops
from rx.subject import Subject

from rx.scheduler.eventloop import AsyncIOScheduler


class TestFlatMapAsync(unittest.TestCase):

def test_flat_map_async(self):
actual_next = None
loop = asyncio.get_event_loop()
scheduler = AsyncIOScheduler(loop=loop)

def mapper(i):
async def _mapper(i):
return i + 1

return asyncio.ensure_future(_mapper(i))

def on_next(i):
nonlocal actual_next
actual_next = i

async def test_flat_map():
x = Subject()
x.pipe(ops.flat_map(mapper)).subscribe(on_next, scheduler=scheduler)
x.on_next(10)
await asyncio.sleep(0.1)

loop.run_until_complete(test_flat_map())
assert actual_next == 11

0 comments on commit ce12560

Please sign in to comment.