Skip to content

Commit

Permalink
feat(p2p): replaced old ssh relay to new relay mechanism (improved p2…
Browse files Browse the repository at this point in the history
…p network stablity over NAT).
  • Loading branch information
DEgITx committed Mar 29, 2019
1 parent 5a2f20a commit ae15f59
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 130 deletions.
Binary file removed imports/darwin/plink
Binary file not shown.
Binary file removed imports/linux/ia32/plink
Binary file not shown.
Binary file removed imports/linux/x64/plink
Binary file not shown.
Binary file removed imports/win/ia32/plink.exe
Binary file not shown.
Binary file removed imports/win/x64/plink.exe
Binary file not shown.
236 changes: 195 additions & 41 deletions src/background/p2p.js
@@ -1,4 +1,3 @@
const ssh = require('./ssh')
const shuffle = require('./shuffle')
const config = require('./config');
const net = require('net')
Expand All @@ -14,6 +13,14 @@ const {promisify} = require('util');
const mkdirp = promisify(require('mkdirp'))
const deleteFolderRecursive = require('./deleteFolderRecursive')
const compareVersions = require('compare-versions');
const portCheck = require('./portCheck')

const findGoodPort = async (port, host) => {
while (!(await portCheck(port, host))) {
port++
}
return port
}

class p2p {
constructor(send = () => {})
Expand Down Expand Up @@ -44,6 +51,13 @@ class p2p {
this.tcpServer = net.createServer();
this.tcpServer.maxConnections = config.p2pConnections * 2;

this.relay = {server: false, client: false}
this.selfAddress = null;
this.relayServers = {};
this.relayServersLimit = 8;
// <-> server commination for relays
this.relaySocket = null;

// define some help info
Object.defineProperty(this.info, 'maxPeersConnections', {
enumerable: true,
Expand Down Expand Up @@ -101,15 +115,26 @@ class p2p {
return;
}

for(const peer of this.clients) {
if(peer.peerId === data.peerId) {
// already connected from different interface
logT('p2p', 'server peer', data.peerId, 'already connected from different address', '( check:', peer.files === data.files && peer.torrents === data.torrents, ')');
return;
}
}

// protocol ok
clearTimeout(socketObject.protocolTimeout)
const { _socket: socket } = socketObject
socketObject.rats = true
socketObject.peerId = data.peerId
socketObject.relay = data.relay

callback({
protocol: 'rats',
version: this.version,
peerId: this.peerId,
relay: this.relay,
info: this.info,
peers: this.addresses(this.recommendedPeersList())
})
Expand Down Expand Up @@ -202,6 +227,72 @@ class p2p {
readable = null
});
})

this.on('relay', async (nil, callback, remote) => {
if(this.relay.server && remote.relay) {
// update status, because client ask for relay
remote.relay.client = true
if(!this.relayServers[remote.peerId] && Object.keys(this.relayServers).length < this.relayServersLimit) {
let relayPort;
const server = net.createServer();
this.relayServers[remote.peerId] = server;
let relay;
const peers = {}
const establishConnectionTimeout = setTimeout(() => {
logTE('relay', `not recived relay income connection, timeout`);
server.close();
delete this.relayServers[remote.peerId]
}, 8000)
server.on('connection', (peer) => {
logT('relay', `new relay connection`);
peer = new JsonSocket(peer);
peer._id = Math.random().toString(36).substring(2, 15)
peers[peer._id] = peer
peer.on('message', (data) => {
if (!relay && data && remote.peerId == data.peerId) {
relay = peer
logT('relay', `reply root pear fouded, current openned relays ${Object.keys(this.relayServers).length}`);
if (this.selfAddress) {
logT('relay', `exchange ${remote.peerId} relay to other peers`);
this.emit('peer', {port: relayPort, address: this.selfAddress})
}
clearTimeout(establishConnectionTimeout);
return;
}
if (relay) {
if(peer === relay && data.id && peers[data.id]) {
//logT('relay', `server message to pear ${data.id}`);
peers[data.id].sendMessage(data.data)
} else {
//logT('relay', `server message to relay ${peer._id}`);
relay.sendMessage({id: peer._id, data});
}
}
});
peer.on('close', () => {
if(peer == relay) {
logT('relay', `relay client disconnected`);
relay = null
server.close();
delete this.relayServers[remote.peerId]
} else if(relay) {
logT('relay', `relay peer disconnected`);
relay.sendMessage({id: peer._id, close: true});
}
if(peer._id && peers[peer._id])
delete peers[peer._id]
});
peer.on('error', (err) => {
logTE('relay', `relay server peer error ${err}`);
})
});
relayPort = await findGoodPort(Math.floor(Math.random() * 50000) + 10000, '0.0.0.0')
server.listen(relayPort, '0.0.0.0');
logT('relay', `establish new relay server on port`, relayPort);
callback({port: relayPort})
}
}
})
}

