Skip to content

Commit

Permalink
fix the receiving for multiple clients
Browse files Browse the repository at this point in the history
  • Loading branch information
maxired committed Apr 30, 2012
1 parent dd2e9de commit af26a76
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 63 deletions.
4 changes: 2 additions & 2 deletions lib/master.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ var xmlrpc = require('xmlrpc')
var master = exports

master.registerPublisher = function(publisher, callback) {
var masterUri = environment.getRosMasterUri()
var masterUri = environment.getRosMasterUri()
, callerUri = publisher.callerUri
, callerId = getGraphResourceName(publisher.callerId)
, topicName = getGraphResourceName(publisher.topic)
Expand All @@ -29,10 +29,10 @@ master.registerSubscriber = function(subscriber, callback) {
, params = [callerId, topicName, messageType, callerUri]
, client = xmlrpc.createClient(masterUri)
;

client.methodCall('registerSubscriber', params, function(error, value) {
parseResponse(error, value, callback);
});

}

function getGraphResourceName(name) {
Expand Down
1 change: 0 additions & 1 deletion lib/tcpros.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ TCPROS.prototype.createSubscriber = function(port, host, subscriber) {
};

TCPROS.prototype.publish = function(message) {
console.log("Publishc called fpor mesage : " , message);
var that=this;
var publish = function(message) {
var messageBuffer = serializeMessage(message);
Expand Down
106 changes: 59 additions & 47 deletions lib/topic.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
var url = require('url')
, EventEmitter2 = require('eventemitter2').EventEmitter2
, portscanner = require('portscanner')
, xmlrpc = require('xmlrpc')
, async = require('async')
, environment = require('./environment')
, master = require('./master')
, TCPROS = require('./tcpros')
;
, EventEmitter2 = require('eventemitter2').EventEmitter2
, portscanner = require('portscanner')
, xmlrpc = require('xmlrpc')
, async = require('async')
, environment = require('./environment')
, master = require('./master')
, TCPROS = require('./tcpros')
;

function Topic(options) {
if ((this instanceof Topic) === false) {
Expand Down Expand Up @@ -44,9 +44,9 @@ Topic.prototype.createSlaveServer = function() {
var hostname = environment.getHostname();
portscanner.findAPortNotInUse(9000, null, hostname, function(error, port) {
var uriFields = { protocol: 'http', hostname: hostname, port: port }
, uri = url.format(uriFields)
, server = xmlrpc.createServer(uri)
;
, uri = url.format(uriFields)
, server = xmlrpc.createServer(uri)
;

that.uri = uri;
server.on('requestTopic', that.requestTopic.bind(that));
Expand All @@ -58,11 +58,14 @@ Topic.prototype.createSlaveServer = function() {
if(this.mode=="all"||this.mode=="publish"){
this._registerPublisher();
}
if(this.mode=="all"||this.mode=="subscribe"){
this._registerSubscriber();
}
};

Topic.prototype._registerPublisher=function(){
var that=this;

this.getUri(function(uri) {
var masterParams = {
callerId : that.node
Expand All @@ -78,8 +81,28 @@ Topic.prototype._registerPublisher=function(){
});
};

Topic.prototype._registerSubscriber=function(){
var that=this;
this.getUri(function(uri) {
var masterParams = {
callerId : that.node
, callerUri : uri
, topic : that.topic
, messageType : that.messageType.messageType
};
master.registerSubscriber(masterParams, function(error, uris) {
if (error) {
that.emit('error', error);
}else{
that._publisherConnect(uris);
}
});
});

}


Topic.prototype.requestTopic = function(error, params, callback) {
console.log("topic requested");
var that = this
, callerId = params[0]
, topic = params[1]
Expand All @@ -92,7 +115,6 @@ Topic.prototype.requestTopic = function(error, params, callback) {


var protocolListening=function(uri){
console.log("Uri ", uri);
var statusCode = 1
, statusMessage = 'ready on ' + uri
, uriFields = url.parse(uri)
Expand Down Expand Up @@ -123,17 +145,9 @@ Topic.prototype.requestTopic = function(error, params, callback) {

};

Topic.prototype.publisherUpdate = function(error, params, callback) {
var that = this
, callerId = params[0]
, topic = params[1]
, publishers = params[2]
;

if (topic.length > 0 && topic.charAt(0) === '/') {
topic = topic.substr(1, topic.length - 1);
}

Topic.prototype._publisherConnect = function(publishers){
var that=this;
publishers.forEach(function(publisherUri) {
var client = xmlrpc.createClient(publisherUri)
, protocols = [['TCPROS']]
Expand All @@ -145,30 +159,43 @@ Topic.prototype.publisherUpdate = function(error, params, callback) {
, protocol = hostParams[0]
, host = hostParams[1]
, port = hostParams[2]
;

if (protocol === 'TCPROS') {
that.protocol = new TCPROS({
, newProtocol;
if (!that.protocols['TCPROS']) {
newProtocol = new TCPROS({
node : that.node
, topic : that.topic
, messageType : that.messageType
});

that.protocol.on('message', function(message) {
newProtocol.on('message', function(message) {
that.emit('message', message);
});

that.protocol.createSubscriber(port, host);
that.protocols['TCPROS'] = newProtocol;
}
that.protocols['TCPROS'].createSubscriber(port, host);
});
});

}

Topic.prototype.publisherUpdate = function(error, params, callback) {
var that = this
, callerId = params[0]
, topic = params[1]
, publishers = params[2]
;

if (topic.length > 0 && topic.charAt(0) === '/') {
topic = topic.substr(1, topic.length - 1);
}
that._publisherConnect(publishers);
callback();
};

Topic.prototype.publish = function(message) {
var that = this;
console.log("publish on topic");
for( i in this.protocols){
console.log("looping for each");
this.protocols[i].protocol.publish(message);
}
}
Expand All @@ -178,21 +205,6 @@ Topic.prototype.subscribe = function(callback) {

this.on('message', callback);

if (!this.protocol) {
this.getUri(function(uri) {
var masterParams = {
callerId : that.node
, callerUri : uri
, topic : that.topic
, messageType : that.messageType.messageType
};
master.registerSubscriber(masterParams, function(error) {
if (error) {
that.emit('error', error);
}
});
});
}
};

module.exports = Topic;
Expand Down
23 changes: 11 additions & 12 deletions receiving.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
var exec = require('child_process').exec
, ros = require('./lib/ros')


ros.types([
'std_msgs/String'
], function(String) {
var node = ros.node('receiver');
node.topics([
{ topic: 'hello_world', messageType: String }
], function(subscribeExample) {
subscribeExample.subscribe(function(message) {
console.log("New data receigned:" , message);
});
'std_msgs/String'
], function(String) {
var topic = ros.topic({ topic: 'hello_world',
messageType: String ,
node:'receiver',
mode: 'subscribe'});

topic.subscribe(function(message) {
console.log("New data receigned:" , message);
});

});
});
});

2 changes: 1 addition & 1 deletion sendingTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ ros.types([
'std_msgs/String'
], function(String) {
var topic = new ros.topic(
{ topic: 'hello_world', messageType: String ,node:'talker'}
{ topic: 'hello_world', messageType: String ,node:'talker', mode:'publish'}
);

function pub(publishExample) {
Expand Down

0 comments on commit af26a76

Please sign in to comment.