Skip to content

Commit

Permalink
Add forkJoin
Browse files Browse the repository at this point in the history
Fixes #533
  • Loading branch information
hoc081098 committed Sep 23, 2020
1 parent ea7c5a4 commit 7b74d1c
Show file tree
Hide file tree
Showing 6 changed files with 353 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/operators.rst
Expand Up @@ -69,6 +69,7 @@ Operator Desc
:func:`start_with <rx.operators.start_with>` Emit a specified sequence of items before beginning to emit the items from the source Observable.
:func:`switch_latest <rx.operators.switch_latest>` Convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables.
:func:`zip <rx.operators.zip>` Combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function.
:func:`fork_join <rx.operators.fork_join>` Wait for Observables to complete and then combine last values they emitted into a tuple.
====================================================== ================================================

Error Handling
Expand Down
29 changes: 29 additions & 0 deletions rx/__init__.py
Expand Up @@ -318,6 +318,35 @@ def for_in(values: Iterable[Any], mapper: typing.Mapper) -> Observable:
return concat_with_iterable(map(mapper, values))


def fork_join(*sources: Observable) -> Observable:
"""Wait for observables to complete and then combine last values
they emitted into a tuple. Whenever any of that observables completes
without emitting any value, result sequence will complete at that moment as well.
.. marble::
:alt: fork_join
---a-----b--c---d-|
--1---2------3-4---|
-a--------- b---|
[ fork_join() ]
--------------------d4b|
Examples:
>>> obs = rx.fork_join(obs1, obs2, obs3)
Args:
sources: Sequence of observables.
Returns:
An observable sequence containing the result of combining last element from
each source in given sequence.
"""

from .core.observable.forkjoin import _fork_join
return _fork_join(*sources)


def from_callable(supplier: Callable[[], Any],
scheduler: Optional[typing.Scheduler] = None
) -> Observable:
Expand Down
69 changes: 69 additions & 0 deletions rx/core/observable/forkjoin.py
@@ -0,0 +1,69 @@
from typing import Optional

from rx.core import Observable, typing
from rx.disposable import CompositeDisposable, SingleAssignmentDisposable


def _fork_join(*sources: Observable) -> Observable:
"""Wait for observables to complete and then combine last values
they emitted into a tuple. Whenever any of that observables completes
without emitting any value, result sequence will complete at that moment as well.
Examples:
>>> obs = rx.fork_join(obs1, obs2, obs3)
Returns:
An observable sequence containing the result of combining last element from
each source in given sequence.
"""

parent = sources[0]

def subscribe(observer: typing.Observer,
scheduler: Optional[typing.Scheduler] = None
) -> CompositeDisposable:
n = len(sources)
values = [None] * n
is_done = [False] * n
has_value = [False] * n

def done(i: int):
is_done[i] = True

if not has_value[i]:
observer.on_completed()
return

if all(is_done):
if all(has_value):
observer.on_next(tuple(values))
observer.on_completed()
else:
observer.on_completed()

subscriptions = [None] * n

def _subscribe(i: int):
subscriptions[i] = SingleAssignmentDisposable()

def on_next(value):
with parent.lock:
values[i] = value
has_value[i] = True

def on_completed():
with parent.lock:
done(i)

subscriptions[i].disposable = sources[i].subscribe_(
on_next,
observer.on_error,
on_completed,
scheduler
)

for i in range(n):
_subscribe(i)
return CompositeDisposable(subscriptions)

return Observable(subscribe)
22 changes: 22 additions & 0 deletions rx/core/operators/forkjoin.py
@@ -0,0 +1,22 @@
from typing import Callable

import rx
from rx import Observable


def _fork_join(*args: Observable) -> Callable[[Observable], Observable]:
def fork_join(source: Observable) -> Observable:
"""Wait for observables to complete and then combine last values
they emitted into a tuple. Whenever any of that observables completes
without emitting any value, result sequence will complete at that moment as well.
Examples:
>>> obs = fork_join(source)
Returns:
An observable sequence containing the result of combining last element from
each source in given sequence.
"""
return rx.fork_join(source, *args)

return fork_join
27 changes: 27 additions & 0 deletions rx/operators/__init__.py
Expand Up @@ -1178,6 +1178,33 @@ def flat_map_latest(mapper: Mapper) -> Callable[[Observable], Observable]:
return _flat_map_latest(mapper)


