Navigation Menu

Skip to content

Commit

Permalink
Extract definition of serf tag names to a separate file
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Apr 21, 2015
1 parent 1e09bf1 commit 943c8e2
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 8 deletions.
5 changes: 3 additions & 2 deletions lib/serf/agent.js
Expand Up @@ -24,7 +24,8 @@ var EventEmitter = require('events').EventEmitter,
util = require('util');

var ConsoleLogger = require('../console-logger').ConsoleLogger,
Downloader = require('./downloader');
Downloader = require('./downloader'),
Tag = require('./tag');

var NODE_NAME_PATTERN = /^([^:]+):(\d+)\/(.+)$/;
var EVENT_LOG_PATTERN = /\[INFO\] serf: ([^:]+): ([^\s]+)(?: ([^\s]+))?/;
Expand Down Expand Up @@ -94,7 +95,7 @@ Agent.prototype.tryStart = function() {
'-bind', this._hostName + ':' + BIND_PORT,
'-rpc-addr', this._hostName + ':' + RPC_PORT,
'-log-level', 'INFO',
'-tag', 'type=protocol-adapter'
'-tag', Tag.nodeType + '=protocol-adapter'
];
this._otherMembers.forEach(function(address) {
if (address.indexOf(':') < 0)
Expand Down
14 changes: 8 additions & 6 deletions lib/serf/client.js
Expand Up @@ -26,6 +26,8 @@
var SerfRPC = require('serf-rpc'),
Q = require('q');

var Tag = require('./tag');

var ENGINE_NODE_NAME_PATTERN = /^([^:]+):(\d+)\/(.+)$/;
var DEFAULT_RPC_PORT = 7373;

Expand Down Expand Up @@ -78,13 +80,13 @@ Client.prototype = {
return this.getAllMembers().then((function(members) {
var clusterIds = {};
var liveEngineNodes = members.filter(function(member) {
if (member.Tags.type != 'engine' ||
member.Tags.role != 'service-provider')
if (member.Tags[Tag.nodeType] != 'engine' ||
member.Tags[Tag.nodeRole] != 'service-provider')
return false;

var name = member.Name;
if (members.some(function(member) {
return member.Tags["buffered-for-" + name] == "true";
return member.Tags[Tag.haveUnprocessedMessagesTagFor(name)] == "true";
}))
return false;

Expand All @@ -94,11 +96,11 @@ Client.prototype = {

if (member.HostName == this._rpcHost ||
member.Addr == this._rpcHost)
clusterIds.sameHost = member.Tags.cluster_id;
clusterIds.sameHost = member.Tags[Tag.clusterId];
if ((member.HostName &&
priorityHostsMatcher.test(member.HostName)) ||
priorityHostsMatcher.test(String(member.Addr)))
clusterIds.priority = member.Tags.cluster_id;
clusterIds.priority = member.Tags[Tag.clusterId];

return matched &&
matched[2] == this._droongaEnginePort &&
Expand All @@ -108,7 +110,7 @@ Client.prototype = {
var clusterId = clusterIds.priority || clusterIds.sameHost;
if (clusterId)
liveEngineNodes = liveEngineNodes.filter(function(member) {
return member.Tags.cluster_id == clusterId;
return member.Tags[Tag.clusterId] == clusterId;
});
return {
clusterIds: clusterIds,
Expand Down
21 changes: 21 additions & 0 deletions lib/serf/tag.js
@@ -0,0 +1,21 @@
var HAVE_UNPROCESSED_MESSAGES_TAG_PREFIX = 'buffered-for-';

module.exports = {
nodeType: 'type',
nodeRole: 'role',
internalNodeName: 'internal-name',
clusterId: 'cluster_id',

acceptMessagesNewerThan: 'accept-newer-than',
lastProcessedMessageTimestamp: 'last-timestamp',

haveUnprocessedMessagesTagFor: function(nodeName) {
return HAVE_UNPROCESSED_MESSAGES_TAG_PREFIX + nodeName;
},
isHaveUnprocessedMessagesTag: function(tag) {
return tag.indexOf(HAVE_UNPROCESSED_MESSAGES_TAG_PREFIX == 0);
},
extractNodeNameFromHaveUnprocessedMessagesTag: function(tag) {
return tag.replace(HAVE_UNPROCESSED_MESSAGES_TAG_PREFIX, '');
}
};

0 comments on commit 943c8e2

Please sign in to comment.