Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

vfs-socket: Enable event routing

  • Loading branch information...
commit 4c8dea5db56b8e8c784aa2298db3eb50ce8f98be 1 parent e430be3
@creationix creationix authored
Showing with 82 additions and 3 deletions.
  1. +2 −1  example/test-events.js
  2. +62 −2 socket/consumer.js
  3. +18 −0 socket/worker.js
View
3  example/test-events.js
@@ -1,4 +1,4 @@
-// Test real version
+// // Test real version
// var vfs = require('vfs-local')({
// root: __dirname + "/"
// });
@@ -19,6 +19,7 @@ function test(vfs) {
console.log("MONKEY is emitted");
vfs.off("MONKEY", onMonkey, function () {
console.log("MONKEY is no longer listening");
+ vfs.emit("MONKEY", "IS NOT LISTENING");
});
});
});
View
64 socket/consumer.js
@@ -13,13 +13,17 @@ function Consumer() {
onEnd: onEnd,
onClose: onClose,
onChange: onChange,
- onReady: onReady
+ onReady: onReady,
+ onEvent: onEvent
});
var proxyStreams = {}; // Stream proxies given us by the other side
var proxyProcesses = {}; // Process proxies given us by the other side
var proxyWatchers = {}; // Watcher proxies given us by the other side
var proxyApis = {};
+ var handlers = {}; // local handlers for remote events
+ var pendingOn = {}; // queue for pending on handlers.
+ var pendingOff = {}; // queue for pending off handlers.
this.vfs = {
ping: ping, // Send a simple ping request to the worker
@@ -38,7 +42,10 @@ function Consumer() {
symlink: route("symlink"),
watch: route("watch"),
changedSince: route("changedSince"),
- extend: route("extend")
+ extend: route("extend"),
+ emit: emit,
+ on: on,
+ off: off
};
var remote = this.remoteApi;
@@ -149,9 +156,62 @@ function Consumer() {
api.emit("ready");
}
+ // For routing events from remote vfs to local listeners.
+ function onEvent(name, value) {
+ var list = handlers[name];
+ if (!list) return;
+ for (var i = 0, l = list.length; i < l; i++) {
+ list[i](value);
+ }
+ }
+
+ function on(name, handler, callback) {
+ if (handlers[name]) {
+ handlers[name].push(handler);
+ if (pendingOn[name]) {
+ callback && pendingOn[name].push(callback);
+ return;
+ }
+ return callback();
+ }
+ handlers[name] = [handler];
+ var pending = pendingOn[name] = [];
+ callback && pending.push(callback);
+ return remote.subscribe(name, function (err) {
+ for (var i = 0, l = pending.length; i < l; i++) {
+ pending[i](err);
+ }
+ delete pendingOn[name];
+ });
+ }
+
+ function off(name, handler, callback) {
+ if (pendingOff[name]) {
+ callback && pendingOff[name].push(callback);
+ return;
+ }
+ if (!handlers[name]) {
+ return callback();
+ }
+ var pending = pendingOff[name] = [];
+ callback && pending.push(callback);
+ return remote.unsubscribe(name, function (err) {
+ delete handlers[name];
+ for (var i = 0, l = pending.length; i < l; i++) {
+ pending[i](err);
+ }
+ delete pendingOff[name];
+ });
+ }
+
+ function emit() {
+ remote.emit.apply(this, arguments);
+ }
+
// Return fake endpoints in the initial return till we have the real ones.
function route(name) {
return function (path, options, callback) {
+ if (!callback) throw new Error("Forgot to pass in callback for " + name);
return remote[name].call(this, path, options, function (err, meta) {
if (err) return callback(err);
if (meta.stream) {
View
18 socket/worker.js
@@ -15,6 +15,10 @@ function Worker(vfs) {
close: close,
call: call,
ping: ping,
+ subscribe: subscribe,
+ unsubscribe: unsubscribe,
+ emit: vfs.emit,
+
// Route other calls to the local vfs instance
spawn: route("spawn"),
exec: route("exec"),
@@ -38,8 +42,22 @@ function Worker(vfs) {
var watchers = {};
var processes = {};
var apis = {};
+ var handlers = {};
var remote = this.remoteApi;
+ function subscribe(name, callback) {
+ handlers[name] = function (value) {
+ remote.onEvent(name, value);
+ }
+ vfs.on(name, handlers[name], callback);
+ }
+
+ function unsubscribe(name, callback) {
+ if (!handlers[name]) return;
+ vfs.off(name, handlers[name], callback);
+ delete handlers[name];
+ }
+
// Resume readable streams that we paused when the channel drains
this.on("drain", function () {
Object.keys(streams).forEach(function (id) {
Please sign in to comment.
Something went wrong with that request. Please try again.