Skip to content

Commit

Permalink
Add combine_throttle operator
Browse files Browse the repository at this point in the history
  • Loading branch information
kormang committed Feb 3, 2024
1 parent af1663d commit 901dc4e
Show file tree
Hide file tree
Showing 5 changed files with 472 additions and 0 deletions.
38 changes: 38 additions & 0 deletions reactivex/__init__.py
Expand Up @@ -232,6 +232,43 @@ def combine_latest(*__sources: Observable[Any]) -> Observable[Any]:
return combine_latest_(*__sources)


def combine_throttle(*args: Observable[Any]) -> Observable[Tuple[Any, ...]]:
"""Merges the specified observable sequences into one observable
sequence by creating a result whenever all of the
observable sequences have produced an element at a corresponding
index. Faster observables, that emits events more frequently, are
throttled, so that they match speed of slower observables.
Speed of emitting items matches speed of slowest source observable.
It is somewhat similar to :func:`reactivex.zip` operator, returns tuple,
but items of faster observables are dropped, so that only latest values are
at each index.
It is also similar to :func:`reactivex.combine_latest`, but emits new item
only when all sources produce new item. Only latest items are included in
resulting tuple, others are dropped, similar to :func:`reactivex.with_latest_from`.
.. marble::
:alt: combine_throttle
--1---2-3--------4---|
-a-------b--c-d------|
[ combine_throttle() ]
--1,a----3,b-----4,d-|
Example:
>>> res = combine_throttle(obs1, obs2)
Args:
args: Observable sources to combine.
Returns:
An observable sequence containing the result of combining
elements of the sources as tuple.
"""
from .observable.combinethrottle import combine_throttle_

return combine_throttle_(*args)


def concat(*sources: Observable[_T]) -> Observable[_T]:
"""Concatenates all of the specified observable sequences.
Expand Down Expand Up @@ -1303,6 +1340,7 @@ def zip(*args: Observable[Any]) -> Observable[Tuple[Any, ...]]:
"catch_with_iterable",
"create",
"combine_latest",
"combine_throttle",
"compose",
"concat",
"concat_with_iterable",
Expand Down
76 changes: 76 additions & 0 deletions reactivex/observable/combinethrottle.py
@@ -0,0 +1,76 @@
from asyncio import Future
from threading import RLock
from typing import Any, Callable, List, Optional, Tuple

from reactivex import Observable, abc, from_future
from reactivex.disposable import CompositeDisposable, SingleAssignmentDisposable
from reactivex.internal import synchronized


def combine_throttle_(*args: Observable[Any]) -> Observable[Tuple[Any, ...]]:
"""Merges the specified observable sequences into one observable
sequence by creating a tuple whenever all of the observable sequences
have produced an element at a corresponding index.
Example:
>>> res = combine_throttle(source)
Args:
args: Observable sources to combine_throttle.
Returns:
An observable sequence containing the result of combining
elements of the sources as a tuple.
"""

n = len(args)

sources = list(args)

def subscribe(
observer: abc.ObserverBase[Any], scheduler: Optional[abc.SchedulerBase] = None
) -> CompositeDisposable:

flags = (1 << (n - 1)) & 0 # Reserve n zero bits.
full_mask = 1 << (n - 1)
full_mask |= full_mask - 1 # Create mask with n 1 bits.
lock = RLock()

results: List[None] = [None] * n

def create_on_next(i: int) -> Callable[[Any], None]:
@synchronized(lock)
def on_next(item: Any) -> None:
nonlocal flags
results[i] = item
flags |= 1 << i
if flags == full_mask:
flags = 0
observer.on_next(tuple(results))

return on_next

subscriptions: List[abc.DisposableBase] = []

for i in range(len(sources)):
source: Observable[Any] = sources[i]
if isinstance(source, Future):
source = from_future(source)

sad = SingleAssignmentDisposable()

sad.disposable = source.subscribe(
create_on_next(i),
observer.on_error,
observer.on_completed,
scheduler=scheduler,
)

subscriptions.append(sad)

return CompositeDisposable(subscriptions)

return Observable(subscribe=subscribe)


__all__ = ["combine_throttle_"]
39 changes: 39 additions & 0 deletions reactivex/operators/__init__.py
Expand Up @@ -430,6 +430,45 @@ def combine_latest(
return combine_latest_(*others)


def combine_throttle(
*args: Observable[Any],
) -> Callable[[Observable[Any]], Observable[Any]]:
"""Merges the specified observable sequences into one observable
sequence by creating a result whenever all of the
observable sequences have produced an element at a corresponding
index. Faster observables, that emits events more frequently, are
throttled, so that they match speed of slower observables.
Speed of emitting items matches speed of slowest source observable.
It is somewhat similar to :func:`reactivex.zip` operator, returns tuple,
but items of faster observables are dropped, so that only latest values are
at each index.
It is also similar to :func:`reactivex.combine_latest`, but emits new item
only when all sources produce new item. Only latest items are included in
resulting tuple, others are dropped, similar to :func:`reactivex.with_latest_from`.
.. marble::
:alt: combine_throttle
--1---2-3--------4---|
-a-------b--c-d------|
[ combine_throttle() ]
--1,a----3,b-----4,d-|
Example:
>>> res = combine_throttle(obs1, obs2)
Args:
args: Observable sources to combine.
Returns:
An observable sequence containing the result of combining
elements of the sources as tuple.
"""
from ._combinethrottle import combine_throttle_

return combine_throttle_(*args)


def concat(*sources: Observable[_T]) -> Callable[[Observable[_T]], Observable[_T]]:
"""Concatenates all the observable sequences.
Expand Down
31 changes: 31 additions & 0 deletions reactivex/operators/_combinethrottle.py
@@ -0,0 +1,31 @@
from typing import Any, Callable, Tuple

import reactivex
from reactivex import Observable


def combine_throttle_(
*args: Observable[Any],
) -> Callable[[Observable[Any]], Observable[Tuple[Any, ...]]]:
def _combine_throttle(source: Observable[Any]) -> Observable[Tuple[Any, ...]]:
"""Merges the specified observable sequences into one observable
sequence by creating a tuple whenever all of the observable sequences
have produced an element at a corresponding index.
Example:
>>> res = combine_throttle(source)
Args:
args: Observable sources to combine_throttle.
Returns:
An observable sequence containing the result of combining
elements of the sources as a tuple.
"""

return reactivex.combine_throttle(source, *args)

return _combine_throttle


__all__ = ["combine_throttle_"]

0 comments on commit 901dc4e

Please sign in to comment.