Skip to content

Commit

Permalink
feat(catch): add catch operator, related to #141, closes #130
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Jul 30, 2015
1 parent e5cf568 commit 94b4c01
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 0 deletions.
109 changes: 109 additions & 0 deletions spec/operators/catch-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.catch()', function () {
it('should pass the error as the first argument', function (done) {
Observable.throw('bad')
.catch(function (err) {
expect(err).toBe('bad');
return Observable.empty();
})
.subscribe(function () { },
function (err) {
expect('this was called').not.toBeTruthy();
},
done);
});

it('should catch the error and allow the return of a new observable to use', function (done) {
var expected = [1, 2, 'foo'];
Observable.of(1, 2, 3)
.map(function (n) {
if (n === 3) {
throw 'bad';
}
return n;
})
.catch(function (err) {
return Observable.of('foo');
})
.subscribe(function (x) {
expect(x).toBe(expected.shift());
}, function (err) {
expect('this was called').not.toBeTruthy();
}, function () {
done();
});
});

it('should catch and allow the observable to be repeated with the third (caught) argument', function (done) {
var expected = [1, 2, 1, 2, 1, 2];
var retries = 0;
Observable.of(1, 2, 3)
.map(function (n) {
if (n === 3) {
throw 'bad';
}
return n;
})
.catch(function (err, caught) {
if (retries++ == 2) {
throw 'done';
}
return caught;
})
.subscribe(function (x) {
expect(x).toBe(expected.shift());
}, function (err) {
expect(err).toBe('done');
done();
}, function () {
expect('this was called').not.toBeTruthy();
})
});

it('should complete if you return Observable.empty()', function (done) {
var expected = [1, 2];
Observable.of(1, 2, 3)
.map(function (n) {
if (n === 3) {
throw 'bad';
}
return n;
})
.catch(function (err) {
return Observable.empty();
})
.subscribe(function (x) {
expect(x).toBe(expected.shift());
}, function (err) {
expect('this was called').not.toBeTruthy();
}, function () {
done();
});
});


it('should error if you return Observable.throw()', function (done) {
var expected = [1, 2];
Observable.of(1, 2, 3)
.map(function (n) {
if (n === 3) {
throw 'bad';
}
return n;
})
.catch(function (err) {
return Observable.throw('haha');
})
.subscribe(function (x) {
expect(x).toBe(expected.shift());
}, function (err) {
expect(err).toBe('haha');
done();
}, function () {
expect('this was called').not.toBeTruthy();
});
});
});
2 changes: 2 additions & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,6 @@ export default class Observable<T> {

publish: () => ConnectableObservable<T>;
multicast: (subjectFactory: () => Subject<T>) => ConnectableObservable<T>;

catch: (selector: (err: any, source: Observable<T>, caught: Observable<any>) => Observable<any>) => Observable<T>;
}
4 changes: 4 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ import partition from './operators/partition';

observableProto.partition = partition;

import _catch from './operators/catch';

observableProto.catch = _catch;

export default {
Subject,
Scheduler,
Expand Down
49 changes: 49 additions & 0 deletions src/operators/catch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import Operator from '../Operator';
import Observer from '../Observer';
import Subscriber from '../Subscriber';
import Observable from '../Observable';

import tryCatch from '../util/tryCatch';
import {errorObject} from '../util/errorObject';

export default function _catch<T>(selector: (err:any, caught:Observable<any>) => Observable<any>) {
var catchOperator = new CatchOperator(selector);
var caught = this.lift(catchOperator);
catchOperator.caught = caught;
return caught;
}

export class CatchOperator<T, R> extends Operator<T, R> {
selector: (err:any, caught:Observable<any>) => Observable<any>;
caught: Observable<any>;
source: Observable<T>;

constructor(selector: (err:any, caught:Observable<any>) => Observable<any>) {
super();
this.selector = selector;
}

call(observer: Observer<T>): Observer<T> {
return new CatchSubscriber(observer, this.selector, this.caught);
}
}

export class CatchSubscriber<T> extends Subscriber<T> {
selector: (err:any, caught:Observable<any>) => Observable<any>;
caught: Observable<any>;

constructor(destination: Observer<T>, selector: (err:any, caught:Observable<any>) => Observable<any>, caught: Observable<any>) {
super(destination);
this.selector = selector;
this.caught = caught;
}

_error(err) {
const result = tryCatch(this.selector)(err, this.caught);
if (result === errorObject) {
this.destination.error(errorObject.e);
} else {
this.add(result.subscribe(this.destination));
}
}
}

0 comments on commit 94b4c01

Please sign in to comment.