Skip to content

Commit

Permalink
chore: bump libp2p to v0.45.0 (#2209)
Browse files Browse the repository at this point in the history
## Why is this change needed?

Upgrades libp2p one minor version up ([Release
Notes](https://github.com/libp2p/js-libp2p/releases/tag/v0.45.0) |
[Migration
Guide](https://github.com/libp2p/js-libp2p/blob/main/doc/migrations/v0.44-v0.45.md))

By extension, upgrades GossipSub two major versions up due to libp2p
interface compatibility breakage (does not break protocol
compatibility).

### Breaking Changes in Events + Emitters

libp2p altered the events to be more precisely aligned for types to
event names, and so our use of `peer:connect` and `peer:disconnect`
becomes `connection:open` and `connection:close`. Additionally the peer
store's address book methods have been coalesced into its parent type
with more fluent interfaces. These too have been upgraded.

## Merge Checklist

_Choose all relevant options below by adding an `x` now or at any time
before submitting for review_

- [x] PR title adheres to the [conventional
commits](https://www.conventionalcommits.org/en/v1.0.0/) standard
- [x] PR has a
[changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets)
- [x] PR has been tagged with a change label(s) (i.e. documentation,
feature, bugfix, or chore)
- [x] PR includes
[documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs)
if necessary.

<!-- start pr-codex -->

---

## PR-Codex overview
This PR updates dependencies, refactors error handling in peer loading,
adds test cases for multiaddr formats, and enhances GossipNode event
handling.

### Detailed summary
- Updated `@libp2p/interface-mocks` to `^12.0.0`
- Refactored error handling in `store.ts`
- Added test cases for multiaddr formats in `gossipNode.test.ts`
- Enhanced event handling in `gossipNode.ts` and `gossipNodeWorker.ts`

> The following files were skipped due to too many changes:
`apps/hubble/src/network/p2p/gossipNodeWorker.ts`, `yarn.lock`,
`node_modules/@libp2p/peer-store/yarn.lock`,
`node_modules/@libp2p/peer-store/dist/index.min.js`

> ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your
question}`

<!-- end pr-codex -->
  • Loading branch information
CassOnMars committed Jul 26, 2024
1 parent 2fa29ad commit 095cca9
Show file tree
Hide file tree
Showing 10 changed files with 534 additions and 11,875 deletions.
4 changes: 3 additions & 1 deletion Dockerfile.hubble
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ ENV RUSTFLAGS="-C target-feature=-crt-static"
COPY --chown=node:node --from=prune /home/node/app/out/json/ .
COPY --chown=node:node --from=prune /home/node/app/out/yarn.lock ./yarn.lock

COPY --chown=node:node patches patches
# Restore commented line if we patch-package again:
# COPY --chown=node:node patches patches

RUN yarn install --frozen-lockfile --network-timeout 1800000
RUN yarn postinstall

Expand Down
23 changes: 14 additions & 9 deletions apps/hubble/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
"test:ci": "yarn build:all && yarn test:rust --release && ENVIRONMENT=test NODE_OPTIONS=\"--experimental-vm-modules --max-old-space-size=4096\" jest --ci --forceExit --coverage -w 2"
},
"devDependencies": {
"@libp2p/interface-mocks": "^9.0.0",
"@libp2p/interface-mocks": "^12.0.0",
"@types/async-lock": "^1.4.0",
"@types/chance": "^1.1.3",
"@types/cli-progress": "^3.11.0",
Expand Down Expand Up @@ -71,22 +71,24 @@
"@aws-sdk/client-s3": "^3.400.0",
"@aws-sdk/client-sts": "^3.398.0",
"@aws-sdk/lib-storage": "^3.504.0",
"@chainsafe/libp2p-gossipsub": "6.2.0",
"@chainsafe/libp2p-noise": "^11.0.0 ",
"@chainsafe/libp2p-gossipsub": "8.0.1",
"@chainsafe/libp2p-noise": "^12.0.0 ",
"@datastructures-js/priority-queue": "^6.3.1",
"@faker-js/faker": "~7.6.0",
"@farcaster/hub-nodejs": "^0.11.21",
"@fastify/cors": "^8.4.0",
"@figma/hot-shots": "^9.0.0-figma.1",
"@grpc/grpc-js": "~1.11.1",
"@libp2p/interface-connection": "^4.0.0",
"@libp2p/interface-connection-gater": "^2.0.1",
"@libp2p/interface-connection": "^5.0.0",
"@libp2p/interface-connection-gater": "^3.0.0",
"@libp2p/interface-peer-id": "^2.0.1",
"@libp2p/mplex": "^7.1.6",
"@libp2p/interface-peer-store": "^2.0.0",
"@libp2p/interface-registrar": "2.0.11",
"@libp2p/mplex": "^8.0.0",
"@libp2p/peer-id-factory": "^2.0.0",
"@libp2p/tcp": "^6.0.0",
"@libp2p/tcp": "^7.0.1",
"@libp2p/utils": "^3.0.2",
"@multiformats/multiaddr": "^11.0.0",
"@multiformats/multiaddr": "^12.0.0",
"@noble/curves": "^1.0.0",
"abitype": "^0.8.3",
"async-lock": "^1.4.0",
Expand All @@ -95,7 +97,7 @@
"cli-progress": "^3.12.0",
"commander": "~10.0.0",
"fastify": "^4.22.0",
"libp2p": "0.44.0",
"libp2p": "0.45.0",
"neverthrow": "~6.0.0",
"node-cron": "~3.0.2",
"patch-package": "^8.0.0",
Expand All @@ -108,5 +110,8 @@
"tar": "^6.2.1",
"tiny-typed-emitter": "~2.1.0",
"viem": "^1.12.2"
},
"resolutions": {
"@libp2p/interface-registrar": "2.0.11"
}
}
4 changes: 2 additions & 2 deletions apps/hubble/src/network/p2p/gossipNode.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ describe("GossipNode", () => {
});

test("start fails if multiaddr format is invalid", async () => {
// an IPv6 being supplied as an IPv4
const options = { ipMultiAddr: "/ip4/2600:1700:6cf0:990:2052:a166:fb35:830a" };
// an IPv6 being supplied as an onion3
const options = { ipMultiAddr: "/onion3/2600:1700:6cf0:990:2052:a166:fb35:830a" };
expect((await node.start([], options))._unsafeUnwrapErr().errCode).toEqual("unavailable");
const error = (await node.start([], options))._unsafeUnwrapErr();

Expand Down
26 changes: 16 additions & 10 deletions apps/hubble/src/network/p2p/gossipNode.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { PublishResult } from "@libp2p/interface-pubsub";
import { Message as GossipSubMessage, PublishResult } from "@libp2p/interface-pubsub";
import { Worker } from "worker_threads";
import { PeerInfo } from "@libp2p/interface-peer-info";
import {
Expand All @@ -23,14 +23,15 @@ import { logger, messageTypeToName } from "../../utils/logger.js";
import { PeriodicPeerCheckScheduler } from "./periodicPeerCheck.js";
import { GOSSIP_PROTOCOL_VERSION } from "./protocol.js";
import { AddrInfo } from "@chainsafe/libp2p-gossipsub/types";
import { PeerScoreThresholds } from "@chainsafe/libp2p-gossipsub/score";
import { PeerScoreParams, PeerScoreThresholds } from "@chainsafe/libp2p-gossipsub/score";
import { statsd } from "../../utils/statsd.js";
import { ClientOptions } from "@figma/hot-shots";
import { createFromProtobuf, exportToProtobuf } from "@libp2p/peer-id-factory";
import EventEmitter from "events";
import RocksDB from "../../storage/db/rocksdb.js";
import { RootPrefix } from "../../storage/db/types.js";
import { sleep } from "../../utils/crypto.js";
import { GossipsubMessage } from "@chainsafe/libp2p-gossipsub";

/** The maximum number of pending merge messages before we drop new incoming gossip or sync messages. */
export const MAX_SYNCTRIE_QUEUE_SIZE = 100_000;
Expand Down Expand Up @@ -85,6 +86,8 @@ export interface NodeOptions {
p2pConnectTimeoutMs?: number | undefined;
/** StatsD parameters */
statsdParams?: ClientOptions | undefined;
/** Override score params. Useful for tests */
scoreParams?: Partial<PeerScoreParams> | undefined;
}

export type GossipMessageResult = {
Expand Down Expand Up @@ -523,7 +526,7 @@ export class GossipNode extends TypedEmitter<NodeEvents> {
}

async registerListeners() {
this._nodeEvents?.addListener("peer:connect", (detail) => {
this._nodeEvents?.addListener("connection:open", (detail: Connection) => {
// console.log("Peer Connected", JSON.stringify(detail, null, 2));
log.info(
{
Expand All @@ -540,15 +543,15 @@ export class GossipNode extends TypedEmitter<NodeEvents> {
// if we restart
this.putPeerAddrToDB(detail.remotePeer.toString(), detail.remoteAddr.toString());
});
this._nodeEvents?.addListener("peer:disconnect", (detail) => {
this._nodeEvents?.addListener("connection:close", (detail: Connection) => {
log.info({ peer: detail.remotePeer }, "P2P Connection disconnected");
this.emit("peerDisconnect", detail);
this.updateStatsdPeerGauges();
});
this._nodeEvents?.addListener("peer:discovery", (detail) => {
log.info({ peer: detail }, "Discovered peer");
});
this._nodeEvents?.addListener("gossipsub:message", (detail) => {
this._nodeEvents?.addListener("gossipsub:message", (detail: GossipsubMessage) => {
log.debug({
identity: this.identity,
gossipMessageId: detail.msgId,
Expand All @@ -562,8 +565,11 @@ export class GossipNode extends TypedEmitter<NodeEvents> {
if (this.gossipTopics().includes(detail.msg.topic)) {
try {
let data: Buffer;
if (detail.msg.data.type === "Buffer") {
data = Buffer.from(detail.msg.data.data);
// some kind of serialization quirk?
// biome-ignore lint/suspicious/noExplicitAny: legacy code, avoid using ignore for new code
if ((detail.msg.data as any).type === "Buffer") {
// biome-ignore lint/suspicious/noExplicitAny: legacy code, avoid using ignore for new code
data = Buffer.from((detail.msg.data as any).data);
} else {
data = Buffer.from(Object.values(detail.msg.data as unknown as Record<string, number>));
}
Expand Down Expand Up @@ -594,13 +600,13 @@ export class GossipNode extends TypedEmitter<NodeEvents> {
this._nodeEvents?.addListener("peer:discovery", (detail) => {
log.info({ identity: this.identity }, `Found peer: ${detail.multiaddrs} }`);
});
this._nodeEvents?.addListener("peer:connect", (detail) => {
this._nodeEvents?.addListener("connection:open", (detail: Connection) => {
log.info({ identity: this.identity }, `Connection established to: ${detail.remotePeer.toString()}`);
});
this._nodeEvents?.addListener("peer:disconnect", (detail) => {
this._nodeEvents?.addListener("connection:close", (detail: Connection) => {
log.info({ identity: this.identity }, `Disconnected from: ${detail.remotePeer.toString()} `);
});
this._nodeEvents?.addListener("message", (detail) => {
this._nodeEvents?.addListener("message", (detail: GossipSubMessage) => {
log.info(
// biome-ignore lint/suspicious/noExplicitAny: legacy code, avoid using ignore for new code
{ identity: this.identity, from: (detail as any)["from"] },
Expand Down
79 changes: 54 additions & 25 deletions apps/hubble/src/network/p2p/gossipNodeWorker.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import { parentPort, workerData } from "worker_threads";
import { peerIdFromBytes } from "@libp2p/peer-id";
import { autoNATService } from "libp2p/autonat";
import { identifyService } from "libp2p/identify";
import { pingService } from "libp2p/ping";
import { fetchService } from "libp2p/fetch";
import * as MultiAddr from "@multiformats/multiaddr";
import { Message as GossipSubMessage, PublishResult, TopicValidatorResult } from "@libp2p/interface-pubsub";
import { Message as GossipSubMessage, PublishResult, TopicValidatorResult, PubSub } from "@libp2p/interface-pubsub";
import {
GossipNode,
LibP2PNodeInterface,
Expand All @@ -27,7 +31,7 @@ import {
import { addressInfoFromParts, checkNodeAddrs, ipMultiAddrStrFromAddressInfo } from "../../utils/p2p.js";
import { createLibp2p, Libp2p } from "libp2p";
import { err, ok, Result, ResultAsync } from "neverthrow";
import { GossipSub, gossipsub } from "@chainsafe/libp2p-gossipsub";
import { GossipSub, gossipsub, GossipsubEvents } from "@chainsafe/libp2p-gossipsub";
import { ConnectionFilter } from "./connectionFilter.js";
import { tcp } from "@libp2p/tcp";
import { mplex } from "@libp2p/mplex";
Expand All @@ -40,6 +44,7 @@ import { initializeStatsd, statsd } from "../../utils/statsd.js";
import v8 from "v8";
import { MessageBundle } from "@farcaster/hub-nodejs";
import { BundleCreator } from "./bundleCreator.js";
import { Peer } from "@libp2p/interface-peer-store";

const MultiaddrLocalHost = "/ip4/127.0.0.1";
const APPLICATION_SCORE_CAP_DEFAULT = 10;
Expand Down Expand Up @@ -84,7 +89,7 @@ export class LibP2PNode {

/** Returns the GossipSub instance used by the Node */
get gossip() {
const pubsub = this._node?.pubsub;
const pubsub = this._node?.services["pubsub"];
return pubsub ? (pubsub as GossipSub) : undefined;
}

Expand Down Expand Up @@ -161,6 +166,7 @@ export class LibP2PNode {
seenTTL: GOSSIP_SEEN_TTL, // Bump up the default to handle large flood of messages. 2 mins was not sufficient to prevent a loop
scoreThresholds: { ...options.scoreThresholds },
scoreParams: {
...options.scoreParams,
appSpecificScore: (peerId) => {
const score = this._peerScores?.get(peerId) ?? 0;
if (options.allowlistedImmunePeers?.includes(peerId)) {
Expand Down Expand Up @@ -215,7 +221,14 @@ export class LibP2PNode {
],
streamMuxers: [mplex()],
connectionEncryption: [noise()],
pubsub: gossip,
services: {
identify: identifyService({}),
ping: pingService({}),
fetch: fetchService({}),
autoNAT: autoNATService({}),
pubsub: gossip,
},
start: false,
}),
(e) => {
log.error({ identity: this.identity, error: e }, "failed to create libp2p node");
Expand All @@ -228,10 +241,10 @@ export class LibP2PNode {

if (result.isErr()) {
return err(result.error);
} else {
this._node = result.value;
return ok(true);
}

this._node = result.value;
return ok(true);
}

async start() {
Expand Down Expand Up @@ -290,7 +303,7 @@ export class LibP2PNode {
* @param message - The message to generate an ID for
* @returns The message ID as an Uint8Array
*/
getMessageId(message: GossipSubMessage): Uint8Array {
getMessageId(message: GossipSubMessage): Uint8Array | Promise<Uint8Array> {
if (message.topic.includes(GossipNode.primaryTopicForNetwork(this._network))) {
// check if message is a Farcaster Protocol Message
const protocolMessage = LibP2PNode.decodeMessage(message.data);
Expand Down Expand Up @@ -351,12 +364,14 @@ export class LibP2PNode {
}

async addPeerToAddressBook(peerId: PeerId, multiaddr: MultiAddr.Multiaddr) {
const addressBook = this._node?.peerStore.addressBook;
if (!addressBook) {
log.error({}, "address book missing for gossipNode");
const store = this._node?.peerStore;
if (!store) {
log.error({}, "peer store missing for gossipNode");
} else {
const addResult = await ResultAsync.fromPromise(
addressBook.add(peerId, [multiaddr]),
store.merge(peerId, {
multiaddrs: [multiaddr],
}),
(error) => new HubError("unavailable", error as Error),
);
if (addResult.isErr()) {
Expand All @@ -383,11 +398,11 @@ export class LibP2PNode {
}
}

const addressBook = this._node?.peerStore.addressBook;
if (!addressBook) {
log.error({}, "address book missing for gossipNode");
const store = this._node?.peerStore;
if (!store) {
log.error({}, "peer store missing for gossipNode");
} else {
await addressBook.delete(peerId);
await store.delete(peerId);
}
}

Expand All @@ -403,15 +418,29 @@ export class LibP2PNode {
}

async getPeerAddresses(peerId: PeerId): Promise<MultiAddr.Multiaddr[]> {
const existingConnections = this._node?.getConnections(peerId);
for (const conn of existingConnections ?? []) {
const knownAddrs = await this._node?.peerStore.addressBook.get(peerId);
if (knownAddrs && !knownAddrs.find((addr) => addr.multiaddr.equals(conn.remoteAddr))) {
await this._node?.peerStore.addressBook.add(peerId, [conn.remoteAddr]);
if (!this._node) {
return [];
}

const existingConnections = this._node.getConnections(peerId);
const peer = await ResultAsync.fromPromise(this._node.peerStore.get(peerId), () => undefined);

if (peer.isOk()) {
const missing = existingConnections
.map((conn) => conn.remoteAddr)
.filter((addr) => !peer.value.addresses.find((a) => a.multiaddr.equals(addr)));
if (peer.value && missing.length !== 0) {
await this._node.peerStore.merge(peerId, {
multiaddrs: missing,
});
}
} else {
await this._node.peerStore.save(peerId, {
multiaddrs: existingConnections.map((conn) => conn.remoteAddr),
});
}

const addresses = (await this._node?.peerStore.get(peerId))?.addresses.map((addr) => addr.multiaddr);
const addresses = (await this._node.peerStore.get(peerId)).addresses.map((addr) => addr.multiaddr);
return addresses ?? [];
}

Expand Down Expand Up @@ -526,7 +555,7 @@ export class LibP2PNode {
const eventHandler = (eventName: string) => {
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
return (event: any) => {
// console.log("Worker: Reboardcasting ", eventName, event.detail);
// console.log("Worker: Rebroadcasting ", eventName, event.detail);
// console.log(" with ", JSON.stringify(event.detail, bigIntSerializer, 2));
parentPort?.postMessage({
event: {
Expand All @@ -537,8 +566,8 @@ export class LibP2PNode {
};
};

this._node?.addEventListener("peer:connect", eventHandler("peer:connect"));
this._node?.addEventListener("peer:disconnect", eventHandler("peer:disconnect"));
this._node?.addEventListener("connection:open", eventHandler("connection:open"));
this._node?.addEventListener("connection:close", eventHandler("connection:close"));
this._node?.addEventListener("peer:discovery", eventHandler("peer:discovery"));

this.gossip?.addEventListener("gossipsub:message", eventHandler("gossipsub:message"));
Expand Down
2 changes: 1 addition & 1 deletion apps/hubble/src/network/p2p/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export const msgIdFnStrictSign = (message: GossipSubMessage): Uint8Array => {
};

/* This has been imported from the libp2p-gossipsub implementation as it's not public there */
export function msgIdFnStrictNoSign(msg: GossipSubMessage): Uint8Array {
export function msgIdFnStrictNoSign(msg: GossipSubMessage): Uint8Array | Promise<Uint8Array> {
// Hashes the raw message data
return noSignMsgId(msg.data);
}
11 changes: 10 additions & 1 deletion apps/hubble/src/test/e2e/gossipNetwork.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,16 @@ describe("gossip network tests", () => {

beforeEach(async () => {
messageStore.clear();
await Promise.all(nodes.map((node) => node.start([])));
await Promise.all(
nodes.map((node) =>
node.start([], {
scoreParams: {
IPColocationFactorWeight: 0,
behaviourPenaltyWeight: 0,
},
}),
),
);
}, TEST_TIMEOUT_LONG);

afterEach(async () => {
Expand Down
Loading

0 comments on commit 095cca9

Please sign in to comment.