From 8b589f03ac6ffa21f167bf2221025f2aab25bea3 Mon Sep 17 00:00:00 2001 From: holgerd77 Date: Sun, 3 Jan 2021 15:48:20 +0100 Subject: [PATCH 1/8] devp2p -> DPT: improved peer related typings --- packages/devp2p/src/dpt/dpt.ts | 25 +++++++++++++------- packages/devp2p/src/dpt/kbucket.ts | 38 ++++++++++++++++-------------- packages/devp2p/src/dpt/message.ts | 10 ++------ packages/devp2p/src/dpt/server.ts | 3 +-- 4 files changed, 39 insertions(+), 37 deletions(-) diff --git a/packages/devp2p/src/dpt/dpt.ts b/packages/devp2p/src/dpt/dpt.ts index 101f0c6c73..9d3de7357a 100644 --- a/packages/devp2p/src/dpt/dpt.ts +++ b/packages/devp2p/src/dpt/dpt.ts @@ -10,6 +10,13 @@ import { Server as DPTServer } from './server' const debug = createDebugLogger('devp2p:dpt') +export interface PeerInfo { + id?: Uint8Array | Buffer + address?: string + udpPort?: number | null + tcpPort?: number | null +} + export class DPT extends EventEmitter { privateKey: Buffer banlist: BanList @@ -28,8 +35,8 @@ export class DPT extends EventEmitter { this.banlist = new BanList() this._kbucket = new KBucket(this._id) - this._kbucket.on('added', (peer) => this.emit('peer:added', peer)) - this._kbucket.on('removed', (peer) => this.emit('peer:removed', peer)) + this._kbucket.on('added', (peer: PeerInfo) => this.emit('peer:added', peer)) + this._kbucket.on('removed', (peer: PeerInfo) => this.emit('peer:removed', peer)) this._kbucket.on('ping', this._onKBucketPing) this._server = new DPTServer(this, this.privateKey, { @@ -55,7 +62,7 @@ export class DPT extends EventEmitter { this._server.destroy(...args) } - _onKBucketPing(oldPeers: any[], newPeer: any): void { + _onKBucketPing(oldPeers: PeerInfo[], newPeer: PeerInfo): void { if (this.banlist.has(newPeer)) return let count = 0 @@ -81,7 +88,7 @@ export class DPT extends EventEmitter { for (const peer of peers) this.addPeer(peer).catch(() => {}) } - async bootstrap(peer: any): Promise { + async bootstrap(peer: PeerInfo): Promise { debug(`bootstrap with peer ${peer.address}:${peer.udpPort}`) peer = await this.addPeer(peer) @@ -89,7 +96,7 @@ export class DPT extends EventEmitter { this._server.findneighbours(peer, this._id) } - async addPeer(obj: any): Promise { + async addPeer(obj: PeerInfo): Promise { if (this.banlist.has(obj)) throw new Error('Peer is banned') debug(`attempt adding peer ${obj.address}:${obj.udpPort}`) @@ -109,15 +116,15 @@ export class DPT extends EventEmitter { } } - getPeer(obj: any): any { + getPeer(obj: string | Buffer | PeerInfo) { return this._kbucket.get(obj) } - getPeers(): any[] { + getPeers() { return this._kbucket.getAll() } - getClosestPeers(id: string): any { + getClosestPeers(id: string) { return this._kbucket.closest(id) } @@ -125,7 +132,7 @@ export class DPT extends EventEmitter { this._kbucket.remove(obj) } - banPeer(obj: any, maxAge?: number) { + banPeer(obj: string | Buffer | PeerInfo, maxAge?: number) { this.banlist.add(obj, maxAge) this._kbucket.remove(obj) } diff --git a/packages/devp2p/src/dpt/kbucket.ts b/packages/devp2p/src/dpt/kbucket.ts index 2e5d621f64..3208513520 100644 --- a/packages/devp2p/src/dpt/kbucket.ts +++ b/packages/devp2p/src/dpt/kbucket.ts @@ -1,56 +1,58 @@ import { EventEmitter } from 'events' import _KBucket = require('k-bucket') +import { PeerInfo } from './dpt' const KBUCKET_SIZE = 16 const KBUCKET_CONCURRENCY = 3 -export interface KObj { - id?: string - port?: string - address?: string +export interface CustomContact extends PeerInfo { + id: Uint8Array | Buffer + vectorClock: number } export class KBucket extends EventEmitter { - _peers: Map = new Map() + _peers: Map = new Map() _kbucket: _KBucket constructor(id: string | Buffer) { super() - this._kbucket = new _KBucket({ + this._kbucket = new _KBucket({ localNodeId: typeof id === 'string' ? Buffer.from(id) : id, numberOfNodesPerKBucket: KBUCKET_SIZE, numberOfNodesToPing: KBUCKET_CONCURRENCY, }) - this._kbucket.on('added', (peer: any) => { + this._kbucket.on('added', (peer: PeerInfo) => { KBucket.getKeys(peer).forEach((key) => this._peers.set(key, peer)) this.emit('added', peer) }) - this._kbucket.on('removed', (peer: any) => { + this._kbucket.on('removed', (peer: PeerInfo) => { KBucket.getKeys(peer).forEach((key) => this._peers.delete(key)) this.emit('removed', peer) }) - this._kbucket.on('ping', (...args: any[]) => this.emit('ping', ...args)) + this._kbucket.on('ping', (oldPeers: PeerInfo[], newPeer: PeerInfo) => { + this.emit('ping', { oldPeers, newPeer }) + }) } - static getKeys(obj: Buffer | string | KObj): string[] { + static getKeys(obj: Buffer | string | PeerInfo): string[] { if (Buffer.isBuffer(obj)) return [obj.toString('hex')] if (typeof obj === 'string') return [obj] const keys = [] if (Buffer.isBuffer(obj.id)) keys.push(obj.id.toString('hex')) - if (obj.address && obj.port) keys.push(`${obj.address}:${obj.port}`) + //if (obj.address && obj.port) keys.push(`${obj.address}:${obj.port}`) return keys } - add(peer: any) { + add(peer: PeerInfo) { const isExists = KBucket.getKeys(peer).some((key) => this._peers.has(key)) - if (!isExists) this._kbucket.add(peer) + if (!isExists) this._kbucket.add(peer as CustomContact) } - get(obj: Buffer | string | KObj) { + get(obj: Buffer | string | PeerInfo) { for (const key of KBucket.getKeys(obj)) { const peer = this._peers.get(key) if (peer !== undefined) return peer @@ -59,16 +61,16 @@ export class KBucket extends EventEmitter { return null } - getAll(): Array { + getAll(): Array { return this._kbucket.toArray() } - closest(id: string): any { + closest(id: string): PeerInfo[] { return this._kbucket.closest(Buffer.from(id), KBUCKET_SIZE) } - remove(obj: Buffer | string | KObj) { + remove(obj: Buffer | string | PeerInfo) { const peer = this.get(obj) - if (peer !== null) this._kbucket.remove(peer.id) + if (peer !== null) this._kbucket.remove((peer as CustomContact).id) } } diff --git a/packages/devp2p/src/dpt/message.ts b/packages/devp2p/src/dpt/message.ts index 240a1d2056..b3ff95e649 100644 --- a/packages/devp2p/src/dpt/message.ts +++ b/packages/devp2p/src/dpt/message.ts @@ -3,6 +3,7 @@ import ip from 'ip' import * as rlp from 'rlp' import secp256k1 from 'secp256k1' import { keccak256, int2buffer, buffer2int, assertEq, unstrictDecode } from '../util' +import { PeerInfo } from './dpt' const debug = createDebugLogger('devp2p:dpt:server') @@ -10,13 +11,6 @@ function getTimestamp() { return (Date.now() / 1000) | 0 } -export interface PeerInfo { - id?: Buffer - address?: string - udpPort?: number | null - tcpPort?: number | null -} - const timestamp = { encode: function (value = getTimestamp() + 60) { const buffer = Buffer.allocUnsafe(4) @@ -133,7 +127,7 @@ type OutNeighborMsg = { [0]: Buffer[][]; [1]: Buffer } const neighbours = { encode: function (obj: InNeighborMsg): OutNeighborMsg { return [ - obj.peers.map((peer: PeerInfo) => endpoint.encode(peer).concat(peer.id!)), + obj.peers.map((peer: PeerInfo) => endpoint.encode(peer).concat(peer.id! as Buffer)), timestamp.encode(obj.timestamp), ] }, diff --git a/packages/devp2p/src/dpt/server.ts b/packages/devp2p/src/dpt/server.ts index 6a486c6300..e56d08fb55 100644 --- a/packages/devp2p/src/dpt/server.ts +++ b/packages/devp2p/src/dpt/server.ts @@ -5,9 +5,8 @@ import { debug as createDebugLogger } from 'debug' import LRUCache = require('lru-cache') import { encode, decode } from './message' import { keccak256, pk2id, createDeferred, formatLogId } from '../util' -import { DPT } from './dpt' +import { DPT, PeerInfo } from './dpt' import { Socket as DgramSocket, RemoteInfo } from 'dgram' -import { PeerInfo } from './message' const debug = createDebugLogger('devp2p:dpt:server') const verbose = createDebugLogger('verbose').enabled From 1e6d1c9a6f6db7e53e7f97bfd135719b110ddaf4 Mon Sep 17 00:00:00 2001 From: holgerd77 Date: Sun, 3 Jan 2021 15:50:38 +0100 Subject: [PATCH 2/8] client: more generous protocol timeout for better support of slower internet connections --- packages/client/lib/net/server/rlpxserver.ts | 2 +- packages/client/lib/service/ethereumservice.ts | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/client/lib/net/server/rlpxserver.ts b/packages/client/lib/net/server/rlpxserver.ts index 064a3f424a..01207c5748 100644 --- a/packages/client/lib/net/server/rlpxserver.ts +++ b/packages/client/lib/net/server/rlpxserver.ts @@ -122,7 +122,7 @@ export class RlpxServer extends Server { async bootstrap(): Promise { const promises = this.bootnodes.map((node) => { const bootnode = { - address: node.ip, + address: node.ip!, udpPort: node.port, tcpPort: node.port, } diff --git a/packages/client/lib/service/ethereumservice.ts b/packages/client/lib/service/ethereumservice.ts index ee529eecd0..1e18ff98f8 100644 --- a/packages/client/lib/service/ethereumservice.ts +++ b/packages/client/lib/service/ethereumservice.ts @@ -11,11 +11,11 @@ export interface EthereumServiceOptions extends ServiceOptions { /* Blockchain database */ db?: LevelUp - /* Protocol timeout in ms (default: 8000) */ - timeout?: number - - /* Sync retry interval in ms (default: 1000) */ + /* Sync retry interval in ms (default: 8000) */ interval?: number + + /* Protocol timeout in ms (default: 2000) */ + timeout?: number } /** @@ -39,7 +39,7 @@ export class EthereumService extends Service { this.flow = new FlowControl() this.chain = options.chain ?? new Chain(options) this.interval = options.interval ?? 8000 - this.timeout = options.timeout ?? 1000 + this.timeout = options.timeout ?? 2000 } /** From 8cf09f56ae6d2d6c445068198c52ce830da3e898 Mon Sep 17 00:00:00 2001 From: holgerd77 Date: Sun, 3 Jan 2021 16:37:33 +0100 Subject: [PATCH 3/8] devp2p -> DPT: new DPTOptions interface, DPT and Server option descriptions --- packages/devp2p/src/dpt/dpt.ts | 34 +++++++++++++++++++++++++++++-- packages/devp2p/src/dpt/server.ts | 21 +++++++++++++++++-- 2 files changed, 51 insertions(+), 4 deletions(-) diff --git a/packages/devp2p/src/dpt/dpt.ts b/packages/devp2p/src/dpt/dpt.ts index 9d3de7357a..e613d8cc7d 100644 --- a/packages/devp2p/src/dpt/dpt.ts +++ b/packages/devp2p/src/dpt/dpt.ts @@ -17,6 +17,36 @@ export interface PeerInfo { tcpPort?: number | null } +export interface DPTOptions { + /** + * Timeout for peer requests + * + * Default: 10s + */ + timeout?: number + + /** + * Network info to send a long a request + * + * Default: 0.0.0.0, no UDP or TCP port provided + */ + endpoint?: PeerInfo + + /** + * Function for socket creation + * + * Default: dgram-created socket + */ + createSocket?: Function + + /** + * Interval for peer table refresh + * + * Default: 60s + */ + refreshInterval?: number +} + export class DPT extends EventEmitter { privateKey: Buffer banlist: BanList @@ -26,7 +56,7 @@ export class DPT extends EventEmitter { private _server: DPTServer private _refreshIntervalId: NodeJS.Timeout - constructor(privateKey: Buffer, options: any) { + constructor(privateKey: Buffer, options: DPTOptions) { super() this.privateKey = Buffer.from(privateKey) @@ -40,9 +70,9 @@ export class DPT extends EventEmitter { this._kbucket.on('ping', this._onKBucketPing) this._server = new DPTServer(this, this.privateKey, { - createSocket: options.createSocket, timeout: options.timeout, endpoint: options.endpoint, + createSocket: options.createSocket, }) this._server.once('listening', () => this.emit('listening')) this._server.once('close', () => this.emit('close')) diff --git a/packages/devp2p/src/dpt/server.ts b/packages/devp2p/src/dpt/server.ts index e56d08fb55..9f8a6458e4 100644 --- a/packages/devp2p/src/dpt/server.ts +++ b/packages/devp2p/src/dpt/server.ts @@ -13,9 +13,26 @@ const verbose = createDebugLogger('verbose').enabled const VERSION = 0x04 -export interface DptServerOptions { +export interface DPTServerOptions { + /** + * Timeout for peer requests + * + * Default: 10s + */ timeout?: number + + /** + * Network info to send a long a request + * + * Default: 0.0.0.0, no UDP or TCP port provided + */ endpoint?: PeerInfo + + /** + * Function for socket creation + * + * Default: dgram-created socket + */ createSocket?: Function } @@ -29,7 +46,7 @@ export class Server extends EventEmitter { _requestsCache: LRUCache> _socket: DgramSocket | null - constructor(dpt: DPT, privateKey: Buffer, options: DptServerOptions) { + constructor(dpt: DPT, privateKey: Buffer, options: DPTServerOptions) { super() this._dpt = dpt From 73be137b2b83a6951f53d33019c34952db1972cc Mon Sep 17 00:00:00 2001 From: holgerd77 Date: Sun, 3 Jan 2021 16:39:25 +0100 Subject: [PATCH 4/8] devp2p -> DPT: limit KBucket constructor id parameter to Buffer --- packages/devp2p/src/dpt/kbucket.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/devp2p/src/dpt/kbucket.ts b/packages/devp2p/src/dpt/kbucket.ts index 3208513520..07eb8cb435 100644 --- a/packages/devp2p/src/dpt/kbucket.ts +++ b/packages/devp2p/src/dpt/kbucket.ts @@ -13,11 +13,11 @@ export interface CustomContact extends PeerInfo { export class KBucket extends EventEmitter { _peers: Map = new Map() _kbucket: _KBucket - constructor(id: string | Buffer) { + constructor(localNodeId: Buffer) { super() this._kbucket = new _KBucket({ - localNodeId: typeof id === 'string' ? Buffer.from(id) : id, + localNodeId, numberOfNodesPerKBucket: KBUCKET_SIZE, numberOfNodesToPing: KBUCKET_CONCURRENCY, }) From b3b0e7866c6dc9ec45b1bbc2d0577c214b967a45 Mon Sep 17 00:00:00 2001 From: holgerd77 Date: Sun, 3 Jan 2021 16:56:18 +0100 Subject: [PATCH 5/8] devp2p, client -> DPT: fixed client RLPXServer tests --- packages/client/test/net/server/rlpxserver.spec.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/client/test/net/server/rlpxserver.spec.ts b/packages/client/test/net/server/rlpxserver.spec.ts index 9d99527723..b849670df1 100644 --- a/packages/client/test/net/server/rlpxserver.spec.ts +++ b/packages/client/test/net/server/rlpxserver.spec.ts @@ -75,10 +75,10 @@ tape('[RlpxServer]', async (t) => { server.dpt = td.object() server.rlpx = td.object() td.when( - server.dpt!.bootstrap({ address: '10.0.0.1', udpPort: '1234', tcpPort: '1234' }) + server.dpt!.bootstrap({ address: '10.0.0.1', udpPort: 1234, tcpPort: 1234 }) ).thenResolve() td.when( - server.dpt!.bootstrap({ address: '10.0.0.2', udpPort: '1234', tcpPort: '1234' }) + (server.dpt! as any).bootstrap({ address: '10.0.0.2', udpPort: '1234', tcpPort: '1234' }) ).thenReject(new Error('err0')) server.on('error', (err: Error) => t.equals(err.message, 'err0', 'got error')) await server.start() @@ -109,10 +109,10 @@ tape('[RlpxServer]', async (t) => { destroy: td.func(), }) td.when( - server.dpt?.bootstrap({ address: '10.0.0.1', udpPort: '1234', tcpPort: '1234' }) + server.dpt!.bootstrap({ address: '10.0.0.1', udpPort: 1234, tcpPort: 1234 }) ).thenResolve(undefined) td.when( - server.dpt?.bootstrap({ address: '10.0.0.2', udpPort: '1234', tcpPort: '1234' }) + (server.dpt! as any).bootstrap({ address: '10.0.0.2', udpPort: '1234', tcpPort: '1234' }) ).thenReject(new Error('err0')) server.on('error', (err) => t.equals(err.message, 'err0', 'got error')) await server.start() From 9fe77d695f748a2aadbda49c5c9f5063d94ecfe3 Mon Sep 17 00:00:00 2001 From: holgerd77 Date: Sun, 3 Jan 2021 16:59:09 +0100 Subject: [PATCH 6/8] devp2p -> DPT: added types for DPT ban-list --- packages/devp2p/src/dpt/ban-list.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/packages/devp2p/src/dpt/ban-list.ts b/packages/devp2p/src/dpt/ban-list.ts index 4d1c296f0a..189017065a 100644 --- a/packages/devp2p/src/dpt/ban-list.ts +++ b/packages/devp2p/src/dpt/ban-list.ts @@ -2,24 +2,25 @@ import LRUCache from 'lru-cache' import { debug as createDebugLogger } from 'debug' import { KBucket } from './kbucket' import { formatLogId } from '../util' +import { PeerInfo } from './dpt' const debug = createDebugLogger('devp2p:dpt:ban-list') const verbose = createDebugLogger('verbose').enabled export class BanList { - private lru: LRUCache + private lru: LRUCache constructor() { this.lru = new LRUCache({ max: 30000 }) // 10k should be enough (each peer obj can has 3 keys) } - add(obj: any, maxAge?: number) { + add(obj: string | Buffer | PeerInfo, maxAge?: number) { for (const key of KBucket.getKeys(obj)) { this.lru.set(key, true, maxAge) debug(`Added peer ${formatLogId(key, verbose)}, size: ${this.lru.length}`) } } - has(obj: any): boolean { + has(obj: string | Buffer | PeerInfo): boolean { return KBucket.getKeys(obj).some((key: string) => Boolean(this.lru.get(key))) } } From 2b2c0c263dac4ac986a17a47810b3d8dd8164160 Mon Sep 17 00:00:00 2001 From: holgerd77 Date: Sun, 3 Jan 2021 17:14:45 +0100 Subject: [PATCH 7/8] devp2p -> DPT: fixed undefined array access in ETH._getStatusString() on malformed ETH/64 status msgs (triggered in client runs) --- packages/devp2p/src/dpt/ban-list.ts | 4 ++-- packages/devp2p/src/eth/index.ts | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/devp2p/src/dpt/ban-list.ts b/packages/devp2p/src/dpt/ban-list.ts index 189017065a..5d4c3a8c00 100644 --- a/packages/devp2p/src/dpt/ban-list.ts +++ b/packages/devp2p/src/dpt/ban-list.ts @@ -13,14 +13,14 @@ export class BanList { this.lru = new LRUCache({ max: 30000 }) // 10k should be enough (each peer obj can has 3 keys) } - add(obj: string | Buffer | PeerInfo, maxAge?: number) { + add(obj: string | Buffer | PeerInfo, maxAge?: number) { for (const key of KBucket.getKeys(obj)) { this.lru.set(key, true, maxAge) debug(`Added peer ${formatLogId(key, verbose)}, size: ${this.lru.length}`) } } - has(obj: string | Buffer | PeerInfo): boolean { + has(obj: string | Buffer | PeerInfo): boolean { return KBucket.getKeys(obj).some((key: string) => Boolean(this.lru.get(key))) } } diff --git a/packages/devp2p/src/eth/index.ts b/packages/devp2p/src/eth/index.ts index a6f57bc4d7..77bbfbe2ea 100644 --- a/packages/devp2p/src/eth/index.ts +++ b/packages/devp2p/src/eth/index.ts @@ -182,8 +182,8 @@ export class ETH extends EventEmitter { verbose )}` if (this._version >= 64) { - sStr += `, ForkHash: 0x${(status[5][0] as Buffer).toString('hex')}` - sStr += `, ForkNext: ${buffer2int(status[5][1] as Buffer)}` + sStr += `, ForkHash: 0x${status[5] ? '0x' + (status[5][0] as Buffer).toString('hex') : '-'}` + sStr += `, ForkNext: ${status[5] ? buffer2int(status[5][1] as Buffer) : '-'}` } sStr += `]` return sStr From 9cc2ec74eac11d8bd3c7bc8ae2d33c0e5e57744a Mon Sep 17 00:00:00 2001 From: holgerd77 Date: Sun, 3 Jan 2021 18:30:14 +0100 Subject: [PATCH 8/8] devp2p, client -> DPT: fixed DPT.bootstrap() error propagation handling in client (fixes wrongly propagated timeout error on bootnodes) --- packages/client/lib/net/server/rlpxserver.ts | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/client/lib/net/server/rlpxserver.ts b/packages/client/lib/net/server/rlpxserver.ts index 01207c5748..102c14ca40 100644 --- a/packages/client/lib/net/server/rlpxserver.ts +++ b/packages/client/lib/net/server/rlpxserver.ts @@ -126,13 +126,13 @@ export class RlpxServer extends Server { udpPort: node.port, tcpPort: node.port, } - try { - return this.dpt!.bootstrap(bootnode) - } catch (e) { - this.error(e) - } + return this.dpt!.bootstrap(bootnode) }) - await Promise.all(promises) + try { + await Promise.all(promises) + } catch (e) { + this.error(e) + } } /**