Skip to content

Commit

Permalink
Merge pull request #51 from CoorpAcademy/issue-etcd-token
Browse files Browse the repository at this point in the history
Abort ETCD request on unsubscribe
  • Loading branch information
godu authored Jul 6, 2016
2 parents 8a10fcc + c6daa3a commit 50a448a
Show file tree
Hide file tree
Showing 11 changed files with 377 additions and 250 deletions.
4 changes: 3 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
sudo: false
language: node_js
node_js:
- '0.12'
- '4'
- '5'
- '6'
cache:
directories:
- node_modules
after_success: npm run coveralls
deploy:
provider: npm
Expand Down
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,8 @@
"babel-register"
],
"all": true
},
"engines": {
"node" : ">=4"
}
}
12 changes: 5 additions & 7 deletions src/test/etcd.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import createEtcd$ from '../etcd';
import setEvent from './fixtures/set-event';
import deleteEvent from './fixtures/delete-event';
import resyncEvent from './fixtures/resync-event';
import createEtcdMock from '../util/test/helpers/etcd';

test('should composite events observable', async t => {
const getMocks = [[null, {
Expand All @@ -27,12 +28,9 @@ test('should composite events observable', async t => {
const watcher = new EventEmitter();
watcher.stop = () => {};

const watcherMocks = [setEvent, deleteEvent, resyncEvent, deleteEvent, setEvent];

const client = {
get: (cwd, options, cb) => cb(...getMocks.shift()),
watcher: () => watcher
};
const client = createEtcdMock({
get: getMocks
}, () => watcher);

const events$ = createEtcd$(client, '/');

Expand Down Expand Up @@ -61,7 +59,7 @@ test('should composite events observable', async t => {

const eventsP = events$.take(6).toArray().toPromise();

watcherMocks.forEach(
[setEvent, deleteEvent, resyncEvent, deleteEvent, setEvent].forEach(
event => watcher.emit(event.action, event)
);

Expand Down
21 changes: 9 additions & 12 deletions src/test/fetch.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ import {pipe, fill, map, concat} from 'lodash/fp';
import createFetch$ from '../fetch';

import emptyRoot from './fixtures/empty-root';
import createEtcdMock from '../util/test/helpers/etcd';

test('should fetch nodes', t => {
const getMocks = [[null, emptyRoot]];

const client = {
get: (cwd, options, cb) => cb(...getMocks.shift())
};
const client = createEtcdMock({
get: [[null, emptyRoot, null]]
});

const fetch$ = createFetch$(client, '/');

Expand All @@ -27,15 +26,13 @@ test('should fetch nodes', t => {
});

test('should retry on error', t => {
const getMocks = pipe(
const client = createEtcdMock({
get: pipe(
fill(new Error),
map(err => [err]),
concat([[null, emptyRoot]])
)(Array(10));

const client = {
get: (cwd, options, cb) => cb(...getMocks.shift())
};
concat([[null, emptyRoot, null]])
)(Array(10))
});

const fetch$ = createFetch$(client, '/');

Expand Down
45 changes: 21 additions & 24 deletions src/test/resync.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,19 @@ import createResyncer$ from '../resync';

import setEvent from './fixtures/set-event';
import resyncEvent from './fixtures/resync-event';
import createEtcdMock from '../util/test/helpers/etcd';

test('should transform resync event', async t => {
const getMocks = [[null, {
action: 'get',
node: {
key: '/',
dir: true,
nodes: [setEvent.node]
}
}]];

const client = {
get: (cwd, options, cb) => cb(...getMocks.shift())
};
const client = createEtcdMock({
get: [[null, {
action: 'get',
node: {
key: '/',
dir: true,
nodes: [setEvent.node]
}
}, null]]
});

const events$ = Observable.of(resyncEvent);

Expand All @@ -38,18 +37,16 @@ test('should transform resync event', async t => {
});

test('should keep order', async t => {
const getMocks = [[null, {
action: 'get',
node: {
key: '/',
dir: true,
nodes: [setEvent.node]
}
}]];

const client = {
get: (cwd, options, cb) => cb(...getMocks.shift())
};
const client = createEtcdMock({
get: [[null, {
action: 'get',
node: {
key: '/',
dir: true,
nodes: [setEvent.node]
}
}, null]]
});

const events$ = Observable.of(setEvent, resyncEvent, setEvent);

Expand Down
21 changes: 7 additions & 14 deletions src/test/watch.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,17 @@ import createWatcher$ from '../watch';
import setEvent from './fixtures/set-event';
import deleteEvent from './fixtures/delete-event';
import resyncEvent from './fixtures/resync-event';
import createEtcdMock from '../util/test/helpers/etcd';

test('should create watcher observable', t => {
t.plan(2);

const watcher = new EventEmitter();
watcher.stop = () => {};

const client = {
watcher: cwd => {
t.deepEqual(cwd, '/');
return watcher;
}
};
const client = createEtcdMock({}, cwd => {
t.deepEqual(cwd, '/');
return watcher;
});

const watcher$ = createWatcher$(client, '/');

Expand All @@ -38,10 +36,7 @@ test('should close watcher on unsubscribe', t => {
watcher.stop = () => {
t.pass();
};

const client = {
watcher: () => watcher
};
const client = createEtcdMock({}, cwd => watcher);

const watcher$ = createWatcher$(client, '/');

Expand All @@ -54,9 +49,7 @@ test('should, emit set/delete/resync events', t => {
const watcher = new EventEmitter();
watcher.stop = () => {};

const client = {
watcher: () => watcher
};
const client = createEtcdMock({}, cwd => watcher);

const watcher$ = createWatcher$(client, '/');

Expand Down
4 changes: 2 additions & 2 deletions src/util/cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export const syncDirectory$ = (client, pathFS, pathETCD) => {
return get$(client, pathETCD).catch(err => {
if (err.errorCode === 100)
return mkdir$(client, pathETCD);
return err;
throw err;
}).flatMap(action => {
if (action && !get('node.dir', action))
return del$(client, pathETCD)
Expand Down Expand Up @@ -52,7 +52,7 @@ export const syncFile$ = (client, pathFS, pathETCD) => {
return get$(client, pathETCD).catch(err => {
if (err.errorCode === 100)
return Observable.of(null);
return err;
throw err;
}).flatMap(action => {
if (action && get('node.dir', action)) return rmdirRecursive$(client, pathETCD);
return Observable.of(action);
Expand Down
39 changes: 21 additions & 18 deletions src/util/etcd.js
Original file line number Diff line number Diff line change
@@ -1,42 +1,45 @@
import {Observable} from 'rxjs';
import {invokeArgs} from 'lodash/fp';
import makeDebug from 'debug';
const debug = makeDebug('squirrel:util:etcd');

export const compareAndSwap$ = (client, ...argz) =>
debug('compareAndSwap', ...argz) ||
Observable.bindNodeCallback(client.compareAndSwap.bind(client))(...argz).pluck(0);
const wrap = fnName => (client, ...argz) =>
debug(fnName, ...argz, client) ||
Observable.create(observer => {
const token = invokeArgs(fnName, [...argz, (err, value) => {
if (err) return observer.error(err);
observer.next(value);
observer.complete();
}], client);

export const del$ = (client, ...argz) =>
debug('del', ...argz) ||
Observable.bindNodeCallback(client.del.bind(client))(...argz).pluck(0);
return () => {
token.abort();
};
});

export const compareAndSwap$ = wrap('compareAndSwap');

export const del$ = wrap('del');

export const delRecursive$ = (client, key) =>
debug('delRecursive', key) ||
del$(client, key, {recursive: true});

export const get$ = (client, ...argz) =>
debug('get', ...argz) ||
Observable.bindNodeCallback(client.get.bind(client))(...argz).pluck(0);
export const get$ = wrap('get');

export const getRecursive$ = (client, key) =>
debug('getRecursive', key) ||
get$(client, key, {recursive: true});

export const mkdir$ = (client, ...argz) =>
debug('mkdir', ...argz) ||
Observable.bindNodeCallback(client.mkdir.bind(client))(...argz).pluck(0);
export const mkdir$ = wrap('mkdir');

export const rmdir$ = (client, ...argz) =>
debug('rmdir', ...argz) ||
Observable.bindNodeCallback(client.rmdir.bind(client))(...argz).pluck(0);
export const rmdir$ = wrap('rmdir');

export const rmdirRecursive$ = (client, key) =>
debug('rmdirRecursive', key) ||
rmdir$(client, key, {recursive: true});

export const set$ = (client, ...argz) =>
debug('set', ...argz) ||
Observable.bindNodeCallback(client.set.bind(client))(...argz).pluck(0);
export const set$ = wrap('set');

export const isDirectory = node =>
debug('isDirectory', node) ||
Expand Down
Loading

0 comments on commit 50a448a

Please sign in to comment.