Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Devp2p DPT Type Improvements #1029

Merged
merged 8 commits into from
Jan 5, 2021
14 changes: 7 additions & 7 deletions packages/client/lib/net/server/rlpxserver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,17 +122,17 @@ export class RlpxServer extends Server {
async bootstrap(): Promise<void> {
const promises = this.bootnodes.map((node) => {
const bootnode = {
address: node.ip,
address: node.ip!,
udpPort: node.port,
tcpPort: node.port,
}
try {
return this.dpt!.bootstrap(bootnode)
} catch (e) {
this.error(e)
}
return this.dpt!.bootstrap(bootnode)
})
await Promise.all(promises)
try {
await Promise.all(promises)
} catch (e) {
this.error(e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

codecov is warning that this line isn't covered, it's not so important, but could add a quick test forcing a fast timeout to ensure the error is caught and propagated properly

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will leave this for now but generally that's a good idea - also left a comment on the codecov/patch discussion in the chat. Will likely be good if we get to some work mode where we take coverage more serious again (I at least started in #1028 to add some substantial tests and had a look at codecov/patch, took me quite some time but I think I finally had some mental breakthrough in my testdouble understanding 😄 ).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've directly taken the occasion and made the codecov/project check mandatory again, together with the other library's test-* checks (e.g. test-trie), think we still had some gap here due to uncertainties in the CI setup for some time (these "run everything or run selected" discussions). And we can of course later always adopt again if some CI setup change comes up.

}
}

/**
Expand Down
10 changes: 5 additions & 5 deletions packages/client/lib/service/ethereumservice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ export interface EthereumServiceOptions extends ServiceOptions {
/* Blockchain database */
db?: LevelUp

/* Protocol timeout in ms (default: 8000) */
timeout?: number

/* Sync retry interval in ms (default: 1000) */
/* Sync retry interval in ms (default: 8000) */
interval?: number

/* Protocol timeout in ms (default: 2000) */
timeout?: number
}

/**
Expand All @@ -39,7 +39,7 @@ export class EthereumService extends Service {
this.flow = new FlowControl()
this.chain = options.chain ?? new Chain(options)
this.interval = options.interval ?? 8000
this.timeout = options.timeout ?? 1000
this.timeout = options.timeout ?? 2000
}

/**
Expand Down
8 changes: 4 additions & 4 deletions packages/client/test/net/server/rlpxserver.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ tape('[RlpxServer]', async (t) => {
server.dpt = td.object()
server.rlpx = td.object()
td.when(
server.dpt!.bootstrap({ address: '10.0.0.1', udpPort: '1234', tcpPort: '1234' })
server.dpt!.bootstrap({ address: '10.0.0.1', udpPort: 1234, tcpPort: 1234 })
).thenResolve()
td.when(
server.dpt!.bootstrap({ address: '10.0.0.2', udpPort: '1234', tcpPort: '1234' })
(server.dpt! as any).bootstrap({ address: '10.0.0.2', udpPort: '1234', tcpPort: '1234' })
).thenReject(new Error('err0'))
server.on('error', (err: Error) => t.equals(err.message, 'err0', 'got error'))
await server.start()
Expand Down Expand Up @@ -109,10 +109,10 @@ tape('[RlpxServer]', async (t) => {
destroy: td.func(),
})
td.when(
server.dpt?.bootstrap({ address: '10.0.0.1', udpPort: '1234', tcpPort: '1234' })
server.dpt!.bootstrap({ address: '10.0.0.1', udpPort: 1234, tcpPort: 1234 })
).thenResolve(undefined)
td.when(
server.dpt?.bootstrap({ address: '10.0.0.2', udpPort: '1234', tcpPort: '1234' })
(server.dpt! as any).bootstrap({ address: '10.0.0.2', udpPort: '1234', tcpPort: '1234' })
).thenReject(new Error('err0'))
server.on('error', (err) => t.equals(err.message, 'err0', 'got error'))
await server.start()
Expand Down
7 changes: 4 additions & 3 deletions packages/devp2p/src/dpt/ban-list.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,25 @@ import LRUCache from 'lru-cache'
import { debug as createDebugLogger } from 'debug'
import { KBucket } from './kbucket'
import { formatLogId } from '../util'
import { PeerInfo } from './dpt'

const debug = createDebugLogger('devp2p:dpt:ban-list')
const verbose = createDebugLogger('verbose').enabled

export class BanList {
private lru: LRUCache<any, boolean>
private lru: LRUCache<string, boolean>
constructor() {
this.lru = new LRUCache({ max: 30000 }) // 10k should be enough (each peer obj can has 3 keys)
}

add(obj: any, maxAge?: number) {
add(obj: string | Buffer | PeerInfo, maxAge?: number) {
for (const key of KBucket.getKeys(obj)) {
this.lru.set(key, true, maxAge)
debug(`Added peer ${formatLogId(key, verbose)}, size: ${this.lru.length}`)
}
}

has(obj: any): boolean {
has(obj: string | Buffer | PeerInfo): boolean {
return KBucket.getKeys(obj).some((key: string) => Boolean(this.lru.get(key)))
}
}
59 changes: 48 additions & 11 deletions packages/devp2p/src/dpt/dpt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,43 @@ import { Server as DPTServer } from './server'

const debug = createDebugLogger('devp2p:dpt')

export interface PeerInfo {
id?: Uint8Array | Buffer
address?: string
udpPort?: number | null
tcpPort?: number | null
}

export interface DPTOptions {
/**
* Timeout for peer requests
*
* Default: 10s
*/
timeout?: number

/**
* Network info to send a long a request
*
* Default: 0.0.0.0, no UDP or TCP port provided
*/
endpoint?: PeerInfo

/**
* Function for socket creation
*
* Default: dgram-created socket
*/
createSocket?: Function

/**
* Interval for peer table refresh
*
* Default: 60s
*/
refreshInterval?: number
}

