Skip to content

Commit

Permalink
reimplement repeat() to not rely on 1st arg of concat() operator (i.e…
Browse files Browse the repository at this point in the history
…. iterator)
  • Loading branch information
Jérémie Fache committed Feb 6, 2019
1 parent 83cd7eb commit ca014b5
Showing 1 changed file with 57 additions and 5 deletions.
62 changes: 57 additions & 5 deletions rx/core/operators/repeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
from rx.core import Observable
from rx.internal.utils import infinite

from rx.disposable import SerialDisposable, CompositeDisposable, SingleAssignmentDisposable, Disposable
from rx.concurrency import current_thread_scheduler

def _repeat(repeat_count=None) -> Callable[[Observable], Observable]:
if repeat_count is None:
repeat_count = sys.maxsize
Expand All @@ -26,10 +29,59 @@ def repeat(source: Observable) -> Observable:
sequence repeatedly.
"""

if repeat_count is None:
gen = infinite()
else:
gen = range(repeat_count)
# if repeat_count is None:
# gen = infinite()
# else:
# gen = range(repeat_count)
#
# return rx.defer(lambda _: rx.concat(source for _ in gen))
# return repeat

if repeat_count == 0:
return rx.empty()

count = -1

def increment_counter():
nonlocal count
if repeat_count:
count += 1

def should_stop():
if repeat_count:
return count >= repeat_count
return False

def subscribe(observer, scheduler=None):
scheduler = scheduler or current_thread_scheduler

subscription = SerialDisposable()
cancelable = SerialDisposable()
is_disposed = False

def action(_, state=None):
nonlocal is_disposed
increment_counter()

if is_disposed:
return

def on_completed():
cancelable.disposable = scheduler.schedule(action)

if should_stop():
observer.on_completed()
else:
d = SingleAssignmentDisposable()
subscription.disposable = d
d.disposable = source.subscribe_(observer.on_next, observer.on_error, on_completed, scheduler)

cancelable.disposable = scheduler.schedule(action)

def dispose():
nonlocal is_disposed
is_disposed = True

return rx.defer(lambda _: rx.concat(source for _ in gen))
return CompositeDisposable(subscription, cancelable, Disposable(dispose))
return Observable(subscribe)
return repeat

0 comments on commit ca014b5

Please sign in to comment.