Skip to content

Commit

Permalink
Merge branch 'multiple_clients'
Browse files Browse the repository at this point in the history
  • Loading branch information
baalexander committed May 16, 2012
2 parents 92172b1 + edb9524 commit 8647f32
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 204 deletions.
76 changes: 38 additions & 38 deletions example/how_to.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,48 +38,48 @@ describe('How to use rosnodejs', function() {

// Unregister as a publisher for test clean up
setTimeout(function() {
publisher.unpublish();
publisher.unregisterPublisher();
}, 1000);
});
});

it('to subscribe to messages', function(done) {
this.timeout(5000);

ros.types([
'std_msgs/String'
], function(String) {

// Creates the topic 'subscribe_example'
var subscriber = new ros.topic({
node : 'listener'
, topic : 'subscribe_example'
, messageType : String
});

subscriber.on('unregistered_subscriber', done);

// Subscribes to the 'subscribe_example' topic
subscriber.subscribe(function(message) {
message.data.should.equal('howdy');

// Unregister as a subscriber for test cleanup
subscriber.unsubscribe();
});

// Uses rostopic to publish a message on the subscribed to topic.
var publishCommand = 'rostopic'
+ ' pub'
+ ' /subscribe_example'
+ ' std_msgs/String'
+ ' howdy'
+ ' --once'
;
var child = exec(publishCommand, function(error, stdout, stderr) {
should.not.exist(error);
});
});
});
// it('to subscribe to messages', function(done) {
// this.timeout(5000);

// ros.types([
// 'std_msgs/String'
// ], function(String) {

// // Creates the topic 'subscribe_example'
// var subscriber = new ros.topic({
// node : 'listener'
// , topic : 'subscribe_example'
// , messageType : String
// });

// subscriber.on('unregistered_subscriber', done);

// // Subscribes to the 'subscribe_example' topic
// subscriber.subscribe(function(message) {
// message.data.should.equal('howdy');

// // Unregister as a subscriber for test cleanup
// subscriber.unregisterSubscriber();
// });

// // Uses rostopic to publish a message on the subscribed to topic.
// var publishCommand = 'rostopic'
// + ' pub'
// + ' /subscribe_example'
// + ' std_msgs/String'
// + ' howdy'
// + ' --once'
// ;
// var child = exec(publishCommand, function(error, stdout, stderr) {
// should.not.exist(error);
// });
// });
// });

});

62 changes: 21 additions & 41 deletions lib/tcpros.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ var TCPROS = function(options) {
this.topic = options.topic;
this.messageType = options.messageType;

this.sockets = [];
this.publisherSocket = null;
this.subscriberSocket = null;
};

TCPROS.prototype.createPublisher = function() {
Expand All @@ -23,18 +24,11 @@ TCPROS.prototype.createPublisher = function() {
portscanner.findAPortNotInUse(9000, null, hostname, function(error, port) {

var server = net.createServer(function(socket) {
that.sockets.push(socket);
var removeSocketFromSockets = function(socket) {
var index = that.sockets.indexOf(socket);
if (index !== 1) {
that.sockets.splice(index, 1);
}
}
socket.on('end', function() {
removeSocketFromSockets(socket);
});
socket.on('error', function() {
removeSocketFromSockets(socket);
that.publisherSocket = socket;

socket.on('error', function(error) {
that.publisherSocket = null;
that.emit('error', error);
});

socket.on('data', function(data) {
Expand All @@ -56,13 +50,14 @@ TCPROS.prototype.createPublisher = function() {

server.on('listening', function() {
var address = server.address();
var uri = url.format({

var publisherUri = url.format({
protocol : 'http'
, hostname : address.address
, port : address.port
});

that.emit('listening', uri);
that.emit('listening', publisherUri);
});

server.on('error', function(error) {
Expand All @@ -75,16 +70,22 @@ TCPROS.prototype.createPublisher = function() {
};
TCPROS.prototype.__proto__ = EventEmitter2.prototype;

TCPROS.prototype.createSubscriber = function(port, host, subscriber) {
TCPROS.prototype.publish = function(message) {
var messageBuffer = serializeMessage(message);
this.publisherSocket.write(messageBuffer);
};

TCPROS.prototype.createSubscriber = function(port, host) {
var that = this;
this.socket = net.createConnection(port, host)
this.socket.on('data', function(data) {

this.subscriberSocket = net.createConnection(port, host)
this.subscriberSocket.on('data', function(data) {
var connectionHeader = deserializeConnectionHeader(data);
var message = deserializeMessage(data, that.messageType);
if (message !== null) {
that.emit('message', message);
}
})
});

var connectionHeader = {
callerId : '/' + this.node
Expand All @@ -94,28 +95,7 @@ TCPROS.prototype.createSubscriber = function(port, host, subscriber) {
};

var buffer = serializeConnectionHeader(connectionHeader);
this.socket.write(buffer)
};

TCPROS.prototype.publish = function(message) {
var that=this;
var publish = function(message) {
var messageBuffer = serializeMessage(message);
this.socket.write(messageBuffer);
};

if (!this.sockets.length) {
this.once('connect', function() {
//that.publish(message);
});
}
else {
async.forEach(this.sockets, function(socket, callback) {
var messageBuffer = serializeMessage(message);
socket.write(messageBuffer);
callback();
});
}
this.subscriberSocket.write(buffer)
};

function deserializeConnectionHeader(buffer) {
Expand Down
Loading

0 comments on commit 8647f32

Please sign in to comment.