Skip to content

Commit

Permalink
Merge 1cae62f into 93f94ca
Browse files Browse the repository at this point in the history
  • Loading branch information
dbrattli committed May 11, 2019
2 parents 93f94ca + 1cae62f commit 6f3372b
Show file tree
Hide file tree
Showing 68 changed files with 242 additions and 193 deletions.
25 changes: 14 additions & 11 deletions rx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

from .core import Observable, abc, typing, pipe

from . import disposable


def amb(*sources: Observable) -> Observable:
"""Propagates the observable sequence that reacts first.
Expand Down Expand Up @@ -466,7 +464,7 @@ def if_then(condition: Callable[[], bool], then_source: Observable,
return _if_then(condition, then_source, else_source)


def interval(period, scheduler: typing.Scheduler = None) -> Observable:
def interval(period, scheduler: Optional[typing.Scheduler] = None) -> Observable:
"""Returns an observable sequence that produces a value after each
period.
Expand Down Expand Up @@ -541,7 +539,11 @@ def on_error_resume_next(*sources: Observable) -> Observable:
return _on_error_resume_next(*sources)


def range(start: int, stop: int = None, step: int = None, scheduler: typing.Scheduler = None) -> Observable:
def range(start: int,
stop: Optional[int] = None,
step: Optional[int] = None,
scheduler: Optional[typing.Scheduler] = None
) -> Observable:
"""Generates an observable sequence of integral numbers within a
specified range, using the specified scheduler to send out observer
messages.
Expand All @@ -564,7 +566,7 @@ def range(start: int, stop: int = None, step: int = None, scheduler: typing.Sche
return _range(start, stop, step, scheduler)


def return_value(value: Any, scheduler: typing.Scheduler = None) -> Observable:
def return_value(value: Any, scheduler: Optional[typing.Scheduler] = None) -> Observable:
"""Returns an observable sequence that contains a single element,
using the specified scheduler to send out observer messages.
There is an alias called 'just'.
Expand All @@ -587,7 +589,7 @@ def return_value(value: Any, scheduler: typing.Scheduler = None) -> Observable:
just = return_value


def repeat_value(value: Any = None, repeat_count: int = None) -> Observable:
def repeat_value(value: Any = None, repeat_count: Optional[int] = None) -> Observable:
"""Generates an observable sequence that repeats the given element
the specified number of times.
Expand Down Expand Up @@ -650,7 +652,7 @@ def start_async(function_async) -> Observable:
return _start_async(function_async)


def throw(exception: Exception, scheduler: typing.Scheduler = None) -> Observable:
def throw(exception: Exception, scheduler: Optional[typing.Scheduler] = None) -> Observable:
"""Returns an observable sequence that terminates with an exception,
using the specified scheduler to send out the single OnError
message.
Expand All @@ -670,8 +672,8 @@ def throw(exception: Exception, scheduler: typing.Scheduler = None) -> Observabl
return _throw(exception, scheduler)


def timer(duetime: typing.AbsoluteOrRelativeTime, period: typing.RelativeTime = None,
scheduler: typing.Scheduler = None) -> Observable:
def timer(duetime: typing.AbsoluteOrRelativeTime, period: Optional[typing.RelativeTime] = None,
scheduler: Optional[typing.Scheduler] = None) -> Observable:
"""Returns an observable sequence that produces a value after
duetime has elapsed and then after each period.
Expand Down Expand Up @@ -722,8 +724,9 @@ def to_async(func: Callable, scheduler=None) -> Callable:
return _to_async(func, scheduler)


def using(resource_factory: Callable[[], typing.Disposable], observable_factory: Callable[[typing.Disposable], Observable]
) -> Observable:
def using(resource_factory: Callable[[], typing.Disposable],
observable_factory: Callable[[typing.Disposable], Observable]
) -> Observable:
"""Constructs an observable sequence that depends on a resource
object, whose lifetime is tied to the resulting observable
sequence's lifetime.
Expand Down
3 changes: 2 additions & 1 deletion rx/concurrency/threadpoolscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ def start(self) -> None:
self.future = self.executor.submit(self.target)

def cancel(self) -> None:
self.future.cancel()
if self.future:
self.future.cancel()

