Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

instance: DHT

  • Loading branch information...
commit 75c5411c92e288af463c54e232b40d500c782d1a 1 parent 4891fe1
@indutny authored
View
36 lib/vock/cli/main.js
@@ -154,6 +154,7 @@ Cli.prototype.onServerAddr = function onServerAddr(address, port) {
var self = this;
this.argv.server = this.server = { address: address, port: port };
+ this.argv.dht = nconf.get('dht') || null;
this.instance = vock.instance.create(this.argv);
this.instance.on('error', function(err) {
@@ -230,10 +231,15 @@ Cli.prototype.onServerAddr = function onServerAddr(address, port) {
port);
});
+ this.instance.on('dht:save', function (config) {
+ nconf.set('dht', config);
+ nconf.save();
+ });
+
this.initKeyboard();
if (this.cmd === 'create') {
- return this.handleCreate();
+ return this.handleConnect();
} else if (this.cmd === 'connect' && this.argv._[1]) {
return this.handleConnect();
}
@@ -284,25 +290,23 @@ Cli.prototype.initKeyboard = function initKeyboard() {
process.stdin.resume();
};
-Cli.prototype.handleCreate = function handleCreate() {
- var id = crypto.createHash('sha1')
+Cli.prototype.handleConnect = function handleConnect() {
+ var id = this.argv._[1] ||
+ crypto.createHash('sha1')
.update(crypto.randomBytes(20))
.digest('hex');
this.instance.api.connect(id, this.handleMembers.bind(this));
-
- this.logger.write('Room created!'.green);
- this.logger.write('Run this on other side:');
- this.logger.write(' vock connect ' + id);
- this.logger.write('Waiting for opponent...');
-};
-
-Cli.prototype.handleConnect = function handleConnect() {
- var self = this,
- id = this.argv._[1];
-
- this.instance.api.connect(id, this.handleMembers.bind(this));
- this.logger.write('Connecting...');
+ this.instance.advertise(id);
+
+ if (this.argv._[1]) {
+ this.logger.write('Connecting...');
+ } else {
+ this.logger.write('Room created!'.green);
+ this.logger.write('Run this on other side:');
+ this.logger.write(' vock connect ' + id);
+ this.logger.write('Waiting for opponent...');
+ }
};
Cli.prototype.handleMembers = function handleMembers(err, info) {
View
60 lib/vock/instance.js
@@ -1,6 +1,10 @@
var vock = require('../vock'),
+ crypto = require('crypto'),
util = require('util'),
+ utile = require('utile'),
dgram = require('dgram'),
+ dht = require('dht.js'),
+ Buffer = require('buffer').Buffer,
EventEmitter = require('events').EventEmitter;
var instance = exports;
@@ -34,6 +38,10 @@ function Instance(options) {
this.peerIndexes.push(i);
}
+ // DHT
+ this.dht = null;
+ this.adIds = {};
+
this.init();
};
util.inherits(Instance, EventEmitter);
@@ -82,6 +90,29 @@ Instance.prototype.init = function init() {
self.getPeer(rinfo).receive(packet);
});
+ // Initiate dht
+ this.socket.once('init', function() {
+ self.dht = dht.node.create(utile.mixin({}, self.options.dht || {}, {
+ socket: self.socket.socket
+ }));
+
+ self.dht.on('peer:new', function(infohash, peer) {
+ infohash = infohash.toString('hex');
+ var id = self.adIds[infohash];
+ if (!id) return;
+
+ self.connect(id, peer.port, peer.address);
+ });
+
+ self.emit('dht:init');
+ });
+
+ // Save dht data periodically
+ this.dhtInterval = setInterval(function() {
+ if (!self.dht) return;
+ self.emit('dht:save', self.dht.save());
+ }, 5000);
+
// Propagate audio errors to instance level
this.audio.on('error', function(err) {
self.emit('error', err);
@@ -89,6 +120,27 @@ Instance.prototype.init = function init() {
};
//
+// ### function advertise (id)
+// #### @id {String} Room id
+// Create DHT advertisement
+//
+Instance.prototype.advertise = function advertise(id) {
+ var self = this;
+
+ if (!this.dht) {
+ this.once('dht:init', function() {
+ self.advertise(id);
+ });
+ return;
+ }
+
+ var hash = crypto.createHash('sha1').update(id).digest('hex');
+ this.dht.advertise(new Buffer(hash, 'hex'), this.socket.port);
+
+ this.adIds[hash] = id;
+};
+
+//
// ### function getPeer (info)
// #### @info {Object} peer info (address, port)
// Return peer object
@@ -280,4 +332,12 @@ Instance.prototype.connect = function connect(id, port, address, callback) {
peer.connect(id, function() {
callback && callback.call(self);
});
+
+ function connectDht() {
+ self.dht.connect({ port: port, address: address });
+ }
+
+ // Inform DHT about new node
+ if (this.dht) return connectDht();
+ this.once('dht:init', connectDht);
};
View
10 lib/vock/socket.js
@@ -54,6 +54,10 @@ Socket.prototype.bind = function bind(ports, callback) {
this.tsocket.removeAllListeners('data');
this.tsocket.removeAllListeners('error');
this.tsocket.removeAllListeners('listening');
+ try {
+ this.tsocket.close();
+ } catch(e) {
+ }
}
// Bind TCP socket
@@ -70,6 +74,10 @@ Socket.prototype.bind = function bind(ports, callback) {
if (self.socket) {
self.socket.removeAllListeners('message');
self.socket.removeAllListeners('listening');
+ try {
+ self.socket.close();
+ } catch(e) {
+ }
}
self.socket = dgram.createSocket('udp4');
@@ -105,6 +113,8 @@ Socket.prototype.init = function init() {
self.bind(list.map(function(item) {
return item.private.port;
}), function(port, reuse) {
+ self.emit('init', port);
+
// Unwind accumulated callbacks
var queue = self._initQueue;
self._initialized = true;
View
4 package.json
@@ -26,7 +26,9 @@
"netroute": "~0.2.1",
"nat-upnp": "~0.2.9",
"keypress": "~0.1.0",
- "nconf": "~0.6.4"
+ "nconf": "~0.6.4",
+ "dht.js": "~0.2.0",
+ "utile": "~0.1.3"
},
"engine": {
"node": ">= 0.8.0"
Please sign in to comment.
Something went wrong with that request. Please try again.