Skip to content

Commit

Permalink
Implements more of the Slave API.
Browse files Browse the repository at this point in the history
This will allow rxgraph to query the node. Still need to do more work with bus
stats and bus info after multiple subscriptions and publications is enabled.

This should help with Issue #20.
  • Loading branch information
baalexander committed Apr 28, 2012
1 parent f0bfac2 commit 02ebedd
Showing 1 changed file with 155 additions and 91 deletions.
246 changes: 155 additions & 91 deletions lib/topic.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,106 @@ function Topic(options) {
}
Topic.prototype.__proto__ = EventEmitter2.prototype;

Topic.prototype.publish = function(message) {
var that = this;

if (this.protocol) {
this.protocol.publish();
}
else {
this.on('publisher_ready', function(protocol) {
protocol.publish(message);
});

this.getUri(function(uri) {
var masterParams = {
callerId : that.node
, callerUri : uri
, topic : that.topic
, messageType : that.messageType.messageType
};
master.registerPublisher(masterParams, function(error) {
if (error) {
that.emit('error', error);
}
else {
that.emit('registered_publisher');
}
});
});
}
};

Topic.prototype.unpublish = function(message) {
var that = this;

if (this.protocol) {
this.getUri(function(uri) {
var masterParams = {
callerId : that.node
, callerUri : uri
, topic : that.topic
};
master.unregisterPublisher(masterParams, function(error) {
if (error) {
that.emit('error', error);
}
else {
that.emit('unregistered_publisher');
}
});
});
}
};

Topic.prototype.subscribe = function(callback) {
var that = this;

this.on('message', callback);

if (!this.protocol) {
this.getUri(function(uri) {
var masterParams = {
callerId : that.node
, callerUri : uri
, topic : that.topic
, messageType : that.messageType.messageType
};
master.registerSubscriber(masterParams, function(error) {
if (error) {
that.emit('error', error);
}
else {
that.emit('registered_subscriber');
}
});
});
}
};

Topic.prototype.unsubscribe = function(callback) {
var that = this;

if (this.protocol) {
this.getUri(function(uri) {
var masterParams = {
callerId : that.node
, callerUri : uri
, topic : that.topic
};
master.unregisterSubscriber(masterParams, function(error) {
if (error) {
that.emit('error', error);
}
else {
that.protocol = null;
that.emit('unregistered_subscriber');
}
});
});
}
};

Topic.prototype.getUri = function(callback) {
if (this.uri) {
callback(this.uri);
Expand All @@ -37,6 +137,10 @@ Topic.prototype.getUri = function(callback) {
}
};


// Slave API
// ---------

Topic.prototype.createSlaveServer = function() {
var that = this;

Expand All @@ -50,6 +154,12 @@ Topic.prototype.createSlaveServer = function() {
that.uri = uri;
server.on('requestTopic', that.requestTopic.bind(that));
server.on('publisherUpdate', that.publisherUpdate.bind(that));
server.on('getBusStats', that.getBusStats.bind(that));
server.on('getBusInfo', that.getBusInfo.bind(that));
server.on('getMasterUri', that.getMasterUri.bind(that));
server.on('getPid', that.getPid.bind(that));
server.on('getSubscriptions', that.getSubscriptions.bind(that));
server.on('getPublications', that.getPublications.bind(that));

that.emit('connection', uri);
});
Expand Down Expand Up @@ -81,7 +191,7 @@ Topic.prototype.requestTopic = function(error, params, callback) {
, port = parseInt(uriFields.port)
, protocolParams = ['TCPROS', hostname, port]
;
callback(null, [statusCode, statusMessage, protocolParams]);
callback(null, [statusCode, '', protocolParams]);
});
this.protocol.createPublisher();
};
Expand Down Expand Up @@ -127,104 +237,58 @@ Topic.prototype.publisherUpdate = function(error, params, callback) {
});
};

Topic.prototype.publish = function(message) {
var that = this;

if (this.protocol) {
this.protocol.publish();
}
else {
this.on('publisher_ready', function(protocol) {
protocol.publish(message);
});

this.getUri(function(uri) {
var masterParams = {
callerId : that.node
, callerUri : uri
, topic : that.topic
, messageType : that.messageType.messageType
};
master.registerPublisher(masterParams, function(error) {
if (error) {
that.emit('error', error);
}
else {
that.emit('registered_publisher');
}
});
});
}
Topic.prototype.getBusStats = function(error, params, callback) {
var code = 1
, statusMessage = ''
, busStats = []
, params = [code, statusMessage, busStats]
;
callback(null, params);
};

Topic.prototype.unpublish = function(message) {
var that = this;

if (this.protocol) {
this.getUri(function(uri) {
var masterParams = {
callerId : that.node
, callerUri : uri
, topic : that.topic
};
master.unregisterPublisher(masterParams, function(error) {
if (error) {
that.emit('error', error);
}
else {
that.emit('unregistered_publisher');
}
});
});
}
Topic.prototype.getBusInfo = function(error, params, callback) {
var code = 1
, statusMessage = ''
, busInfo = []
, params = [code, statusMessage, busInfo]
;
callback(null, params);
};

Topic.prototype.subscribe = function(callback) {
var that = this;

this.on('message', callback);
Topic.prototype.getMasterUri = function(error, params, callback) {
var code = 1
, statusMessage = ''
, masterUri = environment.getMasterUri()
, params = [code, statusMessage, masterUri]
;
callback(null, params);
};

if (!this.protocol) {
this.getUri(function(uri) {
var masterParams = {
callerId : that.node
, callerUri : uri
, topic : that.topic
, messageType : that.messageType.messageType
};
master.registerSubscriber(masterParams, function(error) {
if (error) {
that.emit('error', error);
}
else {
that.emit('registered_subscriber');
}
});
});
}
Topic.prototype.getPid = function(error, params, callback) {
var code = 1
, statusMessage = 'Retrieved node PID'
, pid = process.pid
, params = [code, statusMessage, params]
;
callback(null, params);
};

Topic.prototype.unsubscribe = function(callback) {
var that = this;
Topic.prototype.getSubscriptions = function(error, params, callback) {
var code = 1
, statusMessage = ''
, subscriptions = []
, params = [code, statusMessage, subscriptions]
;
callback(null, params);
};

if (this.protocol) {
this.getUri(function(uri) {
var masterParams = {
callerId : that.node
, callerUri : uri
, topic : that.topic
};
master.unregisterSubscriber(masterParams, function(error) {
if (error) {
that.emit('error', error);
}
else {
that.protocol = null;
that.emit('unregistered_subscriber');
}
});
});
}
Topic.prototype.getPublications = function(error, params, callback) {
var code = 1
, statusMessage = ''
, publications = []
, params = [code, statusMessage, publications]
;
callback(null, params);
};

module.exports = Topic;
Expand Down

0 comments on commit 02ebedd

Please sign in to comment.