def __init__(self, max_workers: Optional[int] = None) -> None:
self.executor: ThreadPoolExecutor = ThreadPoolExecutor(max_workers=max_workers)
Expand Down
7 changes: 4 additions & 3 deletions rx/core/observable/empty.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from typing import Any
from typing import Any, Optional

from rx.core import typing
from rx.core import Observable
from rx.concurrency import immediate_scheduler


def _empty(scheduler: typing.Scheduler = None) -> Observable:
def subscribe(observer: typing.Observer, scheduler_: typing.Scheduler = None) -> typing.Disposable:
def _empty(scheduler: Optional[typing.Scheduler] = None) -> Observable:
def subscribe(observer: typing.Observer, scheduler_: Optional[typing.Scheduler] = None) -> typing.Disposable:
_scheduler = scheduler or scheduler_ or immediate_scheduler

def action(_: typing.Scheduler, __: Any) -> None:
observer.on_completed()

Expand Down
6 changes: 3 additions & 3 deletions rx/core/observable/fromcallback.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from typing import Callable
from typing import Callable, Optional

from rx.disposable import Disposable
from rx.core import typing
from rx.core import Observable
from rx.core.typing import Mapper


def _from_callback(func: Callable, mapper: Mapper = None) -> Callable[[], Observable]:
def _from_callback(func: Callable, mapper: Optional[Mapper] = None) -> Callable[[], Observable]:
"""Converts a callback function to an observable sequence.
Args:
Expand All @@ -24,7 +24,7 @@ def _from_callback(func: Callable, mapper: Mapper = None) -> Callable[[], Observ
def function(*args):
arguments = list(args)

def subscribe(observer: typing.Observer, scheduler: typing.Scheduler = None) -> typing.Disposable:
def subscribe(observer: typing.Observer, scheduler: Optional[typing.Scheduler] = None) -> typing.Disposable:
def handler(*args):
results = list(args)
if mapper:
Expand Down
3 changes: 2 additions & 1 deletion rx/core/observable/fromfuture.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from asyncio.futures import Future
from typing import Optional

from rx.disposable import Disposable
from rx.core import typing
Expand All @@ -18,7 +19,7 @@ def _from_future(future: Future) -> Observable:
and failure.
"""

def subscribe(observer: typing.Observer, scheduler: typing.Scheduler = None) -> typing.Disposable:
def subscribe(observer: typing.Observer, scheduler: Optional[typing.Scheduler] = None) -> typing.Disposable:
def done(future):
try:
value = future.result()
Expand Down
6 changes: 3 additions & 3 deletions rx/core/observable/fromiterable.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from typing import Iterable, Any
from typing import Iterable, Any, Optional

from rx.core import Observable, typing
from rx.concurrency import current_thread_scheduler
from rx.disposable import CompositeDisposable, Disposable


def from_iterable(iterable: Iterable, scheduler: typing.Scheduler = None) -> Observable:
def from_iterable(iterable: Iterable, scheduler: Optional[typing.Scheduler] = None) -> Observable:
"""Converts an iterable to an observable sequence.
Example:
Expand All @@ -20,7 +20,7 @@ def from_iterable(iterable: Iterable, scheduler: typing.Scheduler = None) -> Obs
given iterable sequence.
"""

def subscribe(observer: typing.Observer, scheduler_: typing.Scheduler = None) -> typing.Disposable:
def subscribe(observer: typing.Observer, scheduler_: Optional[typing.Scheduler] = None) -> typing.Disposable:
_scheduler = scheduler or scheduler_ or current_thread_scheduler
iterator = iter(iterable)
disposed = False
Expand Down
1 change: 1 addition & 0 deletions rx/core/observable/groupedobservable.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from .observable import Observable


class GroupedObservable(Observable):
def __init__(self, key, underlying_observable, merged_disposable=None):
super().__init__()
Expand Down
4 changes: 2 additions & 2 deletions rx/core/observable/ifthen.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Callable
from typing import Callable, Optional

import rx
from rx.core import abc
Expand All @@ -7,7 +7,7 @@


