Skip to content

Commit

Permalink
Fix files for isort
Browse files Browse the repository at this point in the history
  • Loading branch information
dbrattli committed Mar 4, 2022
1 parent 18c3319 commit 298d741
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 33 deletions.
28 changes: 16 additions & 12 deletions examples/asyncio/toasyncgenerator.py
@@ -1,19 +1,21 @@
import asyncio
from asyncio import Future
from typing import Any, Coroutine, List, TypeVar

import rx
from rx import operators as ops
from rx.scheduler.eventloop import AsyncIOScheduler
from rx.core import Observable
import reactivex
from reactivex import Notification, Observable
from reactivex import operators as ops
from reactivex.scheduler.eventloop import AsyncIOScheduler

_T = TypeVar("_T")

def to_async_generator(sentinel=None):
loop = asyncio.get_event_loop()
future = Future()
notifications = []

def _to_async_generator(source: Observable):
def to_async_generator(sentinel: Any = None) -> Coroutine[Any, Any, Future[Any]]:
loop = asyncio.get_event_loop()
future = loop.create_future()
notifications: List[Notification[Any]] = []

def _to_async_generator(source: Observable[_T]):
def feeder():
nonlocal future

Expand All @@ -29,7 +31,7 @@ def feeder():
else:
future.set_result(notification.value)

def on_next(value):
def on_next(value: _T) -> None:
"""Takes on_next values and appends them to the notification queue"""

notifications.append(value)
Expand All @@ -45,14 +47,16 @@ async def gen():
future = Future()

return future

return gen

return _to_async_generator


async def go(loop):
scheduler = AsyncIOScheduler(loop)

xs = rx.from_([x for x in range(10)], scheduler=scheduler)
xs = reactivex.from_([x for x in range(10)], scheduler=scheduler)
gen = xs.pipe(to_async_generator())

# Wish we could write something like:
Expand All @@ -69,5 +73,5 @@ def main():
loop.run_until_complete(go(loop))


if __name__ == '__main__':
if __name__ == "__main__":
main()
23 changes: 13 additions & 10 deletions examples/asyncio/toasynciterator.py
@@ -1,17 +1,16 @@
import asyncio
from asyncio import Future

import rx
from rx import operators as ops
from rx import Observable
from rx.scheduler.eventloop import AsyncIOScheduler
import reactivex
from reactivex import Observable
from reactivex import operators as ops
from reactivex.scheduler.eventloop import AsyncIOScheduler


def to_async_iterable():
def _to_async_iterable(source: Observable):
class AIterable:
def __aiter__(self):

class AIterator:
def __init__(self):
self.notifications = []
Expand All @@ -25,9 +24,11 @@ def feeder(self):

notification = self.notifications.pop(0)
dispatch = {
'N': lambda: self.future.set_result(notification.value),
'E': lambda: self.future.set_exception(notification.exception),
'C': lambda: self.future.set_exception(StopAsyncIteration)
"N": lambda: self.future.set_result(notification.value),
"E": lambda: self.future.set_exception(
notification.exception
),
"C": lambda: self.future.set_exception(StopAsyncIteration),
}

dispatch[notification.kind]()
Expand All @@ -44,14 +45,16 @@ async def __anext__(self):
return value

return AIterator()

return AIterable()

return _to_async_iterable


async def go(loop):
scheduler = AsyncIOScheduler(loop)

ai = rx.range(0, 10, scheduler=scheduler).pipe(to_async_iterable())
ai = reactivex.range(0, 10, scheduler=scheduler).pipe(to_async_iterable())
async for x in ai:
print("got %s" % x)

Expand All @@ -61,5 +64,5 @@ def main():
loop.run_until_complete(go(loop))


if __name__ == '__main__':
if __name__ == "__main__":
main()
18 changes: 7 additions & 11 deletions notebooks/reactivex.io/startup.py
@@ -1,12 +1,16 @@
# Helpers.
# Run this cell always after kernel restarts. All other cells are autonomous.
from __future__ import print_function
import reactivex
import time

import inspect
import logging

# getting the current thread
import threading
import time
from random import randint
from reactivex.testing import marbles

import reactivex

logging.basicConfig(format="%(threadName)s:%(message)s")
log = logging.getLogger("Rx")
Expand Down Expand Up @@ -118,9 +122,6 @@ def subs(src, **kw):
return subscription


# getting the current thread
import threading

threads = []


Expand All @@ -138,10 +139,5 @@ def _cur():
return _cur()


from reactivex.scheduler import new_thread_scheduler, timeout_scheduler
from reactivex.subject import Subject
from reactivex.testing import marbles, dump


def marble_stream(s):
return O.from_marbles(s).to_blocking()

0 comments on commit 298d741

Please sign in to comment.