Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 63 additions & 98 deletions lib/jet/daemon.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ var invalidRequest = jetUtils.invalidRequest;
var responseTimeout = jetUtils.responseTimeout;
var methodNotFound = jetUtils.methodNotFound;
var parseError = jetUtils.parseError;
var checked = jetUtils.checked;
var optional = jetUtils.optional;


var createDaemon = function (options) {
Expand Down Expand Up @@ -79,45 +81,6 @@ var createDaemon = function (options) {
}
};

var checked = function (tab, key, typename) {
var p = tab[key];
if (isDefined(p)) {
if (typename) {
if (typeof (p) === typename) {
return p;
} else {
throw invalidParams({
wrongType: key,
got: tab
});
}
} else {
return p;
}
} else {
throw invalidParams({
missingParam: key,
got: tab
});
}
};

var optional = function (tab, key, typename) {
var p = tab[key];
if (isDefined(p)) {
if (typename) {
if (typeof (p) === typename) {
return p;
}
} else {
throw invalidParams({
wrongType: key,
got: tab
});
}
}
};

// dispatches the 'change' jet call.
// updates the internal cache (element table)
// and publishes a change event.
Expand All @@ -140,6 +103,23 @@ var createDaemon = function (options) {
}
};

var appendFetcherToElements = function (fetcher, fetchPeerId) {
var mayHaveInterest;
for (var path in elements) {
if (elements.hasOwnProperty(path)) {
mayHaveInterest = fetcher(
path,
path.toLowerCase(),
'add',
elements[path].value
);
if (mayHaveInterest) {
elements[path].fetchers[fetchPeerId] = fetcher;
}
}
}
};

