Skip to content
This repository has been archived by the owner on Oct 20, 2020. It is now read-only.

Commit

Permalink
feat(events): add common events for ipfs daemon process
Browse files Browse the repository at this point in the history
  • Loading branch information
kenshyx committed Jul 8, 2016
1 parent a7dd0fa commit a2f508d
Show file tree
Hide file tree
Showing 7 changed files with 316 additions and 152 deletions.
10 changes: 9 additions & 1 deletion src/IpfsBin.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"use strict";
const Promise = require('bluebird');
const fs_1 = require('fs');
const Wrapper = require('bin-wrapper');
const path = require('path');
const version = '0.4.1';
Expand All @@ -17,15 +18,22 @@ class IpfsBin {
.dest(target)
.use(process.platform === 'win32' ? 'ipfs.exe' : 'ipfs');
}
getPath() {
return this.wrapper.path();
}
check() {
return new Promise((resolve, reject) => {
this.wrapper.run(['version'], (err) => {
if (err) {
return reject(err);
}
return resolve('ipfs-bin: executable is ok');
return resolve(this.getPath());
});
});
}
deleteBin() {
const unlinkAsync = Promise.promisify(fs_1.unlink);
return unlinkAsync(this.getPath());
}
}
exports.IpfsBin = IpfsBin;
21 changes: 19 additions & 2 deletions src/IpfsBin.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/// <reference path="../typings/main.d.ts"/>

import * as Promise from 'bluebird';

import { unlink } from 'fs';
import Wrapper = require('bin-wrapper');
import path = require('path');

Expand All @@ -27,6 +27,14 @@ export class IpfsBin {
.use(process.platform === 'win32' ? 'ipfs.exe' : 'ipfs');
}

/**
* Get exec path for IPFS
* @returns {string}
*/
getPath() {
return this.wrapper.path();
}

