Skip to content

Commit

Permalink
chore: Calculate earliestTimestamp and numMessages on demand instead …
Browse files Browse the repository at this point in the history
…of at startup (#1506)
  • Loading branch information
adityapk00 committed Oct 12, 2023
1 parent b518b97 commit 86bed6f
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 51 deletions.
5 changes: 5 additions & 0 deletions .changeset/old-students-jam.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/hubble": patch
---

fix: Make message counts on-demand to speed startup
16 changes: 8 additions & 8 deletions apps/hubble/src/storage/stores/storageCache.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ describe("processEvent", () => {

await cache.syncFromDb();
await expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(0));
cache.processEvent(event);
await cache.processEvent(event);
await expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(1));
});

Expand All @@ -160,7 +160,7 @@ describe("processEvent", () => {

await cache.syncFromDb();
await expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(0));
cache.processEvent(event);
await cache.processEvent(event);
await expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(1));
});

Expand All @@ -178,7 +178,7 @@ describe("processEvent", () => {
await putMessage(db, cast);
await cache.syncFromDb();
await expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(1));
cache.processEvent(event);
await cache.processEvent(event);
await expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(1));
});

Expand All @@ -190,7 +190,7 @@ describe("processEvent", () => {
await putMessage(db, message);
await cache.syncFromDb();
await expect(cache.getMessageCount(fid, UserPostfix.ReactionMessage)).resolves.toEqual(ok(1));
cache.processEvent(event);
await cache.processEvent(event);
await expect(cache.getMessageCount(fid, UserPostfix.ReactionMessage)).resolves.toEqual(ok(0));
});

Expand All @@ -202,7 +202,7 @@ describe("processEvent", () => {
await putMessage(db, message);
await cache.syncFromDb();
await expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(1));
cache.processEvent(event);
await cache.processEvent(event);
await expect(cache.getMessageCount(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(0));
});

