Skip to content

Commit

Permalink
fix: migrate to streamx
Browse files Browse the repository at this point in the history
  • Loading branch information
ThaUnknown committed Dec 5, 2022
1 parent f1a492d commit ee748de
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 44 deletions.
62 changes: 25 additions & 37 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
const debug = require('debug')('simple-peer')
const getBrowserRTC = require('get-browser-rtc')
const randombytes = require('randombytes')
const stream = require('readable-stream')
const { Duplex } = require('streamx')
const queueMicrotask = require('queue-microtask') // TODO: remove when Node 10 is not supported
const errCode = require('err-code')
const { Buffer } = require('buffer')
Expand All @@ -25,14 +25,16 @@ function warn (message) {
* Duplex stream.
* @param {Object} opts
*/
class Peer extends stream.Duplex {
class Peer extends Duplex {
constructor (opts) {
opts = Object.assign({
allowHalfOpen: false
}, opts)

super(opts)

this.__objectMode = !!opts.objectMode // streamx is objectMode by default, so implement readable's fuctionality

this._id = randombytes(4).toString('hex').slice(0, 7)
this._debug('new peer %o', opts)

Expand All @@ -52,8 +54,8 @@ class Peer extends stream.Duplex {
this.allowHalfTrickle = opts.allowHalfTrickle !== undefined ? opts.allowHalfTrickle : false
this.iceCompleteTimeout = opts.iceCompleteTimeout || ICECOMPLETE_TIMEOUT

this.destroyed = false
this.destroying = false
this._destroying = false

this._connected = false

this.remoteAddress = undefined
Expand Down Expand Up @@ -180,7 +182,7 @@ class Peer extends stream.Duplex {
}

signal (data) {
if (this.destroying) return
if (this._destroying) return
if (this.destroyed) throw errCode(new Error('cannot signal after peer is destroyed'), 'ERR_DESTROYED')
if (typeof data === 'string') {
try {
Expand Down Expand Up @@ -244,7 +246,7 @@ class Peer extends stream.Duplex {
* @param {ArrayBufferView|ArrayBuffer|Buffer|string|Blob} chunk
*/
send (chunk) {
if (this.destroying) return
if (this._destroying) return
if (this.destroyed) throw errCode(new Error('cannot send after peer is destroyed'), 'ERR_DESTROYED')
this._channel.send(chunk)
}
Expand All @@ -255,7 +257,7 @@ class Peer extends stream.Duplex {
* @param {Object} init
*/
addTransceiver (kind, init) {
if (this.destroying) return
if (this._destroying) return
if (this.destroyed) throw errCode(new Error('cannot addTransceiver after peer is destroyed'), 'ERR_DESTROYED')
this._debug('addTransceiver()')

Expand All @@ -279,7 +281,7 @@ class Peer extends stream.Duplex {
* @param {MediaStream} stream
*/
addStream (stream) {
if (this.destroying) return
if (this._destroying) return
if (this.destroyed) throw errCode(new Error('cannot addStream after peer is destroyed'), 'ERR_DESTROYED')
this._debug('addStream()')

Expand All @@ -294,7 +296,7 @@ class Peer extends stream.Duplex {
* @param {MediaStream} stream
*/
addTrack (track, stream) {
if (this.destroying) return
if (this._destroying) return
if (this.destroyed) throw errCode(new Error('cannot addTrack after peer is destroyed'), 'ERR_DESTROYED')
this._debug('addTrack()')

Expand All @@ -319,7 +321,7 @@ class Peer extends stream.Duplex {
* @param {MediaStream} stream
*/
replaceTrack (oldTrack, newTrack, stream) {
if (this.destroying) return
if (this._destroying) return
if (this.destroyed) throw errCode(new Error('cannot replaceTrack after peer is destroyed'), 'ERR_DESTROYED')
this._debug('replaceTrack()')

Expand All @@ -343,7 +345,7 @@ class Peer extends stream.Duplex {
* @param {MediaStream} stream
*/
removeTrack (track, stream) {
if (this.destroying) return
if (this._destroying) return
if (this.destroyed) throw errCode(new Error('cannot removeTrack after peer is destroyed'), 'ERR_DESTROYED')
this._debug('removeSender()')

Expand All @@ -370,7 +372,7 @@ class Peer extends stream.Duplex {
* @param {MediaStream} stream
*/
removeStream (stream) {
if (this.destroying) return
if (this._destroying) return
if (this.destroyed) throw errCode(new Error('cannot removeStream after peer is destroyed'), 'ERR_DESTROYED')
this._debug('removeSenders()')

Expand All @@ -396,7 +398,7 @@ class Peer extends stream.Duplex {
}

negotiate () {
if (this.destroying) return
if (this._destroying) return
if (this.destroyed) throw errCode(new Error('cannot negotiate after peer is destroyed'), 'ERR_DESTROYED')

if (this.initiator) {
Expand Down Expand Up @@ -424,29 +426,19 @@ class Peer extends stream.Duplex {
this._isNegotiating = true
}

// TODO: Delete this method once readable-stream is updated to contain a default
// implementation of destroy() that automatically calls _destroy()
// See: https://github.com/nodejs/readable-stream/issues/283
destroy (err) {
this._destroy(err, () => {})
_final (cb) {
if (!this._readableState.ended) this.push(null)
cb()
}

_destroy (err, cb) {
if (this.destroyed || this.destroying) return
this.destroying = true
_destroy (cb) {
if (this.destroyed || this._destroying) return
this._destroying = true

this._debug('destroying (error: %s)', err && (err.message || err))
if (!this._writableState.ended) this.end()

queueMicrotask(() => { // allow events concurrent with the call to _destroy() to fire (see #692)
this.destroyed = true
this.destroying = false

this._debug('destroy (error: %s)', err && (err.message || err))

this.readable = this.writable = false

if (!this._readableState.ended) this.push(null)
if (!this._writableState.finished) this.end()
this._destroying = false

this._connected = false
this._pcReady = false
Expand Down Expand Up @@ -493,8 +485,6 @@ class Peer extends stream.Duplex {
this._pc = null
this._channel = null

if (err) this.emit('error', err)
this.emit('close')
cb()
})
}
Expand Down Expand Up @@ -548,9 +538,7 @@ class Peer extends stream.Duplex {
}, CHANNEL_CLOSING_TIMEOUT)
}

_read () {}

_write (chunk, encoding, cb) {
_write (chunk, cb) {
if (this.destroyed) return cb(errCode(new Error('cannot write after peer is destroyed'), 'ERR_DATA_CHANNEL'))

if (this._connected) {
Expand Down Expand Up @@ -972,7 +960,7 @@ class Peer extends stream.Duplex {
_onChannelMessage (event) {
if (this.destroyed) return
let data = event.data
if (data instanceof ArrayBuffer) data = Buffer.from(data)
if (data instanceof ArrayBuffer || this.__objectMode === false) data = Buffer.from(data)
this.push(data)
}

Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"get-browser-rtc": "^1.1.0",
"queue-microtask": "^1.2.3",
"randombytes": "^2.1.0",
"readable-stream": "^3.6.0"
"streamx": "^2.12.4"
},
"devDependencies": {
"airtap": "^4.0.3",
Expand Down Expand Up @@ -66,7 +66,7 @@
"test": "standard && npm run test-browser",
"test-browser": "airtap --coverage --concurrency 1 -- test/*.js",
"test-browser-local": "airtap --coverage --preset local -- test/*.js",
"test-node": "WRTC=wrtc tape test/*.js",
"test-node": "set WRTC=wrtc && tape test/*.js",
"coverage": "nyc report --reporter=text-lcov | coveralls"
},
"funding": [
Expand Down
2 changes: 1 addition & 1 deletion test/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ exports.getConfig = thunky(function (cb) {
})

// For testing on node, we must provide a WebRTC implementation
if (process.env.WRTC === 'wrtc') {
if (process.env.WRTC?.startsWith('wrtc')) {
exports.wrtc = require('wrtc')
}

Expand Down
8 changes: 4 additions & 4 deletions test/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ test('duplex stream: send data before "connect" event', function (t) {
})
peer1.on('finish', function () {
t.pass('got peer1 "finish"')
t.ok(peer1._writableState.finished)
t.ok(peer1._writableState.ended)
})
peer1.on('end', function () {
t.pass('got peer1 "end"')
Expand All @@ -40,7 +40,7 @@ test('duplex stream: send data before "connect" event', function (t) {
})
peer2.on('finish', function () {
t.pass('got peer2 "finish"')
t.ok(peer2._writableState.finished)
t.ok(peer2._writableState.ended)
})
peer2.on('end', function () {
t.pass('got peer2 "end"')
Expand All @@ -67,7 +67,7 @@ test('duplex stream: send data one-way', function (t) {
})
peer1.on('finish', function () {
t.pass('got peer1 "finish"')
t.ok(peer1._writableState.finished)
t.ok(peer1._writableState.ended)
})
peer1.on('end', function () {
t.pass('got peer1 "end"')
Expand All @@ -79,7 +79,7 @@ test('duplex stream: send data one-way', function (t) {
})
peer2.on('finish', function () {
t.pass('got peer2 "finish"')
t.ok(peer2._writableState.finished)
t.ok(peer2._writableState.ended)
})
peer2.on('end', function () {
t.pass('got peer2 "end"')
Expand Down

0 comments on commit ee748de

Please sign in to comment.