Skip to content

Commit

Permalink
Merge pull request #345 from MainRo/bugfix/tofuture
Browse files Browse the repository at this point in the history
Fixed to_future completion on empty Observables
  • Loading branch information
MainRo committed Apr 7, 2019
2 parents 6ddd8bd + f9e82bb commit 9f000f7
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 3 deletions.
17 changes: 14 additions & 3 deletions rx/core/operators/tofuture.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from asyncio import Future

from rx.core import Observable
from rx.internal.exceptions import SequenceContainsNoElementsError


def _to_future(future_ctor: Callable[[], Future] = None) -> Callable[[Observable], Future]:
Expand All @@ -11,6 +12,10 @@ def _to_future(future_ctor: Callable[[], Future] = None) -> Callable[[Observable
def to_future(source: Observable) -> Future:
"""Converts an existing observable sequence to a Future.
If the observable emits a single item, then this item is set as the
result of the future. If the observable emits a sequence of items, then
the last emitted item is set as the result of the future.
Example:
future = rx.return_value(42).pipe(ops.to_future(asyncio.Future))
Expand All @@ -21,17 +26,23 @@ def to_future(source: Observable) -> Future:
A future with the last value from the observable sequence.
"""

has_value = []
has_value = False
last_value = None

def on_next(value):
has_value.append(value)
nonlocal last_value
nonlocal has_value
last_value = value
has_value = True

def on_error(err):
future.set_exception(err)

def on_completed():
if has_value:
future.set_result(has_value.pop())
future.set_result(last_value)
else:
future.set_exception(SequenceContainsNoElementsError())

source.subscribe_(on_next, on_error, on_completed)

Expand Down
24 changes: 24 additions & 0 deletions tests/test_observable/test_tofuture.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import asyncio

import rx
from rx.internal.exceptions import SequenceContainsNoElementsError
from rx.testing import ReactiveTest

on_next = ReactiveTest.on_next
Expand All @@ -27,6 +28,18 @@ async def go():
loop.run_until_complete(go())
assert result == 42

def test_await_success_on_sequence(self):
loop = asyncio.get_event_loop()
result = None

async def go():
nonlocal result
source = rx.from_([40, 41, 42])
result = await source

loop.run_until_complete(go())
assert result == 42

def test_await_error(self):
loop = asyncio.get_event_loop()
error = Exception("error")
Expand All @@ -42,3 +55,14 @@ async def go():

loop.run_until_complete(go())
assert result == error

def test_await_empty_observable(self):
loop = asyncio.get_event_loop()
result = None

async def go():
nonlocal result
source = rx.empty()
result = await source

self.assertRaises(SequenceContainsNoElementsError, loop.run_until_complete, go())

0 comments on commit 9f000f7

Please sign in to comment.