Skip to content

Commit

Permalink
propagate error events up to App
Browse files Browse the repository at this point in the history
  • Loading branch information
feross committed Feb 24, 2014
1 parent c825473 commit 7ef94bb
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 108 deletions.
4 changes: 4 additions & 0 deletions lib/torrent-manager.js
Expand Up @@ -102,6 +102,10 @@ TorrentManager.prototype.add = function (uri) {
// TODO: Add the torrent to the public DHT so peers know to find up
})

torrent.on('error', function (err) {
self.emit('error', err)
})

self.dht.setInfoHash(torrent.infoHash)
self.dht.findPeers(MAX_PEERS) // TODO: should the DHT be concerned with max peers?
}
Expand Down
220 changes: 112 additions & 108 deletions lib/torrent.js
Expand Up @@ -19,9 +19,11 @@ function Torrent (uri, opts) {
var info = parseMagnetUri(uri)
if (!info.infoHash)
throw new Error('invalid torrent uri')

self.infoHash = info.infoHash
self.title = info.title
self.metadata = null
self.file = null

self.swarm = new Swarm(self.infoHash, opts.peerId, { dht: true })

Expand All @@ -32,116 +34,10 @@ function Torrent (uri, opts) {
}

self.swarm.on('error', function (err) {
console.error(err.message)
self.emit('error', err)
})

self.swarm.on('wire', function (wire) {
// Send KEEP-ALIVE (every 60s) so peers will not disconnect the wire
wire.setKeepAlive(true)

// If peer supports DHT, send PORT message to report what port our DHT node
// is listening on
if (wire.peerExtensions.dht) {
console.log('peer supports DHT')
// TODO: DHT doesn't support listening yet
// wire.port(dht.port)
}

// When peer sends PORT, add them to the routing table
wire.on('port', function (port) {
console.log('received PORT: ', port)
// TODO: DHT doesn't have a routing table yet
// dht.add(wire.remoteAddress, port)
})

// Timeout for wire requests to this peer
wire.setTimeout(WIRE_TIMEOUT)

// Support extended messages:
// - ut_metadata (metadata fetching, trackerless torrents)
if (wire.peerExtensions.extended) {
console.log('Wire ' + wire.remoteAddress + ' supports extended messages', wire.peerExtensions)
wire.extended(0, {
m: {
ut_metadata: 1
}
// TODO - this should be set once we have metadata
// metadata_size: xx
})
}

wire.on('extended', function (ext, buf) {
var dict
console.log('Received extended message ' + ext + ' from ' + wire.remoteAddress)

if (ext === 0) { // handshake

try {
console.log('decoding ' + buf.toString())
dict = bncode.decode(buf.toString())
console.log('got extended handshake: ' + JSON.stringify(dict))
} catch (e) {
console.error('Error decoding extended message: ' + e.message)
}

if (dict.m.ut_metadata && dict.metadata_size) {
var metadataSize = dict.metadata_size
var numPieces = Math.ceil(metadataSize / METADATA_BLOCK_SIZE)
console.log('metadata size: ' + metadataSize)
console.log(numPieces + ' pieces')

wire.metadata = new Buffer(metadataSize)

// request all pieces
for (var piece = 0; piece < numPieces; piece++) {
wire.extended(dict.m.ut_metadata, {
msg_type: 0,
piece: piece
})
}
}

} else if (ext === 1) { // ut_metadata

// 0 - request
// 1 - data
// 2 - reject

var str
var dataIndex
var data
try {
str = buf.toString()
console.log('decoding ' + str)
dataIndex = str.indexOf('ee') + 2
var msg = str.substring(0, dataIndex)
console.log('using ' + msg)
dict = bncode.decode(msg)
data = buf.slice(dataIndex)
console.log('got metadata: ' + JSON.stringify(dict))
console.log('got metadata data: ' + data.length + ' bytes')
} catch (e) {
console.error('Error decoding extended message: ' + e.message)
}

// {'msg_type': 1, 'piece': 0, 'total_size': 3425}
if (dict.msg_type === 1) { // data
console.log('total_size: ' + dict.total_size)
data.copy(wire.metadata, dict.piece * METADATA_BLOCK_SIZE)

console.log('METADATA')
console.log(wire.metadata.toString())
self.metadata = {
'announce-list': [],
info: bncode.decode(wire.metadata),
// info_hash:
}
console.log(self.metadata)
self.emit('metadata', this.metadata)
}
}
})
})
self.swarm.on('wire', self._onWire.bind(self))
}