Expand All @@ -214,7 +214,7 @@ describe("processEvent", () => {

// Earliest tsHash is undefined initially
await expect(cache.getEarliestTsHash(fid, UserPostfix.CastMessage)).resolves.toEqual(ok(undefined));
cache.processEvent(event);
await cache.processEvent(event);

// Earliest tsHash is set
await expect(cache.getEarliestTsHash(fid, UserPostfix.CastMessage)).resolves.toEqual(
Expand All @@ -226,7 +226,7 @@ describe("processEvent", () => {
data: { fid, timestamp: middleMessage.data.timestamp + 10 },
});
event = HubEvent.create({ type: HubEventType.MERGE_MESSAGE, mergeMessageBody: { message: laterMessage } });
cache.processEvent(event);
await cache.processEvent(event);
await expect(cache.getEarliestTsHash(fid, UserPostfix.CastMessage)).resolves.toEqual(
makeTsHash(middleMessage.data.timestamp, middleMessage.hash),
);
Expand All @@ -236,7 +236,7 @@ describe("processEvent", () => {
data: { fid, timestamp: middleMessage.data.timestamp - 10 },
});
event = HubEvent.create({ type: HubEventType.MERGE_MESSAGE, mergeMessageBody: { message: earlierMessage } });
cache.processEvent(event);
await cache.processEvent(event);
await expect(cache.getEarliestTsHash(fid, UserPostfix.CastMessage)).resolves.toEqual(
makeTsHash(earlierMessage.data.timestamp, earlierMessage.hash),
);
Expand Down
103 changes: 60 additions & 43 deletions apps/hubble/src/storage/stores/storageCache.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import {
HubError,
HubEvent,
HubResult,
isMergeMessageHubEvent,
isMergeOnChainHubEvent,
isMergeUsernameProofHubEvent,
Expand All @@ -20,6 +19,7 @@ import { makeFidKey, makeMessagePrimaryKey, makeTsHash, typeToSetPostfix } from
import { bytesCompare, getFarcasterTime, HubAsyncResult } from "@farcaster/core";
import { forEachOnChainEvent } from "../db/onChainEvent.js";
import { addProgressBar } from "../../utils/progressBars.js";
import { sleep } from "../../utils/crypto.js";

const makeKey = (fid: number, set: UserMessagePostfix): string => {
return Buffer.concat([makeFidKey(fid), Buffer.from([set])]).toString("hex");
Expand Down Expand Up @@ -47,7 +47,6 @@ export class StorageCache {

async syncFromDb(): Promise<void> {
log.info("starting storage cache sync");
const usage = new Map<string, number>();

const start = Date.now();

Expand All @@ -63,32 +62,6 @@ export class StorageCache {

const progressBar = addProgressBar("Syncing storage cache", totalFids * 2);

let lastFid = 0;
const prefix = Buffer.from([RootPrefix.User]);
await this._db.forEachIteratorByPrefix(
prefix,
async (key) => {
const postfix = (key as Buffer).readUint8(1 + FID_BYTES);
if (postfix < UserMessagePostfixMax) {
const lookupKey = (key as Buffer).subarray(1, 1 + FID_BYTES + 1).toString("hex");
const fid = (key as Buffer).subarray(1, 1 + FID_BYTES).readUInt32BE();
const count = usage.get(lookupKey) ?? 0;
if (this._earliestTsHashes.get(lookupKey) === undefined) {
const tsHash = Uint8Array.from((key as Buffer).subarray(1 + FID_BYTES + 1));
this._earliestTsHashes.set(lookupKey, tsHash);
}
usage.set(lookupKey, count + 1);

if (lastFid !== fid) {
progressBar?.increment();
lastFid = fid;
}
}
},
{ values: false },
15 * 60 * 1000, // 15 minutes
);

const time = getFarcasterTime();
if (time.isErr()) {
log.error({ err: time.error }, "could not obtain time");
Expand All @@ -112,23 +85,67 @@ export class StorageCache {
progressBar?.update(progressBar?.getTotal());
progressBar?.stop();

this._counts = usage;
this._counts = new Map();
this._earliestTsHashes = new Map();

// Start prepopulating the cache in the background
this.prepopulateMessageCounts();

log.info({ timeTakenMs: Date.now() - start }, "storage cache synced");
}

async prepopulateMessageCounts(): Promise<void> {
let prevFid = 0;
let prevPostfix = 0;

const start = Date.now();
log.info("starting storage cache prepopulation");

const prefix = Buffer.from([RootPrefix.User]);
await this._db.forEachIteratorByPrefix(
prefix,
async (key) => {
const postfix = (key as Buffer).readUint8(1 + FID_BYTES);
if (postfix < UserMessagePostfixMax) {
const fid = (key as Buffer).subarray(1, 1 + FID_BYTES).readUInt32BE();

if (prevFid !== fid || prevPostfix !== postfix) {
await this.getMessageCount(fid, postfix);

if (prevFid !== fid) {
// Sleep to allow other threads to run between each fid
await sleep(1);
}

prevFid = fid;
prevPostfix = postfix;
}
}
},
{ values: false },
1 * 60 * 60 * 1000, // 1 hour
);
log.info({ timeTakenMs: Date.now() - start }, "storage cache prepopulation finished");
}

async getMessageCount(fid: number, set: UserMessagePostfix): HubAsyncResult<number> {
const key = makeKey(fid, set);
if (this._counts.get(key) === undefined) {
let total = 0;
await this._db.forEachIteratorByPrefix(
makeMessagePrimaryKey(fid, set),
() => {
const count = this._counts.get(key) ?? 0;
this._counts.set(key, count + 1);
total += 1;
},
{ keys: false, valueAsBuffer: true },
{ keys: false, values: false },
);

// Recheck the count in case it was set by another thread (i.e. no race conditions)
if (this._counts.get(key) === undefined) {
this._counts.set(key, total);
}
}

return ok(this._counts.get(key) ?? 0);
}

Expand Down Expand Up @@ -202,34 +219,34 @@ export class StorageCache {
}
}

processEvent(event: HubEvent): HubResult<void> {
async processEvent(event: HubEvent): HubAsyncResult<void> {
if (isMergeMessageHubEvent(event)) {
this.addMessage(event.mergeMessageBody.message);
await this.addMessage(event.mergeMessageBody.message);
for (const message of event.mergeMessageBody.deletedMessages) {
this.removeMessage(message);
await this.removeMessage(message);
}
} else if (isPruneMessageHubEvent(event)) {
this.removeMessage(event.pruneMessageBody.message);
await this.removeMessage(event.pruneMessageBody.message);
} else if (isRevokeMessageHubEvent(event)) {
this.removeMessage(event.revokeMessageBody.message);
await this.removeMessage(event.revokeMessageBody.message);
} else if (isMergeUsernameProofHubEvent(event)) {
if (event.mergeUsernameProofBody.usernameProofMessage) {
this.addMessage(event.mergeUsernameProofBody.usernameProofMessage);
await this.addMessage(event.mergeUsernameProofBody.usernameProofMessage);
} else if (event.mergeUsernameProofBody.deletedUsernameProofMessage) {
this.removeMessage(event.mergeUsernameProofBody.deletedUsernameProofMessage);
await this.removeMessage(event.mergeUsernameProofBody.deletedUsernameProofMessage);
}
} else if (isMergeOnChainHubEvent(event) && isStorageRentOnChainEvent(event.mergeOnChainEventBody.onChainEvent)) {
this.addRent(event.mergeOnChainEventBody.onChainEvent);
}
return ok(undefined);
}

private addMessage(message: Message): void {
private async addMessage(message: Message): Promise<void> {
if (message.data !== undefined) {
const set = typeToSetPostfix(message.data.type);
const fid = message.data.fid;
const key = makeKey(fid, set);
const count = this._counts.get(key) ?? 0;
const count = this._counts.get(key) ?? (await this.getMessageCount(fid, set)).unwrapOr(0);
this._counts.set(key, count + 1);

const tsHashResult = makeTsHash(message.data.timestamp, message.hash);
Expand All @@ -244,12 +261,12 @@ export class StorageCache {
}
}

private removeMessage(message: Message): void {
private async removeMessage(message: Message): Promise<void> {
if (message.data !== undefined) {
const set = typeToSetPostfix(message.data.type);
const fid = message.data.fid;
const key = makeKey(fid, set);
const count = this._counts.get(key) ?? 0;
const count = this._counts.get(key) ?? (await this.getMessageCount(fid, set)).unwrapOr(0);
if (count === 0) {
log.error(`error: ${set} store message count is already at 0 for fid ${fid}`);
} else {
Expand Down

0 comments on commit 86bed6f

Please sign in to comment.