Skip to content

Commit

Permalink
Merge fe265ef into 15754e0
Browse files Browse the repository at this point in the history
  • Loading branch information
matiboy committed Feb 7, 2023
2 parents 15754e0 + fe265ef commit 580744c
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 149 deletions.
2 changes: 1 addition & 1 deletion reactivex/observable/observable.py
Expand Up @@ -236,7 +236,7 @@ def pipe(self, *operators: Callable[[Any], Any]) -> Any:

return pipe_(self, *operators)

def run(self) -> Any:
def run(self) -> _T_out:
"""Run source synchronously.
Subscribes to the observable source. Then blocks and waits for the
Expand Down
18 changes: 15 additions & 3 deletions reactivex/operators/__init__.py
Expand Up @@ -63,9 +63,14 @@ def all(predicate: Predicate[_T]) -> Callable[[Observable[_T]], Observable[bool]
.. marble::
:alt: all
--1--2--3--4--5-|
[ all(i: i<10) ]
----------------true-|
--1--2--3--4--5--6----|
[ all(i: i<8) ]
------------------true|
--1--2--3--4--5--6----|
[ all(i: i<4) ]
------false|
Example:
>>> op = all(lambda value: value.length > 3)
Expand All @@ -78,6 +83,13 @@ def all(predicate: Predicate[_T]) -> Callable[[Observable[_T]], Observable[bool]
returns an observable sequence containing a single element
determining whether all elements in the source sequence pass
the test in the specified predicate.
If a predicate returns false, the result sequence emits false
and completes immediately, regardless of the state of the
source sequence.
If all items pass the predicate test, the emission of true
will only happen as the source completes.
"""
from ._all import all_

Expand Down
67 changes: 32 additions & 35 deletions reactivex/operators/_average.py
@@ -1,7 +1,8 @@
from dataclasses import dataclass
from typing import Any, Callable, Optional, TypeVar, cast
from typing import Any, Optional, TypeVar, cast

from reactivex import Observable, operators, typing
from reactivex.curry import curry_flip

_T = TypeVar("_T")

Expand All @@ -12,51 +13,47 @@ class AverageValue:
count: int


@curry_flip(1)
def average_(
source: Observable[Any],
key_mapper: Optional[typing.Mapper[_T, float]] = None,
) -> Callable[[Observable[_T]], Observable[float]]:
def average(source: Observable[Any]) -> Observable[float]:
"""Partially applied average operator.
) -> Observable[float]:
"""Partially applied average operator.
Computes the average of an observable sequence of values that
are in the sequence or obtained by invoking a transform
function on each element of the input sequence if present.
Computes the average of an observable sequence of values that
are in the sequence or obtained by invoking a transform
function on each element of the input sequence if present.
Examples:
>>> res = average(source)
Examples:
>>> res = average(source)
Args:
source: Source observable to average.
Args:
source: Source observable to average.
Returns:
An observable sequence containing a single element with the
average of the sequence of values.
"""
Returns:
An observable sequence containing a single element with the
average of the sequence of values.
"""

key_mapper_: typing.Mapper[_T, float] = key_mapper or (
lambda x: float(cast(Any, x))
)
key_mapper_: typing.Mapper[_T, float] = key_mapper or (
lambda x: float(cast(Any, x))
)

def accumulator(prev: AverageValue, cur: float) -> AverageValue:
return AverageValue(sum=prev.sum + cur, count=prev.count + 1)
def accumulator(prev: AverageValue, cur: float) -> AverageValue:
return AverageValue(sum=prev.sum + cur, count=prev.count + 1)

def mapper(s: AverageValue) -> float:
if s.count == 0:
raise Exception("The input sequence was empty")
def mapper(s: AverageValue) -> float:
return s.sum / float(s.count)

return s.sum / float(s.count)
seed = AverageValue(sum=0, count=0)

seed = AverageValue(sum=0, count=0)

ret = source.pipe(
operators.map(key_mapper_),
operators.scan(accumulator, seed),
operators.last(),
operators.map(mapper),
)
return ret

return average
ret = source.pipe(
operators.map(key_mapper_),
operators.scan(accumulator, seed),
operators.last(),
operators.map(mapper),
)
return ret


__all__ = ["average_"]
61 changes: 30 additions & 31 deletions reactivex/operators/_take.py
@@ -1,53 +1,52 @@
from typing import Callable, Optional, TypeVar
from typing import Optional, TypeVar, cast

from reactivex import Observable, abc, empty
from reactivex.curry import curry_flip
from reactivex.internal import ArgumentOutOfRangeException

_T = TypeVar("_T")


def take_(count: int) -> Callable[[Observable[_T]], Observable[_T]]:
@curry_flip(1)
def take_(source: Observable[_T], count: int) -> Observable[_T]:
if count < 0:
raise ArgumentOutOfRangeException()

def take(source: Observable[_T]) -> Observable[_T]:
"""Returns a specified number of contiguous elements from the start of
an observable sequence.
"""Returns a specified number of contiguous elements from the start of
an observable sequence.
>>> take(source)
>>> take(source)
Keyword arguments:
count -- The number of elements to return.
Keyword arguments:
count -- The number of elements to return.
Returns an observable sequence that contains the specified number of
elements from the start of the input sequence.
"""
Returns an observable sequence that contains the specified number of
elements from the start of the input sequence.
"""

if not count:
return empty()
if not count:
return cast(Observable[_T], empty())

def subscribe(
observer: abc.ObserverBase[_T],
scheduler: Optional[abc.SchedulerBase] = None,
):
remaining = count
def subscribe(
observer: abc.ObserverBase[_T],
scheduler: Optional[abc.SchedulerBase] = None,
):
remaining = count