def _if_then(condition: Callable[[], bool], then_source: Observable,
else_source: Observable = None) -> Observable:
else_source: Optional[Observable] = None) -> Observable:
"""Determines whether an observable collection contains values.
Example:
Expand Down
4 changes: 3 additions & 1 deletion rx/core/observable/interval.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from typing import Optional

from rx import timer
from rx.core import Observable, typing


def _interval(period, scheduler: typing.Scheduler = None) -> Observable:
def _interval(period, scheduler: Optional[typing.Scheduler] = None) -> Observable:
return timer(period, period, scheduler)
10 changes: 5 additions & 5 deletions rx/core/observable/marbles.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Dict, Tuple
from typing import List, Dict, Tuple, Optional
import re
import threading
from datetime import datetime, timedelta
Expand Down Expand Up @@ -32,7 +32,7 @@


def hot(string: str, timespan: RelativeTime = 0.1, duetime: AbsoluteOrRelativeTime = 0.0,
lookup: Dict = None, error: Exception = None, scheduler: Scheduler = None) -> Observable:
lookup: Dict = None, error: Optional[Exception] = None, scheduler: Optional[Scheduler] = None) -> Observable:

_scheduler = scheduler or new_thread_scheduler

Expand Down Expand Up @@ -92,7 +92,7 @@ def action(scheduler, state=None):


def from_marbles(string: str, timespan: RelativeTime = 0.1, lookup: Dict = None,
error: Exception = None, scheduler: Scheduler = None) -> Observable:
error: Optional[Exception] = None, scheduler: Optional[Scheduler] = None) -> Observable:

disp = CompositeDisposable()
messages = parse(string, timespan=timespan, lookup=lookup, error=error, raise_stopped=True)
Expand All @@ -117,7 +117,7 @@ def subscribe(observer, scheduler_):


def parse(string: str, timespan: RelativeTime = 1.0, time_shift: RelativeTime = 0.0, lookup: Dict = None,
error: Exception = None, raise_stopped: bool = False) -> List[Tuple[RelativeTime, notification.Notification]]:
error: Optional[Exception] = None, raise_stopped: bool = False) -> List[Tuple[RelativeTime, notification.Notification]]:
"""Convert a marble diagram string to a list of messages.
Each character in the string will advance time by timespan
Expand Down Expand Up @@ -206,6 +206,7 @@ def map_element(time, element):
return (time, notification.OnNext(value))

is_stopped = False

def check_stopped(element):
nonlocal is_stopped
if raise_stopped:
Expand All @@ -215,7 +216,6 @@ def check_stopped(element):
if element in ('#', '|'):
is_stopped = True


iframe = 0
messages = []

Expand Down
4 changes: 3 additions & 1 deletion rx/core/observable/never.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

from rx.disposable import Disposable
from rx.core import Observable, typing

Expand All @@ -10,7 +12,7 @@ def _never() -> Observable:
An observable sequence whose observers will never get called.
"""

def subscribe(observer: typing.Observer, scheduler: typing.Scheduler = None) -> typing.Disposable:
def subscribe(observer: typing.Observer, scheduler: Optional[typing.Scheduler] = None) -> typing.Disposable:
return Disposable()

return Observable(subscribe)
25 changes: 11 additions & 14 deletions rx/core/observable/observable.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class Observable(typing.Observable):
Represents a push-style collection and contains all operators as
methods to allow classic Rx chaining of operators."""