// dispatches the 'fetch' jet call.
// creates a fetch operation and optionally a sorter.
// all elements are inputed as "fake" add events. The
Expand All @@ -148,54 +128,37 @@ var createDaemon = function (options) {
var fetch = function (peer, message) {
var params = checked(message, 'params', 'object');
var fetchId = checked(params, 'id');
var queueNotification;
var mayHaveInterest;
var fetchPeerId;
var notify = function (nparams) {
assert(queueNotification);
queueNotification(nparams);
};
var initializing = true;
var sorter = jetSorter.create(params, notify);
if (isDefined(sorter)) {
notify = function (nparams) {
sorter.sorter(nparams, initializing);
};
}
var fetcher = jetFetcher.create(params, notify);
peer.fetchers[fetchId] = fetcher;

if (isDefined(message.id) && !isDefined(sorter)) {
peer.sendMessage({
id: message.id,
result: true
});
}

queueNotification = function (nparams) {
var queueNotification = function (nparams) {
peer.sendMessage({
method: fetchId,
params: nparams
});
};

fetchPeerId = peer.id + fetchId;
var fetcher;
var sorter;
var initializing = true;

for (var path in elements) {
if (elements.hasOwnProperty(path)) {
mayHaveInterest = fetcher(
path,
path.toLowerCase(),
'add',
elements[path].value
);
if (mayHaveInterest) {
elements[path].fetchers[fetchPeerId] = fetcher;
}
if (isDefined(params.sort)) {
sorter = jetSorter.create(params, queueNotification);
fetcher = jetFetcher.create(params, function (nparams) {
sorter.sorter(nparams, initializing);
});
} else {
fetcher = jetFetcher.create(params, queueNotification);
if (isDefined(message.id)) {
peer.sendMessage({
id: message.id,
result: true
});
}
}

peer.fetchers[fetchId] = fetcher;
appendFetcherToElements(fetcher, peer.id + fetchId);
initializing = false;

if (isDefined(sorter) && sorter.flush) {
if (isDefined(message.id)) {
peer.sendMessage({
Expand Down Expand Up @@ -232,6 +195,26 @@ var createDaemon = function (options) {
// same message.id.
var rcount = 0;

var addRoute = function (message, peer, element) {
var timeout = optional(message.params, 'timeout', 'number') || element.timeout || 5;
/* jslint bitwise: true */
rcount = (rcount + 1) % 2 ^ 31;
var id = message.id.toString() + peer.id + rcount;
assert.equal(routes[id], null);
routes[id] = {
receiver: peer,
id: message.id,
timer: setTimeout(function () {
delete routes[id];
peer.sendMessage({
id: message.id,
error: responseTimeout(message.params)
});
}, timeout * 1000)
};
return id;
};

// routes / forwards a peer request or notification ("call","set") to the peer
// of the corresponding element specified by "params.path".
// creates an entry in the "route" table if it is a request and sets up a timer
Expand All @@ -242,28 +225,11 @@ var createDaemon = function (options) {
var path = checked(params, 'path', 'string');
var element = elements[path];
var req = {};
var id;
var timeout;
if (element) {
if (isDefined(message.id)) {
timeout = optional(params, 'timeout', 'number') || element.timeout || 5;
/* jslint bitwise: true */
rcount = (rcount + 1) % 2 ^ 31;
id = message.id.toString() + peer.id + rcount;
assert.equal(routes[id], null);
routes[id] = {
receiver: peer,
id: message.id,
timer: setTimeout(function () {
delete routes[id];
peer.sendMessage({
id: message.id,
error: responseTimeout(params)
});
}, timeout * 1000)
};
req.id = addRoute(message, peer, element);

}
req.id = id;
req.method = path;

if (params.value !== undefined) {
Expand All @@ -276,13 +242,12 @@ var createDaemon = function (options) {
}
element.peer.sendMessage(req);
} else {
var error = invalidParams({
pathNotExists: path
});
if (isDefined(message.id)) {
peer.sendMessage({
id: message.id,
error: error
error: invalidParams({
pathNotExists: path
})
});
}
}
Expand Down
82 changes: 30 additions & 52 deletions lib/jet/daemon/fetcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,34 @@ var isDefined = jetUtils.isDefined;
exports.create = function (options, notify) {
var pathMatcher = jetPathMatcher.create(options);
var valueMatcher = jetValueMatcher.create(options);
var added;
var added = {};

var matchValue = function (path, event, value) {
var isAdded = added[path];
if (event === 'remove' || !valueMatcher(value)) {
if (isAdded) {
delete added[path];
notify({
path: path,
event: 'remove',
value: value
});
}
return true;
}
if (!isAdded) {
event = 'add';
added[path] = true;
} else {
event = 'change';
}
notify({
path: path,
event: event,
value: value
});
return true;
};

if (isDefined(pathMatcher) && !isDefined(valueMatcher)) {
return function (path, lowerPath, event, value) {
Expand All @@ -24,64 +51,15 @@ exports.create = function (options, notify) {
return true;
};
} else if (!isDefined(pathMatcher) && isDefined(valueMatcher)) {
added = {};
return function (path, lowerPath, event, value) {
var isAdded = added[path];
if (event === 'remove' || !valueMatcher(value)) {
if (isAdded) {
delete added[path];
notify({
path: path,
event: 'remove',
value: value
});
}
return true;
}
if (!isAdded) {
event = 'add';
added[path] = true;
} else {
event = 'change';
}
notify({
path: path,
event: event,
value: value
});
return true;
return matchValue(path, event, value);
};
} else if (isDefined(pathMatcher) && isDefined(valueMatcher)) {
added = {};
return function (path, lowerPath, event, value) {
var isAdded;
if (!pathMatcher(path, lowerPath)) {
return false;
}
isAdded = added[path];
if (event === 'remove' || !valueMatcher(value)) {
if (isAdded) {
delete added[path];
notify({
path: path,
event: 'remove',
value: value
});
}
return true;
}
if (!isAdded) {
event = 'add';
added[path] = true;
} else {
event = 'change';
}
notify({
path: path,
event: event,
value: value
});
return true;
return matchValue(path, event, value);
};
} else {
return function (path, lowerPath, event, value) {
Expand Down
25 changes: 11 additions & 14 deletions lib/jet/daemon/path_matcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,15 @@ exports.create = function (options) {
predicates.push(gen(option));
}
});

var applyPredicates = function (path) {
for (var i = 0; i < predicates.length; ++i) {
if (!predicates[i](path)) {
return false;
}
}
return true;
};
if (ci) {
if (predicates.length === 1) {
pred = predicates[0];
Expand All @@ -136,13 +145,7 @@ exports.create = function (options) {
};
} else {
return function (path, lowerPath) {
var i;
for (i = 0; i < predicates.length; i = i + 1) {
if (!predicates[i](lowerPath)) {
return false;
}
}
return true;
return applyPredicates(lowerPath);
};
}
} else {
Expand All @@ -153,13 +156,7 @@ exports.create = function (options) {
};
} else {
return function (path) {
var i;
for (i = 0; i < predicates.length; i = i + 1) {
if (!predicates[i](path)) {
return false;
}
}
return true;
return applyPredicates(path);
};
}
}
Expand Down
Loading