Skip to content

Commit

Permalink
Should fix #35
Browse files Browse the repository at this point in the history
  • Loading branch information
dbrattli committed Apr 4, 2015
1 parent 3d96c6d commit 72eecb5
Showing 1 changed file with 8 additions and 7 deletions.
15 changes: 8 additions & 7 deletions rx/linq/observable/ziparray.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ def zip_array(cls, *args):
Keyword arguments:
args -- Observable sources.
Returns an observable {Observable} sequence containing lists of elements
at corresponding indexes.
Returns an observable {Observable} sequence containing lists of
elements at corresponding indexes.
"""

sources = list(args)
Expand Down Expand Up @@ -71,17 +71,18 @@ def on_next(x):
queues[i].append(x)
next(i)

subscriptions[i].disposable = sources[i].subscribe(on_next, observer.on_error, lambda: done(i))
subscription = sources[i].subscribe(on_next, observer.on_error, lambda: done(i))
subscriptions[i].disposable = subscription
for idx in range(n):
func(idx)

composite_disposable = CompositeDisposable(subscriptions)

def action():
for _ in queues:
queues[n] = []
def dispose():
for idx, _ in enumerate(queues):
queues[idx] = []

composite_disposable.add(Disposable.create(action))
composite_disposable.add(Disposable.create(dispose))

return composite_disposable
return AnonymousObservable(subscribe)

0 comments on commit 72eecb5

Please sign in to comment.