Skip to content

Commit

Permalink
Merge pull request #1029 from ethereumjs/devp2p-dpt-type-improvements
Browse files Browse the repository at this point in the history
Devp2p DPT Type Improvements
  • Loading branch information
holgerd77 committed Jan 5, 2021
2 parents 9937927 + 9cc2ec7 commit 86fb51d
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 64 deletions.
14 changes: 7 additions & 7 deletions packages/client/lib/net/server/rlpxserver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,17 +122,17 @@ export class RlpxServer extends Server {
async bootstrap(): Promise<void> {
const promises = this.bootnodes.map((node) => {
const bootnode = {
address: node.ip,
address: node.ip!,
udpPort: node.port,
tcpPort: node.port,
}
try {
return this.dpt!.bootstrap(bootnode)
} catch (e) {
this.error(e)
}
return this.dpt!.bootstrap(bootnode)
})
await Promise.all(promises)
try {
await Promise.all(promises)
} catch (e) {
this.error(e)
}
}

/**
Expand Down
10 changes: 5 additions & 5 deletions packages/client/lib/service/ethereumservice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ export interface EthereumServiceOptions extends ServiceOptions {
/* Blockchain database */
db?: LevelUp

/* Protocol timeout in ms (default: 8000) */
timeout?: number

/* Sync retry interval in ms (default: 1000) */
/* Sync retry interval in ms (default: 8000) */
interval?: number

/* Protocol timeout in ms (default: 2000) */
timeout?: number
}

/**
Expand All @@ -39,7 +39,7 @@ export class EthereumService extends Service {
this.flow = new FlowControl()
this.chain = options.chain ?? new Chain(options)
this.interval = options.interval ?? 8000
this.timeout = options.timeout ?? 1000
this.timeout = options.timeout ?? 2000
}

/**
Expand Down
8 changes: 4 additions & 4 deletions packages/client/test/net/server/rlpxserver.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ tape('[RlpxServer]', async (t) => {
server.dpt = td.object()
server.rlpx = td.object()
td.when(
server.dpt!.bootstrap({ address: '10.0.0.1', udpPort: '1234', tcpPort: '1234' })
server.dpt!.bootstrap({ address: '10.0.0.1', udpPort: 1234, tcpPort: 1234 })
).thenResolve()
td.when(
server.dpt!.bootstrap({ address: '10.0.0.2', udpPort: '1234', tcpPort: '1234' })
(server.dpt! as any).bootstrap({ address: '10.0.0.2', udpPort: '1234', tcpPort: '1234' })
).thenReject(new Error('err0'))
server.on('error', (err: Error) => t.equals(err.message, 'err0', 'got error'))
await server.start()
Expand Down Expand Up @@ -109,10 +109,10 @@ tape('[RlpxServer]', async (t) => {
destroy: td.func(),
})
td.when(
server.dpt?.bootstrap({ address: '10.0.0.1', udpPort: '1234', tcpPort: '1234' })
server.dpt!.bootstrap({ address: '10.0.0.1', udpPort: 1234, tcpPort: 1234 })
).thenResolve(undefined)
td.when(
server.dpt?.bootstrap({ address: '10.0.0.2', udpPort: '1234', tcpPort: '1234' })
(server.dpt! as any).bootstrap({ address: '10.0.0.2', udpPort: '1234', tcpPort: '1234' })
).thenReject(new Error('err0'))
server.on('error', (err) => t.equals(err.message, 'err0', 'got error'))
await server.start()
Expand Down
7 changes: 4 additions & 3 deletions packages/devp2p/src/dpt/ban-list.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,25 @@ import LRUCache from 'lru-cache'
import { debug as createDebugLogger } from 'debug'
import { KBucket } from './kbucket'
import { formatLogId } from '../util'
import { PeerInfo } from './dpt'

const debug = createDebugLogger('devp2p:dpt:ban-list')
const verbose = createDebugLogger('verbose').enabled

export class BanList {
private lru: LRUCache<any, boolean>
private lru: LRUCache<string, boolean>
constructor() {
this.lru = new LRUCache({ max: 30000 }) // 10k should be enough (each peer obj can has 3 keys)
}

add(obj: any, maxAge?: number) {
add(obj: string | Buffer | PeerInfo, maxAge?: number) {
for (const key of KBucket.getKeys(obj)) {
this.lru.set(key, true, maxAge)
debug(`Added peer ${formatLogId(key, verbose)}, size: ${this.lru.length}`)
}
}

has(obj: any): boolean {
has(obj: string | Buffer | PeerInfo): boolean {
return KBucket.getKeys(obj).some((key: string) => Boolean(this.lru.get(key)))
}
}
59 changes: 48 additions & 11 deletions packages/devp2p/src/dpt/dpt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,43 @@ import { Server as DPTServer } from './server'

const debug = createDebugLogger('devp2p:dpt')

export interface PeerInfo {
id?: Uint8Array | Buffer
address?: string
udpPort?: number | null
tcpPort?: number | null
}

export interface DPTOptions {
/**
* Timeout for peer requests
*
* Default: 10s
*/
timeout?: number

