Skip to content

Commit

Permalink
Improve logging
Browse files Browse the repository at this point in the history
  • Loading branch information
godu committed Mar 27, 2018
1 parent ec593db commit f516fc3
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 25 deletions.
2 changes: 1 addition & 1 deletion src/bin/helper/dump.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import {escape as qsEscape} from 'querystring';
import {join} from 'path';
import {writeFile} from 'fs';
import {promisify} from 'util';
import {Observable} from 'rxjs';
import mkdirp from 'mkdirp';
import {parseRangeResponse} from '../../etcd/command';
import promisify from './promisify';

const mkdirpP = promisify(mkdirp);
const writeFileP = promisify(writeFile);
Expand Down
13 changes: 0 additions & 13 deletions src/bin/helper/promisify.js

This file was deleted.

2 changes: 1 addition & 1 deletion src/bin/helper/restore.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import {unescape as qsUnescape} from 'querystring';
import {join} from 'path';
import {promisify} from 'util';
import {readFile, readdir} from 'fs';
import mkdirp from 'mkdirp';
import {Observable} from 'rxjs';
import promisify from './promisify';

const mkdirpP = promisify(mkdirp);
const readFileP = promisify(readFile);
Expand Down
2 changes: 1 addition & 1 deletion src/bin/helper/test/dump.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import {tmpdir} from 'os';
import {join} from 'path';
import {promisify} from 'util';
import {readFile, readdir} from 'fs';
import test from 'ava';
import {Etcd3} from 'etcd3';
import dump from '../dump';
import promisify from '../promisify';

const readFileP = promisify(readFile);
const readdirP = promisify(readdir);
Expand Down
2 changes: 1 addition & 1 deletion src/bin/helper/test/save.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import {tmpdir} from 'os';
import {join} from 'path';
import {readFile} from 'fs';
import {promisify} from 'util';
import test from 'ava';
import {Etcd3} from 'etcd3';
import save from '../save';
import promisify from '../promisify';

const readFileP = promisify(readFile);

Expand Down
7 changes: 5 additions & 2 deletions src/etcd/fetch.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import {Observable} from 'rxjs';
import {pipe, get, size} from 'lodash/fp';
import createDebug from 'debug';
import {createFetchCommand} from './command';

const debug = createDebug('squirrel:etcd');
const debug = createDebug('squirrel:etcd:fetch');
const error = createDebug('squirrel:etcd:fetch:error');

const createFetch$ = client => {
debug(`fetch`);
return Observable.defer(() => Observable.fromPromise(client.getAll().exec()))
.do(records => debug(`Fetch ${pipe(get('kvs'), size)(records)} records from ETCD`))
.map(createFetchCommand)
.catch(err => error('Fail to fetch records from ETCD', err))
.retry(Infinity);
};

Expand Down
4 changes: 2 additions & 2 deletions src/etcd/test/watch.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ test('should create watcher observable', async t => {
watch_id: '1',
events: [
{
type: 'Put',
type: 'PUT',
kv: {
key: Buffer.from('foo'),
mod_revision: '1',
Expand All @@ -37,7 +37,7 @@ test('should create watcher observable', async t => {
watch_id: '1',
events: [
{
type: 'Put',
type: 'DELETE',
kv: {
key: Buffer.from('foo'),
create_revision: '0',
Expand Down
25 changes: 21 additions & 4 deletions src/etcd/watch.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import {Observable} from 'rxjs';
import createDebug from 'debug';
import createFetch$ from './fetch';
import {createWatchCommand} from './command';

const debug = createDebug('squirrel:etcd:watch');

const createWatcher$ = client => {
return Observable.create(observer => {
const watcher$ = client
Expand All @@ -18,11 +21,25 @@ const createWatcher$ = client => {
})
.mergeMap(p => Observable.fromPromise(p))
.mergeMap(watcher => {
const connected$ = Observable.fromEvent(watcher, 'connected');
const resync$ = connected$.concatMap(() => createFetch$(client));
const connected$ = Observable.fromEvent(watcher, 'connected').do(() =>
debug(`Watcher is reconnected`)
);
const disconnected$ = Observable.fromEvent(watcher, 'disconnected')
.do(() => debug(`Watcher is disconnected`))
.ignoreElements();
const connecting$ = Observable.fromEvent(watcher, 'connecting')
.do(() => debug(`Watcher is connecting`))
.ignoreElements();
const resync$ = Observable.merge(connected$, disconnected$, connecting$).concatMap(() =>
createFetch$(client)
);

const put$ = Observable.fromEvent(watcher, 'put');
const delete$ = Observable.fromEvent(watcher, 'delete');
const put$ = Observable.fromEvent(watcher, 'put').do(record =>
debug(`%o was updated to v%i`, record.key.toString('utf8'), record.mod_revision)
);
const delete$ = Observable.fromEvent(watcher, 'delete').do(record =>
debug(`%o was removed`, record.key.toString('utf8'))
);
const mutation$ = Observable.merge(put$, delete$).map(createWatchCommand);

return Observable.merge(resync$, mutation$);
Expand Down

0 comments on commit f516fc3

Please sign in to comment.