Skip to content

Commit

Permalink
Merge 8600ec2 into 15754e0
Browse files Browse the repository at this point in the history
  • Loading branch information
matiboy committed Apr 13, 2023
2 parents 15754e0 + 8600ec2 commit 584b98d
Show file tree
Hide file tree
Showing 17 changed files with 491 additions and 400 deletions.
2 changes: 1 addition & 1 deletion reactivex/observable/observable.py
Expand Up @@ -236,7 +236,7 @@ def pipe(self, *operators: Callable[[Any], Any]) -> Any:

return pipe_(self, *operators)

def run(self) -> Any:
def run(self) -> _T_out:
"""Run source synchronously.
Subscribes to the observable source. Then blocks and waits for the
Expand Down
44 changes: 36 additions & 8 deletions reactivex/operators/__init__.py
Expand Up @@ -63,9 +63,14 @@ def all(predicate: Predicate[_T]) -> Callable[[Observable[_T]], Observable[bool]
.. marble::
:alt: all
--1--2--3--4--5-|
[ all(i: i<10) ]
----------------true-|
--1--2--3--4--5--6----|
[ all(i: i<8) ]
------------------true|
--1--2--3--4--5--6----|
[ all(i: i<4) ]
------false|
Example:
>>> op = all(lambda value: value.length > 3)
Expand All @@ -78,6 +83,13 @@ def all(predicate: Predicate[_T]) -> Callable[[Observable[_T]], Observable[bool]
returns an observable sequence containing a single element
determining whether all elements in the source sequence pass
the test in the specified predicate.
If a predicate returns false, the result sequence emits false
and completes immediately, regardless of the state of the
source sequence.
If all items pass the predicate test, the emission of true
will only happen as the source completes.
"""
from ._all import all_