def on_next(value: _T) -> None:
nonlocal remaining
def on_next(value: _T) -> None:
nonlocal remaining

if remaining > 0:
remaining -= 1
observer.on_next(value)
if not remaining:
observer.on_completed()
if remaining > 0:
remaining -= 1
observer.on_next(value)
if not remaining:
observer.on_completed()

return source.subscribe(
on_next, observer.on_error, observer.on_completed, scheduler=scheduler
)
return source.subscribe(
on_next, observer.on_error, observer.on_completed, scheduler=scheduler
)

return Observable(subscribe)

return take
return Observable(subscribe)


__all__ = ["take_"]
91 changes: 46 additions & 45 deletions reactivex/operators/_throttlefirst.py
@@ -1,57 +1,58 @@
from datetime import datetime
from typing import Callable, Optional, TypeVar
from typing import Optional, TypeVar

from reactivex import Observable, abc, typing
from reactivex.curry import curry_flip
from reactivex.scheduler import TimeoutScheduler

_T = TypeVar("_T")


@curry_flip(1)
def throttle_first_(
window_duration: typing.RelativeTime, scheduler: Optional[abc.SchedulerBase] = None
) -> Callable[[Observable[_T]], Observable[_T]]:
def throttle_first(source: Observable[_T]) -> Observable[_T]:
"""Returns an observable that emits only the first item emitted
by the source Observable during sequential time windows of a
specified duration.
Args:
source: Source observable to throttle.
Returns:
An Observable that performs the throttle operation.
"""

def subscribe(
observer: abc.ObserverBase[_T],
scheduler_: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
_scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton()

duration = _scheduler.to_timedelta(window_duration or 0.0)
if duration <= _scheduler.to_timedelta(0):
raise ValueError("window_duration cannot be less or equal zero.")
last_on_next: Optional[datetime] = None

def on_next(x: _T) -> None:
nonlocal last_on_next
emit = False
now = _scheduler.now

with source.lock:
if not last_on_next or now - last_on_next >= duration:
last_on_next = now
emit = True
if emit:
observer.on_next(x)

return source.subscribe(
on_next, observer.on_error, observer.on_completed, scheduler=_scheduler
)

return Observable(subscribe)

return throttle_first
source: Observable[_T],
window_duration: typing.RelativeTime,
scheduler: Optional[abc.SchedulerBase] = None,
) -> Observable[_T]:
"""Returns an observable that emits only the first item emitted
by the source Observable during sequential time windows of a
specified duration.
Args:
source: Source observable to throttle.
Returns:
An Observable that performs the throttle operation.
"""

def subscribe(
observer: abc.ObserverBase[_T],
scheduler_: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
_scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton()

duration = _scheduler.to_timedelta(window_duration or 0.0)
if duration <= _scheduler.to_timedelta(0):
raise ValueError("window_duration cannot be less or equal zero.")
last_on_next: Optional[datetime] = None

def on_next(x: _T) -> None:
nonlocal last_on_next
emit = False
now = _scheduler.now

with source.lock:
if not last_on_next or now - last_on_next >= duration:
last_on_next = now
emit = True
if emit:
observer.on_next(x)

return source.subscribe(
on_next, observer.on_error, observer.on_completed, scheduler=_scheduler
)

return Observable(subscribe)


__all__ = ["throttle_first_"]
44 changes: 23 additions & 21 deletions reactivex/operators/_timestamp.py
@@ -1,8 +1,9 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Callable, Generic, Optional, TypeVar
from typing import Generic, Optional, TypeVar

from reactivex import Observable, abc, defer, operators
from reactivex.curry import curry_flip
from reactivex.scheduler import TimeoutScheduler

_T = TypeVar("_T")
Expand All @@ -14,36 +15,37 @@ class Timestamp(Generic[_T]):
timestamp: datetime


@curry_flip(1)
def timestamp_(
source: Observable[_T],
scheduler: Optional[abc.SchedulerBase] = None,
) -> Callable[[Observable[_T]], Observable[Timestamp[_T]]]:
def timestamp(source: Observable[_T]) -> Observable[Timestamp[_T]]:
"""Records the timestamp for each value in an observable sequence.
) -> Observable[Timestamp[_T]]:
"""Records the timestamp for each value in an observable sequence.
Examples:
>>> timestamp(source)
Examples:
>>> timestamp(source)
Produces objects with attributes `value` and `timestamp`, where
value is the original value.
Produces objects with attributes `value` and `timestamp`, where
value is the original value.
Args:
source: Observable source to timestamp.
Args:
source: Observable source to timestamp.
Returns:
An observable sequence with timestamp information on values.
"""
Returns:
An observable sequence with timestamp information on values.
Each emitted item is a Timestamp object with `.value` and
`.timestamp` attributes
"""

def factory(scheduler_: Optional[abc.SchedulerBase] = None):
_scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton()
def factory(scheduler_: Optional[abc.SchedulerBase] = None):
_scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton()

def mapper(value: _T) -> Timestamp[_T]:
return Timestamp(value=value, timestamp=_scheduler.now)
def mapper(value: _T) -> Timestamp[_T]:
return Timestamp(value=value, timestamp=_scheduler.now)

return source.pipe(operators.map(mapper))
return source.pipe(operators.map(mapper))

return defer(factory)

return timestamp
return defer(factory)


__all__ = ["timestamp_"]

0 comments on commit 580744c

Please sign in to comment.