Skip to content

Commit

Permalink
Improve logging
Browse files Browse the repository at this point in the history
  • Loading branch information
godu committed Mar 27, 2018
1 parent ec593db commit 8805fca
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 8 deletions.
24 changes: 24 additions & 0 deletions src/bin/helper/test/promisify.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import test from 'ava';
import promisify from '../promisify';

test('should return a fullfilled promise', t => {
const callbackFunction = function(cb) {
cb();
};

return t.notThrows(promisify(callbackFunction)());
});

test('should return a rejected promise', async t => {
const callbackFunction = function(cb) {
cb(new Error());
};

await t.throws(promisify(callbackFunction)());

const throwFunction = function(cb) {
throw new Error();
};

return t.throws(promisify(throwFunction)());
});
7 changes: 5 additions & 2 deletions src/etcd/fetch.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import {Observable} from 'rxjs';
import {pipe, get, size} from 'lodash/fp';
import createDebug from 'debug';
import {createFetchCommand} from './command';

const debug = createDebug('squirrel:etcd');
const debug = createDebug('squirrel:etcd:fetch');
const error = createDebug('squirrel:etcd:fetch:error');

const createFetch$ = client => {
debug(`fetch`);
return Observable.defer(() => Observable.fromPromise(client.getAll().exec()))
.do(records => debug(`Fetch ${pipe(get('kvs'), size)(records)} records from ETCD`))
.map(createFetchCommand)
.catch(err => error('Fail to fetch records from ETCD', err))
.retry(Infinity);
};

Expand Down
4 changes: 2 additions & 2 deletions src/etcd/test/watch.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ test('should create watcher observable', async t => {
watch_id: '1',
events: [
{
type: 'Put',
type: 'PUT',
kv: {
key: Buffer.from('foo'),
mod_revision: '1',
Expand All @@ -37,7 +37,7 @@ test('should create watcher observable', async t => {
watch_id: '1',
events: [
{
type: 'Put',
type: 'DELETE',
kv: {
key: Buffer.from('foo'),
create_revision: '0',
Expand Down
25 changes: 21 additions & 4 deletions src/etcd/watch.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import {Observable} from 'rxjs';
import createDebug from 'debug';
import createFetch$ from './fetch';
import {createWatchCommand} from './command';

const debug = createDebug('squirrel:etcd:watch');

const createWatcher$ = client => {
return Observable.create(observer => {
const watcher$ = client
Expand All @@ -18,11 +21,25 @@ const createWatcher$ = client => {
})
.mergeMap(p => Observable.fromPromise(p))
.mergeMap(watcher => {
const connected$ = Observable.fromEvent(watcher, 'connected');
const resync$ = connected$.concatMap(() => createFetch$(client));
const connected$ = Observable.fromEvent(watcher, 'connected').do(() =>
debug(`Watcher is reconnected`)
);
const disconnected$ = Observable.fromEvent(watcher, 'disconnected')
.do(() => debug(`Watcher is disconnected`))
.ignoreElements();
const connecting$ = Observable.fromEvent(watcher, 'connecting')
.do(() => debug(`Watcher is connecting`))
.ignoreElements();
const resync$ = Observable.merge(connected$, disconnected$, connecting$).concatMap(() =>
createFetch$(client)
);

const put$ = Observable.fromEvent(watcher, 'put');
const delete$ = Observable.fromEvent(watcher, 'delete');
const put$ = Observable.fromEvent(watcher, 'put').do(record =>
debug(`%o was updated to v%i`, record.key.toString('utf8'), record.mod_revision)
);
const delete$ = Observable.fromEvent(watcher, 'delete').do(record =>
debug(`%o was removed`, record.key.toString('utf8'))
);
const mutation$ = Observable.merge(put$, delete$).map(createWatchCommand);

return Observable.merge(resync$, mutation$);
Expand Down

0 comments on commit 8805fca

Please sign in to comment.