Skip to content

Commit

Permalink
Merge pull request #25 from hyperswarm/v1-progress
Browse files Browse the repository at this point in the history
v1 tests
  • Loading branch information
davidmarkclements committed Jun 11, 2019
2 parents b87256f + ed858f1 commit 2e45130
Show file tree
Hide file tree
Showing 21 changed files with 2,151 additions and 257 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
node_modules
sandbox.js
sandbox
.nyc_output
coverage
*.0x
10 changes: 5 additions & 5 deletions example.js
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
'use strict'
const { inspect } = require('util')
const swarm = require('./')
const hyperswarm = require('./')
const crypto = require('crypto')
const net = swarm()
const swarm = hyperswarm()

if (!process.argv[2]) { throw Error('node example.js <topic-key>') }

const key = crypto.createHash('sha256')
.update(process.argv[2])
.digest()

net.connectivity((err, capabilities) => {
swarm.connectivity((err, capabilities) => {
console.log('network capabilities', capabilities, err || '')
})

net.join(key, {
swarm.join(key, {
announce: true,
lookup: true
})

net.on('connection', function (socket, info) {
swarm.on('connection', function (socket, info) {
const {
priority,
status,
Expand Down
11 changes: 8 additions & 3 deletions lib/peer-info.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ const BANNED_OR_ACTIVE = BANNED | ACTIVE
const ACTIVE_OR_TRIED = ACTIVE | TRIED

class PeerInfo {
constructor (peer) {
constructor (peer = null) {
this.priority = (peer && peer.local) ? 3 : 2
this.status = RECONNECT | FIREWALLED
this.retries = 0
this.peer = peer || null
this.peer = peer
this.client = peer !== null
this.stream = null

Expand Down Expand Up @@ -74,7 +74,12 @@ class PeerInfo {
requeue () {
if (this.status & BANNED) return -1
if (!(this.status & RECONNECT)) return -1
if (this.retries >= 3) return -1
if (this.retries >= 3) {
// if we don't increment retries past 3
// retries will never be > 3 in update method
this.retries++
return -1
}
return this.retries++
}
}
Expand Down
3 changes: 2 additions & 1 deletion lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ class PeerQueue extends EventEmitter {
this._queue.add(info)
readable = true
}

if (empty && readable) this.emit('readable')
}

Expand Down Expand Up @@ -102,3 +101,5 @@ class PeerQueue extends EventEmitter {
function toID (peer) {
return peer.host + ':' + peer.port
}

module.exports.PeerQueue = PeerQueue
10 changes: 7 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@
"main": "swarm.js",
"dependencies": {
"@hyperswarm/discovery": "^1.1.0",
"@hyperswarm/guts": "1.0.0",
"shuffled-priority-queue": "^2.1.0",
"tap": "^13.1.2",
"utp-native": "^2.1.3"
},
"devDependencies": {
"@hyperswarm/dht": "0.0.1",
"events.once": "^2.0.2",
"nonsynchronous": "^1.0.0",
"standard": "^12.0.1",
"tape": "^4.9.1"
"tap": "^14.1.11"
},
"scripts": {
"test": "tape test/*.js && standard"
"test": "tap -j 1 -R classic test/*.test.js && standard --fix",
"cov": "tap -j 1 -R classic --coverage-report=html test/*.test.js"
},
"repository": {
"type": "git",
Expand Down
107 changes: 88 additions & 19 deletions swarm.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,33 @@ const peerQueue = require('./lib/queue')
const { EventEmitter } = require('events')
const guts = require('@hyperswarm/guts')

const MAX_PEERS_DEFAULT = 3
const MAX_SERVER_SOCKETS = Infinity
const MAX_CLIENT_SOCKETS = Infinity
const MAX_PEERS = 24

const ERR_DESTROYED = 'swarm has been destroyed'
const ERR_MISSING_KEY = 'key is required and must be a buffer'
const ERR_JOIN_OPTS = 'join options must enable lookup, announce or both, but not neither'

const kDrain = Symbol('hyperswarm.drain')
const kIncrPeerCount = Symbol('hyperswarm.incrPeerCount')
const kDecrPeerCount = Symbol('hyperswarm.decrPeerCount')
const kQueue = Symbol('hyperswarm.queue')

module.exports = opts => new Swarm(opts)

class Swarm extends EventEmitter {
constructor (opts = {}) {
super()
const {
maxPeers = MAX_PEERS_DEFAULT,
maxServerSockets = MAX_SERVER_SOCKETS,
maxClientSockets = MAX_CLIENT_SOCKETS,
maxPeers = MAX_PEERS,
bootstrap,
ephemeral
} = opts
const queue = peerQueue()

const network = guts({
bootstrap,
ephemeral,
Expand All @@ -26,57 +39,106 @@ class Swarm extends EventEmitter {
const info = peerInfo(null)
info.connected(socket, isTCP)
this.emit('connection', socket, info)
this.serverSockets += 1
this[kIncrPeerCount]()
socket.once('close', () => {
this.serverSockets -= 1
this.emit('disconnection', socket, info)
this[kDecrPeerCount]()
})
},
close: () => this.emit('close')
})
queue.on('readable', this._drain(queue))

network.tcp.maxConnections = maxServerSockets
network.utp.maxConnections = maxServerSockets

queue.on('readable', this[kDrain](queue))

this.destroyed = false
this.clientSockets = 0
this.serverSockets = 0
this.peers = 0

this.maxPeers = maxPeers
this.emphemeral = ephemeral !== false
this.maxServerSockets = maxServerSockets
this.maxClientSockets = maxClientSockets

this.open = this.peers < this.maxPeers
this.ephemeral = ephemeral !== false

this.network = network
this.queue = queue
this[kQueue] = queue
}
_drain (queue) {
[kDrain] (queue) {
const onConnect = (info) => (err, socket, isTCP) => {
if (err) {
this.peers -= 1
this.clientSockets -= 1
this[kDecrPeerCount]()
queue.requeue(info)
drain()
return
}
info.connected(socket, isTCP)
this.emit('connection', socket, info)
socket.on('close', () => {
this.peers -= 1
this.clientSockets -= 1
this.emit('disconnection', socket, info)
this[kDecrPeerCount]()
info.disconnected()
queue.requeue(info)
setImmediate(drain)
})
drain()
}
const drain = () => {
if (this.peers >= this.maxPeers) return
if (this.open === false) return
if (this.clientSockets >= this.maxClientSockets) return
const info = queue.shift()
if (!info) return
this.peers += 1
this.network.connect(info.peer, onConnect(info))
if (!info) return
this.clientSockets += 1
this[kIncrPeerCount]()
this.connect(info.peer, onConnect(info))
}
return drain
}
[kIncrPeerCount] () {
this.peers += 1
this.open = this.peers < this.maxPeers
if (this.open === false) {
this.network.tcp.maxConnections = -1
this.network.utp.maxConnections = -1
}
}
[kDecrPeerCount] () {
this.peers -= 1
if (this.open) return
this.open = this.peers < this.maxPeers
// note: defensive conditional, to the best of knowledge
// and after some investigation, else branch should never happen
/* istanbul ignore else */
if (this.open === true) {
this.network.tcp.maxConnections = this.maxServerSockets
this.network.utp.maxConnections = this.maxServerSockets
}
}
address () {
if (this.destroyed) throw Error(ERR_DESTROYED)
return this.network.address()
}
listen (port, cb) {
if (this.destroyed) throw Error(ERR_DESTROYED)
this.network.bind(port, cb)
}
join (key, opts = {}) {
if (this.destroyed) throw Error(ERR_DESTROYED)
const { network } = this

if (Buffer.isBuffer(key) === false) throw Error(ERR_MISSING_KEY)

const { announce = false, lookup = true } = opts

if (!announce && !lookup) return

if (!announce && !lookup) throw Error(ERR_JOIN_OPTS)
network.bind((err) => {
if (err) {
this.emit('error', err)
Expand All @@ -88,18 +150,22 @@ class Swarm extends EventEmitter {
: network.lookup(key)

topic.on('update', () => this.emit('update'))
topic.on('peer', (peer) => {
this.emit('peer', peer)
this.queue.add(peer)
})
if (lookup) {
topic.on('peer', (peer) => {
this.emit('peer', peer)
this[kQueue].add(peer)
})
}
})
}
leave (key) {
if (Buffer.isBuffer(key) === false) throw Error(ERR_MISSING_KEY)
if (this.destroyed) return
const { network } = this
const domain = network.discovery._domain(key)
const topics = network.discovery._domains.get(domain)
if (!topics) return

for (const topic of topics) {
if (Buffer.compare(key, topic.key) === 0) {
topic.destroy()
Expand All @@ -108,9 +174,11 @@ class Swarm extends EventEmitter {
}
}
connect (peer, cb) {
if (this.destroyed) throw Error(ERR_DESTROYED)
this.network.connect(peer, cb)
}
connectivity (cb) {
if (this.destroyed) throw Error(ERR_DESTROYED)
this.network.bind((err) => {
if (err) {
cb(err, {
Expand Down Expand Up @@ -138,7 +206,8 @@ class Swarm extends EventEmitter {
})
}
destroy (cb) {
this.queue.destroy()
this.destroyed = true
this[kQueue].destroy()
this.network.close(cb)
}
}
Expand Down
44 changes: 0 additions & 44 deletions test/bulk-timer.js

This file was deleted.

Loading

0 comments on commit 2e45130

Please sign in to comment.