Skip to content

Commit

Permalink
Zip: completes as soon as any source completed
Browse files Browse the repository at this point in the history
Fixes #525
  • Loading branch information
hoc081098 committed Sep 23, 2020
1 parent 93e8a17 commit ea7c5a4
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 39 deletions.
9 changes: 4 additions & 5 deletions rx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from .core import Observable, pipe, typing
from .internal.utils import alias


# Please make sure the version here remains the same as in project.cfg
__version__ = '3.1.1'

Expand Down Expand Up @@ -561,8 +560,8 @@ def generate(initial_state: Any,


def hot(string: str,
timespan: typing.RelativeTime=0.1,
duetime:typing.AbsoluteOrRelativeTime = 0.0,
timespan: typing.RelativeTime = 0.1,
duetime: typing.AbsoluteOrRelativeTime = 0.0,
scheduler: Optional[typing.Scheduler] = None,
lookup: Optional[Mapping] = None,
error: Optional[Exception] = None
Expand Down Expand Up @@ -1092,8 +1091,8 @@ def zip(*args: Observable) -> Observable:
:alt: zip
--1--2---3-----4---|
-a----b----c-d-----|
[ zip() ]
-a----b----c-d------|
[ zip() ]
--1,a-2,b--3,c-4,d-|
Example:
Expand Down
17 changes: 5 additions & 12 deletions rx/core/observable/zip.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from threading import RLock
from typing import Optional, List

from rx import from_future
from rx.core import Observable, typing
from rx.disposable import CompositeDisposable, SingleAssignmentDisposable
from rx.internal.utils import is_future
from rx.internal.concurrency import synchronized
from threading import RLock
from rx.internal.utils import is_future


# pylint: disable=redefined-builtin
Expand All @@ -29,11 +29,11 @@ def _zip(*args: Observable) -> Observable:

sources = list(args)

def subscribe(observer: typing.Observer, scheduler: Optional[typing.Scheduler] = None):
def subscribe(observer: typing.Observer,
scheduler: Optional[typing.Scheduler] = None) -> CompositeDisposable:
n = len(sources)
queues: List[List] = [[] for _ in range(n)]
lock = RLock()
is_done = [False] * n

@synchronized(lock)
def next(i):
Expand All @@ -46,13 +46,6 @@ def next(i):
return

observer.on_next(res)
elif all([x for j, x in enumerate(is_done) if j != i]):
observer.on_completed()

def done(i):
is_done[i] = True
if all(is_done):
observer.on_completed()

subscriptions = [None] * n

Expand All @@ -65,7 +58,7 @@ def on_next(x):
queues[i].append(x)
next(i)

sad.disposable = source.subscribe_(on_next, observer.on_error, lambda: done(i), scheduler)
sad.disposable = source.subscribe_(on_next, observer.on_error, observer.on_completed, scheduler)
subscriptions[i] = sad

for idx in range(n):
Expand Down
4 changes: 2 additions & 2 deletions rx/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3492,8 +3492,8 @@ def zip(*args: Observable) -> Callable[[Observable], Observable]:
:alt: zip
--1--2---3-----4---|
-a----b----c-d-----|
[ zip() ]
-a----b----c-d------|
[ zip() ]
--1,a-2,b--3,c-4,d-|
Expand Down
45 changes: 25 additions & 20 deletions tests/test_observable/test_zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
created = ReactiveTest.created



class TestZip(unittest.TestCase):

def test_zip_never_never(self):
Expand All @@ -37,7 +36,7 @@ def create():
return o1.pipe(ops.zip(o2))

results = scheduler.start(create)
assert results.messages == []
assert results.messages == [on_completed(210)]

def test_zip_empty_empty(self):
scheduler = TestScheduler()
Expand Down Expand Up @@ -67,7 +66,7 @@ def create():
ops.map(sum))

results = scheduler.start(create)
assert results.messages == [on_completed(215)]
assert results.messages == [on_completed(210)]

def test_zip_non_empty_empty(self):
scheduler = TestScheduler()
Expand All @@ -82,7 +81,7 @@ def create():
ops.map(sum))

results = scheduler.start(create)
assert results.messages == [on_completed(215)]
assert results.messages == [on_completed(210)]

