Skip to content
This repository has been archived by the owner on Apr 20, 2018. It is now read-only.

pausableBuffered flushes its buffered messages on source's onCompleted and onError #349

Closed
metrofun opened this issue Nov 2, 2014 · 5 comments

Comments

@metrofun
Copy link

metrofun commented Nov 2, 2014

Current implementation of pausableBuffered when paused, still flushes buffered messaged if a source sequence has completed on errored.

Desired behavior is to flush buffered messaged, only when unpaused by pauser.

@metrofun metrofun changed the title pausableBuffered flush its buffer on source's onCompleted and onError pausableBuffered flushes its buffered messages on source's onCompleted and onError Nov 2, 2014
@metrofun
Copy link
Author

metrofun commented Nov 9, 2014

Current workaround that I'm using is:

source
    // Don't complete the source till `pauser` fired `true` at least once
    .merge(pauser.filter(Boolean).take(1).ignoreElements())
    .pausableBuffered(pauser)

@mattpodwysocki
Copy link
Member

@metrofun working on a more long term fix...

@mattpodwysocki
Copy link
Member

  function combineLatestSource(source, subject, resultSelector) {
    return new AnonymousObservable(function (observer) {
      var n = 2,
        hasValue = [false, false],
        hasValueAll = false,
        isDone = false,
        values = new Array(n);

      function next(x, i) {
        values[i] = x
        var res;
        hasValue[i] = true;
        if (hasValueAll || (hasValueAll = hasValue.every(identity))) {
          try {
            res = resultSelector.apply(null, values);
          } catch (ex) {
            observer.onError(ex);
            return;
          }
          observer.onNext(res);
        } else if (isDone && values[1] === true) {
          observer.onCompleted();
        }
      }

      return new CompositeDisposable(
        source.subscribe(
          function (x) {
            next(x, 0);
          },
          observer.onError.bind(observer),
          function () {
            isDone = true;
            values[1] && observer.onCompleted();
          }),
        subject.subscribe(
          function (x) {
            next(x, 1);
          },
          observer.onError.bind(observer),
          function () {
            isDone = true;
            next(true, 1);
          })
        );
    });
  }

  var PausableBufferedObservable = (function (__super__) {

    inherits(PausableBufferedObservable, __super__);

    function subscribe(observer) {
      var q = [], previousShouldFire;

      var subscription =
        combineLatestSource(
          this.source,
          this.pauser.distinctUntilChanged().startWith(false),
          function (data, shouldFire) {
            return { data: data, shouldFire: shouldFire };
          })
          .subscribe(
            function (results) {
              if (previousShouldFire !== undefined && results.shouldFire != previousShouldFire) {
                previousShouldFire = results.shouldFire;
                // change in shouldFire
                if (results.shouldFire) {
                  while (q.length > 0) {
                    observer.onNext(q.shift());
                  }
                }
              } else {
                previousShouldFire = results.shouldFire;
                // new data
                if (results.shouldFire) {
                  observer.onNext(results.data);
                } else {
                  q.push(results.data);
                }
              }
            },
            function (err) {
              // Empty buffer before sending error
              while (q.length > 0) {
                observer.onNext(q.shift());
              }
              observer.onError(err);
            },
            function () {
              // Empty buffer before sending completion
              while (q.length > 0) {
                observer.onNext(q.shift());
              }
              observer.onCompleted();
            }
          );
      return subscription;
    }

    function PausableBufferedObservable(source, pauser) {
      this.source = source;
      this.controller = new Subject();

      if (pauser && pauser.subscribe) {
        this.pauser = this.controller.merge(pauser);
      } else {
        this.pauser = this.controller;
      }

      __super__.call(this, subscribe);
    }

    PausableBufferedObservable.prototype.pause = function () {
      this.controller.onNext(false);
    };

    PausableBufferedObservable.prototype.resume = function () {
      this.controller.onNext(true);
    };

    return PausableBufferedObservable;

  }(Observable));

  /**
   * Pauses the underlying observable sequence based upon the observable sequence which yields true/false,
   * and yields the values that were buffered while paused.
   * @example
   * var pauser = new Rx.Subject();
   * var source = Rx.Observable.interval(100).pausableBuffered(pauser);
   * @param {Observable} pauser The observable sequence used to pause the underlying sequence.
   * @returns {Observable} The observable sequence which is paused based upon the pauser.
   */
  observableProto.pausableBuffered = function (subject) {
    return new PausableBufferedObservable(this, subject);
  };

mattpodwysocki added a commit that referenced this issue Nov 12, 2014
mattpodwysocki added a commit that referenced this issue Nov 12, 2014
@mattpodwysocki
Copy link
Member

@metrofun fixed the onCompleted issue, now working on the onError issue

@mattpodwysocki
Copy link
Member

@metrofun done!

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants