From 9e4324860741d7eb5676b986c664ec4b317c569d Mon Sep 17 00:00:00 2001 From: godu Date: Wed, 4 May 2016 15:41:53 +0200 Subject: [PATCH] Restructures files --- MARBLE.MD | 143 ++++++++++++++++++++++++ README.md | 6 + src/{indexer.js => build-index.js} | 0 src/{combiner.js => combine.js} | 4 +- src/etcd.js | 45 +------- src/fallback.js | 2 +- src/fetch.js | 18 +++ src/index.js | 10 +- src/{parser.js => parse.js} | 0 src/{patcher.js => patch.js} | 2 +- src/resync.js | 15 +++ src/test/{indexer.js => build-index.js} | 10 +- src/test/{combiner.js => combine.js} | 4 +- src/test/etcd.js | 4 +- src/test/fallback.js | 2 +- src/test/fetch.js | 2 +- src/test/{parser.js => parse.js} | 2 +- src/test/{patcher.js => patch.js} | 2 +- src/test/resync.js | 66 +++++++++++ src/test/{watcher.js => watch.js} | 2 +- src/watch.js | 13 +++ 21 files changed, 289 insertions(+), 63 deletions(-) create mode 100644 MARBLE.MD rename src/{indexer.js => build-index.js} (100%) rename src/{combiner.js => combine.js} (85%) create mode 100644 src/fetch.js rename src/{parser.js => parse.js} (100%) rename src/{patcher.js => patch.js} (96%) create mode 100644 src/resync.js rename src/test/{indexer.js => build-index.js} (86%) rename src/test/{combiner.js => combine.js} (97%) rename src/test/{parser.js => parse.js} (97%) rename src/test/{patcher.js => patch.js} (99%) create mode 100644 src/test/resync.js rename src/test/{watcher.js => watch.js} (93%) create mode 100644 src/watch.js diff --git a/MARBLE.MD b/MARBLE.MD new file mode 100644 index 0000000..c5d1348 --- /dev/null +++ b/MARBLE.MD @@ -0,0 +1,143 @@ +# Marble + +## Fallback + +On init, if a `fallback` file is specified, squirrel load content as first state. Allows restart without an available ETCD cluster. + +- If `fallback` is specified + +``` +-A| +``` + +A is the content of fallback file wrapped in `GET` action. + +```JSON +{ + "action": "GET", + "node": { + "key": "/", + "dir": true, + "nodes": [] + } +} +``` + +- If `fallback` isn't specified + +``` +| +``` + +- If `fallback` file doesn't exists + +``` +| +``` + +## Fetcher + +On init, squirrel fetch ETCD until success. + +``` +-A| +``` + +A is the content of ETCD wrapped in `GET` action. + +```JSON +{ + "action": "GET", + "node": { + "key": "/", + "dir": true, + "nodes": [] + } +} +``` + +On error, retry until is success. + +## Watcher + +Watch modification on ETCD cluster. + +``` +-A-B-C- +``` + +A B C are actions. + +`SET` action: +```JSON +{ + "action": "set", + "node": { + "key": "/foo", + "value": "foo", + "modifiedIndex": 1438, + "createdIndex": 1438 + }, + "prevNode": { + "key": "/foo", + "value": "bar", + "modifiedIndex": 1437, + "createdIndex": 1437 + } +} +``` + +`DELETE` action: +```JSON +{ + "action": "delete", + "node": { + "key": "/foo", + "modifiedIndex": 1435, + "createdIndex": 1434 + }, + "prevNode": { + "key": "/foo", + "value": "bar", + "modifiedIndex": 1434, + "createdIndex": 1434 + } +} +``` + +`RESYNC` action (server cleared and outdated the index) +```JSON +{ + "action": "resync" +} +``` + +## ETCD + +Combine `fallback`, `fetcher` and `watcher` observable. + +``` +-A| // fallback +-B| // fetcher +--C-D-E- // watcher + +-ABC-D-E- +``` + +## Combiner + +Aggregate ETCD's events to rebuild representation of ETCD content. + +``` +-a-b-c- // ETCD's events + +------ + \ \ \ + A \ C + B + +-A---BC // ConcatAll +``` + +A & C are `SET` or `GET` events. +B is a `RESYNC` event which is tranform to `fetch` observable. \ No newline at end of file diff --git a/README.md b/README.md index 528ac1b..3317624 100644 --- a/README.md +++ b/README.md @@ -28,3 +28,9 @@ $ npm test ``` Please note that test use an actual etcd service + + + + +## [Marble](./MARBLE.md) + diff --git a/src/indexer.js b/src/build-index.js similarity index 100% rename from src/indexer.js rename to src/build-index.js diff --git a/src/combiner.js b/src/combine.js similarity index 85% rename from src/combiner.js rename to src/combine.js index d6ae54e..d7cfed7 100644 --- a/src/combiner.js +++ b/src/combine.js @@ -1,6 +1,6 @@ -import {set, del} from './patcher'; +import {set, del} from './patch'; import createDebug from 'debug'; -const debug = createDebug('squirrel:combiner'); +const debug = createDebug('squirrel:combine'); const createCombiner$ = event$ => { return event$.scan((store, action) => { diff --git a/src/etcd.js b/src/etcd.js index 0af9709..30608f3 100644 --- a/src/etcd.js +++ b/src/etcd.js @@ -1,47 +1,12 @@ -import {Observable} from 'rxjs'; -import {parseAction} from './parser'; -import createDebug from 'debug'; -const debug = createDebug('squirrel:etcd'); - -const createFetch$ = (client, cwd) => { - debug(`fetch: ${cwd}`); - const list = Observable.bindNodeCallback( - cb => client.get(cwd, {recursive: true}, (err, data) => cb(err, data)) - ); - - return Observable.of(1) - .flatMap(() => list()) - .retry(Infinity) - .map(parseAction); -}; - -const createWatcher$ = watcher => { - const set$ = Observable.fromEvent(watcher, 'set'); - const delete$ = Observable.fromEvent(watcher, 'delete'); - const reSync$ = Observable.fromEvent(watcher, 'resync'); - - return Observable.merge(set$, delete$, reSync$) - .map(parseAction); -}; +import createWatcher$ from './watch'; +import createResync$ from './resync'; const createEtcd$ = (client, watcher, cwd) => { - const watcher$ = createWatcher$(watcher); - - const events$ = watcher$.startWith({ + const events$ = createWatcher$(watcher).startWith({ action: 'resync' - }).flatMap(action => { - if (action.action === 'resync') { - debug('watcher: resync'); - return createFetch$(client, cwd); - } - return Observable.of(action); }); - return events$; + return createResync$(client, cwd, events$); }; -export { - createFetch$, - createWatcher$, - createEtcd$ -}; +export default createEtcd$; diff --git a/src/fallback.js b/src/fallback.js index b4d14a1..628911b 100644 --- a/src/fallback.js +++ b/src/fallback.js @@ -1,7 +1,7 @@ import {readFile} from 'fs'; import {isString, identity} from 'lodash/fp'; import {Observable} from 'rxjs'; -import {parseAction} from './parser'; +import {parseAction} from './parse'; import createDebug from 'debug'; const debug = createDebug('squirrel:fallback'); diff --git a/src/fetch.js b/src/fetch.js new file mode 100644 index 0000000..40fc978 --- /dev/null +++ b/src/fetch.js @@ -0,0 +1,18 @@ +import {Observable} from 'rxjs'; +import {parseAction} from './parse'; +import createDebug from 'debug'; +const debug = createDebug('squirrel:etcd'); + +const createFetch$ = (client, cwd) => { + debug(`fetch: ${cwd}`); + const list = Observable.bindNodeCallback( + cb => client.get(cwd, {recursive: true}, (err, data) => cb(err, data)) + ); + + return Observable.of(1) + .flatMap(() => list()) + .retry(Infinity) + .map(parseAction); +}; + +export default createFetch$; diff --git a/src/index.js b/src/index.js index d4dd995..a35f564 100644 --- a/src/index.js +++ b/src/index.js @@ -7,13 +7,13 @@ import Etcd from 'node-etcd'; import {Observable} from 'rxjs'; import createDebug from 'debug'; -import {createEtcd$} from './etcd'; +import createEtcd$ from './etcd'; import createFallback$ from './fallback'; -import createCombiner$ from './combiner'; +import createCombiner$ from './combine'; import createStore from './store'; import createAPI from './api'; import createSave from './save'; -import createIndexer from './indexer'; +import createIndexBuilder from './build-index'; const debug = createDebug('squirrel'); @@ -34,7 +34,7 @@ const createSquirrel = options => { const client = new Etcd(options.hosts, pick(['auth', 'ca', 'key', 'cert'], options)); const watcher = client.watcher(options.cwd, null, {recursive: true}); - const indexer = createIndexer(options.indexes); + const indexBuilder = createIndexBuilder(options.indexes); const save = options.save ? createSave(options.fallback) : identity; const events$ = Observable.concat( @@ -45,7 +45,7 @@ const createSquirrel = options => { const node$ = createCombiner$(events$); const {store, observable} = createStore( save(node$), - indexer + indexBuilder ); const api = createAPI(store); diff --git a/src/parser.js b/src/parse.js similarity index 100% rename from src/parser.js rename to src/parse.js diff --git a/src/patcher.js b/src/patch.js similarity index 96% rename from src/patcher.js rename to src/patch.js index fd0060c..55409cb 100644 --- a/src/patcher.js +++ b/src/patch.js @@ -7,7 +7,7 @@ import { startsWith } from 'lodash/fp'; import createDebug from 'debug'; -const debug = createDebug('squirrel:patcher'); +const debug = createDebug('squirrel:patch'); const get = (store, {action, node}) => { if (!startsWith(store.key, node.key)) return null; diff --git a/src/resync.js b/src/resync.js new file mode 100644 index 0000000..521d3be --- /dev/null +++ b/src/resync.js @@ -0,0 +1,15 @@ +import {Observable} from 'rxjs'; +import createFetcher$ from './fetch'; +import createDebug from 'debug'; +const debug = createDebug('squirrel:etcd'); + +const createResync$ = (client, cwd, events$) => + events$.flatMap(action => { + if (action.action === 'resync') { + debug('watcher: resync'); + return createFetcher$(client, cwd); + } + return Observable.of(action); + }); + +export default createResync$; diff --git a/src/test/indexer.js b/src/test/build-index.js similarity index 86% rename from src/test/indexer.js rename to src/test/build-index.js index 400af01..bb6b5b1 100644 --- a/src/test/indexer.js +++ b/src/test/build-index.js @@ -1,8 +1,8 @@ import test from 'ava'; -import createIndexer from '../indexer'; +import createIndexBuild from '../build-index'; test('should create indexer', t => { - createIndexer(['name']); + createIndexBuild(['name']); }); test('should create index', t => { @@ -41,7 +41,7 @@ test('should create index', t => { } }; - t.deepEqual(createIndexer(indexes)(input), output); + t.deepEqual(createIndexBuild(indexes)(input), output); }); test('should index with two same entry', t => { @@ -74,7 +74,7 @@ test('should index with two same entry', t => { } }; - t.deepEqual(createIndexer(indexes)(input), output); + t.deepEqual(createIndexBuild(indexes)(input), output); }); test('should create index with deep key', t => { @@ -121,5 +121,5 @@ test('should create index with deep key', t => { } }; - t.deepEqual(createIndexer(indexes)(input), output); + t.deepEqual(createIndexBuild(indexes)(input), output); }); diff --git a/src/test/combiner.js b/src/test/combine.js similarity index 97% rename from src/test/combiner.js rename to src/test/combine.js index 986b034..a2b7397 100644 --- a/src/test/combiner.js +++ b/src/test/combine.js @@ -1,7 +1,7 @@ -import {Observable} from 'rxjs'; import test from 'ava'; +import {Observable} from 'rxjs'; -import createCombiner$ from '../combiner'; +import createCombiner$ from '../combine'; import emptyRoot from './fixtures/empty-root'; import setEvent from './fixtures/set-event'; diff --git a/src/test/etcd.js b/src/test/etcd.js index e4e3c81..97a0fcf 100644 --- a/src/test/etcd.js +++ b/src/test/etcd.js @@ -1,13 +1,13 @@ import test from 'ava'; import createEtcd from './helpers/etcd'; -import {createEtcd$} from '../etcd'; +import createEtcd$ from '../etcd'; import setEvent from './fixtures/set-event'; import deleteEvent from './fixtures/delete-event'; import resyncEvent from './fixtures/resync-event'; -test('should resync', async t => { +test('should composite events observable', async t => { const client = createEtcd({ get: [[null, { action: 'get', diff --git a/src/test/fallback.js b/src/test/fallback.js index 7cb78ba..1bd2d2c 100644 --- a/src/test/fallback.js +++ b/src/test/fallback.js @@ -1,6 +1,6 @@ import test from 'ava'; import {join} from 'path'; -import {parseNode} from '../parser'; +import {parseNode} from '../parse'; import createFallback$ from '../fallback'; diff --git a/src/test/fetch.js b/src/test/fetch.js index d19a130..5975a47 100644 --- a/src/test/fetch.js +++ b/src/test/fetch.js @@ -2,7 +2,7 @@ import test from 'ava'; import {pipe, fill, map, concat} from 'lodash/fp'; import createEtcd from './helpers/etcd'; -import {createFetch$} from '../etcd'; +import createFetch$ from '../fetch'; import emptyRoot from './fixtures/empty-root'; diff --git a/src/test/parser.js b/src/test/parse.js similarity index 97% rename from src/test/parser.js rename to src/test/parse.js index 729ae36..034c785 100644 --- a/src/test/parser.js +++ b/src/test/parse.js @@ -1,5 +1,5 @@ import test from 'ava'; -import {parseAction, parseNode} from '../parser'; +import {parseAction, parseNode} from '../parse'; test('should parseNode', t => { const tests = [{ diff --git a/src/test/patcher.js b/src/test/patch.js similarity index 99% rename from src/test/patcher.js rename to src/test/patch.js index 3e647ed..53f9279 100644 --- a/src/test/patcher.js +++ b/src/test/patch.js @@ -1,5 +1,5 @@ import test from 'ava'; -import {get, set, del} from '../patcher'; +import {get, set, del} from '../patch'; test('should get node', t => { const input = { diff --git a/src/test/resync.js b/src/test/resync.js new file mode 100644 index 0000000..f4e15f4 --- /dev/null +++ b/src/test/resync.js @@ -0,0 +1,66 @@ +import test from 'ava'; +import {Observable} from 'rxjs'; + +import createEtcd from './helpers/etcd'; +import createResyncer$ from '../resync'; + +import setEvent from './fixtures/set-event'; +import resyncEvent from './fixtures/resync-event'; + +test('should transform resync event', async t => { + const client = createEtcd({ + get: [[null, { + action: 'get', + node: { + key: '/', + dir: true, + nodes: [setEvent.node] + } + }]] + }); + + const events$ = Observable.of(resyncEvent); + + const watcher$ = createResyncer$(client, '/', events$); + + const expected = [{ + action: 'get', + node: { + key: '/', + dir: true, + nodes: [setEvent.node] + } + }]; + + const events = await watcher$.take(1).toArray().toPromise(); + t.deepEqual(events, expected); +}); + +test('should keep order', async t => { + const client = createEtcd({ + get: [[null, { + action: 'get', + node: { + key: '/', + dir: true, + nodes: [setEvent.node] + } + }]] + }); + + const events$ = Observable.of(setEvent, resyncEvent, setEvent); + + const watcher$ = createResyncer$(client, '/', events$); + + const expected = [setEvent, { + action: 'get', + node: { + key: '/', + dir: true, + nodes: [setEvent.node] + } + }, setEvent]; + + const events = await watcher$.take(3).toArray().toPromise(); + t.deepEqual(events, expected); +}); diff --git a/src/test/watcher.js b/src/test/watch.js similarity index 93% rename from src/test/watcher.js rename to src/test/watch.js index e7a5c31..dc3704f 100644 --- a/src/test/watcher.js +++ b/src/test/watch.js @@ -1,7 +1,7 @@ import {EventEmitter} from 'events'; import test from 'ava'; -import {createWatcher$} from '../etcd'; +import createWatcher$ from '../watch'; import setEvent from './fixtures/set-event'; import deleteEvent from './fixtures/delete-event'; diff --git a/src/watch.js b/src/watch.js new file mode 100644 index 0000000..f10df32 --- /dev/null +++ b/src/watch.js @@ -0,0 +1,13 @@ +import {Observable} from 'rxjs'; +import {parseAction} from './parse'; + +const createWatcher$ = watcher => { + const set$ = Observable.fromEvent(watcher, 'set'); + const delete$ = Observable.fromEvent(watcher, 'delete'); + const reSync$ = Observable.fromEvent(watcher, 'resync'); + + return Observable.merge(set$, delete$, reSync$) + .map(parseAction); +}; + +export default createWatcher$;