Skip to content

Commit

Permalink
More type fixes for optional arguments
Browse files Browse the repository at this point in the history
  • Loading branch information
dbrattli committed May 11, 2019
1 parent a689033 commit 1cae62f
Show file tree
Hide file tree
Showing 13 changed files with 60 additions and 41 deletions.
12 changes: 8 additions & 4 deletions rx/core/observable/timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ def action(scheduler, state):
return Observable(subscribe)


def observable_timer_timespan_and_period(duetime: typing.RelativeTime, period: typing.RelativeTime,
scheduler: Optional[typing.Scheduler] = None) -> Observable:
def observable_timer_timespan_and_period(duetime: typing.RelativeTime,
period: typing.RelativeTime,
scheduler: Optional[typing.Scheduler] = None
) -> Observable:
if duetime == period:
def subscribe(observer, scheduler_=None):
_scheduler = scheduler or scheduler_ or timeout_scheduler
Expand All @@ -74,8 +76,10 @@ def action(count):
return observable_timer_duetime_and_period(duetime, period, scheduler)


def _timer(duetime: typing.AbsoluteOrRelativeTime, period: Optional[typing.RelativeTime] = None,
scheduler: Optional[typing.Scheduler] = None) -> Observable:
def _timer(duetime: typing.AbsoluteOrRelativeTime,
period: Optional[typing.RelativeTime] = None,
scheduler: Optional[typing.Scheduler] = None
) -> Observable:
if isinstance(duetime, datetime):
if period is None:
return observable_timer_date(duetime, scheduler)
Expand Down
2 changes: 1 addition & 1 deletion rx/core/observer/autodetachobserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def __init__(self,
on_next: Optional[typing.OnNext] = None,
on_error: Optional[typing.OnError] = None,
on_completed: Optional[typing.OnCompleted] = None
) -> Any:
) -> None:
self._on_next = on_next or noop
self._on_error = on_error or default_error
self._on_completed = on_completed or noop
Expand Down
8 changes: 5 additions & 3 deletions rx/core/observer/observeonobserver.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
from typing import Any

from .scheduledobserver import ScheduledObserver


class ObserveOnObserver(ScheduledObserver):

def _on_next_core(self, value):
def _on_next_core(self, value: Any) -> None:
super()._on_next_core(value)
self.ensure_active()

def _on_error_core(self, error):
def _on_error_core(self, error: Exception) -> None:
super()._on_error_core(error)
self.ensure_active()

def _on_completed_core(self):
def _on_completed_core(self) -> None:
super()._on_completed_core()
self.ensure_active()
2 changes: 1 addition & 1 deletion rx/core/observer/observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def __init__(self,
on_next: Optional[typing.OnNext] = None,
on_error: Optional[typing.OnError] = None,
on_completed: Optional[typing.OnCompleted] = None
) -> Any:
) -> None:
self.is_stopped = False
if on_next is not None:
self._on_next_core = on_next
Expand Down
4 changes: 2 additions & 2 deletions rx/core/operators/count.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from typing import Callable
from typing import Callable, Optional
from rx.core import Observable, pipe
from rx.core.typing import Predicate

from rx import operators as ops


def _count(predicate: Predicate = None) -> Callable[[Observable], Observable]:
def _count(predicate: Optional[Predicate] = None) -> Callable[[Observable], Observable]:

if predicate:
filtering = ops.filter(predicate)
Expand Down
6 changes: 3 additions & 3 deletions rx/core/operators/delay.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Callable
from typing import Callable, Optional
from datetime import datetime, timedelta

from rx import operators as ops
Expand All @@ -15,7 +15,7 @@ def __init__(self, value, timestamp):


def observable_delay_timespan(source: Observable, duetime: typing.RelativeTime,
scheduler: typing.Scheduler = None) -> Observable:
scheduler: Optional[typing.Scheduler] = None) -> Observable:

def subscribe(observer, scheduler_=None):
nonlocal duetime
Expand Down Expand Up @@ -99,7 +99,7 @@ def action(scheduler, state):
return Observable(subscribe)


def _delay(duetime: typing.RelativeTime, scheduler: typing.Scheduler = None) -> Callable[[Observable], Observable]:
def _delay(duetime: typing.RelativeTime, scheduler: Optional[typing.Scheduler] = None) -> Callable[[Observable], Observable]:
def delay(source: Observable) -> Observable:
"""Time shifts the observable sequence.
Expand Down
8 changes: 4 additions & 4 deletions rx/core/operators/delaysubscription.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from datetime import datetime
from typing import Union, Callable
from typing import Callable, Optional

import rx
from rx import operators as ops
from rx.core import Observable, typing


def _delay_subscription(duetime: typing.AbsoluteOrRelativeTime, scheduler: typing.Scheduler = None
) -> Callable[[Observable], Observable]:
def _delay_subscription(duetime: typing.AbsoluteOrRelativeTime,
scheduler: Optional[typing.Scheduler] = None
) -> Callable[[Observable], Observable]:
def delay_subscription(source: Observable) -> Observable:
"""Time shifts the observable sequence by delaying the subscription.
Expand Down
11 changes: 7 additions & 4 deletions rx/core/operators/do.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from typing import Callable
from typing import Callable, Optional