def __init__(self, subscribe: Callable[[typing.Observer, Optional[typing.Scheduler]], typing.Disposable] = None) -> None:
def __init__(self, subscribe: Optional[Callable[[typing.Observer, Optional[typing.Scheduler]], typing.Disposable]] = None) -> None:
"""Creates an observable sequence object from the specified
subscription function.
Expand Down Expand Up @@ -105,12 +105,12 @@ def __iadd__(self, other):
return concat(self, other)

def subscribe(self, # pylint: disable=too-many-arguments,arguments-differ
observer: Union[typing.Observer, typing.OnNext] = None,
on_error: typing.OnError = None,
on_completed: typing.OnCompleted = None,
on_next: typing.OnNext = None,
observer: Optional[Union[typing.Observer, typing.OnNext]] = None,
on_error: Optional[typing.OnError] = None,
on_completed: Optional[typing.OnCompleted] = None,
on_next: Optional[typing.OnNext] = None,
*,
scheduler: typing.Scheduler = None,
scheduler: Optional[typing.Scheduler] = None,
) -> typing.Disposable:
"""Subscribe an observer to the observable sequence.
Expand Down Expand Up @@ -145,13 +145,12 @@ def subscribe(self, # pylint: disable=too-many-arguments,arguments-differ

return self.subscribe_(on_next, on_error, on_completed, scheduler)


def subscribe_(self,
on_next: typing.OnNext = None,
on_error: typing.OnError = None,
on_completed: typing.OnCompleted = None,
scheduler: typing.Scheduler = None
) -> typing.Disposable:
on_next: Optional[typing.OnNext] = None,
on_error: Optional[typing.OnError] = None,
on_completed: Optional[typing.OnCompleted] = None,
scheduler: Optional[typing.Scheduler] = None
) -> typing.Disposable:
"""Subscribe callbacks to the observable sequence.
Examples:
Expand Down Expand Up @@ -208,7 +207,6 @@ def set_disposable(_: abc.Scheduler = None, __: Any = None):
# Hide the identity of the auto detach observer
return Disposable(auto_detach_observer.dispose)


def pipe(self, *operators: Callable[['Observable'], 'Observable']) -> 'Observable':
"""Compose multiple operators left to right.
Expand All @@ -227,7 +225,6 @@ def pipe(self, *operators: Callable[['Observable'], 'Observable']) -> 'Observabl
from ..pipe import pipe
return pipe(*operators)(self)


def run(self) -> Any:
"""Run source synchronously.
Expand Down
8 changes: 7 additions & 1 deletion rx/core/observable/range.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
from typing import Optional

from rx.core import typing
from rx.core import Observable
from rx.concurrency import current_thread_scheduler
from rx.disposable import MultipleAssignmentDisposable


def _range(start: int, stop: int = None, step: int = None, scheduler: typing.Scheduler = None) -> Observable:
def _range(start: int,
stop: Optional[int] = None,
step: Optional[int] = None,
scheduler: Optional[typing.Scheduler] = None
) -> Observable:
"""Generates an observable sequence of integral numbers within a
specified range, using the specified scheduler to send out observer
messages.
Expand Down
4 changes: 2 additions & 2 deletions rx/core/observable/repeat.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from typing import Any
from typing import Any, Optional

import rx
from rx import operators as ops
from rx.core import Observable


def _repeat_value(value: Any = None, repeat_count: int = None) -> Observable:
def _repeat_value(value: Any = None, repeat_count: Optional[int] = None) -> Observable:
"""Generates an observable sequence that repeats the given element
the specified number of times.
Expand Down
8 changes: 4 additions & 4 deletions rx/core/observable/returnvalue.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from typing import Any, Callable
from typing import Any, Callable, Optional

from rx.core import typing
from rx.core import Observable
from rx.concurrency import current_thread_scheduler
from rx.core.abc.scheduler import Scheduler


def _return_value(value: Any, scheduler: typing.Scheduler = None) -> Observable:
def _return_value(value: Any, scheduler: Optional[typing.Scheduler] = None) -> Observable:
"""Returns an observable sequence that contains a single element,
using the specified scheduler to send out observer messages.
There is an alias called 'just'.
Expand All @@ -23,7 +23,7 @@ def _return_value(value: Any, scheduler: typing.Scheduler = None) -> Observable:
element.
"""

def subscribe(observer: typing.Observer, scheduler_: typing.Scheduler = None) -> typing.Disposable:
def subscribe(observer: typing.Observer, scheduler_: Optional[typing.Scheduler] = None) -> typing.Disposable:
_scheduler = scheduler or scheduler_ or current_thread_scheduler

def action(scheduler: typing.Scheduler, state: Any = None):
Expand All @@ -34,7 +34,7 @@ def action(scheduler: typing.Scheduler, state: Any = None):
return Observable(subscribe)


def _from_callable(supplier: Callable, scheduler: typing.Scheduler = None) -> Observable:
def _from_callable(supplier: Callable, scheduler: Optional[typing.Scheduler] = None) -> Observable:
def subscribe(observer: typing.Observer, scheduler_: typing.Scheduler = None):
_scheduler = scheduler or scheduler_ or current_thread_scheduler

Expand Down

0 comments on commit 6f3372b

Please sign in to comment.