listen() {
Expand All @@ -210,60 +301,48 @@ class p2p {
}

checkPortAndRedirect(address, port) {
isPortReachable(port, {host: address}).then((isAvailable) => {
this.selfAddress = address;
isPortReachable(port, {host: address}).then(async (isAvailable) => {
if(this.closing)
return // responce can be very late, and ssh can start after closing of program, this will break on linux

this.p2pStatus = isAvailable ? 2 : 0
this.send('p2pStatus', this.p2pStatus)
return // responce can be very late, and can start after closing of program, this will break on linux

// all ok don't need to start any ssh tunnels
if(isAvailable)
{
logT('ssh', 'tcp p2p port is reachable - all ok')
return;
logT('relay', 'tcp p2p port is reachable - all ok')
const server = net.createServer();
const randomPort = await findGoodPort(Math.floor(Math.random() * 50000) + 10000, '0.0.0.0')
server.listen(randomPort, '0.0.0.0');
isPortReachable(randomPort, {host: address}).then(async (isAvailable) => {
if(isAvailable) {
logT('relay', 'relay server port check success - can be using as relay')
this.relay.server = true;
} else {
logT('relay', 'relay server port check failes - not using as relay')
}
server.close();
})
this.p2pStatus = 2
this.send('p2pStatus', this.p2pStatus)
}
else
{
logT('ssh', 'tcp p2p port is unreachable - try ssh tunnel')
}

if(!this.encryptor)
{
logT('ssh', 'something wrong with encryptor')
return
}

let remoteHost = '03de848286b8fbe6e775e6601c3bcfb9b71dfddcacb861b061458ce5e4020a15a649aabef88234d2af01ead4276a6de1YlqiJBlXCmoA7TpnbRuSRHNDsIBLlZ9McbovKJXHtAA='

this.ssh = ssh(config.spiderPort, this.encryptor.decrypt(remoteHost), 'relay', 'relaymytrf', (selfPeer) => {
if(!selfPeer)
{
this.p2pStatus = 0
this.send('p2pStatus', this.p2pStatus)
this.externalPeers = []
return
logT('relay', 'tcp p2p port is unreachable, using relay client')
this.relay.client = true;
// try reconnect to new relay server
let candidatePeer = this.peersList().filter(peer => peer.relay && peer.relay.server)
if(candidatePeer && candidatePeer.length > 0) {
logT('relay', 'reconnect to new relay, because no relays connection before check');
this.connectToRelay(candidatePeer[0])
}

logT('ssh', 'ssh tunnel success, redirect peers to ssh')

this.p2pStatus = 1
this.send('p2pStatus', this.p2pStatus)
this.ignore(selfPeer)
this.emit('peer', selfPeer)
this.externalPeers = [selfPeer] // add external peers and tell this on every connection
})
this.p2pStatus = 0
this.send('p2pStatus', this.p2pStatus)
}
})
}

close()
{
this.closing = true
if(this.ssh)
{
logT('ssh', 'closing ssh...')
this.ssh.kill()
}
// close server
const promise = new Promise(resolve => this.tcpServer.close(resolve))
for (const client in this.clients) {
Expand Down Expand Up @@ -329,6 +408,76 @@ class p2p {
return _.uniq(peers)
}

connectToRelay(relayPeer, tryes = 3)
{
if(this.relay.client && relayPeer.relay.server && !this.relaySocket) {
relayPeer.emit('relay', {}, ({port} = {}) => {
if(!port) {
logTE('relay', 'no port in relay request responce');
return;
}

logT('relay', 'try connecting to new relay', relayPeer.peerId)
let peers = {}
this.relaySocket = new JsonSocket(new net.Socket());
this.relaySocket.connect(port, relayPeer.address, () => {
logT('relay', 'connected to relay', relayPeer.peerId);
this.relaySocket.sendMessage({peerId: this.peerId})
this.p2pStatus = 1
this.send('p2pStatus', this.p2pStatus)
tryes = 3; // restore tryies bebause we connected
});

this.relaySocket.on('message', (data) => {
if(!data.id)
return

if(!peers[data.id]) {
if(data.close)
return

peers[data.id] = new JsonSocket(new net.Socket());
peers[data.id].on('message', (toPeer) => {
//logT('relay', 'client message to relay', data.id);
this.relaySocket.sendMessage({id: data.id, data: toPeer})
})
peers[data.id].connect(config.spiderPort, '0.0.0.0', () => {
//logT('relay', 'client message to my server', data.id);
peers[data.id].sendMessage(data.data)
});
} else {
if(data.close) {
peers[data.id].destroy();
delete peers[data.id];
logT('relay', 'peer disconnected');
return
}

//logT('relay', 'client message to my server', data.id);
peers[data.id].sendMessage(data.data)
}
});

this.relaySocket.on('close', () => {
logT('relay', 'relay client closed because server exit');
for(const id in peers) {
peers[id].destroy();
}
peers = null
this.relaySocket = null
this.p2pStatus = 0
this.send('p2pStatus', this.p2pStatus)
// try reconnect to new relay server
let candidatePeer = this.peersList().filter(peer => peer.relay && peer.relay.server && peer != relayPeer)
if(candidatePeer && candidatePeer.length > 0 && tryes > 0) {
logT('relay', 'reconnect to new relay, because old closed');
this.connectToRelay(candidatePeer[0], --tryes)
}
});
})
}
}

connect(address)
{
this.peers.push(address)
Expand Down Expand Up @@ -368,6 +517,7 @@ class p2p {
port: config.spiderPort,
version: this.version,
peerId: this.peerId,
relay: this.relay,
info: this.info,
peers: this.addresses(this.recommendedPeersList()).concat(this.externalPeers) // also add external peers
}, (data) => {
Expand Down Expand Up @@ -406,6 +556,7 @@ class p2p {
//extra info
address.version = data.version
address.peerId = data.peerId
address.relay = data.relay
address.info = data.info
this.send('peer', {
size: this.size,
Expand All @@ -419,6 +570,9 @@ class p2p {
{
data.peers.forEach(peer => this.add(peer))
}

// try connect to relay if needed
this.connectToRelay(address)
})
});

Expand Down
1 change: 0 additions & 1 deletion src/background/spider.js
Expand Up @@ -39,7 +39,6 @@ const mime = require('mime');
//server.listen(config.httpPort);
//console.log('Listening web server on', config.httpPort, 'port')


module.exports = function (send, recive, dataDirectory, version, env)
{
this.initialized = (async () =>
Expand Down

0 comments on commit ae15f59

Please sign in to comment.