/**
* Start download and check the ipfs executable
* @returns {Bluebird}
Expand All @@ -38,8 +46,17 @@ export class IpfsBin {
return reject(err);
}

return resolve('ipfs-bin: executable is ok');
return resolve(this.getPath());
});
});
}

/**
*
* @returns {Bluebird<T>}
*/
deleteBin() {
const unlinkAsync = Promise.promisify(unlink);
return unlinkAsync(this.getPath());
}
}
147 changes: 86 additions & 61 deletions src/IpfsConnector.js
Original file line number Diff line number Diff line change
@@ -1,30 +1,53 @@
"use strict";
const os_1 = require('os');
const Promise = require('bluebird');
const IpfsBin_1 = require('./IpfsBin');
const IpfsApiHelper_1 = require('./IpfsApiHelper');
const ipfsApi = require('ipfs-api');
const events_1 = require('events');
const constants_1 = require('./constants');
const childProcess = require('child_process');
const path = require('path');
const symbolEnforcer = Symbol();
const symbol = Symbol();
class IpfsConnector {
class IpfsConnector extends events_1.EventEmitter {
constructor(enforcer) {
super();
this.downloadManager = new IpfsBin_1.IpfsBin();
this.options = constants_1.options;
this._callbacks = new Map();
this.logger = console;
this.options = {
retry: true,
apiAddress: '/ip4/127.0.0.1/tcp/5001',
args: ['daemon'],
executable: '',
extra: {
env: Object.assign({}, process.env, { IPFS_PATH: path.join(os_1.homedir(), '.ipfs') }),
detached: true
}
};
if (enforcer !== symbolEnforcer) {
throw new Error('Use .getInstance() instead of constructing a new object');
}
this._callbacks.set('process.stderr.on', (data) => {
if (data.toString().includes('daemon is running')) {
return this.emit(constants_1.events.SERVICE_STARTED);
}
return this.emit(constants_1.events.SERVICE_FAILED, data);
});
this._callbacks.set('process.stdout.on', (data) => {
if (data.includes('Daemon is ready')) {
return this.emit(constants_1.events.SERVICE_STARTED);
}
});
this._callbacks.set('ipfs.init', (err, stdout, stderr) => {
if (err) {
if (stderr.toString().includes('file already exists')) {
return this.emit(constants_1.events.IPFS_INIT);
}
this.logger.error(stderr);
return this.emit(constants_1.events.IPFS_INIT, stderr.toString());
}
return this.emit(constants_1.events.IPFS_INIT);
});
this._callbacks.set('events.IPFS_INIT', (err) => {
if (!err) {
this.start();
}
});
this._callbacks.set('events.SERVICE_FAILED', (message) => {
if (message.includes('ipfs init')) {
setTimeout(() => this._init(), 500);
}
});
}
static getInstance() {
if (!this[symbol]) {
Expand Down Expand Up @@ -52,68 +75,70 @@ class IpfsConnector {
this.options.extra.env.IPFS_PATH = target;
}
checkExecutable() {
const timeOut = setTimeout(() => {
this.emit(constants_1.events.DOWNLOAD_STARTED);
}, 600);
return this.downloadManager.check().then(data => {
this.logger.info(data);
this.logger.info(`executing from ${data}`);
return true;
}).catch(err => {
this.logger.error(err);
this.emit(constants_1.events.BINARY_CORRUPTED, err);
return false;
});
}).finally(() => clearTimeout(timeOut));
}
start() {
return this._start().then(data => {
return data;
}).catch(err => {
if (this.options.retry) {
return this._init().then(() => this.start());
return this.checkExecutable().then((binOk) => {
if (!binOk) {
return this.emit(constants_1.events.SERVICE_FAILED);
}
return err;
this.process = childProcess.spawn(this.downloadManager.wrapper.path(), this.options.args, this.options.extra);
this.once(constants_1.events.SERVICE_STARTED, () => {
this._flushStartingEvents();
});
this._pipeStd();
this._attachStartingEvents();
});
}
_attachStartingEvents() {
this.process.stderr.on('data', this._callbacks.get('process.stderr.on'));
this.process.stdout.on('data', this._callbacks.get('process.stdout.on'));
this.on(constants_1.events.IPFS_INIT, this._callbacks.get('events.IPFS_INIT'));
this.on(constants_1.events.SERVICE_FAILED, this._callbacks.get('events.SERVICE_FAILED'));
}
_flushStartingEvents() {
this.process.stderr.removeListener('data', this._callbacks.get('process.stderr.on'));
this.process.stdout.removeListener('data', this._callbacks.get('process.stdout.on'));
this.removeListener(constants_1.events.IPFS_INIT, this._callbacks.get('events.IPFS_INIT'));
this.removeListener(constants_1.events.SERVICE_FAILED, this._callbacks.get('events.SERVICE_FAILED'));
}
_pipeStd() {
const logError = (data) => this.logger.error(data.toString());
const logInfo = (data) => this.logger.info(data.toString());
this.process.stderr.on('data', logError);
this.process.stdout.on('data', logInfo);
this.once(constants_1.events.SERVICE_STOPPED, () => {
this.process.stderr.removeListener('data', logError);
this.process.stdout.removeListener('data', logInfo);
});
}
stop(signal = 'SIGINT') {
this.process.kill(signal);
this.process = null;
this.emit(constants_1.events.SERVICE_STOPPING);
this._api = null;
this.options.retry = true;
}
_init() {
return new Promise((resolve, reject) => {
let init = childProcess.exec(this.downloadManager.wrapper.path() + ' init', { env: this.options.extra.env }, (err, stdout, stderr) => {
if (err) {
if (stderr.toString().includes('file already exists')) {
return resolve('already init');
}
this.logger.error(stderr);
return reject(stderr);
}
return resolve('init finished');
});
this.options.retry = false;
if (this.process) {
this.process.once('exit', () => this.emit(constants_1.events.SERVICE_STOPPED));
this.process.kill(signal);
this.process = null;
});
return this;
}
this.emit(constants_1.events.SERVICE_STOPPED);
return this;
}
_start() {
return new Promise((resolve, reject) => {
this.checkExecutable().then(() => {
this.process = childProcess.spawn(this.downloadManager.wrapper.path(), this.options.args, this.options.extra);
this.process.stderr.on('data', (data) => {
if (data.toString().includes('daemon is running')) {
return resolve('already running');
}
this.logger.error(`ipfs:_start:stderr: ${data}`);
return reject(new Error('could not start ipfs'));
});
this.process.stdout.on('data', (data) => {
this.logger.info(`ipfs:_start:stdout: ${data}`);
if (data.includes('Daemon is ready')) {
resolve('all systems up');
}
});
}).catch(err => {
this.logger.error(`ipfs:_start:err: ${err}`);
reject(err);
});
});
_init() {
let init = childProcess.exec(this.downloadManager.wrapper.path() + ' init', { env: this.options.extra.env }, this._callbacks.get('ipfs.init'));
this.options.retry = false;
this.process = null;
}
}
exports.IpfsConnector = IpfsConnector;

0 comments on commit a2f508d

Please sign in to comment.