diff --git a/lib/interval.js b/lib/interval.js index 9de411e..b3be1fd 100644 --- a/lib/interval.js +++ b/lib/interval.js @@ -40,7 +40,29 @@ onInterval = function(interval, load) { if (interval < 1000) { return Observable["throw"](new Error("Interval has to be at least 1s: " + interval + "ms")); } else { - return load().concat(Observable.interval(interval).flatMap(load)); + return Observable.create(function(observer) { + var dispose, loadSubscription, prepareNext, runLoad, timeoutHandle; + loadSubscription = timeoutHandle = null; + dispose = function() { + if (timeoutHandle) { + clearTimeout(timeoutHandle); + } + if (loadSubscription) { + loadSubscription.dispose(); + } + return loadSubscription = timeoutHandle = null; + }; + prepareNext = function() { + dispose(); + return timeoutHandle = setTimeout(runLoad, interval); + }; + runLoad = function() { + dispose(); + return loadSubscription = load().subscribe(observer.onNext.bind(observer), observer.onError.bind(observer), prepareNext); + }; + runLoad(); + return dispose; + }); } } else { return load(); diff --git a/src/interval.coffee b/src/interval.coffee index 689f392..b82245e 100644 --- a/src/interval.coffee +++ b/src/interval.coffee @@ -39,9 +39,29 @@ onInterval = (interval, load) -> if interval < 1000 Observable.throw new Error "Interval has to be at least 1s: #{interval}ms" else - load().concat( - Observable.interval(interval).flatMap(load) - ) + Observable.create (observer) -> + loadSubscription = timeoutHandle = null + + dispose = -> + clearTimeout(timeoutHandle) if timeoutHandle + loadSubscription.dispose() if loadSubscription + loadSubscription = timeoutHandle = null + + prepareNext = -> + dispose() + timeoutHandle = setTimeout runLoad, interval + + runLoad = -> + dispose() + loadSubscription = load().subscribe( + observer.onNext.bind(observer), + observer.onError.bind(observer), + prepareNext + ) + + runLoad() + + dispose else load()