def fork_join(*others: Observable) -> Callable[[Observable], Observable]:
"""Wait for observables to complete and then combine last values
they emitted into a tuple. Whenever any of that observables completes
without emitting any value, result sequence will complete at that moment as well.
.. marble::
:alt: fork_join
---a-----b--c---d-|
--1---2------3-4---|
-a--------- b---|
[ fork_join() ]
--------------------d4b|
Examples:
>>> res = fork_join(obs1)
>>> res = fork_join(obs1, obs2, obs3)
Returns:
An operator function that takes an observable source and
return an observable sequence containing the result
of combining last element from each source in given sequence.
"""
from rx.core.operators.forkjoin import _fork_join
return _fork_join(*others)


def group_by(key_mapper: Mapper,
element_mapper: Optional[Mapper] = None,
subject_mapper: Optional[Callable[[], Subject]] = None,
Expand Down
205 changes: 205 additions & 0 deletions tests/test_observable/test_forkjoin.py
@@ -0,0 +1,205 @@
import unittest

import rx
from rx import operators as ops
from rx.testing import ReactiveTest, TestScheduler

on_next = ReactiveTest.on_next
on_completed = ReactiveTest.on_completed
on_error = ReactiveTest.on_error
subscribe = ReactiveTest.subscribe
subscribed = ReactiveTest.subscribed
disposed = ReactiveTest.disposed
created = ReactiveTest.created


class RxException(Exception):
pass


class TestForkJoin(unittest.TestCase):
def test_fork_join_never_never(self):
scheduler = TestScheduler()
e1 = rx.never()
e2 = rx.never()

results = scheduler.start(lambda: rx.fork_join(e1, e2))
assert results.messages == []

def test_fork_join_never_empty(self):
scheduler = TestScheduler()
e1 = rx.never()
e2 = rx.empty()

results = scheduler.start(lambda: rx.fork_join(e1, e2))
assert results.messages == [on_completed(200)]

def test_fork_join_never_non_empty(self):
scheduler = TestScheduler()
e1 = rx.never()
e2 = scheduler.create_hot_observable([
on_next(150, 1),
on_next(230, 2),
on_completed(300)
])

results = scheduler.start(lambda: rx.fork_join(e1, e2))
assert results.messages == []

def test_fork_join_empty_empty(self):
scheduler = TestScheduler()
e1 = rx.empty()
e2 = rx.empty()

results = scheduler.start(lambda: rx.fork_join(e1, e2))
assert results.messages == [on_completed(200)]

def test_fork_join_empty_non_empty(self):
scheduler = TestScheduler()
e1 = rx.empty()
e2 = scheduler.create_hot_observable([
on_next(150, 1),
on_next(230, 2),
on_completed(300)
])

results = scheduler.start(lambda: rx.fork_join(e1, e2))
assert results.messages == [on_completed(200)]

def test_fork_join_non_empty_non_empty_right_last(self):
scheduler = TestScheduler()

msgs1 = [on_next(150, 1), on_next(215, 2), on_completed(230)]
msgs2 = [on_next(150, 1), on_next(220, 3), on_completed(240)]

e1 = scheduler.create_hot_observable(msgs1)
e2 = scheduler.create_hot_observable(msgs2)

results = scheduler.start(lambda: rx.fork_join(e1, e2))
assert results.messages == [on_next(240, (2, 3)), on_completed(240)]

def test_fork_join_non_empty_non_empty_left_last(self):
scheduler = TestScheduler()

msgs1 = [on_next(150, 1), on_next(215, 2), on_completed(300)]
msgs2 = [on_next(150, 1), on_next(220, 3), on_completed(240)]

e1 = scheduler.create_hot_observable(msgs1)
e2 = scheduler.create_hot_observable(msgs2)

results = scheduler.start(lambda: rx.fork_join(e1, e2))
assert results.messages == [on_next(300, (2, 3)), on_completed(300)]

def test_fork_join_empty_error(self):
ex = RxException()

scheduler = TestScheduler()
e1 = rx.empty()
e2 = scheduler.create_hot_observable([
on_next(150, 1),
on_next(230, 2),
on_error(300, ex)
])

results = scheduler.start(lambda: rx.fork_join(e1, e2))
assert results.messages == [on_completed(200)]

def test_fork_join_never_error(self):
ex = RxException()

scheduler = TestScheduler()
e1 = rx.never()
e2 = scheduler.create_hot_observable([
on_next(150, 1),
on_next(230, 2),
on_error(300, ex)
])

results = scheduler.start(lambda: rx.fork_join(e1, e2))
assert results.messages == [on_error(300, ex)]

def test_fork_join_non_empty_error_left_last(self):
ex = RxException()

scheduler = TestScheduler()

msgs1 = [on_next(150, 1), on_next(250, 2), on_completed(330)]
msgs2 = [on_next(150, 1), on_next(230, 2), on_error(300, ex)]
e1 = scheduler.create_hot_observable(msgs1)
e2 = scheduler.create_hot_observable(msgs2)

results = scheduler.start(lambda: rx.fork_join(e1, e2))
assert results.messages == [on_error(300, ex)]

def test_fork_join_non_empty_error_right_last(self):
ex = RxException()

scheduler = TestScheduler()

msgs1 = [on_next(150, 1), on_next(250, 2), on_completed(300)]
msgs2 = [on_next(150, 1), on_next(230, 2), on_error(330, ex)]
e1 = scheduler.create_hot_observable(msgs1)
e2 = scheduler.create_hot_observable(msgs2)

results = scheduler.start(lambda: rx.fork_join(e1, e2))
assert results.messages == [on_error(330, ex)]

def test_fork_join_error_error_left_last(self):
ex = RxException()

scheduler = TestScheduler()

msgs1 = [on_next(150, 1), on_next(250, 2), on_error(340, ex)]
msgs2 = [on_next(150, 1), on_next(230, 2), on_error(330, ex)]
e1 = scheduler.create_hot_observable(msgs1)
e2 = scheduler.create_hot_observable(msgs2)

results = scheduler.start(lambda: rx.fork_join(e1, e2))
assert results.messages == [on_error(330, ex)]

def test_fork_join_error_error_right_last(self):
ex = RxException()

scheduler = TestScheduler()

msgs1 = [on_next(150, 1), on_next(250, 2), on_error(340, ex)]
msgs2 = [on_next(150, 1), on_next(230, 2), on_error(370, ex)]
e1 = scheduler.create_hot_observable(msgs1)
e2 = scheduler.create_hot_observable(msgs2)

results = scheduler.start(lambda: rx.fork_join(e1, e2))
assert results.messages == [on_error(340, ex)]

def test_fork_join_many(self):
scheduler = TestScheduler()

msgs1 = [on_next(150, 1), on_next(210, 2), on_next(230, 3), on_next(300, 9), on_completed(500)]
msgs2 = [on_next(150, 1), on_next(205, 3), on_next(220, 7), on_next(400, 3), on_completed(900)]
msgs3 = [on_next(150, 1), on_next(250, 2), on_next(300, 3), on_next(400, 9), on_next(500, 2), on_completed(850)]
msgs4 = [on_next(150, 1), on_next(400, 2), on_next(550, 10), on_next(560, 11), on_next(600, 3),
on_completed(605)]
msgs5 = [on_next(150, 1), on_next(201, 3), on_next(550, 10), on_next(560, 11), on_next(600, 3),
on_next(900, 99), on_completed(905)]

xs = [scheduler.create_hot_observable(x) for x in [msgs1, msgs2, msgs3, msgs4, msgs5]]
results = scheduler.start(lambda: rx.fork_join(*xs))
assert results.messages == [on_next(905, (9, 3, 2, 3, 99)), on_completed(905)]

def test_fork_join_many_ops(self):
scheduler = TestScheduler()

msgs1 = [on_next(150, 1), on_next(210, 2), on_next(230, 3), on_next(300, 9), on_completed(500)]
msgs2 = [on_next(150, 1), on_next(205, 3), on_next(220, 7), on_next(400, 3), on_completed(900)]
msgs3 = [on_next(150, 1), on_next(250, 2), on_next(300, 3), on_next(400, 9), on_next(500, 2), on_completed(850)]
msgs4 = [on_next(150, 1), on_next(400, 2), on_next(550, 10), on_next(560, 11), on_next(600, 3),
on_completed(605)]
msgs5 = [on_next(150, 1), on_next(201, 3), on_next(550, 10), on_next(560, 11), on_next(600, 3),
on_next(900, 99), on_completed(905)]

xs = [scheduler.create_hot_observable(x) for x in [msgs2, msgs3, msgs4, msgs5]]

def create():
return scheduler.create_hot_observable(msgs1).pipe(ops.fork_join(*xs))

results = scheduler.start(create)
assert results.messages == [on_next(905, (9, 3, 2, 3, 99)), on_completed(905)]

0 comments on commit 7b74d1c

Please sign in to comment.