/**
* Network info to send a long a request
*
* Default: 0.0.0.0, no UDP or TCP port provided
*/
endpoint?: PeerInfo

/**
* Function for socket creation
*
* Default: dgram-created socket
*/
createSocket?: Function

/**
* Interval for peer table refresh
*
* Default: 60s
*/
refreshInterval?: number
}

export class DPT extends EventEmitter {
privateKey: Buffer
banlist: BanList
Expand All @@ -19,7 +56,7 @@ export class DPT extends EventEmitter {
private _server: DPTServer
private _refreshIntervalId: NodeJS.Timeout

constructor(privateKey: Buffer, options: any) {
constructor(privateKey: Buffer, options: DPTOptions) {
super()

this.privateKey = Buffer.from(privateKey)
Expand All @@ -28,14 +65,14 @@ export class DPT extends EventEmitter {
this.banlist = new BanList()

this._kbucket = new KBucket(this._id)
this._kbucket.on('added', (peer) => this.emit('peer:added', peer))
this._kbucket.on('removed', (peer) => this.emit('peer:removed', peer))
this._kbucket.on('added', (peer: PeerInfo) => this.emit('peer:added', peer))
this._kbucket.on('removed', (peer: PeerInfo) => this.emit('peer:removed', peer))
this._kbucket.on('ping', this._onKBucketPing)

this._server = new DPTServer(this, this.privateKey, {
createSocket: options.createSocket,
timeout: options.timeout,
endpoint: options.endpoint,
createSocket: options.createSocket,
})
this._server.once('listening', () => this.emit('listening'))
this._server.once('close', () => this.emit('close'))
Expand All @@ -55,7 +92,7 @@ export class DPT extends EventEmitter {
this._server.destroy(...args)
}

_onKBucketPing(oldPeers: any[], newPeer: any): void {
_onKBucketPing(oldPeers: PeerInfo[], newPeer: PeerInfo): void {
if (this.banlist.has(newPeer)) return

let count = 0
Expand All @@ -81,15 +118,15 @@ export class DPT extends EventEmitter {
for (const peer of peers) this.addPeer(peer).catch(() => {})
}

async bootstrap(peer: any): Promise<void> {
async bootstrap(peer: PeerInfo): Promise<void> {
debug(`bootstrap with peer ${peer.address}:${peer.udpPort}`)

peer = await this.addPeer(peer)
if (!this._id) return
this._server.findneighbours(peer, this._id)
}

async addPeer(obj: any): Promise<any> {
async addPeer(obj: PeerInfo): Promise<any> {
if (this.banlist.has(obj)) throw new Error('Peer is banned')
debug(`attempt adding peer ${obj.address}:${obj.udpPort}`)

Expand All @@ -109,23 +146,23 @@ export class DPT extends EventEmitter {
}
}

getPeer(obj: any): any {
getPeer(obj: string | Buffer | PeerInfo) {
return this._kbucket.get(obj)
}

getPeers(): any[] {
getPeers() {
return this._kbucket.getAll()
}

getClosestPeers(id: string): any {
getClosestPeers(id: string) {
return this._kbucket.closest(id)
}

removePeer(obj: any) {
this._kbucket.remove(obj)
}

banPeer(obj: any, maxAge?: number) {
banPeer(obj: string | Buffer | PeerInfo, maxAge?: number) {
this.banlist.add(obj, maxAge)
this._kbucket.remove(obj)
}
Expand Down
42 changes: 22 additions & 20 deletions packages/devp2p/src/dpt/kbucket.ts
Original file line number Diff line number Diff line change
@@ -1,56 +1,58 @@
import { EventEmitter } from 'events'
import _KBucket = require('k-bucket')
import { PeerInfo } from './dpt'

const KBUCKET_SIZE = 16
const KBUCKET_CONCURRENCY = 3

export interface KObj {
id?: string
port?: string
address?: string
export interface CustomContact extends PeerInfo {
id: Uint8Array | Buffer
vectorClock: number
}

export class KBucket extends EventEmitter {
_peers: Map<string, any> = new Map()
_peers: Map<string, PeerInfo> = new Map()
_kbucket: _KBucket
constructor(id: string | Buffer) {
constructor(localNodeId: Buffer) {
super()

this._kbucket = new _KBucket({
localNodeId: typeof id === 'string' ? Buffer.from(id) : id,
this._kbucket = new _KBucket<CustomContact>({
localNodeId,
numberOfNodesPerKBucket: KBUCKET_SIZE,
numberOfNodesToPing: KBUCKET_CONCURRENCY,
})

this._kbucket.on('added', (peer: any) => {
this._kbucket.on('added', (peer: PeerInfo) => {
KBucket.getKeys(peer).forEach((key) => this._peers.set(key, peer))
this.emit('added', peer)
})

this._kbucket.on('removed', (peer: any) => {
this._kbucket.on('removed', (peer: PeerInfo) => {
KBucket.getKeys(peer).forEach((key) => this._peers.delete(key))
this.emit('removed', peer)
})

this._kbucket.on('ping', (...args: any[]) => this.emit('ping', ...args))
this._kbucket.on('ping', (oldPeers: PeerInfo[], newPeer: PeerInfo) => {
this.emit('ping', { oldPeers, newPeer })
})
}

static getKeys(obj: Buffer | string | KObj): string[] {
static getKeys(obj: Buffer | string | PeerInfo): string[] {
if (Buffer.isBuffer(obj)) return [obj.toString('hex')]
if (typeof obj === 'string') return [obj]

const keys = []
if (Buffer.isBuffer(obj.id)) keys.push(obj.id.toString('hex'))
if (obj.address && obj.port) keys.push(`${obj.address}:${obj.port}`)
//if (obj.address && obj.port) keys.push(`${obj.address}:${obj.port}`)
return keys
}

add(peer: any) {
add(peer: PeerInfo) {
const isExists = KBucket.getKeys(peer).some((key) => this._peers.has(key))
if (!isExists) this._kbucket.add(peer)
if (!isExists) this._kbucket.add(peer as CustomContact)
}

get(obj: Buffer | string | KObj) {
get(obj: Buffer | string | PeerInfo) {
for (const key of KBucket.getKeys(obj)) {
const peer = this._peers.get(key)
if (peer !== undefined) return peer
Expand All @@ -59,16 +61,16 @@ export class KBucket extends EventEmitter {
return null
}

getAll(): Array<any> {
getAll(): Array<PeerInfo> {
return this._kbucket.toArray()
}

closest(id: string): any {
closest(id: string): PeerInfo[] {
return this._kbucket.closest(Buffer.from(id), KBUCKET_SIZE)
}

remove(obj: Buffer | string | KObj) {
remove(obj: Buffer | string | PeerInfo) {
const peer = this.get(obj)
if (peer !== null) this._kbucket.remove(peer.id)
if (peer !== null) this._kbucket.remove((peer as CustomContact).id)
}
}
10 changes: 2 additions & 8 deletions packages/devp2p/src/dpt/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,14 @@ import ip from 'ip'
import * as rlp from 'rlp'
import secp256k1 from 'secp256k1'
import { keccak256, int2buffer, buffer2int, assertEq, unstrictDecode } from '../util'
import { PeerInfo } from './dpt'

const debug = createDebugLogger('devp2p:dpt:server')

function getTimestamp() {
return (Date.now() / 1000) | 0
}

export interface PeerInfo {
id?: Buffer
address?: string
udpPort?: number | null
tcpPort?: number | null
}

const timestamp = {
encode: function (value = getTimestamp() + 60) {
const buffer = Buffer.allocUnsafe(4)
Expand Down Expand Up @@ -133,7 +127,7 @@ type OutNeighborMsg = { [0]: Buffer[][]; [1]: Buffer }
const neighbours = {
encode: function (obj: InNeighborMsg): OutNeighborMsg {
return [
obj.peers.map((peer: PeerInfo) => endpoint.encode(peer).concat(peer.id!)),
obj.peers.map((peer: PeerInfo) => endpoint.encode(peer).concat(peer.id! as Buffer)),
timestamp.encode(obj.timestamp),
]
},
Expand Down
Loading

0 comments on commit 86fb51d

Please sign in to comment.