from rx.core import Observable, typing
from rx.core.typing import Observer, Disposable
from rx.disposable import CompositeDisposable


def _do_action(on_next: typing.OnNext = None, on_error: typing.OnError = None, on_completed: typing.OnCompleted = None
def _do_action(on_next: Optional[typing.OnNext] = None,
on_error: Optional[typing.OnError] = None,
on_completed: Optional[typing.OnCompleted] = None
) -> Callable[[Observable], Observable]:
def do_action(source: Observable) -> Observable:
"""Invokes an action for each element in the observable
Expand Down Expand Up @@ -140,7 +142,8 @@ def do_on_dispose(source: Observable, on_dispose):
"""

class OnDispose(Disposable):
def dispose(self):

def dispose(self) -> None:
on_dispose()

def subscribe(observer, scheduler=None):
Expand Down Expand Up @@ -231,7 +234,7 @@ class OnDispose(Disposable):
def __init__(self, was_invoked):
self.was_invoked = was_invoked

def dispose(self):
def dispose(self) -> None:
if not self.was_invoked[0]:
finally_action()
self.was_invoked[0] = True
Expand Down
8 changes: 4 additions & 4 deletions rx/core/operators/filter.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Callable
from typing import Callable, Optional

from rx.core import Observable
from rx.core.typing import Predicate, PredicateIndexed, Scheduler, Observer, Disposable
Expand All @@ -22,7 +22,7 @@ def filter(source: Observable) -> Observable:
A filtered observable sequence.
"""

def subscribe(observer: Observer, scheduler: Scheduler) -> Disposable:
def subscribe(observer: Observer, scheduler: Optional[Scheduler]) -> Disposable:
def on_next(value):
try:
should_run = predicate(value)
Expand All @@ -38,7 +38,7 @@ def on_next(value):
return filter


def _filter_indexed(predicate_indexed: PredicateIndexed = None) -> Callable[[Observable], Observable]:
def _filter_indexed(predicate_indexed: Optional[PredicateIndexed] = None) -> Callable[[Observable], Observable]:
def filter_indexed(source: Observable) -> Observable:
"""Partially applied indexed filter operator.
Expand All @@ -55,7 +55,7 @@ def filter_indexed(source: Observable) -> Observable:
A filtered observable sequence.
"""

def subscribe(observer: Observer, scheduler: Scheduler):
def subscribe(observer: Observer, scheduler: Optional[Scheduler]):
count = 0

def on_next(value):
Expand Down
6 changes: 4 additions & 2 deletions rx/core/operators/firstordefault.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Callable
from typing import Any, Callable, Optional

from rx import operators as ops
from rx.core import Observable, pipe
Expand All @@ -25,7 +25,9 @@ def on_completed():
return first_or_default_async


def _first_or_default(predicate: Predicate = None, default_value: Any = None) -> Callable[[Observable], Observable]:
def _first_or_default(predicate: Optional[Predicate] = None,
default_value: Any = None
) -> Callable[[Observable], Observable]:
"""Returns the first element of an observable sequence that
satisfies the condition in the predicate, or a default value if no
such element exists.
Expand Down
6 changes: 3 additions & 3 deletions rx/core/operators/flatmap.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import collections
from typing import Callable
from typing import Callable, Optional

from rx import from_, from_future, operators as ops
from rx.core import Observable
Expand All @@ -23,7 +23,7 @@ def projection(x, i):
)


def _flat_map(mapper: Mapper = None) -> Callable[[Observable], Observable]:
def _flat_map(mapper: Optional[Mapper] = None) -> Callable[[Observable], Observable]:
def flat_map(source: Observable) -> Observable:
"""One of the Following:
Projects each element of an observable sequence to an observable
Expand Down Expand Up @@ -52,7 +52,7 @@ def flat_map(source: Observable) -> Observable:
return flat_map


def _flat_map_indexed(mapper_indexed: MapperIndexed = None) -> Callable[[Observable], Observable]:
def _flat_map_indexed(mapper_indexed: Optional[MapperIndexed] = None) -> Callable[[Observable], Observable]:
def flat_map_indexed(source: Observable) -> Observable:
"""One of the Following:
Projects each element of an observable sequence to an observable
Expand Down
2 changes: 1 addition & 1 deletion rx/disposable/serialdisposable.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def __init__(self) -> None:
def get_disposable(self) -> Optional[Disposable]:
return self.current

