Skip to content

Commit

Permalink
fix: Accept raw databytes if using alternate protobuf encoding (#1508)
Browse files Browse the repository at this point in the history
  • Loading branch information
adityapk00 committed Oct 12, 2023
1 parent f7e755b commit ba86d37
Show file tree
Hide file tree
Showing 26 changed files with 6,609 additions and 25 deletions.
8 changes: 8 additions & 0 deletions .changeset/rude-needles-look.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@farcaster/hub-nodejs": patch
"@farcaster/hub-web": patch
"@farcaster/core": patch
"@farcaster/hubble": patch
---

feat: Allow signing raw message data bytes to support rust, Golang etc...
6 changes: 4 additions & 2 deletions apps/hubble/src/hubble.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import { HttpAPIServer } from "./rpc/httpServer.js";
import { SingleBar } from "cli-progress";
import { exportToProtobuf } from "@libp2p/peer-id-factory";
import OnChainEventStore from "./storage/stores/onChainEventStore.js";
import { ensureMessageData } from "./storage/db/message.js";

export type HubSubmitSource = "gossip" | "rpc" | "eth-provider" | "l2-provider" | "sync" | "fname-registry";

Expand Down Expand Up @@ -1113,9 +1114,9 @@ export class Hub implements HubInterface {
/* RPC Handler API */
/* -------------------------------------------------------------------------- */

async submitMessage(message: Message, source?: HubSubmitSource): HubAsyncResult<number> {
async submitMessage(submittedMessage: Message, source?: HubSubmitSource): HubAsyncResult<number> {
// message is a reserved key in some logging systems, so we use submittedMessage instead
const logMessage = log.child({ submittedMessage: messageToLog(message), source });
const logMessage = log.child({ submittedMessage: messageToLog(submittedMessage), source });

if (this.syncEngine.syncTrieQSize > MAX_MESSAGE_QUEUE_SIZE) {
log.warn({ syncTrieQSize: this.syncEngine.syncTrieQSize }, "SubmitMessage rejected: Sync trie queue is full");
Expand All @@ -1124,6 +1125,7 @@ export class Hub implements HubInterface {

const start = Date.now();

const message = ensureMessageData(submittedMessage);
const mergeResult = await this.engine.mergeMessage(message);

mergeResult.match(
Expand Down
3 changes: 2 additions & 1 deletion apps/hubble/src/network/sync/merkleTrie.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
} from "../../storage/db/types.js";
import { logger } from "../../utils/logger.js";
import { getStatusdInitialization } from "../../utils/statsd.js";
import { messageDecode } from "../../storage/db/message.js";

/**
* Represents a node in the trie, and it's immediate children
Expand Down Expand Up @@ -234,7 +235,7 @@ class MerkleTrie {
const postfix = (key as Buffer).readUint8(1 + FID_BYTES);
if (postfix < UserMessagePostfixMax) {
const message = Result.fromThrowable(
() => Message.decode(new Uint8Array(value as Buffer)),
() => messageDecode(new Uint8Array(value as Buffer)),
(e) => e as HubError,
)();
if (message.isOk() && message.value.hash.length === HASH_LENGTH) {
Expand Down
57 changes: 49 additions & 8 deletions apps/hubble/src/storage/db/message.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { bytesIncrement, CastId, HubError, HubResult, Message, MessageType } from "@farcaster/hub-nodejs";
import { err, ok, ResultAsync } from "neverthrow";
import { bytesIncrement, CastId, HubError, HubResult, Message, MessageData, MessageType } from "@farcaster/hub-nodejs";
import { err, ok, Result, ResultAsync } from "neverthrow";
import RocksDB, { Iterator, Transaction } from "./rocksdb.js";
import { FID_BYTES, RootPrefix, TRUE_VALUE, UserMessagePostfix, UserMessagePostfixMax, UserPostfix } from "./types.js";
import { MessagesPage, PAGE_SIZE_MAX, PageOptions } from "../stores/types.js";
Expand Down Expand Up @@ -114,7 +114,7 @@ export const getMessage = async <T extends Message>(
tsHash: Uint8Array,
): Promise<T> => {
const buffer = await db.get(makeMessagePrimaryKey(fid, set, tsHash));
return Message.decode(new Uint8Array(buffer)) as T;
return messageDecode(new Uint8Array(buffer)) as T;
};

export const deleteMessage = (db: RocksDB, message: Message): Promise<void> => {
Expand All @@ -124,7 +124,7 @@ export const deleteMessage = (db: RocksDB, message: Message): Promise<void> => {

export const getManyMessages = async <T extends Message>(db: RocksDB, primaryKeys: Buffer[]): Promise<T[]> => {
const buffers = await db.getMany(primaryKeys);
return buffers.map((buffer) => Message.decode(new Uint8Array(buffer)) as T);
return buffers.map((buffer) => messageDecode(new Uint8Array(buffer)) as T);
};

export const getManyMessagesByFid = async <T extends Message>(
Expand All @@ -151,7 +151,7 @@ export const getAllMessagesByFid = async (db: RocksDB, fid: number): Promise<Mes

const messages: Message[] = [];
await db.forEachIterator((_key, buffer) => {
messages.push(Message.decode(new Uint8Array(buffer as Buffer)));
messages.push(messageDecode(new Uint8Array(buffer as Buffer)));
}, iteratorOptions);

return messages;
Expand Down Expand Up @@ -200,7 +200,7 @@ export const getMessagesPageByPrefix = async <T extends Message>(

const getNextIteratorRecord = async (iterator: Iterator): Promise<[Buffer, Message]> => {
const [key, value] = await iterator.next();
return [key as Buffer, Message.decode(new Uint8Array(value as Buffer))];
return [key as Buffer, messageDecode(new Uint8Array(value as Buffer))];
};

let iteratorFinished = false;
Expand Down Expand Up @@ -287,9 +287,10 @@ export const putMessageTransaction = (txn: Transaction, message: Message): Trans
throw tsHash.error; // TODO: use result pattern
}
const primaryKey = makeMessagePrimaryKey(message.data.fid, typeToSetPostfix(message.data.type), tsHash.value);
const messageBuffer = Buffer.from(Message.encode(message).finish());
const bySignerKey = makeMessageBySignerKey(message.data.fid, message.signer, message.data.type, tsHash.value);
return txn.put(primaryKey, messageBuffer).put(bySignerKey, TRUE_VALUE);

const messageBuffer = messageEncode(message);
return txn.put(primaryKey, Buffer.from(messageBuffer)).put(bySignerKey, TRUE_VALUE);
};

export const deleteMessageTransaction = (txn: Transaction, message: Message): Transaction => {
Expand All @@ -304,3 +305,43 @@ export const deleteMessageTransaction = (txn: Transaction, message: Message): Tr
const bySignerKey = makeMessageBySignerKey(message.data.fid, message.signer, message.data.type, tsHash.value);
return txn.del(bySignerKey).del(primaryKey);
};

// If the message's data_bytes is set, then we'll not store the data field in the DB.
// to save space. Instead, we'll store the message with the data_bytes field set, and
// then when we read the message, we'll decode it, set the data field
export const messageEncode = (message: Message): Uint8Array => {
if (message.dataBytes && message.dataBytes.length > 0) {
const cloned = Message.decode(Message.encode(message).finish());
cloned.data = undefined;
return Message.encode(cloned).finish();
} else {
return Message.encode(message).finish();
}
};

// If the message's data_bytes is set, then we'll decode it and populate the data field
// to make the Message object easier to work with
export const messageDecode = (messageBytes: Uint8Array): Message => {
const message = Message.decode(messageBytes);
if (message.dataBytes && message.dataBytes.length > 0) {
message.data = MessageData.decode(message.dataBytes);
}
return message;
};

// Ensure that the message has a data field set, by decoding the data_bytes field if it exists
export const ensureMessageData = (message: Message): Message => {
// If the message has a data_bytes field set, use that instead of the data field
if (message.dataBytes && message.dataBytes.length > 0) {
const decodedMessageData = Result.fromThrowable(
// biome-ignore lint/style/noNonNullAssertion: <explanation>
() => MessageData.decode(message.dataBytes!),
(e) => e,
)();
if (decodedMessageData.isOk()) {
message.data = decodedMessageData.value;
}
}

return message;
};
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import RocksDB from "../rocksdb.js";
import { makeTsHash, putMessageTransaction } from "../message.js";
import OnChainEventStore from "../../stores/onChainEventStore.js";

const db = jestRocksDB("clearEvents.migration.test");
const db = jestRocksDB("uniqueverifications.migration.test");

describe("uniqueVerifications migration", () => {
const putVerificationMessage = async (
Expand Down
235 changes: 235 additions & 0 deletions apps/hubble/src/storage/engine/messageDataBytes.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
import {
CastAddMessage,
Factories,
FarcasterNetwork,
Message,
MessageData,
OnChainEvent,
bytesCompare,
bytesDecrement,
} from "@farcaster/hub-nodejs";
import { ensureMessageData, messageDecode, messageEncode } from "../db/message.js";
import { jestRocksDB } from "../db/jestUtils.js";
import Engine from "./index.js";
import { blake3Truncate160 } from "../../utils/crypto.js";

const db = jestRocksDB("protobufs.messageDataBytes.test");
const network = FarcasterNetwork.TESTNET;
const engine = new Engine(db, network);
const fid = Factories.Fid.build();
const signer = Factories.Ed25519Signer.build();
const custodySigner = Factories.Eip712Signer.build();

let castAdd: CastAddMessage;

const cloneMessage = (message: Message): Message => {
return Message.decode(Message.encode(message).finish());
};

describe("messageDataBytes", () => {
beforeAll(async () => {
castAdd = await Factories.CastAddMessage.create({
data: { fid, network, castAddBody: { text: "This is a cast" } },
});
});

test("encode decode test", async () => {
const encoded = messageEncode(castAdd);
const decoded = messageDecode(encoded);

expect(Message.toJSON(decoded)).toEqual(Message.toJSON(castAdd));
});

test("message data bytes to message.data", async () => {
const castAddClone = cloneMessage(castAdd);
castAddClone.data = undefined;
castAddClone.dataBytes = MessageData.encode(castAdd.data).finish();

const decoded = messageDecode(Message.encode(castAddClone).finish());
// biome-ignore lint/style/noNonNullAssertion: <explanation>
expect(MessageData.toJSON(decoded.data!)).toEqual(MessageData.toJSON(castAdd.data));
});

test("ensure message.data", async () => {
const castAddClone = cloneMessage(castAdd);
castAddClone.data = undefined;
castAddClone.dataBytes = MessageData.encode(castAdd.data).finish();

const ensured = ensureMessageData(castAddClone);
// biome-ignore lint/style/noNonNullAssertion: <explanation>
expect(MessageData.toJSON(ensured.data!)).toEqual(MessageData.toJSON(castAdd.data));
});

describe("With engine", () => {
let castAdd: CastAddMessage;
let custodyEvent: OnChainEvent;
let signerEvent: OnChainEvent;
let storageEvent: OnChainEvent;

beforeAll(async () => {
const signerKey = (await signer.getSignerKey())._unsafeUnwrap();
const custodySignerKey = (await custodySigner.getSignerKey())._unsafeUnwrap();
custodyEvent = Factories.IdRegistryOnChainEvent.build({ fid }, { transient: { to: custodySignerKey } });
signerEvent = Factories.SignerOnChainEvent.build({ fid }, { transient: { signer: signerKey } });
storageEvent = Factories.StorageRentOnChainEvent.build({ fid });

castAdd = await Factories.CastAddMessage.create({ data: { fid, network } }, { transient: { signer } });
});

beforeEach(async () => {
await engine.mergeOnChainEvent(custodyEvent);
await engine.mergeOnChainEvent(signerEvent);
await engine.mergeOnChainEvent(storageEvent);
});

test("merges with dataBytes", async () => {
const castAddClone = Message.decode(Message.encode(castAdd).finish());
castAddClone.data = undefined;
castAddClone.dataBytes = MessageData.encode(castAdd.data).finish();

// Try and merge
const result = await engine.mergeMessage(ensureMessageData(castAddClone));
expect(result.isOk()).toBeTruthy();

const fetched = await engine.getCast(fid, castAdd.hash);

expect(fetched.isOk()).toBeTruthy();
expect(MessageData.toJSON(fetched._unsafeUnwrap().data)).toEqual(MessageData.toJSON(castAdd.data));
});

test("fails without ensureData", async () => {
const castAddClone = cloneMessage(castAdd);
castAddClone.data = undefined;
castAddClone.dataBytes = MessageData.encode(castAdd.data).finish();

// Try and merge without calling ensureMessageData. This will fail
const result = await engine.mergeMessage(castAddClone);
expect(result.isErr()).toBeTruthy();
expect(result._unsafeUnwrapErr().message).toContain("message data is missing");
});

test("data sizes should match", async () => {
const castAddClone = cloneMessage(castAdd);
castAddClone.dataBytes = MessageData.encode(castAdd.data).finish();

const encodedBytesNoData = messageEncode(castAddClone);
const encodedBytesDefault = Message.encode(castAdd).finish();

expect(encodedBytesNoData.length).toEqual(encodedBytesDefault.length);
});

test("fails if hash doesn't match", async () => {
const castAddClone = cloneMessage(castAdd);
castAddClone.hash = new Uint8Array([0, 0, 0, 0]);
castAddClone.dataBytes = MessageData.encode(castAdd.data).finish();

const result = await engine.mergeMessage(castAddClone);
expect(result.isErr()).toBeTruthy();
expect(result._unsafeUnwrapErr().message).toContain("invalid hash");

// Change the hash
castAddClone.hash = bytesDecrement(castAdd.hash)._unsafeUnwrap();
const result2 = await engine.mergeMessage(castAddClone);
expect(result2.isErr()).toBeTruthy();
expect(result2._unsafeUnwrapErr().message).toContain("invalid hash");

// Change the data bytes
castAddClone.hash = castAdd.hash;
castAddClone.dataBytes = new Uint8Array([0, 0, 0, 0]);
const result3 = await engine.mergeMessage(castAddClone);
expect(result3.isErr()).toBeTruthy();
expect(result3._unsafeUnwrapErr().message).toContain("invalid hash");
});

test("fails if dataBytes is > 1024 bytes", async () => {
const castAddClone = cloneMessage(castAdd);
castAddClone.dataBytes = new Uint8Array(1025);

const result = await engine.mergeMessage(castAddClone);
expect(result.isErr()).toBeTruthy();
expect(result._unsafeUnwrapErr().message).toContain("dataBytes > 1024 bytes");
});

// This function re-encodes the fid with a different varint encoding, simulating what
// the Rust code would do.
const reencodeFidWithDifferentVarInt = (messageData: MessageData): Buffer => {
// Step 1: Encode the original message
const bytes = MessageData.encode(messageData).finish();

// Step 2: Find the varint bytes for the 'fid' field
const fidKey = 16; // 2 << 3 | 0
let index = -1;

for (let i = 0; i < bytes.length; i++) {
if (bytes[i] === fidKey) {
index = i + 1; // potentially where the varint bytes start
break;
}
}

if (index === -1) {
console.log("Field not found");
return Buffer.from([]);
}

// Extract the varint bytes of fid
let varintEndIndex = index;
while ((bytes[varintEndIndex] as number) > 127) {
varintEndIndex++;
}
varintEndIndex++; // Include the last byte, whose MSB should be 0

// const originalVarintBytes = bytes.slice(index, varintEndIndex);

// Step 3: Replace the varint bytes (hacky example)
// WARNING: This is a hacky way to manually manipulate the varint encoding.
// Only do this in tests.
const newVarintBytes = [];
let value = fid;
while (value >= 0x80) {
newVarintBytes.push((value & 0x7f) | 0x80);
value >>>= 7;
}
newVarintBytes.push(value);

// Add leading zero to the most significant byte
newVarintBytes[newVarintBytes.length - 1] |= 0x80;
newVarintBytes.push(0x00);

// Create the new bytes array with the alternative varint encoding for fid
const newBytes = Buffer.concat([bytes.slice(0, index), Buffer.from(newVarintBytes), bytes.slice(varintEndIndex)]);

// Decode and verify
// const decodedMsg = MessageData.decode(newBytes);
// console.log("Decoded message: ", decodedMsg);

return newBytes;
};

test("varint encoding", async () => {
const changedDataBytes = reencodeFidWithDifferentVarInt(castAdd.data);
expect(bytesCompare(changedDataBytes, MessageData.encode(castAdd.data).finish()) !== 0).toBeTruthy();

const castAddClone = cloneMessage(castAdd);
castAddClone.data = undefined;
castAddClone.dataBytes = changedDataBytes;
castAddClone.hash = blake3Truncate160(changedDataBytes);

// Try and merge. This has a different varint encoding for the fid field
// like the one that would be produced by the Rust code.
// But we've not updated the signature, so this will fail
const result = await engine.mergeMessage(ensureMessageData(castAddClone));
expect(result.isErr()).toBeTruthy();
expect(result._unsafeUnwrapErr().message).toContain("invalid signature");

// Update the signature, and then merge
castAddClone.signature = (await signer.signMessageHash(castAddClone.hash))._unsafeUnwrap();
const result2 = await engine.mergeMessage(ensureMessageData(castAddClone));
expect(result2.isOk()).toBeTruthy();

const fetched = await engine.getCast(fid, castAddClone.hash);
expect(fetched.isOk()).toBeTruthy();
expect(MessageData.toJSON(fetched._unsafeUnwrap().data)).toEqual(MessageData.toJSON(castAdd.data));
});
});
});

0 comments on commit ba86d37

Please sign in to comment.