Object.defineProperty(Torrent.prototype, 'progress', {
Expand All @@ -159,6 +55,114 @@ Torrent.prototype.addPeer = function (addr) {
self.swarm.add(addr)
}

Torrent.prototype._onWire = function (wire) {
// Send KEEP-ALIVE (every 60s) so peers will not disconnect the wire
wire.setKeepAlive(true)

// If peer supports DHT, send PORT message to report what port our DHT node
// is listening on
if (wire.peerExtensions.dht) {
console.log('peer supports DHT')
// TODO: DHT doesn't support listening yet
// wire.port(dht.port)
}

// When peer sends PORT, add them to the routing table
wire.on('port', function (port) {
console.log('received PORT: ', port)
// TODO: DHT doesn't have a routing table yet
// dht.add(wire.remoteAddress, port)
})

// Timeout for wire requests to this peer
wire.setTimeout(WIRE_TIMEOUT)

// Support extended messages:
// - ut_metadata (metadata fetching, trackerless torrents)
if (wire.peerExtensions.extended) {
console.log('Wire ' + wire.remoteAddress + ' supports extended messages', wire.peerExtensions)
wire.extended(0, {
m: {
ut_metadata: 1
}
// TODO - this should be set once we have metadata
// metadata_size: xx
})
}

wire.on('extended', function (ext, buf) {
var dict
console.log('Received extended message ' + ext + ' from ' + wire.remoteAddress)

if (ext === 0) { // handshake

try {
console.log('decoding ' + buf.toString())
dict = bncode.decode(buf.toString())
console.log('got extended handshake: ' + JSON.stringify(dict))
} catch (e) {
console.error('Error decoding extended message: ' + e.message)
}

if (dict.m.ut_metadata && dict.metadata_size) {
var metadataSize = dict.metadata_size
var numPieces = Math.ceil(metadataSize / METADATA_BLOCK_SIZE)
console.log('metadata size: ' + metadataSize)
console.log(numPieces + ' pieces')

wire.metadata = new Buffer(metadataSize)

// request all pieces
for (var piece = 0; piece < numPieces; piece++) {
wire.extended(dict.m.ut_metadata, {
msg_type: 0,
piece: piece
})
}
}

} else if (ext === 1) { // ut_metadata

// 0 - request
// 1 - data
// 2 - reject

var str
var dataIndex
var data
try {
str = buf.toString()
console.log('decoding ' + str)
dataIndex = str.indexOf('ee') + 2
var msg = str.substring(0, dataIndex)
console.log('using ' + msg)
dict = bncode.decode(msg)
data = buf.slice(dataIndex)
console.log('got metadata: ' + JSON.stringify(dict))
console.log('got metadata data: ' + data.length + ' bytes')
} catch (e) {
console.error('Error decoding extended message: ' + e.message)
}

// {'msg_type': 1, 'piece': 0, 'total_size': 3425}
if (dict.msg_type === 1) { // data
console.log('total_size: ' + dict.total_size)
data.copy(wire.metadata, dict.piece * METADATA_BLOCK_SIZE)

console.log('METADATA')
console.log(wire.metadata.toString())
self.metadata = {
'announce-list': [],
info: bncode.decode(wire.metadata),
// info_hash:
}
console.log(self.metadata)
self.emit('metadata', this.metadata)
}
}
})
})

//
// HELPER METHODS
//
Expand Down

0 comments on commit 7ef94bb

Please sign in to comment.