Skip to content

Commit

Permalink
partially revert #537
Browse files Browse the repository at this point in the history
The original fix for #525 breaks sequences where some observables emit
item faster than other ones. By completing too soon, the remaining
observables cannot catchup later. A very simple case is in #578 where
the two observables to zip emit their items sequentially.

We can fix both issues by completing whenever an observable completes
and there is no queued item. Otherwise we let the remaining observables
a chance to emit new items before completion.
  • Loading branch information
MainRo committed Jan 22, 2022
1 parent a4e84d8 commit 0a4d40a
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 3 deletions.
11 changes: 10 additions & 1 deletion rx/core/observable/zip.py
Expand Up @@ -34,6 +34,7 @@ def subscribe(observer: typing.Observer,
n = len(sources)
queues: List[List] = [[] for _ in range(n)]
lock = RLock()
is_completed = [False] * n

@synchronized(lock)
def next(i):
Expand All @@ -46,6 +47,14 @@ def next(i):
return

observer.on_next(res)
elif all([x for j, x in enumerate(is_completed) if j != i]) \
and all([len(x) == 0 for j, x in enumerate(queues) if j != i]):
observer.on_completed()

def completed(i):
is_completed[i] = True
if all(is_completed) or all([len(q) == 0 for q in queues]):
observer.on_completed()

subscriptions = [None] * n

Expand All @@ -58,7 +67,7 @@ def on_next(x):
queues[i].append(x)
next(i)

sad.disposable = source.subscribe_(on_next, observer.on_error, observer.on_completed, scheduler)
sad.disposable = source.subscribe_(on_next, observer.on_error, lambda: completed(i), scheduler)
subscriptions[i] = sad

for idx in range(n):
Expand Down
34 changes: 32 additions & 2 deletions tests/test_observable/test_zip.py
Expand Up @@ -95,7 +95,7 @@ def create():
ops.map(sum))

results = scheduler.start(create)
assert results.messages == [on_completed(220)]
assert results.messages == []

def test_zip_non_empty_never(self):
scheduler = TestScheduler()
Expand All @@ -109,7 +109,7 @@ def create():
ops.map(sum))

results = scheduler.start(create)
assert results.messages == [on_completed(220)]
assert results.messages == []

def test_zip_non_empty_non_empty(self):
scheduler = TestScheduler()
Expand All @@ -126,6 +126,36 @@ def create():
results = scheduler.start(create)
assert results.messages == [on_next(220, 2 + 3), on_completed(230)]

def test_zip_non_empty_non_empty_sequential(self):
scheduler = TestScheduler()
msgs1 = [on_next(210, 1), on_next(215, 2), on_completed(230)]
msgs2 = [on_next(240, 1), on_next(245, 3), on_completed(250)]
e1 = scheduler.create_cold_observable(msgs1)
e2 = scheduler.create_cold_observable(msgs2)

def create():
return e1.pipe(
ops.zip(e2),
ops.map(sum))

results = scheduler.start(create)
assert results.messages == [on_next(200+240, 1 + 1), on_next(200+245, 2 + 3), on_completed(200+250)]

def test_zip_non_empty_partial_sequential(self):
scheduler = TestScheduler()
msgs1 = [on_next(210, 1), on_next(215, 2), on_completed(230)]
msgs2 = [on_next(240, 1), on_completed(250)]
e1 = scheduler.create_cold_observable(msgs1)
e2 = scheduler.create_cold_observable(msgs2)

def create():
return e1.pipe(
ops.zip(e2),
ops.map(sum))

results = scheduler.start(create)
assert results.messages == [on_next(200+240, 1 + 1), on_completed(200+250)]

def test_zip_empty_error(self):
ex = 'ex'
scheduler = TestScheduler()
Expand Down

0 comments on commit 0a4d40a

Please sign in to comment.