def test_zip_never_non_empty(self):
scheduler = TestScheduler()
Expand All @@ -96,7 +95,7 @@ def create():
ops.map(sum))

results = scheduler.start(create)
assert results.messages == []
assert results.messages == [on_completed(220)]

def test_zip_non_empty_never(self):
scheduler = TestScheduler()
Expand All @@ -110,7 +109,7 @@ def create():
ops.map(sum))

results = scheduler.start(create)
assert results.messages == []
assert results.messages == [on_completed(220)]

def test_zip_non_empty_non_empty(self):
scheduler = TestScheduler()
Expand All @@ -125,7 +124,7 @@ def create():
ops.map(sum))

results = scheduler.start(create)
assert results.messages == [on_next(220, 2 + 3), on_completed(240)]
assert results.messages == [on_next(220, 2 + 3), on_completed(230)]

def test_zip_empty_error(self):
ex = 'ex'
Expand Down Expand Up @@ -246,13 +245,15 @@ def msgs1_factory():
for i in range(5):
results.append(on_next(205 + i * 5, i))
return results

msgs1 = msgs1_factory()

def msgs2_factory():
results = []
for i in range(10):
results.append(on_next(205 + i * 8, i))
return results

msgs2 = msgs2_factory()

length = min(len(msgs1), len(msgs2))
Expand All @@ -265,13 +266,13 @@ def create():
ops.map(sum))

results = scheduler.start(create).messages
assert(length == len(results))
assert (length == len(results))
for i in range(length):
_sum = msgs1[i].value.value + msgs2[i].value.value
time = max(msgs1[i].time, msgs2[i].time)
assert(results[i].value.kind == 'N'
and results[i].time == time
and results[i].value.value == _sum)
assert (results[i].value.kind == 'N'
and results[i].time == time
and results[i].value.value == _sum)

def test_zip_some_data_asymmetric2(self):
scheduler = TestScheduler()
Expand All @@ -282,13 +283,15 @@ def msgs1_factory():
results.append(on_next(205 + i * 5, i))

return results

msgs1 = msgs1_factory()

def msgs2_factory():
results = []
for i in range(5):
results.append(on_next(205 + i * 8, i))
return results

msgs2 = msgs2_factory()

length = min(len(msgs1), len(msgs2))
Expand All @@ -301,13 +304,13 @@ def create():
ops.map(sum))

results = scheduler.start(create).messages
assert(length == len(results))
assert (length == len(results))
for i in range(length):
_sum = msgs1[i].value.value + msgs2[i].value.value
time = max(msgs1[i].time, msgs2[i].time)
assert(results[i].value.kind == 'N'
and results[i].time == time
and results[i].value.value == _sum)
assert (results[i].value.kind == 'N'
and results[i].time == time
and results[i].value.value == _sum)

def test_zip_some_data_symmetric(self):
scheduler = TestScheduler()
Expand All @@ -317,13 +320,15 @@ def msgs1_factory():
for i in range(10):
results.append(on_next(205 + i * 5, i))
return results

msgs1 = msgs1_factory()

def msgs2_factory():
results = []
for i in range(10):
results.append(on_next(205 + i * 8, i))
return results

msgs2 = msgs2_factory()

length = min(len(msgs1), len(msgs2))
Expand All @@ -336,13 +341,13 @@ def create():
ops.map(sum))

results = scheduler.start(create).messages
assert(length == len(results))
assert (length == len(results))
for i in range(length):
_sum = msgs1[i].value.value + msgs2[i].value.value
time = max(msgs1[i].time, msgs2[i].time)
assert(results[i].value.kind == 'N'
and results[i].time == time
and results[i].value.value == _sum)
assert (results[i].value.kind == 'N'
and results[i].time == time
and results[i].value.value == _sum)

def test_zip_with_iterable_never_empty(self):
scheduler = TestScheduler()
Expand Down Expand Up @@ -423,7 +428,7 @@ def create():
def test_zip_with_iterable_non_empty_non_empty(self):
scheduler = TestScheduler()
n1 = scheduler.create_hot_observable(
on_next(150, 1), on_next(215, 2), on_completed(230))
on_next(150, 1), on_next(215, 2), on_completed(230))
n2 = [3]

def create():
Expand Down

0 comments on commit ea7c5a4

Please sign in to comment.