Skip to content

Commit

Permalink
Merge 0a34ddc into 06fc120
Browse files Browse the repository at this point in the history
  • Loading branch information
kwonoj committed Oct 14, 2015
2 parents 06fc120 + 0a34ddc commit 91b9be9
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 12 deletions.
112 changes: 110 additions & 2 deletions spec/operators/skipUntil-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,118 @@ var Observable = Rx.Observable;

describe('Observable.prototype.skipUntil()', function () {
it('should skip values until another observable notifies', function () {
var source = hot('--a--b--c--d--e--|');
var e1 = hot('--a--b--c--d--e--|');
var skip = hot('-------------x--|');
var expected = ('--------------e--|');

expectObservable(source.skipUntil(skip)).toBe(expected);
expectObservable(e1.skipUntil(skip)).toBe(expected);
});

it('should skip value and raises error until another observable raises error', function () {
var e1 = hot('--a--b--c--d--e--|');
var skip = hot('-------------#');
var expected = '-------------#';

expectObservable(e1.skipUntil(skip)).toBe(expected);
});

it('should skip all element when another observable does not emit and completes early', function () {
var e1 = hot('--a--b--c--d--e--|');
var skip = hot('------------|');
var expected = '-----------------|';

expectObservable(e1.skipUntil(skip)).toBe(expected);
});

it('should skip all element when another observable is empty', function () {
var e1 = hot('--a--b--c--d--e--|');
var skip = Observable.empty();
var expected = '-----------------|';

expectObservable(e1.skipUntil(skip)).toBe(expected);
});

it('should not complete if source observable does not complete', function () {
var e1 = hot('-');
var skip = hot('-------------x--|');
var expected = '-';

expectObservable(e1.skipUntil(skip)).toBe(expected);
});

it('should not complete if source observable never completes', function () {
var e1 = Observable.never();
var skip = hot('-------------x--|');
var expected = '-';

expectObservable(e1.skipUntil(skip)).toBe(expected);
});

it('should raise error if source does not completes when another observable raises error', function () {
var e1 = hot('-');
var skip = hot('-------------#');
var expected = '-------------#';

expectObservable(e1.skipUntil(skip)).toBe(expected);
});

it('should raise error if source never completes when another observable raises error', function () {
var e1 = Observable.never();
var skip = hot('-------------#');
var expected = '-------------#';

expectObservable(e1.skipUntil(skip)).toBe(expected);
});

it('should skip all element and does not complete when another observable never completes', function () {
var e1 = hot('--a--b--c--d--e--|');
var skip = Observable.never();
var expected = '-';

expectObservable(e1.skipUntil(skip)).toBe(expected);
});

it('should skip all element and does not complete when another observable does not completes', function () {
var e1 = hot('--a--b--c--d--e--|');
var skip = hot('-');
var expected = '-';

expectObservable(e1.skipUntil(skip)).toBe(expected);
});

it('should skip all element and does not complete when another observable completes after source', function () {
var e1 = hot('--a--b--c--d--e--|');
var skip = hot('------------------------|');
var expected = '-';

expectObservable(e1.skipUntil(skip)).toBe(expected);
});

it('should not completes if source does not completes when another observable does not emit', function () {
var e1 = hot('-');
var skip = hot('--------------|');
var expected = '-';

expectObservable(e1.skipUntil(skip)).toBe(expected);
});

it('should not completes if source and another observable both does not complete', function () {
var e1 = hot('-');
var skip = hot('-');
var expected = '-';

expectObservable(e1.skipUntil(skip)).toBe(expected);
});

it('should skip all element when another observable unsubscribed early before emit', function () {
var e1 = hot('--a--b--c--d--e--|');
var skip = hot('-------------x--|');
var expected = '-';

e1.subscribe(function () {
skip.unsubscribe();
});

expectObservable(e1.skipUntil(skip)).toBe(expected);
});
});
37 changes: 27 additions & 10 deletions src/operators/skipUntil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,52 @@ class SkipUntilOperator<T, R> implements Operator<T, R> {
constructor(private notifier: Observable<any>) {
}

call(subscriber: Subscriber<R>): Subscriber<T> {
call(subscriber: Subscriber<T>): Subscriber<T> {
return new SkipUntilSubscriber(subscriber, this.notifier);
}
}

class SkipUntilSubscriber<T> extends Subscriber<T> {
private notificationSubscriber: NotificationSubscriber<any> = new NotificationSubscriber();
private notificationSubscriber: NotificationSubscriber<any> = null;

constructor(destination: Subscriber<T>, private notifier: Observable<any>) {
super(destination);
this.notificationSubscriber = new NotificationSubscriber(this);
this.add(this.notifier.subscribe(this.notificationSubscriber));
}

_next(x) {
if (this.notificationSubscriber.hasNotified) {
this.destination.next(x);
_next(value: T) {
if (this.notificationSubscriber.hasValue) {
this.destination.next(value);
}
}

_complete() {
if (this.notificationSubscriber.hasCompleted) {
this.destination.complete();
}
this.notificationSubscriber.unsubscribe();
}
}

class NotificationSubscriber<T> extends Subscriber<T> {
hasNotified: boolean = false;
hasValue: boolean = false;
hasCompleted: boolean = false;

constructor() {
constructor(private parent: SkipUntilSubscriber<any>) {
super(null);
}

_next() {
this.hasNotified = true;
this.unsubscribe();
_next(unused: T) {
this.hasValue = true;
}

_error(err) {
this.parent.error(err);
this.hasValue = true;
}

_complete() {
this.hasCompleted = true;
}
}

0 comments on commit 91b9be9

Please sign in to comment.