Expand All @@ -90,9 +102,8 @@ def amb(right_source: Observable[_T]) -> Callable[[Observable[_T]], Observable[_
.. marble::
:alt: amb
---8--6--9-----------|
---8--6--9---------|
--1--2--3---5--------|
----------10-20-30---|
[ amb() ]
--1--2--3---5--------|
Expand Down Expand Up @@ -2611,7 +2622,7 @@ def scan(
Applies an accumulator function over an observable sequence and
returns each intermediate result. The optional seed value is used
as the initial accumulator value. For aggregation behavior with no
intermediate results, see `aggregate()` or `Observable()`.
intermediate results, see `reduce()` or `Observable()`.
.. marble::
:alt: scan
Expand Down Expand Up @@ -2705,12 +2716,29 @@ def single(
the condition in the optional predicate, and reports an exception
if there is not exactly one element in the observable sequence.
If the predicates does not match any item, the resulting sequence
errors once the source completes.
If the predicate matches more than one item, the resulting sequence
errors immediately, without waiting for the source to complete.
If the source never completes, the resulting sequence does not
emit anything, nor completes.
.. marble::
:alt: single
----1--2--3--4-----|
[ single(3) ]
----------3--------|
[ single(x==3) ]
-----------------3-|
----1--3--3--4-----|
[ single(x==3) ]
----------x
----1--1--1--1-----|
[ single(x==3) ]
-------------------x
Example:
>>> res = single()
Expand Down
147 changes: 73 additions & 74 deletions reactivex/operators/_amb.py
@@ -1,92 +1,91 @@
from asyncio import Future
from typing import Callable, List, Optional, TypeVar, Union
from typing import List, Optional, TypeVar, Union

from reactivex import Observable, abc, from_future
from reactivex.curry import curry_flip
from reactivex.disposable import CompositeDisposable, SingleAssignmentDisposable

_T = TypeVar("_T")


@curry_flip(1)
def amb_(
right_source: Union[Observable[_T], "Future[_T]"]
) -> Callable[[Observable[_T]], Observable[_T]]:
left_source: Observable[_T], right_source: Union[Observable[_T], "Future[_T]"]
) -> Observable[_T]:

if isinstance(right_source, Future):
obs: Observable[_T] = from_future(right_source)
else:
obs = right_source

def amb(left_source: Observable[_T]) -> Observable[_T]:
def subscribe(
observer: abc.ObserverBase[_T],
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
choice: List[Optional[str]] = [None]
left_choice = "L"
right_choice = "R"
left_subscription = SingleAssignmentDisposable()
right_subscription = SingleAssignmentDisposable()

def choice_left():
if not choice[0]:
choice[0] = left_choice
right_subscription.dispose()

def choice_right():
if not choice[0]:
choice[0] = right_choice
left_subscription.dispose()

def on_next_left(value: _T) -> None:
with left_source.lock:
choice_left()
if choice[0] == left_choice:
observer.on_next(value)

def on_error_left(err: Exception) -> None:
with left_source.lock:
choice_left()
if choice[0] == left_choice:
observer.on_error(err)

def on_completed_left() -> None:
with left_source.lock:
choice_left()
if choice[0] == left_choice:
observer.on_completed()

left_d = left_source.subscribe(
on_next_left, on_error_left, on_completed_left, scheduler=scheduler
)
left_subscription.disposable = left_d

def send_right(value: _T) -> None:
with left_source.lock:
choice_right()
if choice[0] == right_choice:
observer.on_next(value)

def on_error_right(err: Exception) -> None:
with left_source.lock:
choice_right()
if choice[0] == right_choice:
observer.on_error(err)

def on_completed_right() -> None:
with left_source.lock:
choice_right()
if choice[0] == right_choice:
observer.on_completed()

right_d = obs.subscribe(
send_right, on_error_right, on_completed_right, scheduler=scheduler
)
right_subscription.disposable = right_d
return CompositeDisposable(left_subscription, right_subscription)

return Observable(subscribe)

return amb
def subscribe(
observer: abc.ObserverBase[_T],
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
choice: List[Optional[str]] = [None]
left_choice = "L"
right_choice = "R"
left_subscription = SingleAssignmentDisposable()
right_subscription = SingleAssignmentDisposable()

def choice_left():
if not choice[0]:
choice[0] = left_choice
right_subscription.dispose()

def choice_right():
if not choice[0]:
choice[0] = right_choice
left_subscription.dispose()

def on_next_left(value: _T) -> None:
with left_source.lock:
choice_left()
if choice[0] == left_choice:
observer.on_next(value)

def on_error_left(err: Exception) -> None:
with left_source.lock:
choice_left()
if choice[0] == left_choice:
observer.on_error(err)

def on_completed_left() -> None:
with left_source.lock:
choice_left()
if choice[0] == left_choice:
observer.on_completed()

left_d = left_source.subscribe(
on_next_left, on_error_left, on_completed_left, scheduler=scheduler
)
left_subscription.disposable = left_d

def send_right(value: _T) -> None:
with left_source.lock:
choice_right()
if choice[0] == right_choice:
observer.on_next(value)

def on_error_right(err: Exception) -> None:
with left_source.lock:
choice_right()
if choice[0] == right_choice:
observer.on_error(err)

def on_completed_right() -> None:
with left_source.lock:
choice_right()
if choice[0] == right_choice:
observer.on_completed()

right_d = obs.subscribe(
send_right, on_error_right, on_completed_right, scheduler=scheduler
)
right_subscription.disposable = right_d
return CompositeDisposable(left_subscription, right_subscription)

return Observable(subscribe)


__all__ = ["amb_"]
67 changes: 32 additions & 35 deletions reactivex/operators/_average.py
@@ -1,7 +1,8 @@
from dataclasses import dataclass
from typing import Any, Callable, Optional, TypeVar, cast
from typing import Any, Optional, TypeVar, cast

from reactivex import Observable, operators, typing
from reactivex.curry import curry_flip

_T = TypeVar("_T")

Expand All @@ -12,51 +13,47 @@ class AverageValue:
count: int


@curry_flip(1)
def average_(
source: Observable[Any],
key_mapper: Optional[typing.Mapper[_T, float]] = None,
) -> Callable[[Observable[_T]], Observable[float]]:
def average(source: Observable[Any]) -> Observable[float]:
"""Partially applied average operator.
) -> Observable[float]:
"""Partially applied average operator.
Computes the average of an observable sequence of values that
are in the sequence or obtained by invoking a transform
function on each element of the input sequence if present.
Computes the average of an observable sequence of values that
are in the sequence or obtained by invoking a transform
function on each element of the input sequence if present.
Examples:
>>> res = average(source)
Examples:
>>> res = average(source)
Args:
source: Source observable to average.
Args:
source: Source observable to average.
Returns:
An observable sequence containing a single element with the
average of the sequence of values.
"""
Returns:
An observable sequence containing a single element with the
average of the sequence of values.
"""

key_mapper_: typing.Mapper[_T, float] = key_mapper or (
lambda x: float(cast(Any, x))
)
key_mapper_: typing.Mapper[_T, float] = key_mapper or (
lambda x: float(cast(Any, x))
)

def accumulator(prev: AverageValue, cur: float) -> AverageValue:
return AverageValue(sum=prev.sum + cur, count=prev.count + 1)
def accumulator(prev: AverageValue, cur: float) -> AverageValue:
return AverageValue(sum=prev.sum + cur, count=prev.count + 1)

def mapper(s: AverageValue) -> float:
if s.count == 0:
raise Exception("The input sequence was empty")
def mapper(s: AverageValue) -> float:
return s.sum / float(s.count)

return s.sum / float(s.count)
seed = AverageValue(sum=0, count=0)

seed = AverageValue(sum=0, count=0)

ret = source.pipe(
operators.map(key_mapper_),
operators.scan(accumulator, seed),
operators.last(),
operators.map(mapper),
)
return ret

return average
ret = source.pipe(
operators.map(key_mapper_),
operators.scan(accumulator, seed),
operators.last(),
operators.map(mapper),
)
return ret


__all__ = ["average_"]

0 comments on commit 584b98d

Please sign in to comment.