export class DPT extends EventEmitter {
privateKey: Buffer
banlist: BanList
Expand All @@ -19,7 +56,7 @@ export class DPT extends EventEmitter {
private _server: DPTServer
private _refreshIntervalId: NodeJS.Timeout

constructor(privateKey: Buffer, options: any) {
constructor(privateKey: Buffer, options: DPTOptions) {
super()

this.privateKey = Buffer.from(privateKey)
Expand All @@ -28,14 +65,14 @@ export class DPT extends EventEmitter {
this.banlist = new BanList()

this._kbucket = new KBucket(this._id)
this._kbucket.on('added', (peer) => this.emit('peer:added', peer))
this._kbucket.on('removed', (peer) => this.emit('peer:removed', peer))
this._kbucket.on('added', (peer: PeerInfo) => this.emit('peer:added', peer))
this._kbucket.on('removed', (peer: PeerInfo) => this.emit('peer:removed', peer))
this._kbucket.on('ping', this._onKBucketPing)

this._server = new DPTServer(this, this.privateKey, {
createSocket: options.createSocket,
timeout: options.timeout,
endpoint: options.endpoint,
createSocket: options.createSocket,
})
this._server.once('listening', () => this.emit('listening'))
this._server.once('close', () => this.emit('close'))
Expand All @@ -55,7 +92,7 @@ export class DPT extends EventEmitter {
this._server.destroy(...args)
}

_onKBucketPing(oldPeers: any[], newPeer: any): void {
_onKBucketPing(oldPeers: PeerInfo[], newPeer: PeerInfo): void {
if (this.banlist.has(newPeer)) return

let count = 0
Expand All @@ -81,15 +118,15 @@ export class DPT extends EventEmitter {
for (const peer of peers) this.addPeer(peer).catch(() => {})
}

async bootstrap(peer: any): Promise<void> {
async bootstrap(peer: PeerInfo): Promise<void> {
debug(`bootstrap with peer ${peer.address}:${peer.udpPort}`)

peer = await this.addPeer(peer)
if (!this._id) return
this._server.findneighbours(peer, this._id)
}

async addPeer(obj: any): Promise<any> {
async addPeer(obj: PeerInfo): Promise<any> {
if (this.banlist.has(obj)) throw new Error('Peer is banned')
debug(`attempt adding peer ${obj.address}:${obj.udpPort}`)

Expand All @@ -109,23 +146,23 @@ export class DPT extends EventEmitter {
}
}

getPeer(obj: any): any {
getPeer(obj: string | Buffer | PeerInfo) {
return this._kbucket.get(obj)
}

getPeers(): any[] {
getPeers() {
return this._kbucket.getAll()
}

getClosestPeers(id: string): any {
getClosestPeers(id: string) {
return this._kbucket.closest(id)
}

removePeer(obj: any) {
this._kbucket.remove(obj)
}

banPeer(obj: any, maxAge?: number) {
banPeer(obj: string | Buffer | PeerInfo, maxAge?: number) {
this.banlist.add(obj, maxAge)
this._kbucket.remove(obj)
}
Expand Down
42 changes: 22 additions & 20 deletions packages/devp2p/src/dpt/kbucket.ts
Original file line number Diff line number Diff line change
@@ -1,56 +1,58 @@
import { EventEmitter } from 'events'
import _KBucket = require('k-bucket')
import { PeerInfo } from './dpt'

const KBUCKET_SIZE = 16
const KBUCKET_CONCURRENCY = 3

export interface KObj {
id?: string
port?: string
address?: string
export interface CustomContact extends PeerInfo {
id: Uint8Array | Buffer
vectorClock: number
}

export class KBucket extends EventEmitter {
_peers: Map<string, any> = new Map()
_peers: Map<string, PeerInfo> = new Map()
_kbucket: _KBucket
constructor(id: string | Buffer) {
constructor(localNodeId: Buffer) {
super()

this._kbucket = new _KBucket({
localNodeId: typeof id === 'string' ? Buffer.from(id) : id,
this._kbucket = new _KBucket<CustomContact>({
localNodeId,
numberOfNodesPerKBucket: KBUCKET_SIZE,
numberOfNodesToPing: KBUCKET_CONCURRENCY,
})

this._kbucket.on('added', (peer: any) => {
this._kbucket.on('added', (peer: PeerInfo) => {
KBucket.getKeys(peer).forEach((key) => this._peers.set(key, peer))
this.emit('added', peer)
})

this._kbucket.on('removed', (peer: any) => {
this._kbucket.on('removed', (peer: PeerInfo) => {
KBucket.getKeys(peer).forEach((key) => this._peers.delete(key))
this.emit('removed', peer)
})

this._kbucket.on('ping', (...args: any[]) => this.emit('ping', ...args))
this._kbucket.on('ping', (oldPeers: PeerInfo[], newPeer: PeerInfo) => {
this.emit('ping', { oldPeers, newPeer })
})
}

static getKeys(obj: Buffer | string | KObj): string[] {
static getKeys(obj: Buffer | string | PeerInfo): string[] {
if (Buffer.isBuffer(obj)) return [obj.toString('hex')]
if (typeof obj === 'string') return [obj]

const keys = []
if (Buffer.isBuffer(obj.id)) keys.push(obj.id.toString('hex'))
if (obj.address && obj.port) keys.push(`${obj.address}:${obj.port}`)
//if (obj.address && obj.port) keys.push(`${obj.address}:${obj.port}`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this remain commented out?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there is just no property port on peer objects - just updPort and tcpPort, so the line in its current form has no effect and I couldn't decide on selecting for one of the other port properties, in fact I am not understanding well enough what this line is doing TBH. So I left to indicate that there was something which might have had some intention. 😄 Could have left a comment though.

Will also leave to not revoke the approval but rather merge here now.

return keys
}

add(peer: any) {
add(peer: PeerInfo) {
const isExists = KBucket.getKeys(peer).some((key) => this._peers.has(key))
if (!isExists) this._kbucket.add(peer)
if (!isExists) this._kbucket.add(peer as CustomContact)
}

get(obj: Buffer | string | KObj) {
get(obj: Buffer | string | PeerInfo) {
for (const key of KBucket.getKeys(obj)) {
const peer = this._peers.get(key)
if (peer !== undefined) return peer
Expand All @@ -59,16 +61,16 @@ export class KBucket extends EventEmitter {
return null
}

getAll(): Array<any> {
getAll(): Array<PeerInfo> {
return this._kbucket.toArray()
}

closest(id: string): any {
closest(id: string): PeerInfo[] {
return this._kbucket.closest(Buffer.from(id), KBUCKET_SIZE)
}

remove(obj: Buffer | string | KObj) {
remove(obj: Buffer | string | PeerInfo) {
const peer = this.get(obj)
if (peer !== null) this._kbucket.remove(peer.id)
if (peer !== null) this._kbucket.remove((peer as CustomContact).id)
}
}
10 changes: 2 additions & 8 deletions packages/devp2p/src/dpt/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,14 @@ import ip from 'ip'
import * as rlp from 'rlp'
import secp256k1 from 'secp256k1'
import { keccak256, int2buffer, buffer2int, assertEq, unstrictDecode } from '../util'
import { PeerInfo } from './dpt'

const debug = createDebugLogger('devp2p:dpt:server')

function getTimestamp() {
return (Date.now() / 1000) | 0
}

export interface PeerInfo {
id?: Buffer
address?: string
udpPort?: number | null
tcpPort?: number | null
}

const timestamp = {
encode: function (value = getTimestamp() + 60) {
const buffer = Buffer.allocUnsafe(4)
Expand Down Expand Up @@ -133,7 +127,7 @@ type OutNeighborMsg = { [0]: Buffer[][]; [1]: Buffer }
const neighbours = {
encode: function (obj: InNeighborMsg): OutNeighborMsg {
return [
obj.peers.map((peer: PeerInfo) => endpoint.encode(peer).concat(peer.id!)),
obj.peers.map((peer: PeerInfo) => endpoint.encode(peer).concat(peer.id! as Buffer)),
timestamp.encode(obj.timestamp),
]
},
Expand Down
Loading