Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

more robustness and error events in boot

  • Loading branch information...
commit 5df69d76e6bfaba4dae06433c7a8e5303f5e3182 1 parent 0897474
@Raynos authored
Showing with 66 additions and 38 deletions.
  1. +61 −33 browser.js
  2. +5 −5 package.json
View
94 browser.js
@@ -5,7 +5,7 @@ var shoe = require("mux-demux-shoe")
, PauseStream = require("pause-stream")
, through = require("through")
, duplex = require("duplexer")
- , Backoff = require("backoff").fibonnaci
+ , Backoff = require("backoff").FibonacciStrategy
, BACKOFF_OPTIONS = {
initialDelay: 500,
maxDelay: 10000
@@ -13,14 +13,25 @@ var shoe = require("mux-demux-shoe")
module.exports = reconnecter
-function reconnecter(uri) {
+function reconnecter(uri, options) {
+ if (!options) {
+ options = {}
+ }
+
+ if (!options.backoff) {
+ options.backoff = BACKOFF_OPTIONS
+ }
+
+ // Proxy stream that does not close
var proxyWrite = PauseStream().pause()
, proxyRead = through()
, proxy = duplex(proxyWrite, proxyRead)
+ // Hold all the MD streams we open so we can re-open them transparently
, metaStreams = []
, stream
, connected = false
- , backoff = Backoff(BACKOFF_OPTIONS)
+ // Create a backoff scheme
+ , backoff = new Backoff(options.backoff)
uri = uri || "/boot"
@@ -28,17 +39,21 @@ function reconnecter(uri) {
proxy.createWriteStream = createWriteStream
proxy.createReadStream = createReadStream
+ // Open the initial connection
createShoeStream()
return proxy
function createShoeStream() {
- stream = shoe(uri)
+ // create a new stream to the server
+ stream = shoe(uri, options)
+ // re-open proxy MDM streams for all the open ones
metaStreams.forEach(proxyMdmStream)
stream.once("connect", onconnect)
+ // connect the proxy to the stream. The proxy is not allowed to end
proxyWrite.pipe(stream).pipe(proxyRead, {
end: false
})
@@ -47,9 +62,11 @@ function reconnecter(uri) {
stream.on("end", repoll)
}
- function repoll() {
+ function repoll(wasOpen) {
+ proxy.emit("ended", wasOpen)
// wait a second otherwise it spin locks
- var delay = backoff.backoffStrategy_.next()
+ var delay = backoff.next()
+ proxy.emit("poll", delay)
setTimeout(createShoeStream, delay)
}
@@ -59,6 +76,7 @@ function reconnecter(uri) {
backoff.reset()
+ // resume all the buffered state because the connection is open
proxyWrite.resume()
metaStreams.forEach(resume)
@@ -67,70 +85,78 @@ function reconnecter(uri) {
}
function handleDisconnect() {
+ // pause existing proxy streams
proxyWrite.pause()
metaStreams.forEach(pause)
+ // boolean on ended means it's a disconnect rather then just a server
+ // closing the stream
proxy.emit("disconnect")
connected = false
- repoll()
+ // repoll server for open connection
+ repoll(true)
}
function proxyMdmStream(details) {
+ // grab the proxy streams
var proxyMdmRead = details.proxyMdmRead
, proxyMdmWrite = details.proxyMdmWrite
+ , proxy = details.proxy
, meta = details.meta
, opts = details.opts
var mdm = stream.createStream(meta, opts)
+ // proxy MDM stream can't be closed
proxyMdmWrite.pipe(mdm).pipe(proxyMdmRead, {
end: false
})
- mdm.once("end", function () {
+ mdm.on("error", reemit)
+
+ // if this stream was destroyed then don't close the proxy
+ mdm.once("end", onend)
+
+ function reemit(err) {
+ proxy.emit("error", err)
+ }
+
+ function onend() {
if (!mdm.destroyed) {
+ // it ended cleanly so end the proxy
proxyMdmRead.emit("end")
+ } else {
+ // ended means there is a temporary disconnect from the server
+ proxy.emit("ended")
}
- })
+ }
}
function createStream(meta, opts) {
+ // create proxy for MDM
var proxyMdmRead = through()
, proxyMdmWrite = PauseStream()
, proxy = duplex(proxyMdmWrite, proxyMdmRead)
+ , details = {
+ proxy: proxy
+ , proxyMdmRead: proxyMdmRead
+ , proxyMdmWrite: proxyMdmWrite
+ , meta: meta
+ , opts: opts
+ }
- var mdm = stream.createStream(meta, opts)
-
- if (!connected) {
- proxyMdmWrite.pause()
- }
-
- proxyMdmWrite.pipe(mdm).pipe(proxyMdmRead, {
- end: false
- })
+ proxyMdmStream(details)
- mdm.once("end", function () {
- if (!mdm.destroyed) {
- proxyMdmRead.emit("end")
- }
- })
+ var index = metaStreams.push(details)
- var index = metaStreams.push({
- proxy: proxy
- , proxyMdmRead: proxyMdmRead
- , proxyMdmWrite: proxyMdmWrite
- , meta: meta
- , opts: opts
- })
+ proxyMdmWrite.once("end", removeFromCache)
- proxyMdmWrite.on("end", removeFromCache)
+ return proxy
function removeFromCache() {
metaStreams.splice(index - 1, 1)
}
-
- return proxy
}
function createWriteStream(meta) {
@@ -150,8 +176,10 @@ function reconnecter(uri) {
function pause(details) {
details.proxyMdmWrite.pause()
+ details.proxy.emit("disconnect")
}
function resume(details) {
details.proxyMdmWrite.resume()
+ details.proxy.emit("connect")
}
View
10 package.json
@@ -4,7 +4,7 @@
"description": "An automatic reconnect mux-demux-shoe",
"keywords": [],
"author": "Raynos <raynos2@gmail.com>",
- "repository": "git://github.com/Raynos/boot.git",
+ "repository": "git://github.com/Raynos/boot.gitn",
"main": "index",
"homepage": "https://github.com/Raynos/boot",
"browserify": "browser.js",
@@ -18,11 +18,11 @@
"email": "raynos2@gmail.com"
},
"dependencies": {
- "mux-demux-shoe": "~0.2.0",
- "through": "~0.1.2",
+ "mux-demux-shoe": "~0.2.4",
+ "through": "~0.1.4",
"pause-stream": "0.0.3",
- "backoff": "~0.2.0",
- "duplexer": "0.0.1"
+ "backoff": "~0.3.0",
+ "duplexer": "~0.0.2"
},
"devDependencies": {
"upnode": "~0.4.1",
Please sign in to comment.
Something went wrong with that request. Please try again.