diff --git a/src/IpfsBin.js b/src/IpfsBin.js
index 4b6552d..2b49a91 100644
--- a/src/IpfsBin.js
+++ b/src/IpfsBin.js
@@ -1,5 +1,6 @@
"use strict";
const Promise = require('bluebird');
+const fs_1 = require('fs');
const Wrapper = require('bin-wrapper');
const path = require('path');
const version = '0.4.1';
@@ -17,15 +18,22 @@ class IpfsBin {
.dest(target)
.use(process.platform === 'win32' ? 'ipfs.exe' : 'ipfs');
}
+ getPath() {
+ return this.wrapper.path();
+ }
check() {
return new Promise((resolve, reject) => {
this.wrapper.run(['version'], (err) => {
if (err) {
return reject(err);
}
- return resolve('ipfs-bin: executable is ok');
+ return resolve(this.getPath());
});
});
}
+ deleteBin() {
+ const unlinkAsync = Promise.promisify(fs_1.unlink);
+ return unlinkAsync(this.getPath());
+ }
}
exports.IpfsBin = IpfsBin;
diff --git a/src/IpfsBin.ts b/src/IpfsBin.ts
index 0cb9bc3..67d35bc 100644
--- a/src/IpfsBin.ts
+++ b/src/IpfsBin.ts
@@ -1,7 +1,7 @@
///
import * as Promise from 'bluebird';
-
+import { unlink } from 'fs';
import Wrapper = require('bin-wrapper');
import path = require('path');
@@ -27,6 +27,14 @@ export class IpfsBin {
.use(process.platform === 'win32' ? 'ipfs.exe' : 'ipfs');
}
+ /**
+ * Get exec path for IPFS
+ * @returns {string}
+ */
+ getPath() {
+ return this.wrapper.path();
+ }
+
/**
* Start download and check the ipfs executable
* @returns {Bluebird}
@@ -38,8 +46,17 @@ export class IpfsBin {
return reject(err);
}
- return resolve('ipfs-bin: executable is ok');
+ return resolve(this.getPath());
});
});
}
+
+ /**
+ *
+ * @returns {Bluebird}
+ */
+ deleteBin() {
+ const unlinkAsync = Promise.promisify(unlink);
+ return unlinkAsync(this.getPath());
+ }
}
diff --git a/src/IpfsConnector.js b/src/IpfsConnector.js
index 38ab46d..302360e 100644
--- a/src/IpfsConnector.js
+++ b/src/IpfsConnector.js
@@ -1,30 +1,53 @@
"use strict";
-const os_1 = require('os');
-const Promise = require('bluebird');
const IpfsBin_1 = require('./IpfsBin');
const IpfsApiHelper_1 = require('./IpfsApiHelper');
const ipfsApi = require('ipfs-api');
+const events_1 = require('events');
+const constants_1 = require('./constants');
const childProcess = require('child_process');
-const path = require('path');
const symbolEnforcer = Symbol();
const symbol = Symbol();
-class IpfsConnector {
+class IpfsConnector extends events_1.EventEmitter {
constructor(enforcer) {
+ super();
this.downloadManager = new IpfsBin_1.IpfsBin();
+ this.options = constants_1.options;
+ this._callbacks = new Map();
this.logger = console;
- this.options = {
- retry: true,
- apiAddress: '/ip4/127.0.0.1/tcp/5001',
- args: ['daemon'],
- executable: '',
- extra: {
- env: Object.assign({}, process.env, { IPFS_PATH: path.join(os_1.homedir(), '.ipfs') }),
- detached: true
- }
- };
if (enforcer !== symbolEnforcer) {
throw new Error('Use .getInstance() instead of constructing a new object');
}
+ this._callbacks.set('process.stderr.on', (data) => {
+ if (data.toString().includes('daemon is running')) {
+ return this.emit(constants_1.events.SERVICE_STARTED);
+ }
+ return this.emit(constants_1.events.SERVICE_FAILED, data);
+ });
+ this._callbacks.set('process.stdout.on', (data) => {
+ if (data.includes('Daemon is ready')) {
+ return this.emit(constants_1.events.SERVICE_STARTED);
+ }
+ });
+ this._callbacks.set('ipfs.init', (err, stdout, stderr) => {
+ if (err) {
+ if (stderr.toString().includes('file already exists')) {
+ return this.emit(constants_1.events.IPFS_INIT);
+ }
+ this.logger.error(stderr);
+ return this.emit(constants_1.events.IPFS_INIT, stderr.toString());
+ }
+ return this.emit(constants_1.events.IPFS_INIT);
+ });
+ this._callbacks.set('events.IPFS_INIT', (err) => {
+ if (!err) {
+ this.start();
+ }
+ });
+ this._callbacks.set('events.SERVICE_FAILED', (message) => {
+ if (message.includes('ipfs init')) {
+ setTimeout(() => this._init(), 500);
+ }
+ });
}
static getInstance() {
if (!this[symbol]) {
@@ -52,68 +75,70 @@ class IpfsConnector {
this.options.extra.env.IPFS_PATH = target;
}
checkExecutable() {
+ const timeOut = setTimeout(() => {
+ this.emit(constants_1.events.DOWNLOAD_STARTED);
+ }, 600);
return this.downloadManager.check().then(data => {
- this.logger.info(data);
+ this.logger.info(`executing from ${data}`);
return true;
}).catch(err => {
this.logger.error(err);
+ this.emit(constants_1.events.BINARY_CORRUPTED, err);
return false;
- });
+ }).finally(() => clearTimeout(timeOut));
}
start() {
- return this._start().then(data => {
- return data;
- }).catch(err => {
- if (this.options.retry) {
- return this._init().then(() => this.start());
+ return this.checkExecutable().then((binOk) => {
+ if (!binOk) {
+ return this.emit(constants_1.events.SERVICE_FAILED);
}
- return err;
+ this.process = childProcess.spawn(this.downloadManager.wrapper.path(), this.options.args, this.options.extra);
+ this.once(constants_1.events.SERVICE_STARTED, () => {
+ this._flushStartingEvents();
+ });
+ this._pipeStd();
+ this._attachStartingEvents();
+ });
+ }
+ _attachStartingEvents() {
+ this.process.stderr.on('data', this._callbacks.get('process.stderr.on'));
+ this.process.stdout.on('data', this._callbacks.get('process.stdout.on'));
+ this.on(constants_1.events.IPFS_INIT, this._callbacks.get('events.IPFS_INIT'));
+ this.on(constants_1.events.SERVICE_FAILED, this._callbacks.get('events.SERVICE_FAILED'));
+ }
+ _flushStartingEvents() {
+ this.process.stderr.removeListener('data', this._callbacks.get('process.stderr.on'));
+ this.process.stdout.removeListener('data', this._callbacks.get('process.stdout.on'));
+ this.removeListener(constants_1.events.IPFS_INIT, this._callbacks.get('events.IPFS_INIT'));
+ this.removeListener(constants_1.events.SERVICE_FAILED, this._callbacks.get('events.SERVICE_FAILED'));
+ }
+ _pipeStd() {
+ const logError = (data) => this.logger.error(data.toString());
+ const logInfo = (data) => this.logger.info(data.toString());
+ this.process.stderr.on('data', logError);
+ this.process.stdout.on('data', logInfo);
+ this.once(constants_1.events.SERVICE_STOPPED, () => {
+ this.process.stderr.removeListener('data', logError);
+ this.process.stdout.removeListener('data', logInfo);
});
}
stop(signal = 'SIGINT') {
- this.process.kill(signal);
- this.process = null;
+ this.emit(constants_1.events.SERVICE_STOPPING);
this._api = null;
this.options.retry = true;
- }
- _init() {
- return new Promise((resolve, reject) => {
- let init = childProcess.exec(this.downloadManager.wrapper.path() + ' init', { env: this.options.extra.env }, (err, stdout, stderr) => {
- if (err) {
- if (stderr.toString().includes('file already exists')) {
- return resolve('already init');
- }
- this.logger.error(stderr);
- return reject(stderr);
- }
- return resolve('init finished');
- });
- this.options.retry = false;
+ if (this.process) {
+ this.process.once('exit', () => this.emit(constants_1.events.SERVICE_STOPPED));
+ this.process.kill(signal);
this.process = null;
- });
+ return this;
+ }
+ this.emit(constants_1.events.SERVICE_STOPPED);
+ return this;
}
- _start() {
- return new Promise((resolve, reject) => {
- this.checkExecutable().then(() => {
- this.process = childProcess.spawn(this.downloadManager.wrapper.path(), this.options.args, this.options.extra);
- this.process.stderr.on('data', (data) => {
- if (data.toString().includes('daemon is running')) {
- return resolve('already running');
- }
- this.logger.error(`ipfs:_start:stderr: ${data}`);
- return reject(new Error('could not start ipfs'));
- });
- this.process.stdout.on('data', (data) => {
- this.logger.info(`ipfs:_start:stdout: ${data}`);
- if (data.includes('Daemon is ready')) {
- resolve('all systems up');
- }
- });
- }).catch(err => {
- this.logger.error(`ipfs:_start:err: ${err}`);
- reject(err);
- });
- });
+ _init() {
+ let init = childProcess.exec(this.downloadManager.wrapper.path() + ' init', { env: this.options.extra.env }, this._callbacks.get('ipfs.init'));
+ this.options.retry = false;
+ this.process = null;
}
}
exports.IpfsConnector = IpfsConnector;
diff --git a/src/IpfsConnector.ts b/src/IpfsConnector.ts
index a11896a..642604d 100644
--- a/src/IpfsConnector.ts
+++ b/src/IpfsConnector.ts
@@ -1,10 +1,11 @@
///
-import {homedir} from 'os';
import * as Promise from 'bluebird';
-import {IpfsBin} from './IpfsBin';
-import {IpfsApiHelper} from './IpfsApiHelper';
+import { IpfsBin } from './IpfsBin';
+import { IpfsApiHelper } from './IpfsApiHelper';
import * as ipfsApi from 'ipfs-api';
+import { EventEmitter } from 'events';
+import { events, options } from './constants';
import childProcess = require('child_process');
import path = require('path');
@@ -12,29 +13,67 @@ import path = require('path');
const symbolEnforcer = Symbol();
const symbol = Symbol();
-export class IpfsConnector {
+export class IpfsConnector extends EventEmitter {
private process: childProcess.ChildProcess;
private downloadManager = new IpfsBin();
+ public options = options;
+ private _callbacks = new Map();
private logger: any = console;
private _api: IpfsApiHelper;
- public options = {
- retry: true,
- apiAddress: '/ip4/127.0.0.1/tcp/5001',
- args: ['daemon'],
- executable: '',
- extra: {
- env: Object.assign({}, process.env, {IPFS_PATH: path.join(homedir(), '.ipfs')}),
- detached: true
- }
- };
/**
* @param enforcer
*/
constructor(enforcer: Symbol) {
+ super();
if (enforcer !== symbolEnforcer) {
throw new Error('Use .getInstance() instead of constructing a new object');
}
+ this._callbacks.set('process.stderr.on', (data: string) => {
+ if (data.toString().includes('daemon is running')) {
+ /**
+ * @event IpfsConnector#SERVICE_STARTED
+ */
+ return this.emit(events.SERVICE_STARTED);
+ }
+ /**
+ * @event IpfsConnector#SERVICE_FAILED
+ */
+ return this.emit(events.SERVICE_FAILED, data);
+ });
+ this._callbacks.set('process.stdout.on', (data: string) => {
+ if (data.includes('Daemon is ready')) {
+ /**
+ * @event IpfsConnector#SERVICE_STARTED
+ */
+ return this.emit(events.SERVICE_STARTED);
+ }
+ });
+ this._callbacks.set('ipfs.init', (err: Error, stdout: string, stderr: string) => {
+ if (err) {
+ if (stderr.toString().includes('file already exists')) {
+ /**
+ * @event IpfsConnector#IPFS_INIT
+ */
+ return this.emit(events.IPFS_INIT);
+ }
+ this.logger.error(stderr);
+ // init exited with errors
+ return this.emit(events.IPFS_INIT, stderr.toString());
+ }
+ // everything works fine
+ return this.emit(events.IPFS_INIT);
+ });
+ this._callbacks.set('events.IPFS_INIT', (err: string) => {
+ if (!err) {
+ this.start();
+ }
+ });
+ this._callbacks.set('events.SERVICE_FAILED', (message: string) => {
+ if (message.includes('ipfs init')) {
+ setTimeout(() => this._init(), 500);
+ }
+ });
}
/**
@@ -99,103 +138,118 @@ export class IpfsConnector {
* @returns {Bluebird}
*/
public checkExecutable(): Promise<{}> {
+ const timeOut = setTimeout(() => {
+ /**
+ * @event IpfsConnector#DOWNLOAD_STARTED
+ */
+ this.emit(events.DOWNLOAD_STARTED);
+ }, 600);
return this.downloadManager.check().then(data => {
- this.logger.info(data);
+ this.logger.info(`executing from ${data}`);
return true;
}).catch(err => {
this.logger.error(err);
+ this.emit(events.BINARY_CORRUPTED, err);
return false;
- });
+ }).finally(() => clearTimeout(timeOut));
}
/**
* Start ipfs daemon process
- * @returns {Bluebird}
+ * @returns {Bluebird}
*/
- public start(): Promise<{}> {
- return this._start().then(data => {
- return data;
- }).catch(err => {
- if (this.options.retry) {
- return this._init().then(
- () => this.start()
+ public start() {
+
+ return this.checkExecutable().then(
+ (binOk) => {
+ if (!binOk) {
+ /**
+ * @event IpfsConnector#SERVICE_FAILED
+ */
+ return this.emit(events.SERVICE_FAILED);
+ }
+ this.process = childProcess.spawn(
+ this.downloadManager.wrapper.path(),
+ this.options.args,
+ this.options.extra
);
+ this.once(events.SERVICE_STARTED, () => {
+ this._flushStartingEvents();
+ });
+ this._pipeStd();
+ this._attachStartingEvents();
}
- return err;
- });
+ );
}
/**
- * Stop ipfs daemon
- * @param signal
+ * Filter daemon startup log
+ * @private
*/
- public stop(signal = 'SIGINT'): void {
- this.process.kill(signal);
- this.process = null;
- this._api = null;
- this.options.retry = true;
+ private _attachStartingEvents() {
+ this.process.stderr.on('data', this._callbacks.get('process.stderr.on'));
+ this.process.stdout.on('data', this._callbacks.get('process.stdout.on'));
+ this.on(events.IPFS_INIT, this._callbacks.get('events.IPFS_INIT'));
+ this.on(events.SERVICE_FAILED, this._callbacks.get('events.SERVICE_FAILED'));
}
/**
- * Runs `ipfs init`
- * @returns {Bluebird}
+ * Remove startup filters
* @private
*/
- private _init(): Promise<{}> {
- return new Promise((resolve, reject) => {
- let init = childProcess.exec(
- this.downloadManager.wrapper.path() + ' init',
- { env: this.options.extra.env },
- (err: Error, stdout: string, stderr: string) => {
- if (err) {
- if (stderr.toString().includes('file already exists')) {
- return resolve('already init');
- }
- this.logger.error(stderr);
- return reject(stderr);
- }
- return resolve('init finished');
- }
- );
- this.options.retry = false;
- this.process = null;
- });
+ private _flushStartingEvents() {
+ this.process.stderr.removeListener('data', this._callbacks.get('process.stderr.on'));
+ this.process.stdout.removeListener('data', this._callbacks.get('process.stdout.on'));
+ this.removeListener(events.IPFS_INIT, this._callbacks.get('events.IPFS_INIT'));
+ this.removeListener(events.SERVICE_FAILED, this._callbacks.get('events.SERVICE_FAILED'));
}
/**
- * Spawn child process for ipfs daemon
- * @returns {Bluebird}
+ * Log output from ipfs daemon
* @private
*/
- private _start(): Promise<{}> {
- return new Promise((resolve, reject) => {
- this.checkExecutable().then(
- () => {
- this.process = childProcess.spawn(
- this.downloadManager.wrapper.path(),
- this.options.args,
- this.options.extra
- );
- // @TODO: emit events instead of resolve promise
- this.process.stderr.on('data', (data: string) => {
- if (data.toString().includes('daemon is running')) {
- return resolve('already running');
- }
- this.logger.error(`ipfs:_start:stderr: ${data}`);
- return reject(new Error('could not start ipfs'));
- });
-
- this.process.stdout.on('data', (data: string) => {
- this.logger.info(`ipfs:_start:stdout: ${data}`);
- if (data.includes('Daemon is ready')) {
- resolve('all systems up');
- }
- });
- }
- ).catch(err => {
- this.logger.error(`ipfs:_start:err: ${err}`);
- reject(err);
- });
+ private _pipeStd() {
+ const logError = (data: Buffer) => this.logger.error(data.toString());
+ const logInfo = (data: Buffer) => this.logger.info(data.toString());
+
+ this.process.stderr.on('data', logError);
+ this.process.stdout.on('data', logInfo);
+ this.once(events.SERVICE_STOPPED, () => {
+ this.process.stderr.removeListener('data', logError);
+ this.process.stdout.removeListener('data', logInfo);
});
}
+
+ /**
+ * Stop ipfs daemon
+ * @param signal
+ * @returns {IpfsConnector}
+ */
+ public stop(signal = 'SIGINT') {
+ this.emit(events.SERVICE_STOPPING);
+ this._api = null;
+ this.options.retry = true;
+ if (this.process) {
+ this.process.once('exit', () => this.emit(events.SERVICE_STOPPED));
+ this.process.kill(signal);
+ this.process = null;
+ return this;
+ }
+ this.emit(events.SERVICE_STOPPED);
+ return this;
+ }
+
+ /**
+ * Runs `ipfs init`
+ * @private
+ */
+ private _init() {
+ let init = childProcess.exec(
+ this.downloadManager.wrapper.path() + ' init',
+ { env: this.options.extra.env },
+ this._callbacks.get('ipfs.init')
+ );
+ this.options.retry = false;
+ this.process = null;
+ }
}
diff --git a/src/constants.js b/src/constants.js
new file mode 100644
index 0000000..ef283e0
--- /dev/null
+++ b/src/constants.js
@@ -0,0 +1,24 @@
+"use strict";
+const os_1 = require('os');
+const path_1 = require('path');
+exports.events = {
+ DOWNLOAD_STARTED: 'DOWNLOAD_STARTED',
+ BINARY_CORRUPTED: 'BINARY_CORRUPTED',
+ SERVICE_STARTING: 'SERVICE_STARTING',
+ SERVICE_STARTED: 'SERVICE_STARTED',
+ SERVICE_STOPPING: 'SERVICE_STOPPING',
+ SERVICE_STOPPED: 'SERVICE_STOPPED',
+ SERVICE_FAILED: 'SERVICE_FAILED',
+ IPFS_INIT: 'IPFS_INIT',
+ ERROR: 'ERROR'
+};
+exports.options = {
+ retry: true,
+ apiAddress: '/ip4/127.0.0.1/tcp/5001',
+ args: ['daemon'],
+ executable: '',
+ extra: {
+ env: Object.assign({}, process.env, { IPFS_PATH: path_1.join(os_1.homedir(), '.ipfs') }),
+ detached: true
+ }
+};
diff --git a/src/constants.ts b/src/constants.ts
new file mode 100644
index 0000000..b6b89c3
--- /dev/null
+++ b/src/constants.ts
@@ -0,0 +1,25 @@
+import { homedir } from 'os';
+import { join as pathJoin } from 'path';
+
+export const events = {
+ DOWNLOAD_STARTED: 'DOWNLOAD_STARTED',
+ BINARY_CORRUPTED: 'BINARY_CORRUPTED',
+ SERVICE_STARTING: 'SERVICE_STARTING',
+ SERVICE_STARTED: 'SERVICE_STARTED',
+ SERVICE_STOPPING: 'SERVICE_STOPPING',
+ SERVICE_STOPPED: 'SERVICE_STOPPED',
+ SERVICE_FAILED: 'SERVICE_FAILED',
+ IPFS_INIT: 'IPFS_INIT',
+ ERROR: 'ERROR'
+};
+
+export const options = {
+ retry: true,
+ apiAddress: '/ip4/127.0.0.1/tcp/5001',
+ args: ['daemon'],
+ executable: '',
+ extra: {
+ env: Object.assign({}, process.env, { IPFS_PATH: pathJoin(homedir(), '.ipfs') }),
+ detached: true
+ }
+};
\ No newline at end of file
diff --git a/tests/index.js b/tests/index.js
index 929cb4b..f404974 100644
--- a/tests/index.js
+++ b/tests/index.js
@@ -5,6 +5,7 @@ const chai = require("chai");
const chaiAsPromised = require("chai-as-promised");
const rimraf = require("rimraf");
const winston = require('winston');
+const constants = require('../src/constants');
chai.use(chaiAsPromised);
const expect = chai.expect;
@@ -32,7 +33,7 @@ describe('IpfsConnector', function () {
before(function (done) {
instance.setBinPath(binTarget);
- instance.checkExecutable().then(function () {
+ rimraf(binTarget, function () {
done();
});
});
@@ -48,9 +49,19 @@ describe('IpfsConnector', function () {
expect(instance.logger).to.equal(logger);
});
- it('starts ipfs daemon', function () {
+ it('emits downloading binaries', function(done){
+ instance.once(constants.events.DOWNLOAD_STARTED, function(){
+ done();
+ });
+ instance.checkExecutable();
+ });
+
+ it('starts ipfs daemon', function (done) {
instance.setLogger(console);
- return expect(instance.start()).to.be.fulfilled;
+ instance.on(constants.events.SERVICE_STARTED, function(){
+ done();
+ });
+ instance.start();
});
describe('.add()', function () {