diff --git a/index.js b/index.js index 0ee8051a..6e43217f 100644 --- a/index.js +++ b/index.js @@ -2,7 +2,7 @@ const debug = require('debug')('simple-peer') const getBrowserRTC = require('get-browser-rtc') const randombytes = require('randombytes') -const stream = require('readable-stream') +const { Duplex } = require('streamx') const queueMicrotask = require('queue-microtask') // TODO: remove when Node 10 is not supported const errCode = require('err-code') const { Buffer } = require('buffer') @@ -25,7 +25,7 @@ function warn (message) { * Duplex stream. * @param {Object} opts */ -class Peer extends stream.Duplex { +class Peer extends Duplex { constructor (opts) { opts = Object.assign({ allowHalfOpen: false @@ -33,6 +33,8 @@ class Peer extends stream.Duplex { super(opts) + this.__objectMode = !!opts.objectMode // streamx is objectMode by default, so implement readable's fuctionality + this._id = randombytes(4).toString('hex').slice(0, 7) this._debug('new peer %o', opts) @@ -52,8 +54,8 @@ class Peer extends stream.Duplex { this.allowHalfTrickle = opts.allowHalfTrickle !== undefined ? opts.allowHalfTrickle : false this.iceCompleteTimeout = opts.iceCompleteTimeout || ICECOMPLETE_TIMEOUT - this.destroyed = false - this.destroying = false + this._destroying = false + this._connected = false this.remoteAddress = undefined @@ -180,7 +182,7 @@ class Peer extends stream.Duplex { } signal (data) { - if (this.destroying) return + if (this._destroying) return if (this.destroyed) throw errCode(new Error('cannot signal after peer is destroyed'), 'ERR_DESTROYED') if (typeof data === 'string') { try { @@ -244,7 +246,7 @@ class Peer extends stream.Duplex { * @param {ArrayBufferView|ArrayBuffer|Buffer|string|Blob} chunk */ send (chunk) { - if (this.destroying) return + if (this._destroying) return if (this.destroyed) throw errCode(new Error('cannot send after peer is destroyed'), 'ERR_DESTROYED') this._channel.send(chunk) } @@ -255,7 +257,7 @@ class Peer extends stream.Duplex { * @param {Object} init */ addTransceiver (kind, init) { - if (this.destroying) return + if (this._destroying) return if (this.destroyed) throw errCode(new Error('cannot addTransceiver after peer is destroyed'), 'ERR_DESTROYED') this._debug('addTransceiver()') @@ -279,7 +281,7 @@ class Peer extends stream.Duplex { * @param {MediaStream} stream */ addStream (stream) { - if (this.destroying) return + if (this._destroying) return if (this.destroyed) throw errCode(new Error('cannot addStream after peer is destroyed'), 'ERR_DESTROYED') this._debug('addStream()') @@ -294,7 +296,7 @@ class Peer extends stream.Duplex { * @param {MediaStream} stream */ addTrack (track, stream) { - if (this.destroying) return + if (this._destroying) return if (this.destroyed) throw errCode(new Error('cannot addTrack after peer is destroyed'), 'ERR_DESTROYED') this._debug('addTrack()') @@ -319,7 +321,7 @@ class Peer extends stream.Duplex { * @param {MediaStream} stream */ replaceTrack (oldTrack, newTrack, stream) { - if (this.destroying) return + if (this._destroying) return if (this.destroyed) throw errCode(new Error('cannot replaceTrack after peer is destroyed'), 'ERR_DESTROYED') this._debug('replaceTrack()') @@ -343,7 +345,7 @@ class Peer extends stream.Duplex { * @param {MediaStream} stream */ removeTrack (track, stream) { - if (this.destroying) return + if (this._destroying) return if (this.destroyed) throw errCode(new Error('cannot removeTrack after peer is destroyed'), 'ERR_DESTROYED') this._debug('removeSender()') @@ -370,7 +372,7 @@ class Peer extends stream.Duplex { * @param {MediaStream} stream */ removeStream (stream) { - if (this.destroying) return + if (this._destroying) return if (this.destroyed) throw errCode(new Error('cannot removeStream after peer is destroyed'), 'ERR_DESTROYED') this._debug('removeSenders()') @@ -396,7 +398,7 @@ class Peer extends stream.Duplex { } negotiate () { - if (this.destroying) return + if (this._destroying) return if (this.destroyed) throw errCode(new Error('cannot negotiate after peer is destroyed'), 'ERR_DESTROYED') if (this.initiator) { @@ -424,29 +426,19 @@ class Peer extends stream.Duplex { this._isNegotiating = true } - // TODO: Delete this method once readable-stream is updated to contain a default - // implementation of destroy() that automatically calls _destroy() - // See: https://github.com/nodejs/readable-stream/issues/283 - destroy (err) { - this._destroy(err, () => {}) + _final (cb) { + if (!this._readableState.ended) this.push(null) + cb() } - _destroy (err, cb) { - if (this.destroyed || this.destroying) return - this.destroying = true + _destroy (cb) { + if (this.destroyed || this._destroying) return + this._destroying = true - this._debug('destroying (error: %s)', err && (err.message || err)) + if (!this._writableState.ended) this.end() queueMicrotask(() => { // allow events concurrent with the call to _destroy() to fire (see #692) - this.destroyed = true - this.destroying = false - - this._debug('destroy (error: %s)', err && (err.message || err)) - - this.readable = this.writable = false - - if (!this._readableState.ended) this.push(null) - if (!this._writableState.finished) this.end() + this._destroying = false this._connected = false this._pcReady = false @@ -493,8 +485,6 @@ class Peer extends stream.Duplex { this._pc = null this._channel = null - if (err) this.emit('error', err) - this.emit('close') cb() }) } @@ -548,9 +538,7 @@ class Peer extends stream.Duplex { }, CHANNEL_CLOSING_TIMEOUT) } - _read () {} - - _write (chunk, encoding, cb) { + _write (chunk, cb) { if (this.destroyed) return cb(errCode(new Error('cannot write after peer is destroyed'), 'ERR_DATA_CHANNEL')) if (this._connected) { @@ -972,7 +960,7 @@ class Peer extends stream.Duplex { _onChannelMessage (event) { if (this.destroyed) return let data = event.data - if (data instanceof ArrayBuffer) data = Buffer.from(data) + if (data instanceof ArrayBuffer || this.__objectMode === false) data = Buffer.from(data) this.push(data) } diff --git a/package.json b/package.json index aa4635cb..dc34b68c 100644 --- a/package.json +++ b/package.json @@ -17,7 +17,7 @@ "get-browser-rtc": "^1.1.0", "queue-microtask": "^1.2.3", "randombytes": "^2.1.0", - "readable-stream": "^3.6.0" + "streamx": "^2.12.4" }, "devDependencies": { "airtap": "^4.0.3", @@ -66,7 +66,7 @@ "test": "standard && npm run test-browser", "test-browser": "airtap --coverage --concurrency 1 -- test/*.js", "test-browser-local": "airtap --coverage --preset local -- test/*.js", - "test-node": "WRTC=wrtc tape test/*.js", + "test-node": "set WRTC=wrtc && tape test/*.js", "coverage": "nyc report --reporter=text-lcov | coveralls" }, "funding": [ diff --git a/test/common.js b/test/common.js index 204c3024..e0440446 100644 --- a/test/common.js +++ b/test/common.js @@ -20,7 +20,7 @@ exports.getConfig = thunky(function (cb) { }) // For testing on node, we must provide a WebRTC implementation -if (process.env.WRTC === 'wrtc') { +if (process.env.WRTC?.startsWith('wrtc')) { exports.wrtc = require('wrtc') } diff --git a/test/stream.js b/test/stream.js index 55993987..36600f27 100644 --- a/test/stream.js +++ b/test/stream.js @@ -28,7 +28,7 @@ test('duplex stream: send data before "connect" event', function (t) { }) peer1.on('finish', function () { t.pass('got peer1 "finish"') - t.ok(peer1._writableState.finished) + t.ok(peer1._writableState.ended) }) peer1.on('end', function () { t.pass('got peer1 "end"') @@ -40,7 +40,7 @@ test('duplex stream: send data before "connect" event', function (t) { }) peer2.on('finish', function () { t.pass('got peer2 "finish"') - t.ok(peer2._writableState.finished) + t.ok(peer2._writableState.ended) }) peer2.on('end', function () { t.pass('got peer2 "end"') @@ -67,7 +67,7 @@ test('duplex stream: send data one-way', function (t) { }) peer1.on('finish', function () { t.pass('got peer1 "finish"') - t.ok(peer1._writableState.finished) + t.ok(peer1._writableState.ended) }) peer1.on('end', function () { t.pass('got peer1 "end"') @@ -79,7 +79,7 @@ test('duplex stream: send data one-way', function (t) { }) peer2.on('finish', function () { t.pass('got peer2 "finish"') - t.ok(peer2._writableState.finished) + t.ok(peer2._writableState.ended) }) peer2.on('end', function () { t.pass('got peer2 "end"')