diff --git a/examples/asyncio/await.py b/examples/asyncio/await.py index 7b54acedb..2f3175c6b 100644 --- a/examples/asyncio/await.py +++ b/examples/asyncio/await.py @@ -1,8 +1,9 @@ import asyncio -from rx import Observable +import rx -stream = Observable.just("Hello, world!") +stream = rx.just("Hello, world!") + async def hello_world(): n = await stream diff --git a/examples/asyncio/toasyncgenerator.py b/examples/asyncio/toasyncgenerator.py index 1325ec3ea..4895b8a60 100644 --- a/examples/asyncio/toasyncgenerator.py +++ b/examples/asyncio/toasyncgenerator.py @@ -1,59 +1,61 @@ -import rx -asyncio = rx.config['asyncio'] +import asyncio +from asyncio import Future -from rx.concurrency import AsyncIOScheduler +import rx +from rx import operators as ops +from rx.concurrency.mainloopscheduler import AsyncIOScheduler from rx.core import Observable -from rx.internal import extensionmethod - -@extensionmethod(ObservableBase) -def to_async_generator(self, future_ctor=None, sentinel=None): - future_ctor = future_ctor or rx.config.get("Future") - if not future_ctor: - raise Exception('Future type not provided nor in rx.config.Future') +def to_async_generator(sentinel=None): loop = asyncio.get_event_loop() - future = [future_ctor()] + future = Future() notifications = [] - def feeder(): - if not len(notifications) or future[0].done(): - return + def _to_async_generator(source: Observable): + + def feeder(): + nonlocal future + + if not notifications or future.done(): + return - notification = notifications.pop(0) + notification = notifications.pop(0) - if notification.kind == "E": - future[0].set_exception(notification.exception) - elif notification.kind == "C": - future[0].set_exception(StopIteration(sentinel)) - else: - future[0].set_result(notification.value) + if notification.kind == "E": + future.set_exception(notification.exception) + elif notification.kind == "C": + future.set_result(sentinel) + else: + future.set_result(notification.value) - def on_next(value): - """Takes on_next values and appends them to the notification queue""" + def on_next(value): + """Takes on_next values and appends them to the notification queue""" - notifications.append(value) - loop.call_soon(feeder) + notifications.append(value) + loop.call_soon(feeder) - self.materialize().subscribe(on_next) + source.pipe(ops.materialize()).subscribe(on_next) - @asyncio.coroutine - def gen(): - """Generator producing futures""" + @asyncio.coroutine + def gen(): + """Generator producing futures""" + nonlocal future - loop.call_soon(feeder) - future[0] = future_ctor() + loop.call_soon(feeder) + future = Future() - return future[0] - return gen + return future + return gen + return _to_async_generator @asyncio.coroutine def go(): scheduler = AsyncIOScheduler() - xs = Observable.from_([x for x in range(10)], scheduler=scheduler) - gen = xs.to_async_generator() + xs = rx.from_([x for x in range(10)], scheduler=scheduler) + gen = xs.pipe(to_async_generator()) # Wish we could write something like: # ys = (x for x in yield from gen()) diff --git a/examples/asyncio/toasynciterator.py b/examples/asyncio/toasynciterator.py index 5467531c4..43a76ae1c 100644 --- a/examples/asyncio/toasynciterator.py +++ b/examples/asyncio/toasynciterator.py @@ -1,55 +1,58 @@ -import rx - -from rx.concurrency import AsyncIOScheduler -from rx.core import Observable -from rx.internal import extensionmethod +import asyncio +from asyncio import Future -asyncio = rx.config['asyncio'] -future_ctor = rx.config.get("Future") or asyncio.Future +import rx +from rx import operators as ops +from rx import Observable +from rx.concurrency.mainloopscheduler import AsyncIOScheduler -@extensionmethod(ObservableBase) -async def __aiter__(self): - source = self +def to_async_iterable(): + def _to_async_iterable(source: Observable): + class AIterable: + def __aiter__(self): - class AIterator: - def __init__(self): - self.notifications = [] - self.future = future_ctor() + class AIterator: + def __init__(self): + self.notifications = [] + self.future = Future() - source.materialize().subscribe(self.on_next) + source.pipe(ops.materialize()).subscribe(self.on_next) - def feeder(self): - if not self.notifications or self.future.done(): - return + def feeder(self): + if not self.notifications or self.future.done(): + return - notification = self.notifications.pop(0) - dispatch = { - 'N': lambda: self.future.set_result(notification.value), - 'E': lambda: self.future.set_exception(notification.exception), - 'C': lambda: self.future.set_exception(StopAsyncIteration) - } + notification = self.notifications.pop(0) + dispatch = { + 'N': lambda: self.future.set_result(notification.value), + 'E': lambda: self.future.set_exception(notification.exception), + 'C': lambda: self.future.set_exception(StopAsyncIteration) + } - dispatch[notification.kind]() + dispatch[notification.kind]() - def on_next(self, notification): - self.notifications.append(notification) - self.feeder() + def on_next(self, notification): + self.notifications.append(notification) + self.feeder() - async def __anext__(self): - self.feeder() + async def __anext__(self): + self.feeder() - value = await self.future - self.future = future_ctor() - return value + value = await self.future + self.future = Future() + return value - return AIterator() + return AIterator() + return AIterable() + return _to_async_iterable async def go(): scheduler = AsyncIOScheduler() - async for x in Observable.range(0, 10, scheduler=scheduler): + ai = rx.range(0, 10, scheduler=scheduler).pipe(to_async_iterable()) + async for x in ai: print("got %s" % x) @@ -57,5 +60,6 @@ def main(): loop = asyncio.get_event_loop() loop.run_until_complete(go()) + if __name__ == '__main__': main() diff --git a/examples/errors/failing.py b/examples/errors/failing.py index 43cdc3dc9..9e011e8a2 100644 --- a/examples/errors/failing.py +++ b/examples/errors/failing.py @@ -7,8 +7,10 @@ looping forever. """ +import time + import rx -from rx.testing import stringify # noqa +from rx import operators as ops def failing(x): @@ -19,10 +21,16 @@ def failing(x): def main(): - xs = rx.Observable.from_string("1-2-3-4-5-6-7-9-|").publish() - xs.map(failing).retry().subscribe(print) + xs = rx.from_marbles("1-2-3-4-5-6-7-9-|").pipe(ops.publish()) + xs.pipe( + ops.map(failing), + ops.retry() + ).subscribe(print) xs.connect() # Must connect. Cannot use ref_count() with publish() + time.sleep(5) + + if __name__ == '__main__': main()