Skip to content
Browse files

handle reconnects better

  • Loading branch information...
1 parent 5311629 commit f898182ab81d5e73535f79ae3e79e74f2b980050 @Raynos Raynos committed
Showing with 173 additions and 51 deletions.
  1. +3 −3 README.md
  2. +83 −19 browser.js
  3. +29 −10 examples/lazynode/server.js
  4. +3 −0 examples/simple/static/index.js
  5. +1 −1 index.js
  6. +19 −8 lib/handleClient.js
  7. +10 −2 lib/handleServer.js
  8. +19 −2 lib/redirectServerToClient.js
  9. +6 −6 package.json
View
6 README.md
@@ -1,4 +1,4 @@
-# browser-stream-server
+# stream-server
A stream server in the browser
@@ -118,11 +118,11 @@ Optionally pass in a prefix which defaults to `"/browser-server"`. If you pass i
The returned proxy object emits `server-created` and `server-destroyed` events when a server stream connects and claims to own the SERVER_NAME
``` js
-proxy.on("server-created", function (serverName) {
+proxy.on("server.created", function (serverName) {
createSomeSpecialResources(serverName)
})
-proxy.on("server-destroyed", function (serverName) {
+proxy.on("server.destroyed", function (serverName) {
destroySpecialResources(serverName)
})
```
View
102 browser.js
@@ -22,9 +22,9 @@ function createServer(mdm, options, callback) {
function connect(name) {
// Return a proxy stream
- var writable = PauseStream().pause()
+ var readable = through(catchFirstMessage)
// catch first message from server that stream is open
- , readable = through(catchFirstMessage)
+ , writable = PauseStream().pause()
, proxy = duplex(writable, readable)
, tryAgain = 4
, connected = false
@@ -35,22 +35,30 @@ function createServer(mdm, options, callback) {
return proxy
function openStream() {
- // connect to a client directly
- var stream = mdm.createStream(prefix + "/client/" + name +
- "/" + rack())
+ // connect to a server directly
+ var id = rack()
+ , stream = mdm.createStream(prefix + "/client/" + name +
+ "/" + id)
+
+ console.log("opening", id)
writable.pipe(stream).pipe(readable)
+ stream.once("ended", function () {
+ console.log("client-stream did not end")
+ stream.end()
+ })
+
// handle server does not exist yet and try to reconnect 4 times
stream.once("error", handleError)
function handleError(err) {
- var parts = err.message.split(":")
- , code = parts[0]
- , message = parts[1]
+ var code = err.code
+ , message = err.message
- if (code === "404" && message === " server does not exist") {
+ if (code === "404" && message === "server does not exist") {
tryAgain--
+ console.log('server does not exist', tryAgain)
if (tryAgain === 0) {
return proxy.emit("error", err)
}
@@ -66,17 +74,13 @@ function createServer(mdm, options, callback) {
function catchFirstMessage(data) {
// the open message is from the proxy server
- if (connected === false && data === "open") {
+ if (connected === false && data === "[stream-server]:open") {
connected = true
writable.resume()
} else {
this.emit("data", data)
}
}
-
- function isOpen() {
- writable.resume()
- }
}
function listen(name) {
@@ -88,12 +92,72 @@ function createServer(mdm, options, callback) {
return stream
function openServerConnection(clientName) {
- // for each client message that comes up open the client stream
- // and return it into the callback
- var clientStream = mdm.createStream(prefix + "/server/" + name +
- "/client/" + clientName)
+ console.log("request to open", clientName)
+
+ // for each client message that comes up create a proxy stream
+ // and return it
+ var writable = PauseStream().pause()
+ , readable = through(catchFirstMessage)
+ , proxy = duplex(writable, readable)
+ , tryAgain = 4
+ , connected = false
+
+ openStream()
+
+ callback(proxy)
+
+ function openStream() {
+ // connect to client directly (through relay)
+ var id = rack()
+ , stream = mdm.createStream(prefix + "/server/" + name +
+ "/client/" + clientName + "/" + id)
+
+ console.log("opening", id)
- callback(clientStream)
+ // Boot server says the relay server disconnected.
+ // Clean up open streams
+ stream.once("ended", function () {
+ console.log("ending server", id)
+ stream.purge && stream.purge()
+ stream.end()
+ })
+
+ writable.pipe(stream).pipe(readable)
+
+ // handle client does not exist yet and try to reconnect
+ stream.once("error", handleError)
+
+ function handleError(err) {
+ var code = err.code
+ , message = err.message
+
+ if (code === "500" && message === "cannot open client-" +
+ "server connection"
+ ) {
+ tryAgain--
+ console.log('client does not exist', tryAgain, id)
+ if (tryAgain === 0) {
+ return proxy.emit("error", err)
+ }
+
+ stream.end()
+ stream.destroy()
+ setTimeout(openStream, 500)
+ } else {
+ proxy.emit("error", err)
+ }
+ }
+ }
+
+ function catchFirstMessage(data) {
+ // the open message is from the proxy server
+ if (connected === false && data === "[stream-server]:open") {
+ connected = true
+ writable.resume()
+ } else {
+ this.emit("data", data)
+ }
+ }
}
}
}
View
39 examples/lazynode/server.js
@@ -1,15 +1,34 @@
var browserifyServer = require("browserify-server")
- , http = require("http")
- , path = require("path")
, boot = require("boot")
- , StreamServer = require("../..")
- , StreamRouter = require("stream-router")
+ , StreamServerProxy = require("../..")
-var handler = browserifyServer(path.join(__dirname, "static"))
- , server = http.createServer(handler).listen(8080)
- , streamRouter = StreamRouter()
- , sock = boot(streamRouter)
+var server = browserifyServer.listen(__dirname, 8080)
+ , proxy = StreamServerProxy()
-streamRouter.addRoute("/browser-server/*", StreamServer("/browser-server"))
+boot.install(server, logger(proxy))
-sock.install(server, "/boot")
+function logger(f) {
+ return function (stream) {
+ console.log("[BOOT-STREAM-RECEIVED]", {
+ meta: stream.meta
+ , id: stream.id
+ })
+ stream.on("data", function (data) {
+ console.log("[BOOT-STREAM-DATA]", {
+ meta: stream.meta
+ , data: data
+ , id: stream.id
+ })
+ })
+ var _write = stream.write
+ stream.write = function (data) {
+ console.log("[BOOT-STREAM-WRITE]", {
+ meta: stream.meta
+ , data: data
+ , id: stream.id
+ })
+ _write.apply(this, arguments)
+ }
+ f.apply(this, arguments)
+ }
+}
View
3 examples/simple/static/index.js
@@ -14,6 +14,9 @@ function listen() {
console.log("[SERVER]", data)
})
stream.write("from server")
+ stream.on("end", function () {
+ console.log("[SERVER] end")
+ })
}).listen(SERVER_NAME)
}
View
2 index.js
@@ -17,7 +17,7 @@ function StreamServerProxy(prefix) {
prefix = prefix || "/stream-server"
- proxy.addRoute(prefix + "/server/:serverName/client/:clientName"
+ proxy.addRoute(prefix + "/server/:serverName/client/:clientName/*"
, partial(redirectServerToClient, stores))
proxy.addRoute(prefix + "/server/:serverName"
, partial(handleServer, stores, proxy))
View
27 lib/handleClient.js
@@ -11,7 +11,12 @@ function handleClient(stores, stream, params, counter) {
counter = counter || 0
if (!server) {
- stream.error("404: server does not exist: " + serverName)
+ stream.error({
+ code: "404"
+ , library: "[stream-server]"
+ , message: "server does not exist"
+ , details: serverName
+ })
return stream.end()
}
@@ -19,21 +24,27 @@ function handleClient(stores, stream, params, counter) {
, rack = server.rack
, serverStream = server.stream
, clientName = rack()
- , bufferWritable = through()
- , bufferReadable = PauseStream().pause()
- , buffer = duplex(bufferWritable, bufferReadable)
+ , buffer = BufferStream(stream)
store.set(clientName, buffer)
- serverStream.write(clientName)
+ console.log("handling client", clientName)
- // When a client requests a server connection buffer that stream
- // until the server responds
- bufferWritable.pipe(stream).pipe(bufferReadable)
+ serverStream.write(clientName)
stream.once("end", onend)
function onend() {
store.delete(clientName)
}
+}
+
+function BufferStream(stream) {
+ var writable = through()
+ , readable = PauseStream().pause()
+ , buffer = duplex(writable, readable)
+
+ writable.pipe(stream).pipe(readable)
+
+ return buffer
}
View
12 lib/handleServer.js
@@ -9,9 +9,16 @@ function handleServer(stores, proxy, stream, params) {
if (store) {
// someone already registered this server. This is invalid state
+ stream.error({
+ code: "500"
+ , library: "[stream-server]"
+ , message: "server already exists: " + serverName
+ , trace: console.trace()
+ })
return stream.end()
}
+ // create a store for all the clients
store = StreamStore()
stores[serverName] = {
@@ -20,7 +27,8 @@ function handleServer(stores, proxy, stream, params) {
, stream: stream
}
- proxy.emit("server-created", serverName)
+ // emit that a server was created back up to the server
+ proxy.emit("server.created", serverName)
stream.once("end", onend)
@@ -29,7 +37,7 @@ function handleServer(stores, proxy, stream, params) {
// Close all the open client streams that are trying to connect
// to this server
store.iterate(ensureStreamIsEnded)
- proxy.emit("server-destroyed", serverName)
+ proxy.emit("server.destroyed", serverName)
}
function ensureStreamIsEnded(stream) {
View
21 lib/redirectServerToClient.js
@@ -7,6 +7,13 @@ function redirectServerToClient(stores, stream, params) {
if (!server) {
// trying to connect to a stream when a server is not registered
+ stream.error({
+ code: "500"
+ , library: "[stream-server]"
+ , message: "cannot open client-server connection, server does" +
+ "not exist"
+ , details: serverName
+ })
return stream.end()
}
@@ -15,12 +22,22 @@ function redirectServerToClient(stores, stream, params) {
if (!hasClientStream) {
// The client the server is trying to connect to doesn't exist
+ stream.error({
+ code: "500"
+ , library: "[stream-server]"
+ , message: "cannot open client-server connection"
+ , details: clientName
+ })
return stream.end()
}
var clientStream = store.get(clientName)
- clientStream.write("open")
-
+ // tell the client it is open so it can empty its buffer directly to
+ // the server
+ clientStream.write("[stream-server]:open")
+ // Tell the server that this connection is open aswell
+ stream.write("[stream-server]:open")
+ // connect client and server
stream.pipe(clientStream).pipe(stream)
clientStream.resume()
View
12 package.json
@@ -1,12 +1,12 @@
{
- "name": "browser-stream-server",
+ "name": "stream-server",
"version": "0.4.0",
"description": "A stream server in the browser",
"keywords": [],
"author": "Raynos <raynos2@gmail.com>",
- "repository": "git://github.com/Colingo/browser-stream-server.git",
+ "repository": "git://github.com/Colingo/stream-server.git",
"main": "index",
- "homepage": "https://github.com/Colingo/browser-stream-server",
+ "homepage": "https://github.com/Colingo/stream-server",
"browserify": "browser.js",
"contributors": [
{
@@ -14,7 +14,7 @@
}
],
"bugs": {
- "url": "https://github.com/Colingo/browser-stream-server/issues",
+ "url": "https://github.com/Colingo/stream-server/issues",
"email": "raynos2@gmail.com"
},
"dependencies": {
@@ -28,14 +28,14 @@
"xtend": "~1.0.3"
},
"devDependencies": {
- "boot": "~0.8.0",
+ "boot": "~0.8.1",
"browserify-server": "~0.2.1",
"lazynode": "~0.1.0"
},
"licenses": [
{
"type": "MIT",
- "url": "http://github.com/Colingo/browser-stream-server/raw/master/LICENSE"
+ "url": "http://github.com/Colingo/stream-server/raw/master/LICENSE"
}
],
"scripts": {}

0 comments on commit f898182

Please sign in to comment.
Something went wrong with that request. Please try again.