Skip to content

Commit

Permalink
fix: enable asyncValidation so we don't forward invalid messages on g…
Browse files Browse the repository at this point in the history
…ossip (#1520)
  • Loading branch information
sanjayprabhu committed Oct 14, 2023
1 parent 5cff3ff commit 89ce7d2
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 27 deletions.
5 changes: 5 additions & 0 deletions .changeset/khaki-shoes-jump.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/hubble": patch
---

fix: enable asyncValidation so we don't forward invalid messages on gossip
13 changes: 8 additions & 5 deletions apps/hubble/src/hubble.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
ClientOptions,
} from "@farcaster/hub-nodejs";
import { PeerId } from "@libp2p/interface-peer-id";
import { peerIdFromBytes } from "@libp2p/peer-id";
import { peerIdFromBytes, peerIdFromString } from "@libp2p/peer-id";
import { publicAddressesFirst } from "@libp2p/utils/address-sort";
import { Multiaddr, multiaddr } from "@multiformats/multiaddr";
import { Result, ResultAsync, err, ok } from "neverthrow";
Expand Down Expand Up @@ -892,7 +892,7 @@ export class Hub implements HubInterface {
/* Private Methods */
/* -------------------------------------------------------------------------- */

private async handleGossipMessage(gossipMessage: GossipMessage, source: PeerId): HubAsyncResult<void> {
private async handleGossipMessage(gossipMessage: GossipMessage, source: PeerId, msgId: string): HubAsyncResult<void> {
const peerIdResult = Result.fromThrowable(
() => peerIdFromBytes(gossipMessage.peerId ?? new Uint8Array([])),
(error) => new HubError("bad_request.parse_failure", error as Error),
Expand All @@ -916,13 +916,16 @@ export class Hub implements HubInterface {

// Merge the message
const result = await this.submitMessage(message, "gossip");
if (result.isErr()) {
if (result.isOk()) {
this.gossipNode.reportValid(msgId, peerIdFromString(source.toString()).toBytes());
} else {
log.info(
{
errCode: result.error.errCode,
peerId: source.toString(),
origin: peerIdResult.value,
hash: bytesToHexString(message.hash).unwrapOr(""),
msgId,
},
"Received bad gossip message from peer",
);
Expand Down Expand Up @@ -1096,10 +1099,10 @@ export class Hub implements HubInterface {
await this.gossipNode.subscribe(this.gossipNode.primaryTopic());
await this.gossipNode.subscribe(this.gossipNode.contactInfoTopic());

this.gossipNode.on("message", async (_topic, message, source) => {
this.gossipNode.on("message", async (_topic, message, source, msgId) => {
await message.match(
async (gossipMessage: GossipMessage) => {
await this.handleGossipMessage(gossipMessage, source);
await this.handleGossipMessage(gossipMessage, source, msgId);
},
async (error: HubError) => {
log.error(error, "failed to decode message");
Expand Down
18 changes: 15 additions & 3 deletions apps/hubble/src/network/p2p/gossipNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { TypedEmitter } from "tiny-typed-emitter";
import { logger } from "../../utils/logger.js";
import { PeriodicPeerCheckScheduler } from "./periodicPeerCheck.js";
import { GOSSIP_PROTOCOL_VERSION } from "./protocol.js";
import { AddrInfo } from "@chainsafe/libp2p-gossipsub/types";
import { AddrInfo, MsgIdStr } from "@chainsafe/libp2p-gossipsub/types";
import { PeerScoreThresholds } from "@chainsafe/libp2p-gossipsub/score";
import { statsd } from "../../utils/statsd.js";
import { createFromProtobuf, exportToProtobuf } from "@libp2p/peer-id-factory";
Expand All @@ -35,7 +35,7 @@ const workerLog = logger.child({ component: "GossipNodeWorker" });
/** Events emitted by a Farcaster Gossip Node */
interface NodeEvents {
/** Triggers on receipt of a new message and includes the topic and message contents */
message: (topic: string, message: HubResult<GossipMessage>, source: PeerId) => void;
message: (topic: string, message: HubResult<GossipMessage>, source: PeerId, msgId: string) => void;
/** Triggers when a peer connects and includes the libp2p Connection object*/
peerConnect: (connection: Connection) => void;
/** Triggers when a peer disconnects and includes the libp2p Connection object */
Expand Down Expand Up @@ -84,6 +84,7 @@ export interface LibP2PNodeInterface {
subscribe: (topic: string) => Promise<void>;
gossipMessage: (message: Uint8Array) => Promise<SuccessOrError & { peerIds: Uint8Array[] }>;
gossipContactInfo: (contactInfo: Uint8Array) => Promise<SuccessOrError & { peerIds: Uint8Array[] }>;
reportValid: (messageId: string, propagationSource: Uint8Array) => Promise<void>;
}

// Extract the method names (as strings) from the LibP2PNodeInterface
Expand Down Expand Up @@ -385,7 +386,14 @@ export class GossipNode extends TypedEmitter<NodeEvents> {
} else {
data = Buffer.from(Object.values(detail.msg.data as unknown as Record<string, number>));
}
this.emit("message", detail.msg.topic, GossipNode.decodeMessage(data), detail.propagationSource);
const messageId = detail.msg;
this.emit(
"message",
detail.msg.topic,
GossipNode.decodeMessage(data),
detail.propagationSource,
detail.msgId,
);
} catch (e) {
logger.error({ e, data: detail.msg.data }, "Failed to decode message");
}
Expand Down Expand Up @@ -463,6 +471,10 @@ export class GossipNode extends TypedEmitter<NodeEvents> {
this.callMethod("updateDeniedPeerIds", peerIds);
}

reportValid(messageId: string, propagationSource: Uint8Array) {
this.callMethod("reportValid", messageId, propagationSource);
}

/* -------------------------------------------------------------------------- */
/* Private Methods */
/* -------------------------------------------------------------------------- */
Expand Down
25 changes: 19 additions & 6 deletions apps/hubble/src/network/p2p/gossipNodeWorker.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { parentPort, workerData } from "worker_threads";
import { peerIdFromBytes } from "@libp2p/peer-id";
import * as MultiAddr from "@multiformats/multiaddr";
import { Message as GossipSubMessage, PublishResult } from "@libp2p/interface-pubsub";
import { Message as GossipSubMessage, PublishResult, TopicValidatorResult } from "@libp2p/interface-pubsub";
import {
GossipNode,
LibP2PNodeInterface,
LibP2PNodeMessage,
LibP2PNodeMethodGenericMessage,
NodeOptions,
GossipNode,
LibP2PNodeMethodReturnType,
LibP2PNodeInterface,
NodeOptions,
} from "./gossipNode.js";
import {
ContactInfoContent,
Expand All @@ -21,8 +21,8 @@ import {
Message,
} from "@farcaster/hub-nodejs";
import { addressInfoFromParts, checkNodeAddrs, ipMultiAddrStrFromAddressInfo } from "../../utils/p2p.js";
import { Libp2p, createLibp2p } from "libp2p";
import { Result, ResultAsync, err, ok } from "neverthrow";
import { createLibp2p, Libp2p } from "libp2p";
import { err, ok, Result, ResultAsync } from "neverthrow";
import { GossipSub, gossipsub } from "@chainsafe/libp2p-gossipsub";
import { ConnectionFilter } from "./connectionFilter.js";
import { tcp } from "@libp2p/tcp";
Expand Down Expand Up @@ -109,6 +109,7 @@ export class LibP2PNode {
const gossip = gossipsub({
emitSelf: false,
allowPublishToZeroPeers: true,
asyncValidation: true, // Do not forward messages until we've merged it (prevents forwarding known bad messages)
globalSignaturePolicy: "StrictSign",
msgIdFn: this.getMessageId.bind(this),
directPeers: options.directPeers || [],
Expand Down Expand Up @@ -372,6 +373,10 @@ export class LibP2PNode {
}
}

async reportValid(messageId: string, propagationSource: PeerId) {
this.gossip?.reportMessageValidationResult(messageId, propagationSource, TopicValidatorResult.Accept);
}

registerEventListeners() {
// When serializing data, we need to handle some data types specially.
// 1, BigInts are not supported by JSON.stringify, so we convert them to strings
Expand Down Expand Up @@ -572,5 +577,13 @@ parentPort?.on("message", async (msg: LibP2PNodeMethodGenericMessage) => {
});
break;
}
case "reportValid": {
const specificMsg = msg as LibP2PNodeMessage<"reportValid">;
const [msgId, source] = specificMsg.args;
const sourceId = peerIdFromBytes(source);
await libp2pNode.reportValid(msgId, sourceId);
parentPort?.postMessage({ methodCallId, result: makeResult<"reportValid">(undefined) });
break;
}
}
});
66 changes: 53 additions & 13 deletions apps/hubble/src/test/e2e/gossipNetwork.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
import { GossipNode } from "../../network/p2p/gossipNode.js";
import { sleep } from "../../utils/crypto.js";
import { Factories, GossipMessage, MessageData } from "@farcaster/hub-nodejs";
import {
Factories,
GossipMessage,
isCastAddMessage,
isReactionAddMessage,
Message,
MessageData,
MessageType,
} from "@farcaster/hub-nodejs";
import { peerIdFromString } from "@libp2p/peer-id";

const NUM_NODES = 10;
const PROPAGATION_DELAY = 3 * 1000; // between 2 and 3 full heartbeat ticks
Expand Down Expand Up @@ -49,47 +58,78 @@ describe("gossip network tests", () => {
await sleep(PROPAGATION_DELAY);

await Promise.all(nodes.map(async (n) => expect((await n.allPeerIds()).length).toBeGreaterThanOrEqual(1)));

const randomNode = nodes[Math.floor(Math.random() * nodes.length)] as GossipNode;
// Add listeners that receive new GossipMessages and push them to the MessageStore
nodes.forEach((n) => {
{
n.addListener("message", async (topic, message) => {
n.addListener("message", async (topic, message, source, msgId) => {
expect(message.isOk()).toBeTruthy();

const peerId = n.peerId()?.toString() ?? "";
const existingTopics = messageStore.get(peerId) || new Map();
const existingMessages = existingTopics.get(topic) || [];

existingMessages.push(message._unsafeUnwrap());
const gossipMessage = message._unsafeUnwrap();
existingMessages.push(gossipMessage);
existingTopics.set(topic, existingMessages);
messageStore.set(peerId, existingTopics);

// we'll treat reaction add messages as invalid and everything else as valid
if (gossipMessage.message && !isReactionAddMessage(gossipMessage.message)) {
n.reportValid(msgId, peerIdFromString(source.toString()).toBytes());
}
});
n.registerDebugListeners();
}
});

// Create a message and send it to a random node
const message = await Factories.Message.create();
const randomNode = nodes[Math.floor(Math.random() * nodes.length)] as GossipNode;
const publishResult = await randomNode.gossipMessage(message);
expect(publishResult.isOk()).toBeTruthy();
expect(publishResult._unsafeUnwrap().recipients.length).toBeGreaterThan(0);
const validMessage = await Factories.CastAddMessage.create();
const invalidMessage = await Factories.ReactionAddMessage.create();
const validPublishResult = await randomNode.gossipMessage(validMessage);
expect(validPublishResult.isOk()).toBeTruthy();
expect(validPublishResult._unsafeUnwrap().recipients.length).toBeGreaterThan(0);
const invalidPublishResult = await randomNode.gossipMessage(invalidMessage);
expect(invalidPublishResult.isOk()).toBeTruthy();
expect(invalidPublishResult._unsafeUnwrap().recipients.length).toBeGreaterThan(0);

// Sleep 5 heartbeat ticks
await sleep(PROPAGATION_DELAY);

// Assert that every node except the sender has pushed the message into its MessageStore
const nonSenderNodes = nodes.filter((n) => n.peerId()?.toString() !== randomNode.peerId()?.toString());

let numReactionAddMessages = 0;
let numCastAddMessages = 0;

nonSenderNodes.map((n) => {
const topics = messageStore.get(n.peerId()?.toString() ?? "");
expect(topics).toBeDefined();
expect(topics?.has(primaryTopic)).toBeTruthy();
const topicMessages = topics?.get(primaryTopic) ?? [];
expect(topicMessages.length).toBe(1);
expect((topicMessages[0] as GossipMessage).message).toEqual(message);
let castAddMessage: Message | undefined;
let reactionAddMessage: Message | undefined;
expect(topicMessages.length).toBeGreaterThan(0);
for (const msg of topicMessages) {
if (msg.message && isReactionAddMessage(msg.message)) {
reactionAddMessage = msg.message;
numReactionAddMessages++;
} else {
castAddMessage = msg.message;
numCastAddMessages += 1;
}
}
// Cast add message must always be present, but it's ok for the reaction add message to be missing sometimes
expect(castAddMessage).toBeDefined();
});

expect(numCastAddMessages).toBe(NUM_NODES - 1);
// Reaction messages are not forwarded to all nodes because they are considered in valid. They stop after the first node.
// This is a test that asyncValidation is working as expected.
expect(numReactionAddMessages).toBe(1);

messageStore.clear();

// Make sure a message with data_bytes is also received
const messageWithDataBytes = await Factories.Message.create({ data: { castAddBody: { text: "data" } } });
messageWithDataBytes.dataBytes = MessageData.encode(messageWithDataBytes.data as MessageData).finish();
Expand All @@ -108,8 +148,8 @@ describe("gossip network tests", () => {
expect(topics).toBeDefined();
expect(topics?.has(primaryTopic)).toBeTruthy();
const topicMessages = topics?.get(primaryTopic) ?? [];
expect(topicMessages.length).toBe(2);
expect((topicMessages[1] as GossipMessage).message).toEqual(messageWithDataBytes);
expect(topicMessages.length).toBe(1);
expect((topicMessages[0] as GossipMessage).message).toEqual(messageWithDataBytes);
});
},
TEST_TIMEOUT_LONG,
Expand Down

0 comments on commit 89ce7d2

Please sign in to comment.