Skip to content

Commit

Permalink
Merge pull request #127 from ChainSafe/cayman/metrics
Browse files Browse the repository at this point in the history
Add metrics
  • Loading branch information
wemeetagain committed Jul 27, 2021
2 parents 2e2f623 + 2b936cc commit da78f53
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 7 deletions.
7 changes: 6 additions & 1 deletion src/libp2p/discv5.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Multiaddr } from "multiaddr";
import { randomBytes } from "libp2p-crypto";
import { AbortController } from "@chainsafe/abort-controller";

import { Discv5, ENRInput } from "../service";
import { Discv5, ENRInput, IDiscv5Metrics } from "../service";
import { createNodeId, ENR } from "../enr";
import { IDiscv5Config } from "../config";
import { toBuffer } from "../util";
Expand All @@ -28,6 +28,10 @@ export interface IDiscv5DiscoveryInputOptions extends Partial<IDiscv5Config> {
* Amount of time in milliseconds to wait between lookups
*/
searchInterval: number;
/**
* Optional metrics
*/
metrics?: IDiscv5Metrics;
/**
* Enable/disable discv5
* Note: this option is handled within libp2p, not within discv5
Expand Down Expand Up @@ -57,6 +61,7 @@ export class Discv5Discovery extends EventEmitter {
peerId: options.peerId,
multiaddr: new Multiaddr(options.bindAddr),
config: options,
metrics: options.metrics,
});
this.searchInterval = options.searchInterval;
this.started = false;
Expand Down
34 changes: 28 additions & 6 deletions src/service/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import {
createTalkResponseMessage,
RequestId,
} from "../message";
import { Discv5EventEmitter, ENRInput, IActiveRequest, INodesResponse } from "./types";
import { Discv5EventEmitter, ENRInput, IActiveRequest, IDiscv5Metrics, INodesResponse } from "./types";
import { AddrVotes } from "./addrVotes";
import { TimeoutMap } from "../util";
import { IDiscv5Config, defaultConfig } from "../config";
Expand All @@ -64,6 +64,7 @@ export interface IDiscv5CreateOptions {
peerId: PeerId;
multiaddr: Multiaddr;
config?: Partial<IDiscv5Config>;
metrics?: IDiscv5Metrics;
}

/**
Expand Down Expand Up @@ -122,11 +123,13 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
*/
private addrVotes: AddrVotes;

private metrics?: IDiscv5Metrics;

/**
* Default constructor.
* @param sessionService the service managing sessions underneath.
*/
constructor(config: IDiscv5Config, sessionService: SessionService) {
constructor(config: IDiscv5Config, sessionService: SessionService, metrics?: IDiscv5Metrics) {
super();
this.config = config;
this.sessionService = sessionService;
Expand All @@ -139,6 +142,14 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
this.connectedPeers = new Map();
this.nextLookupId = 1;
this.addrVotes = new AddrVotes();
if (metrics) {
this.metrics = metrics;
// eslint-disable-next-line @typescript-eslint/no-this-alias
const discv5 = this;
metrics.kadTableSize.collect = () => metrics.kadTableSize.set(discv5.kbuckets.size);
metrics.connectedPeerCount.collect = () => metrics.connectedPeerCount.set(discv5.connectedPeers.size);
metrics.activeSessionCount.collect = () => metrics.activeSessionCount.set(discv5.sessionService.sessionsSize());
}
}

/**
Expand All @@ -148,12 +159,12 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
* @param peerId the PeerId with the keypair that identifies the enr
* @param multiaddr The multiaddr which contains the the network interface and port to which the UDP server binds
*/
public static create({ enr, peerId, multiaddr, config = {} }: IDiscv5CreateOptions): Discv5 {
public static create({ enr, peerId, multiaddr, config = {}, metrics }: IDiscv5CreateOptions): Discv5 {
const fullConfig = { ...defaultConfig, ...config };
const decodedEnr = typeof enr === "string" ? ENR.decodeTxt(enr) : enr;
const udpTransport = new UDPTransportService(multiaddr, decodedEnr.nodeId);
const sessionService = new SessionService(fullConfig, decodedEnr, createKeypairFromPeerId(peerId), udpTransport);
return new Discv5(fullConfig, sessionService);
return new Discv5(fullConfig, sessionService, metrics);
}

/**
Expand Down Expand Up @@ -311,8 +322,13 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
const enr = this.getKadValue(srcId);
const addr = await enr?.getFullMultiaddr("udp");
if (enr && addr) {
await this.sessionService.sendResponse(addr, srcId, msg);
log(`Sent TALKRESP message to node ${enr.id}`);
log(`Sending TALKRESP message to node ${enr.id}`);
try {
this.sessionService.sendResponse(addr, srcId, msg);
this.metrics?.sentMessageCount.inc({ type: MessageType[MessageType.TALKRESP] });
} catch (e) {
log("Failed to send a TALKRESP response. Error: %s", e.message);
}
} else {
if (!addr && enr) {
log(`No ip + udp port found for node ${srcId}`);
Expand Down Expand Up @@ -348,6 +364,7 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
const message = createFindNodeMessage([0]);
this.sessionService.sendRequestUnknownEnr(src, nodeId, message);
this.activeRequests.set(message.id, { request: message, dstId: nodeId });
this.metrics?.sentMessageCount.inc({ type: MessageType[message.type] });
} catch (e) {
log("Requesting ENR failed. Error: %s", e.message);
}
Expand Down Expand Up @@ -388,6 +405,7 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
try {
this.sessionService.sendRequest(dstEnr, req);
this.activeRequests.set(req.id, { request: req, dstId: nodeId, lookupId });
this.metrics?.sentMessageCount.inc({ type: MessageType[req.type] });
return true;
} catch (e) {
log("Sending request to node: %s failed: error: %s", nodeId, e.message);
Expand Down Expand Up @@ -512,6 +530,7 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
};

private onMessage = (srcId: NodeId, src: Multiaddr, message: Message): void => {
this.metrics?.rcvdMessageCount.inc({ type: MessageType[message.type] });
switch (message.type) {
case MessageType.PING:
return this.onPing(srcId, src, message as IPingMessage);
Expand Down Expand Up @@ -550,6 +569,7 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
srcId,
createPongMessage(message.id, this.enr.seq, srcOpts.host, srcOpts.port)
);
this.metrics?.sentMessageCount.inc({ type: MessageType[MessageType.PONG] });
} catch (e) {
log("Failed to send Pong. Error %s", e.message);
}
Expand Down Expand Up @@ -608,6 +628,7 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
log("Sending empty NODES response to %s", srcId);
try {
this.sessionService.sendResponse(src, srcId, createNodesMessage(id, 0, nodes));
this.metrics?.sentMessageCount.inc({ type: MessageType[MessageType.NODES] });
} catch (e) {
log("Failed to send a NODES response. Error: %s", e.message);
}
Expand All @@ -626,6 +647,7 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
const _nodes = nodes.slice(i, i + nodesPerPacket);
try {
this.sessionService.sendResponse(src, srcId, createNodesMessage(id, total, _nodes));
this.metrics?.sentMessageCount.inc({ type: MessageType[MessageType.NODES] });
} catch (e) {
log("Failed to send a NODES response. Error: %s", e.message);
}
Expand Down
25 changes: 25 additions & 0 deletions src/service/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,28 @@ export interface IActiveRequest {
}

export type ENRInput = ENR | string;

// Metrics

type Labels<T extends string> = Partial<Record<T, string | number>>;
interface IGauge<T extends string = string> {
inc(value?: number): void;
inc(labels: Labels<T>, value?: number): void;
set(value: number): void;
set(labels: Labels<T>, value: number): void;
collect(): void;
}

export interface IDiscv5Metrics {
/** Total size of the kad table */
kadTableSize: IGauge;
/** Total number of active sessions */
activeSessionCount: IGauge;
/** Total number of connected peers */
connectedPeerCount: IGauge;

/** Total number messages sent by message type */
sentMessageCount: IGauge<"type">;
/** Total number messages received by message type */
rcvdMessageCount: IGauge<"type">;
}
4 changes: 4 additions & 0 deletions src/session/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ export class SessionService extends (EventEmitter as { new (): StrictEventEmitte
this.sessions.clear();
}

public sessionsSize(): number {
return this.sessions.size;
}

public updateEnr(enr: ENR): void {
const session = this.sessions.get(enr.nodeId);
if (session) {
Expand Down

0 comments on commit da78f53

Please sign in to comment.