Skip to content

Commit

Permalink
Add disposability to actions
Browse files Browse the repository at this point in the history
  • Loading branch information
Berkeley Martinez authored and Berkeley Martinez committed Dec 17, 2015
1 parent 2f90eca commit 8495558
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 31 deletions.
23 changes: 14 additions & 9 deletions docs/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Actions

### Actions(spec : object) : ActionsFactory
### Actions(spec : Object) => ActionsFactory
[Ⓢ]()

A factory that produces stampit factory functions. Takes in optional object
Expand All @@ -16,32 +16,37 @@ stampit [docs](https://github.com/stampit-org/stampit).
Any other properties are used to create observables
methods of these factories instances. These are taken as the specifications of the actions instance.

> spec signature : { methodName: mappingFunction | null }
`spec: { methodName: (map(val: any) => val: Observable|Any)|Null }`

For every key on spec,
there will be a corresponding method with that name. If the keys value on spec
is a function, it is used as an initial mapping function. If value is null, the
indentity function, `((x) => x)`, is used.
is a function, it is used as an initial map function. If value is null, the
identity function, `((x) => x)`, is used. Keys with `_` are ignored.

A mapping function has the following signature

`(value : any) : any|Observable<any>`
`map(value : Any) => Observable<any>|Any`

If a mapping function returns an observable, that observable is subscribed to
and those values it returns are passed to that actions observables.

### ActionsFactory(instanceProperties) : actions
### ActionsFactory(instanceProperties: Object) => actions: Object

A factory function ([a stampit stamp](https://github.com/stampit-org/stampit#stampit-api)) that returns an actions instance with methods defined during factory creation above.

### actions.someObservableMethod : observable
### actions.dispose: function

Any method defined during factory creation will be an observable method availble
Will dispose all the observable actions attached to the actions object and remove their observers.

### actions.someObservableMethod: Observable

Any method defined during factory creation will be an observable method available
on the instance object. This method has all the methods available to an [RxJS Observable](https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/observable.md#observable-instance-methods) instance as well as...

### actions.someObservableMethod.displayName : string

A displayName taken from the key in `spec`
A displayName for the action used during debugging and registering. To add a
displayName property use the `refs` of the stampit spec

## Store

Expand Down
101 changes: 81 additions & 20 deletions src/Actions.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import { Observable, Subject, Disposable, helpers } from 'rx';
import stampit from 'stampit';
import debugFactory from 'debug';
import {
Observable,
Subject,
Disposable,
CompositeDisposable,
helpers
} from 'rx';

import waitFor from './waitFor';

const { checkDisposed } = Disposable;
const assign = Object.assign;
const debug = debugFactory('thundercats:actions');
const currentStampSpec = [
Expand Down Expand Up @@ -40,14 +47,27 @@ export function getActionDef(ctx) {


export function create(shouldBind, { name, map }) {
let observers = [];
let actionStart = new Subject();
let maybeBound = shouldBind ?
const observers = [];
const actionDisposable = new CompositeDisposable();
const actionStart = new Subject();
const maybeBound = shouldBind ?
map.bind(this) :
map;

function action(value) {
Observable.just(value)
// throw if disposed observable is retried
checkDisposed(action);
if (action.isStopped) {
debug('%s called after being stopped', name);
return value;
}

// NOTE: if an error is thrown in the mapping function
// this will cause the stream to collapse
// and the action will no longer be observable
// nor will the observers listen as they have been stopped by
// the error
const mapDisposable = Observable.just(value)
.map(maybeBound)
.flatMap(value => {
if (Observable.isObservable(value)) {
Expand All @@ -61,26 +81,45 @@ export function create(shouldBind, { name, map }) {
.doOnNext(
value => observers.forEach(observer => observer.onNext(value))
)
// notify action observers of error
.doOnError(
value => observers.forEach(observer => observer.onError(value))
)
// prevent error from calling on error
.catch(e => Observable.just(e))
.subscribe(
() => debug('action % onNext', name)
() => debug('%s onNext', name),
err => {
// observables returned by the mapping function must use
// a catch to prevent the action from collapsing the stream
action.error = err;
action.isStopped = true;
action.hasError = true;

// notify action observers of error
observers.forEach(observer => observer.onError(err));
// observers will no longer listen after pushing error
// as the stream has collapsed
// so we remove them
observers.length = 0;
}
);

actionDisposable.add(mapDisposable);
return value;
}

action.isDisposed = false;
action.isStopped = false;
action.displayName = name;
action.observers = observers;
assign(action, Observable.prototype);

action.hasObservers = function hasObservers() {
return observers.length > 0 ||
actionStart.hasObservers();
// in next major version this should throw if already disposed
// in order to better follow RxJS conventions
//
// checkDisposed(action);

return !!(
observers.length > 0 ||
actionStart.observers &&
actionStart.observers.length > 0
);
};

action.waitFor = function() {
Expand All @@ -89,22 +128,37 @@ export function create(shouldBind, { name, map }) {
};

action._subscribe = function subscribeToAction(observer) {
// in next major version this should check if action
// has been stopped or disposed and act accordingly
observers.push(observer);
return new Disposable(() => {
observers.splice(observers.indexOf(observer), 1);
});
};

const subscription = new Disposable(() => {
observers.length = 0;
action.isDisposed = true;
actionStart.dispose();
actionDisposable.dispose();
});

action.dispose = () => subscription.dispose();

Observable.call(action);

debug('action %s created', action.displayName);
return action;
debug('%s created', action.displayName);
return {
action,
subscription
};
}

export function createMany(shouldBind, instance) {
export function createMany(shouldBind, instance, compositeDisposable) {
return this
.map(create.bind(instance, shouldBind))
.reduce((ctx, action) => {
.reduce((ctx, { action, subscription }) => {
compositeDisposable.add(subscription);
ctx[action.displayName] = action;
return ctx;
}, {});
Expand All @@ -121,8 +175,15 @@ export default function Actions(obj = {}) {

return stampit()
.init(({ instance }) => {
const actionMethods = getActionDef(obj)::createMany(shouldBind, instance);
return assign(instance, actionMethods);
const actionsDisposable = new CompositeDisposable();
const actionMethods = getActionDef(obj)
::createMany(shouldBind, instance, actionsDisposable);

return assign(
instance,
actionMethods,
{ dispose() { actionsDisposable.dispose(); } }
);
})
.refs(refs)
.props(props)
Expand Down
114 changes: 112 additions & 2 deletions test/action.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* eslint-disable no-unused-expressions */
import Rx from 'rx';
import Rx, { Observable } from 'rx';
import stampit from 'stampit';
import chai, { expect } from 'chai';
import chai, { assert, expect } from 'chai';
import sinon from 'sinon';
import sinonChai from 'sinon-chai';
import chaiAsPromised from 'chai-as-promised';
Expand Down Expand Up @@ -193,6 +193,15 @@ describe('Actions', function() {
catActions.meow.should.equal('goesthecat');
initSpy.should.have.been.calledOnce;
});

it('should work without it', () => {
const catActions = Actions()();

assert(
typeof catActions === 'object',
'using undefined spec does not work'
);
});
});

describe('waitFor', function() {
Expand Down Expand Up @@ -267,6 +276,107 @@ describe('Actions', function() {
});

describe('disposal', function() {
let catActions;

beforeEach(function() {
const CatActions = Actions({
doThis(val) {
return val;
},
mapObs(obs) {
return obs;
}
});
catActions = CatActions();
});

it('should have a dispose method', () => {
expect(catActions.dispose).to.be.a('function');
});

it('should dispose action observables', () => {
catActions.mapObs.isDisposed.should.be.false;
catActions.doThis.isDisposed.should.be.false;

catActions.mapObs.subscribe(() => {});
catActions.doThis.subscribe(() => {});

catActions.mapObs.hasObservers().should.be.equal(true);
catActions.doThis.hasObservers().should.be.equal(true);

// should dispose all observers
catActions.dispose();

catActions.doThis.isDisposed.should.be.true;
catActions.mapObs.isDisposed.should.be.true;

// these should be removed in next major version
// as hasObservers should throw when action is disposed
catActions.doThis.hasObservers().should.be.equal(false);
catActions.mapObs.hasObservers().should.be.equal(false);

expect(() => {
catActions.doThis({});
}).to.throw(/object has been disposed/i);

expect(() => {
catActions.mapObs(Observable.from([1, 2, 3]));
}).to.throw(/object has been disposed/i);
});

it('individual action should dispose', () => {
assert(
typeof catActions.doThis.dispose === 'function',
'individual action does not have a dispose method'
);

catActions.doThis.subscribe(() => {});
assert(
catActions.doThis.hasObservers(),
'action does not any have observers after a subscribe'
);

catActions.doThis.dispose();
assert(
catActions.doThis.isDisposed,
'action is not disposed'
);

assert(
!catActions.mapObs.isDisposed,
'unrelated action is disposed!'
);

catActions.dispose();
assert(
catActions.mapObs.isDisposed,
'actions is not disposed'
);
});
});

describe('error', function() {
it('should stop action', () => {
const err = new Error('Catastrophy');
const CatActions = Actions({
doThis(val) {
return val;
}
});
const catActions = CatActions();
catActions.doThis.subscribe(
() => { throw new Error('should never be called'); },
(_err) => expect(_err).to.equal(err)
);
catActions.doThis.hasObservers().should.be.true;
catActions.doThis(Observable.throw(err));

catActions.doThis(Observable.just('foo'));
catActions.doThis.hasObservers().should.be.false;
});
});

describe('subscription', function() {

let spy;
let catActions;
Expand Down

0 comments on commit 8495558

Please sign in to comment.