Skip to content

Commit

Permalink
feat(reducers): combineReducers catches errors when reducing values
Browse files Browse the repository at this point in the history
  When an error is caught, it is emitted to an optional error subject,
  while the stream itself does not emit. This means that any subscribers
  will still have the previously emitted value.
  • Loading branch information
anton164 committed Mar 3, 2020
1 parent b5c529b commit 60b6e12
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 15 deletions.
15 changes: 11 additions & 4 deletions src/reducer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ import { reducer, combineReducers, actionCreator } from 'rxbeach';
import test from 'ava';
import { marbles } from 'rxjs-marbles/ava';
import sinon from 'sinon';
import { Subject } from 'rxjs';

const throwErrorFn = (): number => {
throw 'error';
throw errors.e;
};
const incrementOne = actionCreator('[increment] one');
const decrement = actionCreator('[increment] decrement');
Expand Down Expand Up @@ -36,6 +37,9 @@ const numbers = {
'5': 5,
'6': 6,
};
const errors = {
e: 'error',
};

test('reducer should store reducer function', t => {
incrementOneHandler.resetHistory();
Expand Down Expand Up @@ -92,13 +96,16 @@ test(
);

test(
'combineReducers should not silence errors',
'combineReducers catches errors and emits them to error subject',
marbles(m => {
const action$ = m.hot(' 1d1', actions);
const expected$ = m.hot('2# ', numbers);
const expected$ = m.hot('2-3', numbers);
const errorMarbles = ' -e-';
const error$ = new Subject<any>();

m.expect(error$).toBeObservable(errorMarbles, errors);
m.expect(
action$.pipe(combineReducers(1, [handleOne, handleDecrement]))
action$.pipe(combineReducers(1, [handleOne, handleDecrement], error$))
).toBeObservable(expected$);
})
);
Expand Down
49 changes: 38 additions & 11 deletions src/reducer.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { OperatorFunction, pipe, Observable } from 'rxjs';
import { scan, map } from 'rxjs/operators';
import { OperatorFunction, pipe, Observable, Subject } from 'rxjs';
import { scan, map, filter } from 'rxjs/operators';
import {
VoidPayload,
UnknownAction,
UnknownActionCreator,
UnknownActionCreatorWithPayload,
defaultErrorSubject,
} from 'rxbeach/internal';
import { ofType, merge } from 'rxbeach/operators';

Expand Down Expand Up @@ -192,13 +193,20 @@ const ACTION_ORIGIN = Symbol('Action origin');
* `merge`, which is called with all the actions first and then the source
* streams in the order their reducers are defined in the `reducers` argument.
*
* If a reducer throws an error, it will be nexted on the error subject. If the
* error subject is not explicitly set, it will default to
* `defaultErrorSubject`, which will rethrow the errors globally, as uncaught
* exceptions. The stream will not complete or emit any value upon an error.
*
*
* @param seed The initial input to the first reducer call
* @param reducers The reducer entries that should be combined
* @see rxjs.merge
*/
export const combineReducers = <State>(
seed: State,
reducers: RegisteredReducer<State, any>[]
reducers: RegisteredReducer<State, any>[],
errorSubject: Subject<any> = defaultErrorSubject
): OperatorFunction<UnknownAction, State> => {
const actionReducers = reducers.filter(isActionReducer);
const streamReducers = reducers.filter(isStreamReducer);
Expand All @@ -222,14 +230,33 @@ export const combineReducers = <State>(
ofType(...actionReducers.flatMap(reducerFn => reducerFn.trigger.actions)),
map((action): Packet => ({ origin: ACTION_ORIGIN, value: action })),
merge(...source$s),
scan((state, packet) => {
if (packet.origin === ACTION_ORIGIN) {
const reducerFn = reducersByActionType.get(packet.value.type)!;
return reducerFn(state, packet.value.payload);
}
scan(
({ state }, packet) => {
try {
if (packet.origin === ACTION_ORIGIN) {
const reducerFn = reducersByActionType.get(packet.value.type)!;
return {
caughtError: false,
state: reducerFn(state, packet.value.payload),
};
}

const reducerFn = streamReducers[packet.origin];
return reducerFn(state, packet.value);
}, seed)
const reducerFn = streamReducers[packet.origin];
return {
caughtError: false,
state: reducerFn(state, packet.value),
};
} catch (e) {
errorSubject.next(e);
return {
caughtError: true,
state,
};
}
},
{ state: seed, caughtError: false }
),
filter(({ caughtError }) => caughtError === false),
map(({ state }) => state)
);
};

0 comments on commit 60b6e12

Please sign in to comment.