def set_disposable(self, value) -> None:
def set_disposable(self, value: typing.Disposable) -> None:
"""If the SerialDisposable has already been disposed, assignment
to this property causes immediate disposal of the given
disposable object. Assigning this property disposes the previous
Expand Down
26 changes: 17 additions & 9 deletions rx/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ def contains(value: Any, comparer=None) -> Callable[[Observable], Observable]:
return _contains(value, comparer)


def count(predicate=None) -> Callable[[Observable], Observable]:
def count(predicate: Optional[typing.Predicate] = None) -> Callable[[Observable], Observable]:
"""Returns an observable sequence containing a value that
represents how many elements in the specified observable sequence
satisfy a condition if provided, else the count of items.
Expand Down Expand Up @@ -370,7 +370,9 @@ def count(predicate=None) -> Callable[[Observable], Observable]:
return _count(predicate)


def debounce(duetime: typing.RelativeTime, scheduler: typing.Scheduler = None) -> Callable[[Observable], Observable]:
def debounce(duetime: typing.RelativeTime,
scheduler: Optional[typing.Scheduler] = None
) -> Callable[[Observable], Observable]:
"""Ignores values from an observable sequence which are followed by
another value before duetime.
Expand Down Expand Up @@ -429,7 +431,7 @@ def default_if_empty(default_value: Any = None) -> Callable[[Observable], Observ
return _default_if_empty(default_value)


def delay_subscription(duetime: typing.AbsoluteOrRelativeTime, scheduler: typing.Scheduler = None
def delay_subscription(duetime: typing.AbsoluteOrRelativeTime, scheduler: Optional[typing.Scheduler] = None
) -> Callable[[Observable], Observable]:
"""Time shifts the observable sequence by delaying the
subscription.
Expand Down Expand Up @@ -502,7 +504,9 @@ def dematerialize() -> Callable[[Observable], Observable]:
return _dematerialize()


def delay(duetime: typing.RelativeTime, scheduler: typing.Scheduler = None) -> Callable[[Observable], Observable]:
def delay(duetime: typing.RelativeTime,
scheduler: Optional[typing.Scheduler] = None
) -> Callable[[Observable], Observable]:
"""The delay operator.
.. marble::
Expand Down Expand Up @@ -629,7 +633,9 @@ def do(observer: typing.Observer) -> Callable[[Observable], Observable]:
return do_(observer)


def do_action(on_next: typing.OnNext = None, on_error: typing.OnError = None, on_completed: typing.OnCompleted = None
def do_action(on_next: Optional[typing.OnNext] = None,
on_error: Optional[typing.OnError] = None,
on_completed: Optional[typing.OnCompleted] = None
) -> Callable[[Observable], Observable]:
"""Invokes an action for each element in the observable sequence
and invokes an action on graceful or exceptional termination of the
Expand Down Expand Up @@ -812,7 +818,7 @@ def filter(predicate: Predicate) -> Callable[[Observable], Observable]:
return _filter(predicate)


def filter_indexed(predicate_indexed: PredicateIndexed = None) -> Callable[[Observable], Observable]:
def filter_indexed(predicate_indexed: Optional[PredicateIndexed] = None) -> Callable[[Observable], Observable]:
"""Filters the elements of an observable sequence based on a
predicate by incorporating the element's index.
Expand Down Expand Up @@ -952,7 +958,9 @@ def first(predicate=None) -> Callable[[Observable], Observable]:
return _first(predicate)


def first_or_default(predicate: Predicate = None, default_value: Any = None) -> Callable[[Observable], Observable]:
def first_or_default(predicate: Optional[Predicate] = None,
default_value: Any = None
) -> Callable[[Observable], Observable]:
"""Returns the first element of an observable sequence that
satisfies the condition in the predicate, or a default value if no
such element exists.
Expand Down Expand Up @@ -986,7 +994,7 @@ def first_or_default(predicate: Predicate = None, default_value: Any = None) ->
return _first_or_default(predicate, default_value)


def flat_map(mapper: Mapper = None) -> Callable[[Observable], Observable]:
def flat_map(mapper: Optional[Mapper] = None) -> Callable[[Observable], Observable]:
"""The flat_map operator.
.. marble::
Expand Down Expand Up @@ -1028,7 +1036,7 @@ def flat_map(mapper: Mapper = None) -> Callable[[Observable], Observable]:
return _flat_map(mapper)


def flat_map_indexed(mapper_indexed: MapperIndexed = None) -> Callable[[Observable], Observable]:
def flat_map_indexed(mapper_indexed: Optional[MapperIndexed] = None) -> Callable[[Observable], Observable]:
"""The `flat_map_indexed` operator.
One of the Following:
Expand Down

0 comments on commit 1cae62f

Please sign in to comment.