From a2f508d16b06a0f365c432924874faa1dfd5c490 Mon Sep 17 00:00:00 2001 From: Marius Darila Date: Fri, 8 Jul 2016 18:46:57 +0300 Subject: [PATCH] feat(events): add common events for ipfs daemon process --- src/IpfsBin.js | 10 +- src/IpfsBin.ts | 21 +++- src/IpfsConnector.js | 147 ++++++++++++++++------------ src/IpfsConnector.ts | 224 +++++++++++++++++++++++++++---------------- src/constants.js | 24 +++++ src/constants.ts | 25 +++++ tests/index.js | 17 +++- 7 files changed, 316 insertions(+), 152 deletions(-) create mode 100644 src/constants.js create mode 100644 src/constants.ts diff --git a/src/IpfsBin.js b/src/IpfsBin.js index 4b6552d..2b49a91 100644 --- a/src/IpfsBin.js +++ b/src/IpfsBin.js @@ -1,5 +1,6 @@ "use strict"; const Promise = require('bluebird'); +const fs_1 = require('fs'); const Wrapper = require('bin-wrapper'); const path = require('path'); const version = '0.4.1'; @@ -17,15 +18,22 @@ class IpfsBin { .dest(target) .use(process.platform === 'win32' ? 'ipfs.exe' : 'ipfs'); } + getPath() { + return this.wrapper.path(); + } check() { return new Promise((resolve, reject) => { this.wrapper.run(['version'], (err) => { if (err) { return reject(err); } - return resolve('ipfs-bin: executable is ok'); + return resolve(this.getPath()); }); }); } + deleteBin() { + const unlinkAsync = Promise.promisify(fs_1.unlink); + return unlinkAsync(this.getPath()); + } } exports.IpfsBin = IpfsBin; diff --git a/src/IpfsBin.ts b/src/IpfsBin.ts index 0cb9bc3..67d35bc 100644 --- a/src/IpfsBin.ts +++ b/src/IpfsBin.ts @@ -1,7 +1,7 @@ /// import * as Promise from 'bluebird'; - +import { unlink } from 'fs'; import Wrapper = require('bin-wrapper'); import path = require('path'); @@ -27,6 +27,14 @@ export class IpfsBin { .use(process.platform === 'win32' ? 'ipfs.exe' : 'ipfs'); } + /** + * Get exec path for IPFS + * @returns {string} + */ + getPath() { + return this.wrapper.path(); + } + /** * Start download and check the ipfs executable * @returns {Bluebird} @@ -38,8 +46,17 @@ export class IpfsBin { return reject(err); } - return resolve('ipfs-bin: executable is ok'); + return resolve(this.getPath()); }); }); } + + /** + * + * @returns {Bluebird} + */ + deleteBin() { + const unlinkAsync = Promise.promisify(unlink); + return unlinkAsync(this.getPath()); + } } diff --git a/src/IpfsConnector.js b/src/IpfsConnector.js index 38ab46d..302360e 100644 --- a/src/IpfsConnector.js +++ b/src/IpfsConnector.js @@ -1,30 +1,53 @@ "use strict"; -const os_1 = require('os'); -const Promise = require('bluebird'); const IpfsBin_1 = require('./IpfsBin'); const IpfsApiHelper_1 = require('./IpfsApiHelper'); const ipfsApi = require('ipfs-api'); +const events_1 = require('events'); +const constants_1 = require('./constants'); const childProcess = require('child_process'); -const path = require('path'); const symbolEnforcer = Symbol(); const symbol = Symbol(); -class IpfsConnector { +class IpfsConnector extends events_1.EventEmitter { constructor(enforcer) { + super(); this.downloadManager = new IpfsBin_1.IpfsBin(); + this.options = constants_1.options; + this._callbacks = new Map(); this.logger = console; - this.options = { - retry: true, - apiAddress: '/ip4/127.0.0.1/tcp/5001', - args: ['daemon'], - executable: '', - extra: { - env: Object.assign({}, process.env, { IPFS_PATH: path.join(os_1.homedir(), '.ipfs') }), - detached: true - } - }; if (enforcer !== symbolEnforcer) { throw new Error('Use .getInstance() instead of constructing a new object'); } + this._callbacks.set('process.stderr.on', (data) => { + if (data.toString().includes('daemon is running')) { + return this.emit(constants_1.events.SERVICE_STARTED); + } + return this.emit(constants_1.events.SERVICE_FAILED, data); + }); + this._callbacks.set('process.stdout.on', (data) => { + if (data.includes('Daemon is ready')) { + return this.emit(constants_1.events.SERVICE_STARTED); + } + }); + this._callbacks.set('ipfs.init', (err, stdout, stderr) => { + if (err) { + if (stderr.toString().includes('file already exists')) { + return this.emit(constants_1.events.IPFS_INIT); + } + this.logger.error(stderr); + return this.emit(constants_1.events.IPFS_INIT, stderr.toString()); + } + return this.emit(constants_1.events.IPFS_INIT); + }); + this._callbacks.set('events.IPFS_INIT', (err) => { + if (!err) { + this.start(); + } + }); + this._callbacks.set('events.SERVICE_FAILED', (message) => { + if (message.includes('ipfs init')) { + setTimeout(() => this._init(), 500); + } + }); } static getInstance() { if (!this[symbol]) { @@ -52,68 +75,70 @@ class IpfsConnector { this.options.extra.env.IPFS_PATH = target; } checkExecutable() { + const timeOut = setTimeout(() => { + this.emit(constants_1.events.DOWNLOAD_STARTED); + }, 600); return this.downloadManager.check().then(data => { - this.logger.info(data); + this.logger.info(`executing from ${data}`); return true; }).catch(err => { this.logger.error(err); + this.emit(constants_1.events.BINARY_CORRUPTED, err); return false; - }); + }).finally(() => clearTimeout(timeOut)); } start() { - return this._start().then(data => { - return data; - }).catch(err => { - if (this.options.retry) { - return this._init().then(() => this.start()); + return this.checkExecutable().then((binOk) => { + if (!binOk) { + return this.emit(constants_1.events.SERVICE_FAILED); } - return err; + this.process = childProcess.spawn(this.downloadManager.wrapper.path(), this.options.args, this.options.extra); + this.once(constants_1.events.SERVICE_STARTED, () => { + this._flushStartingEvents(); + }); + this._pipeStd(); + this._attachStartingEvents(); + }); + } + _attachStartingEvents() { + this.process.stderr.on('data', this._callbacks.get('process.stderr.on')); + this.process.stdout.on('data', this._callbacks.get('process.stdout.on')); + this.on(constants_1.events.IPFS_INIT, this._callbacks.get('events.IPFS_INIT')); + this.on(constants_1.events.SERVICE_FAILED, this._callbacks.get('events.SERVICE_FAILED')); + } + _flushStartingEvents() { + this.process.stderr.removeListener('data', this._callbacks.get('process.stderr.on')); + this.process.stdout.removeListener('data', this._callbacks.get('process.stdout.on')); + this.removeListener(constants_1.events.IPFS_INIT, this._callbacks.get('events.IPFS_INIT')); + this.removeListener(constants_1.events.SERVICE_FAILED, this._callbacks.get('events.SERVICE_FAILED')); + } + _pipeStd() { + const logError = (data) => this.logger.error(data.toString()); + const logInfo = (data) => this.logger.info(data.toString()); + this.process.stderr.on('data', logError); + this.process.stdout.on('data', logInfo); + this.once(constants_1.events.SERVICE_STOPPED, () => { + this.process.stderr.removeListener('data', logError); + this.process.stdout.removeListener('data', logInfo); }); } stop(signal = 'SIGINT') { - this.process.kill(signal); - this.process = null; + this.emit(constants_1.events.SERVICE_STOPPING); this._api = null; this.options.retry = true; - } - _init() { - return new Promise((resolve, reject) => { - let init = childProcess.exec(this.downloadManager.wrapper.path() + ' init', { env: this.options.extra.env }, (err, stdout, stderr) => { - if (err) { - if (stderr.toString().includes('file already exists')) { - return resolve('already init'); - } - this.logger.error(stderr); - return reject(stderr); - } - return resolve('init finished'); - }); - this.options.retry = false; + if (this.process) { + this.process.once('exit', () => this.emit(constants_1.events.SERVICE_STOPPED)); + this.process.kill(signal); this.process = null; - }); + return this; + } + this.emit(constants_1.events.SERVICE_STOPPED); + return this; } - _start() { - return new Promise((resolve, reject) => { - this.checkExecutable().then(() => { - this.process = childProcess.spawn(this.downloadManager.wrapper.path(), this.options.args, this.options.extra); - this.process.stderr.on('data', (data) => { - if (data.toString().includes('daemon is running')) { - return resolve('already running'); - } - this.logger.error(`ipfs:_start:stderr: ${data}`); - return reject(new Error('could not start ipfs')); - }); - this.process.stdout.on('data', (data) => { - this.logger.info(`ipfs:_start:stdout: ${data}`); - if (data.includes('Daemon is ready')) { - resolve('all systems up'); - } - }); - }).catch(err => { - this.logger.error(`ipfs:_start:err: ${err}`); - reject(err); - }); - }); + _init() { + let init = childProcess.exec(this.downloadManager.wrapper.path() + ' init', { env: this.options.extra.env }, this._callbacks.get('ipfs.init')); + this.options.retry = false; + this.process = null; } } exports.IpfsConnector = IpfsConnector; diff --git a/src/IpfsConnector.ts b/src/IpfsConnector.ts index a11896a..642604d 100644 --- a/src/IpfsConnector.ts +++ b/src/IpfsConnector.ts @@ -1,10 +1,11 @@ /// -import {homedir} from 'os'; import * as Promise from 'bluebird'; -import {IpfsBin} from './IpfsBin'; -import {IpfsApiHelper} from './IpfsApiHelper'; +import { IpfsBin } from './IpfsBin'; +import { IpfsApiHelper } from './IpfsApiHelper'; import * as ipfsApi from 'ipfs-api'; +import { EventEmitter } from 'events'; +import { events, options } from './constants'; import childProcess = require('child_process'); import path = require('path'); @@ -12,29 +13,67 @@ import path = require('path'); const symbolEnforcer = Symbol(); const symbol = Symbol(); -export class IpfsConnector { +export class IpfsConnector extends EventEmitter { private process: childProcess.ChildProcess; private downloadManager = new IpfsBin(); + public options = options; + private _callbacks = new Map(); private logger: any = console; private _api: IpfsApiHelper; - public options = { - retry: true, - apiAddress: '/ip4/127.0.0.1/tcp/5001', - args: ['daemon'], - executable: '', - extra: { - env: Object.assign({}, process.env, {IPFS_PATH: path.join(homedir(), '.ipfs')}), - detached: true - } - }; /** * @param enforcer */ constructor(enforcer: Symbol) { + super(); if (enforcer !== symbolEnforcer) { throw new Error('Use .getInstance() instead of constructing a new object'); } + this._callbacks.set('process.stderr.on', (data: string) => { + if (data.toString().includes('daemon is running')) { + /** + * @event IpfsConnector#SERVICE_STARTED + */ + return this.emit(events.SERVICE_STARTED); + } + /** + * @event IpfsConnector#SERVICE_FAILED + */ + return this.emit(events.SERVICE_FAILED, data); + }); + this._callbacks.set('process.stdout.on', (data: string) => { + if (data.includes('Daemon is ready')) { + /** + * @event IpfsConnector#SERVICE_STARTED + */ + return this.emit(events.SERVICE_STARTED); + } + }); + this._callbacks.set('ipfs.init', (err: Error, stdout: string, stderr: string) => { + if (err) { + if (stderr.toString().includes('file already exists')) { + /** + * @event IpfsConnector#IPFS_INIT + */ + return this.emit(events.IPFS_INIT); + } + this.logger.error(stderr); + // init exited with errors + return this.emit(events.IPFS_INIT, stderr.toString()); + } + // everything works fine + return this.emit(events.IPFS_INIT); + }); + this._callbacks.set('events.IPFS_INIT', (err: string) => { + if (!err) { + this.start(); + } + }); + this._callbacks.set('events.SERVICE_FAILED', (message: string) => { + if (message.includes('ipfs init')) { + setTimeout(() => this._init(), 500); + } + }); } /** @@ -99,103 +138,118 @@ export class IpfsConnector { * @returns {Bluebird} */ public checkExecutable(): Promise<{}> { + const timeOut = setTimeout(() => { + /** + * @event IpfsConnector#DOWNLOAD_STARTED + */ + this.emit(events.DOWNLOAD_STARTED); + }, 600); return this.downloadManager.check().then(data => { - this.logger.info(data); + this.logger.info(`executing from ${data}`); return true; }).catch(err => { this.logger.error(err); + this.emit(events.BINARY_CORRUPTED, err); return false; - }); + }).finally(() => clearTimeout(timeOut)); } /** * Start ipfs daemon process - * @returns {Bluebird} + * @returns {Bluebird} */ - public start(): Promise<{}> { - return this._start().then(data => { - return data; - }).catch(err => { - if (this.options.retry) { - return this._init().then( - () => this.start() + public start() { + + return this.checkExecutable().then( + (binOk) => { + if (!binOk) { + /** + * @event IpfsConnector#SERVICE_FAILED + */ + return this.emit(events.SERVICE_FAILED); + } + this.process = childProcess.spawn( + this.downloadManager.wrapper.path(), + this.options.args, + this.options.extra ); + this.once(events.SERVICE_STARTED, () => { + this._flushStartingEvents(); + }); + this._pipeStd(); + this._attachStartingEvents(); } - return err; - }); + ); } /** - * Stop ipfs daemon - * @param signal + * Filter daemon startup log + * @private */ - public stop(signal = 'SIGINT'): void { - this.process.kill(signal); - this.process = null; - this._api = null; - this.options.retry = true; + private _attachStartingEvents() { + this.process.stderr.on('data', this._callbacks.get('process.stderr.on')); + this.process.stdout.on('data', this._callbacks.get('process.stdout.on')); + this.on(events.IPFS_INIT, this._callbacks.get('events.IPFS_INIT')); + this.on(events.SERVICE_FAILED, this._callbacks.get('events.SERVICE_FAILED')); } /** - * Runs `ipfs init` - * @returns {Bluebird} + * Remove startup filters * @private */ - private _init(): Promise<{}> { - return new Promise((resolve, reject) => { - let init = childProcess.exec( - this.downloadManager.wrapper.path() + ' init', - { env: this.options.extra.env }, - (err: Error, stdout: string, stderr: string) => { - if (err) { - if (stderr.toString().includes('file already exists')) { - return resolve('already init'); - } - this.logger.error(stderr); - return reject(stderr); - } - return resolve('init finished'); - } - ); - this.options.retry = false; - this.process = null; - }); + private _flushStartingEvents() { + this.process.stderr.removeListener('data', this._callbacks.get('process.stderr.on')); + this.process.stdout.removeListener('data', this._callbacks.get('process.stdout.on')); + this.removeListener(events.IPFS_INIT, this._callbacks.get('events.IPFS_INIT')); + this.removeListener(events.SERVICE_FAILED, this._callbacks.get('events.SERVICE_FAILED')); } /** - * Spawn child process for ipfs daemon - * @returns {Bluebird} + * Log output from ipfs daemon * @private */ - private _start(): Promise<{}> { - return new Promise((resolve, reject) => { - this.checkExecutable().then( - () => { - this.process = childProcess.spawn( - this.downloadManager.wrapper.path(), - this.options.args, - this.options.extra - ); - // @TODO: emit events instead of resolve promise - this.process.stderr.on('data', (data: string) => { - if (data.toString().includes('daemon is running')) { - return resolve('already running'); - } - this.logger.error(`ipfs:_start:stderr: ${data}`); - return reject(new Error('could not start ipfs')); - }); - - this.process.stdout.on('data', (data: string) => { - this.logger.info(`ipfs:_start:stdout: ${data}`); - if (data.includes('Daemon is ready')) { - resolve('all systems up'); - } - }); - } - ).catch(err => { - this.logger.error(`ipfs:_start:err: ${err}`); - reject(err); - }); + private _pipeStd() { + const logError = (data: Buffer) => this.logger.error(data.toString()); + const logInfo = (data: Buffer) => this.logger.info(data.toString()); + + this.process.stderr.on('data', logError); + this.process.stdout.on('data', logInfo); + this.once(events.SERVICE_STOPPED, () => { + this.process.stderr.removeListener('data', logError); + this.process.stdout.removeListener('data', logInfo); }); } + + /** + * Stop ipfs daemon + * @param signal + * @returns {IpfsConnector} + */ + public stop(signal = 'SIGINT') { + this.emit(events.SERVICE_STOPPING); + this._api = null; + this.options.retry = true; + if (this.process) { + this.process.once('exit', () => this.emit(events.SERVICE_STOPPED)); + this.process.kill(signal); + this.process = null; + return this; + } + this.emit(events.SERVICE_STOPPED); + return this; + } + + /** + * Runs `ipfs init` + * @private + */ + private _init() { + let init = childProcess.exec( + this.downloadManager.wrapper.path() + ' init', + { env: this.options.extra.env }, + this._callbacks.get('ipfs.init') + ); + this.options.retry = false; + this.process = null; + } } diff --git a/src/constants.js b/src/constants.js new file mode 100644 index 0000000..ef283e0 --- /dev/null +++ b/src/constants.js @@ -0,0 +1,24 @@ +"use strict"; +const os_1 = require('os'); +const path_1 = require('path'); +exports.events = { + DOWNLOAD_STARTED: 'DOWNLOAD_STARTED', + BINARY_CORRUPTED: 'BINARY_CORRUPTED', + SERVICE_STARTING: 'SERVICE_STARTING', + SERVICE_STARTED: 'SERVICE_STARTED', + SERVICE_STOPPING: 'SERVICE_STOPPING', + SERVICE_STOPPED: 'SERVICE_STOPPED', + SERVICE_FAILED: 'SERVICE_FAILED', + IPFS_INIT: 'IPFS_INIT', + ERROR: 'ERROR' +}; +exports.options = { + retry: true, + apiAddress: '/ip4/127.0.0.1/tcp/5001', + args: ['daemon'], + executable: '', + extra: { + env: Object.assign({}, process.env, { IPFS_PATH: path_1.join(os_1.homedir(), '.ipfs') }), + detached: true + } +}; diff --git a/src/constants.ts b/src/constants.ts new file mode 100644 index 0000000..b6b89c3 --- /dev/null +++ b/src/constants.ts @@ -0,0 +1,25 @@ +import { homedir } from 'os'; +import { join as pathJoin } from 'path'; + +export const events = { + DOWNLOAD_STARTED: 'DOWNLOAD_STARTED', + BINARY_CORRUPTED: 'BINARY_CORRUPTED', + SERVICE_STARTING: 'SERVICE_STARTING', + SERVICE_STARTED: 'SERVICE_STARTED', + SERVICE_STOPPING: 'SERVICE_STOPPING', + SERVICE_STOPPED: 'SERVICE_STOPPED', + SERVICE_FAILED: 'SERVICE_FAILED', + IPFS_INIT: 'IPFS_INIT', + ERROR: 'ERROR' +}; + +export const options = { + retry: true, + apiAddress: '/ip4/127.0.0.1/tcp/5001', + args: ['daemon'], + executable: '', + extra: { + env: Object.assign({}, process.env, { IPFS_PATH: pathJoin(homedir(), '.ipfs') }), + detached: true + } +}; \ No newline at end of file diff --git a/tests/index.js b/tests/index.js index 929cb4b..f404974 100644 --- a/tests/index.js +++ b/tests/index.js @@ -5,6 +5,7 @@ const chai = require("chai"); const chaiAsPromised = require("chai-as-promised"); const rimraf = require("rimraf"); const winston = require('winston'); +const constants = require('../src/constants'); chai.use(chaiAsPromised); const expect = chai.expect; @@ -32,7 +33,7 @@ describe('IpfsConnector', function () { before(function (done) { instance.setBinPath(binTarget); - instance.checkExecutable().then(function () { + rimraf(binTarget, function () { done(); }); }); @@ -48,9 +49,19 @@ describe('IpfsConnector', function () { expect(instance.logger).to.equal(logger); }); - it('starts ipfs daemon', function () { + it('emits downloading binaries', function(done){ + instance.once(constants.events.DOWNLOAD_STARTED, function(){ + done(); + }); + instance.checkExecutable(); + }); + + it('starts ipfs daemon', function (done) { instance.setLogger(console); - return expect(instance.start()).to.be.fulfilled; + instance.on(constants.events.SERVICE_STARTED, function(){ + done(); + }); + instance.start(); }); describe('.add()', function () {