From 911dcb077fbcd7ddede2b12627571835df718ea6 Mon Sep 17 00:00:00 2001 From: jaipaljadeja Date: Fri, 14 Mar 2025 03:03:57 +0530 Subject: [PATCH 1/8] indexer: do not call factory for pending blocks --- packages/indexer/src/indexer.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/indexer/src/indexer.ts b/packages/indexer/src/indexer.ts index ff271343..1879d3ee 100644 --- a/packages/indexer/src/indexer.ts +++ b/packages/indexer/src/indexer.ts @@ -292,7 +292,7 @@ export async function run( let block: TBlock | null; // when factory mode - if (isFactoryMode) { + if (isFactoryMode && finality !== "pending") { assert(indexer.options.factory !== undefined); const [factoryBlock, mainBlock] = blocks; From d94fc854012bab35e2f04f1e68b886e517ba8ee6 Mon Sep 17 00:00:00 2001 From: jaipaljadeja Date: Fri, 14 Mar 2025 03:35:52 +0530 Subject: [PATCH 2/8] plugin-drizzle: handle pending blocks --- packages/plugin-drizzle/src/index.ts | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/packages/plugin-drizzle/src/index.ts b/packages/plugin-drizzle/src/index.ts index 04d06e19..b1219cba 100644 --- a/packages/plugin-drizzle/src/index.ts +++ b/packages/plugin-drizzle/src/index.ts @@ -128,6 +128,7 @@ export function drizzleStorage< let tableNames: string[] = []; let indexerId = ""; const alwaysReindex = process.env["APIBARA_ALWAYS_REINDEX"] === "true"; + let prevFinality: DataFinality | undefined; try { tableNames = Object.values((schema as TSchema) ?? db._.schema ?? {}).map( @@ -299,7 +300,8 @@ export function drizzleStorage< indexer.hooks.hook("handler:middleware", async ({ use }) => { use(async (context, next) => { try { - const { endCursor, finality } = context as { + const { endCursor, finality, cursor } = context as { + cursor: Cursor; endCursor: Cursor; finality: DataFinality; }; @@ -315,6 +317,15 @@ export function drizzleStorage< TSchema >; + if (prevFinality === "pending") { + // invalidate if previous block's finality was "pending" + await invalidate(tx, cursor, idColumn, indexerId); + + if (enablePersistence) { + await invalidateState({ tx, cursor, indexerId }); + } + } + if (finality !== "finalized") { await registerTriggers( tx, @@ -328,13 +339,15 @@ export function drizzleStorage< await next(); delete context[DRIZZLE_PROPERTY]; - if (enablePersistence) { + if (enablePersistence && finality !== "pending") { await persistState({ tx, endCursor, indexerId, }); } + + prevFinality = finality; }); if (finality !== "finalized") { From acced055dc8d0d2ffd435a3015664bdb8f6cde6c Mon Sep 17 00:00:00 2001 From: jaipaljadeja Date: Fri, 14 Mar 2025 03:36:06 +0530 Subject: [PATCH 3/8] plugin-drizzle: add tests for pending blocks --- packages/plugin-drizzle/tests/helper.ts | 4 +- packages/plugin-drizzle/tests/storage.test.ts | 410 ++++++++++++++++++ 2 files changed, 412 insertions(+), 2 deletions(-) diff --git a/packages/plugin-drizzle/tests/helper.ts b/packages/plugin-drizzle/tests/helper.ts index d6ef9326..ffaf2120 100644 --- a/packages/plugin-drizzle/tests/helper.ts +++ b/packages/plugin-drizzle/tests/helper.ts @@ -6,7 +6,7 @@ import { DrizzleStorageError } from "../src/utils"; export const testTable = pgTable("test", { id: serial("id").primaryKey(), blockNumber: integer("block_number").notNull(), - key: text("key"), + key: text("key").unique(), count: integer("count"), data: text("data"), createdAt: timestamp("created_at"), @@ -41,7 +41,7 @@ export async function migratePgliteDb(db: PgLiteDb) { CREATE TABLE IF NOT EXISTS test ( id SERIAL PRIMARY KEY, block_number INTEGER NOT NULL, - key TEXT, + key TEXT UNIQUE, count INTEGER, data TEXT, created_at TIMESTAMP diff --git a/packages/plugin-drizzle/tests/storage.test.ts b/packages/plugin-drizzle/tests/storage.test.ts index 2b8ab56f..6091ea1f 100644 --- a/packages/plugin-drizzle/tests/storage.test.ts +++ b/packages/plugin-drizzle/tests/storage.test.ts @@ -1263,4 +1263,414 @@ describe("Drizzle test", () => { ] `); }); + + it("should handle pending data correctly", async () => { + const db = await getPgliteDb(); + + // This test simulates the below scenario: + // 1. We have a pending block with transactions A, B, C + // 2. The final accepted block only contains A, C, D, E (B is missing) + // 3. We want to ensure that transaction B is not in the final table + const indexer = getMockIndexer({ + override: { + plugins: [drizzleStorage({ db, persistState: true })], + async transform({ endCursor, block: { data }, finality }) { + const { db: tx } = useDrizzleStorage(db); + + // Parse the data to get transactions + // In our mock, data will be a string like "PENDING_A_B_C" or "ACCEPTED_A_B_C" based on below mock stream + if (data) { + // Extract transaction IDs from the data string + const parts = data.split("_"); + const blockType = parts[0]; // "PENDING" or "ACCEPTED" + const txIds = parts.slice(1); // ["A", "B", "C"] or ["A", "C", "D", "E"] + + // For each transaction in the block + for (const txId of txIds) { + // Insert new transaction + await tx.insert(testTable).values({ + blockNumber: Number(endCursor?.orderKey), + key: txId, + count: txIds.length, + data: `${blockType}_${txId}`, + }); + } + } + }, + }, + }); + + // Create a custom mock client that simulates: + // 1. A pending block with transactions A, B, C + // 2. Then the same block number becomes accepted with transactions A, C, D, E (B is missing) + const mockClient = new MockClient( + (request, options) => { + return [ + // First a pending block with transactions A, B, C + { + _tag: "data", + data: { + cursor: { orderKey: 5000000n }, + endCursor: { orderKey: 5000001n }, + finality: "pending", + data: [{ data: "PENDING_A_B_C" }], + production: "backfill", + }, + }, + // Then the same block becomes accepted with transactions A, C, D, E (B is missing) + { + _tag: "data", + data: { + cursor: { orderKey: 5000000n }, + endCursor: { orderKey: 5000001n }, + finality: "accepted", + data: [{ data: "ACCEPTED_A_C_D_E" }], + production: "backfill", + }, + }, + ]; + }, + ); + + await run(mockClient, indexer); + + // Query the database to see what transactions were stored + const result = await db.select().from(testTable); + + // Sort by key for consistent test results + const sortedResult = result.sort((a, b) => + (a.key || "").localeCompare(b.key || ""), + ); + + // We expect to see only transactions from the accepted block (A, C, D, E) + // with the correct count (4) for each + expect( + sortedResult.map(({ id, createdAt, ...r }) => r), + ).toMatchInlineSnapshot(` + [ + { + "blockNumber": 5000001, + "count": 4, + "data": "ACCEPTED_A", + "key": "A", + }, + { + "blockNumber": 5000001, + "count": 4, + "data": "ACCEPTED_C", + "key": "C", + }, + { + "blockNumber": 5000001, + "count": 4, + "data": "ACCEPTED_D", + "key": "D", + }, + { + "blockNumber": 5000001, + "count": 4, + "data": "ACCEPTED_E", + "key": "E", + }, + ] + `); + + // Verify that transaction B is not in the final table + const hasTxB = result.some((row) => row.key === "B"); + expect(hasTxB).toBe(false); + + // Verify that all transactions from the accepted block have count = 4 + const acceptedTxs = result.filter((row) => + row.data?.startsWith("ACCEPTED_"), + ); + expect(acceptedTxs.length).toBe(4); // A, C, D, E + expect(acceptedTxs.every((row) => row.count === 4)).toBe(true); + }); + + it("should handle multiple pending blocks with updates", async () => { + const db = await getPgliteDb(); + + // This test simulates a more complex scenario: + // 1. First a pending block with transactions A, B + // 2. The same pending block updated with transactions A, B, C + // 3. The same pending block updated again with transactions A, C, D (B removed) + // 4. Finally the block becomes accepted with transactions A, C, D, E + // We want to ensure that only the final accepted transactions are in the table + + const indexer = getMockIndexer({ + override: { + plugins: [drizzleStorage({ db, persistState: true })], + async transform({ endCursor, block: { data }, finality }) { + const { db: tx } = useDrizzleStorage(db); + if (data) { + // Extract transaction IDs from the data string + const parts = data.split("_"); + const blockType = parts[0]; // "PENDING" or "ACCEPTED" + const txIds = parts.slice(1); // Transaction IDs + + // For each transaction in the block + for (const txId of txIds) { + // Insert new transaction + await tx.insert(testTable).values({ + blockNumber: Number(endCursor?.orderKey), + key: txId, + count: txIds.length, + data: `${blockType}_${txId}`, + }); + } + } + }, + }, + }); + + // Create a custom mock client that simulates the scenario + const mockClient = new MockClient( + (request, options) => { + return [ + // First pending block with transactions A, B + { + _tag: "data", + data: { + cursor: { orderKey: 5000000n }, + endCursor: { orderKey: 5000001n }, + finality: "pending", + data: [{ data: "PENDING_A_B" }], + production: "backfill", + }, + }, + // Same pending block updated with transactions A, B, C + { + _tag: "data", + data: { + cursor: { orderKey: 5000000n }, + endCursor: { orderKey: 5000001n }, + finality: "pending", + data: [{ data: "PENDING_A_B_C" }], + production: "backfill", + }, + }, + // Same pending block updated again with transactions A, C, D (B removed) + { + _tag: "data", + data: { + cursor: { orderKey: 5000000n }, + endCursor: { orderKey: 5000001n }, + finality: "pending", + data: [{ data: "PENDING_A_C_D" }], + production: "backfill", + }, + }, + // Finally the block becomes accepted with transactions A, C, D, E + { + _tag: "data", + data: { + cursor: { orderKey: 5000000n }, + endCursor: { orderKey: 5000001n }, + finality: "accepted", + data: [{ data: "ACCEPTED_A_C_D_E" }], + production: "backfill", + }, + }, + ]; + }, + ); + + await run(mockClient, indexer); + + // Query the database to see what transactions were stored + const result = await db.select().from(testTable); + + // Sort by key for consistent test results + const sortedResult = result.sort((a, b) => + (a.key || "").localeCompare(b.key || ""), + ); + + // We expect to see only transactions from the accepted block (A, C, D, E) + // with the correct count (4) for each + expect( + sortedResult.map(({ id, createdAt, ...r }) => r), + ).toMatchInlineSnapshot(` + [ + { + "blockNumber": 5000001, + "count": 4, + "data": "ACCEPTED_A", + "key": "A", + }, + { + "blockNumber": 5000001, + "count": 4, + "data": "ACCEPTED_C", + "key": "C", + }, + { + "blockNumber": 5000001, + "count": 4, + "data": "ACCEPTED_D", + "key": "D", + }, + { + "blockNumber": 5000001, + "count": 4, + "data": "ACCEPTED_E", + "key": "E", + }, + ] + `); + + // Verify that transaction B is not in the final table + const hasTxB = result.some((row) => row.key === "B"); + expect(hasTxB).toBe(false); + + // Verify that all transactions from the accepted block have count = 4 + const acceptedTxs = result.filter((row) => + row.data?.startsWith("ACCEPTED_"), + ); + expect(acceptedTxs.length).toBe(4); // A, C, D, E + expect(acceptedTxs.every((row) => row.count === 4)).toBe(true); + }); + + it("should not persist state for pending blocks", async () => { + const db = await getPgliteDb(); + // This test verifies that state is not persisted for pending blocks + // and that the checkpoint only advances for accepted blocks + + const indexer = getMockIndexer({ + override: { + plugins: [drizzleStorage({ db, persistState: true })], + async transform({ endCursor, block: { data }, finality }) { + const { db: tx } = useDrizzleStorage(db); + + // Insert a record for each block + await tx.insert(testTable).values({ + blockNumber: Number(endCursor?.orderKey), + key: `block_${Number(endCursor?.orderKey)}`, + count: 1, + data: `${finality}_${data}`, + }); + }, + }, + }); + + // Create a mock client that alternates between pending and accepted blocks + const mockClient = new MockClient( + (request, options) => { + return [ + // Block 1: Pending + { + _tag: "data", + data: { + cursor: { orderKey: 5000000n }, + endCursor: { orderKey: 5000001n }, + finality: "pending", + data: [{ data: "block1" }], + production: "backfill", + }, + }, + // Block 1: Accepted + { + _tag: "data", + data: { + cursor: { orderKey: 5000000n }, + endCursor: { orderKey: 5000001n }, + finality: "accepted", + data: [{ data: "block1" }], + production: "backfill", + }, + }, + // Block 2: Pending + { + _tag: "data", + data: { + cursor: { orderKey: 5000001n }, + endCursor: { orderKey: 5000002n }, + finality: "pending", + data: [{ data: "block2" }], + production: "backfill", + }, + }, + // Block 2: Accepted + { + _tag: "data", + data: { + cursor: { orderKey: 5000001n }, + endCursor: { orderKey: 5000002n }, + finality: "accepted", + data: [{ data: "block2" }], + production: "backfill", + }, + }, + // Block 3: Pending + { + _tag: "data", + data: { + cursor: { orderKey: 5000002n }, + endCursor: { orderKey: 5000003n }, + finality: "pending", + data: [{ data: "block3" }], + production: "backfill", + }, + }, + ]; + }, + ); + + await run(mockClient, indexer); + + // Check the database records + const result = await db.select().from(testTable); + + // data is invalidated before each transform if the prev block was pending + expect(result).toMatchInlineSnapshot(` + [ + { + "blockNumber": 5000001, + "count": 1, + "createdAt": null, + "data": "accepted_block1", + "id": 2, + "key": "block_5000001", + }, + { + "blockNumber": 5000002, + "count": 1, + "createdAt": null, + "data": "accepted_block2", + "id": 4, + "key": "block_5000002", + }, + { + "blockNumber": 5000003, + "count": 1, + "createdAt": null, + "data": "pending_block3", + "id": 5, + "key": "block_5000003", + }, + ] + `); + + // Check the checkpoints table + const checkpointsResult = await db.select().from(checkpoints); + + // should only have non-pending blocks in the checkpoint + expect(checkpointsResult).toMatchInlineSnapshot(` + [ + { + "id": "indexer_testing_default", + "orderKey": 5000002, + "uniqueKey": null, + }, + ] + `); + + // The checkpoint should be at block 2, which is the last accepted block + expect(checkpointsResult[0].orderKey).toBe(5000002); + + // Check the rollback table - it should have entries for all blocks + const rollbackResult = await db.select().from(reorgRollbackTable); + + // Verify that we have the correct number of operations in the rollback table + expect(rollbackResult.length).toBe(3); // 3 as last pending ones are invalidated + }); }); From 14ba5b10613d219477098ce7630433380713514a Mon Sep 17 00:00:00 2001 From: jaipaljadeja Date: Fri, 14 Mar 2025 03:36:29 +0530 Subject: [PATCH 4/8] plugin-mongo: handle pending blocks --- packages/plugin-mongo/src/index.ts | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/packages/plugin-mongo/src/index.ts b/packages/plugin-mongo/src/index.ts index 0e8c8504..a1bf7b0c 100644 --- a/packages/plugin-mongo/src/index.ts +++ b/packages/plugin-mongo/src/index.ts @@ -4,6 +4,7 @@ import type { DbOptions, MongoClient } from "mongodb"; import { generateIndexerId } from "@apibara/indexer/internal"; import { useInternalContext } from "@apibara/indexer/internal/plugins"; +import type { Cursor, DataFinality } from "@apibara/protocol"; import { cleanupStorage, finalize, invalidate } from "./mongo"; import { finalizeState, @@ -62,6 +63,7 @@ export function mongoStorage({ return defineIndexerPlugin((indexer) => { let indexerId = ""; const alwaysReindex = process.env["APIBARA_ALWAYS_REINDEX"] === "true"; + let prevFinality: DataFinality | undefined; indexer.hooks.hook("run:before", async () => { const { indexerName } = useInternalContext(); @@ -189,7 +191,11 @@ export function mongoStorage({ indexer.hooks.hook("handler:middleware", async ({ use }) => { use(async (context, next) => { - const { endCursor } = context; + const { endCursor, finality, cursor } = context as { + cursor: Cursor; + endCursor: Cursor; + finality: DataFinality; + }; if (!endCursor) { throw new MongoStorageError("end cursor is undefined"); @@ -199,10 +205,20 @@ export function mongoStorage({ const db = client.db(dbName, dbOptions); context[MONGO_PROPERTY] = new MongoStorage(db, session, endCursor); + if (prevFinality === "pending") { + // invalidate if previous block's finality was "pending" + await invalidate(db, session, cursor, collections); + + if (enablePersistence) { + await invalidateState({ db, session, cursor, indexerId }); + } + } + await next(); + delete context[MONGO_PROPERTY]; - if (enablePersistence) { + if (enablePersistence && finality !== "pending") { await persistState({ db, endCursor, @@ -210,6 +226,8 @@ export function mongoStorage({ indexerId, }); } + + prevFinality = finality; }); }); }); From 093dd3d21b4341f27681c50e9a43ef796628f5d0 Mon Sep 17 00:00:00 2001 From: jaipaljadeja Date: Fri, 14 Mar 2025 03:36:43 +0530 Subject: [PATCH 5/8] plugin-mongo: add tests for pending blocks --- packages/plugin-mongo/tests/storage.test.ts | 436 ++++++++++++++++++++ 1 file changed, 436 insertions(+) diff --git a/packages/plugin-mongo/tests/storage.test.ts b/packages/plugin-mongo/tests/storage.test.ts index 7ef79230..67e137b6 100644 --- a/packages/plugin-mongo/tests/storage.test.ts +++ b/packages/plugin-mongo/tests/storage.test.ts @@ -1018,6 +1018,442 @@ describe("MongoDB Test", () => { ] `); }); + + it("should handle pending data correctly", async () => { + const { db, client, dbName } = getRandomDatabase(); + + // This test simulates the below scenario: + // 1. We have a pending block with transactions A, B, C + // 2. The final accepted block only contains A, C, D, E (B is missing) + // 3. We want to ensure that transaction B is not in the final table + const indexer = getMockIndexer({ + override: { + plugins: [mongoStorage({ client, dbName, collections: ["test"] })], + async transform({ endCursor, block: { data }, finality }) { + const db = useMongoStorage(); + const collection = db.collection("test"); + + // Parse the data to get transactions + // In our mock, data will be a string like "PENDING_A_B_C" or "ACCEPTED_A_B_C" based on below mock stream + if (data) { + // Extract transaction IDs from the data string + const parts = data.split("_"); + const blockType = parts[0]; // "PENDING" or "ACCEPTED" + const txIds = parts.slice(1); // ["A", "B", "C"] or ["A", "C", "D", "E"] + + // For each transaction in the block + for (const txId of txIds) { + // Insert new transaction + await collection.insertOne({ + blockNumber: Number(endCursor?.orderKey), + key: txId, + count: txIds.length, + data: `${blockType}_${txId}`, + }); + } + } + }, + }, + }); + + // Create a custom mock client that simulates: + // 1. A pending block with transactions A, B, C + // 2. Then the same block number becomes accepted with transactions A, C, D, E (B is missing) + const mockClient = new MockClient( + (request, options) => { + return [ + // First a pending block with transactions A, B, C + { + _tag: "data", + data: { + cursor: { orderKey: 5000000n }, + endCursor: { orderKey: 5000001n }, + finality: "pending", + data: [{ data: "PENDING_A_B_C" }], + production: "backfill", + }, + }, + // Then the same block becomes accepted with transactions A, C, D, E (B is missing) + { + _tag: "data", + data: { + cursor: { orderKey: 5000000n }, + endCursor: { orderKey: 5000001n }, + finality: "accepted", + data: [{ data: "ACCEPTED_A_C_D_E" }], + production: "backfill", + }, + }, + ]; + }, + ); + + await run(mockClient, indexer); + + // Query the database to see what transactions were stored + const result = await db.collection("test").find().toArray(); + + // Sort by key for consistent test results + const sortedResult = result.sort((a, b) => + (a.key || "").localeCompare(b.key || ""), + ); + + // We expect to see only transactions from the accepted block (A, C, D, E) + // with the correct count (4) for each + expect( + sortedResult.map(({ _id, _cursor, ...r }) => r), + ).toMatchInlineSnapshot(` + [ + { + "blockNumber": 5000001, + "count": 4, + "data": "ACCEPTED_A", + "key": "A", + }, + { + "blockNumber": 5000001, + "count": 4, + "data": "ACCEPTED_C", + "key": "C", + }, + { + "blockNumber": 5000001, + "count": 4, + "data": "ACCEPTED_D", + "key": "D", + }, + { + "blockNumber": 5000001, + "count": 4, + "data": "ACCEPTED_E", + "key": "E", + }, + ] + `); + + // Verify that transaction B is not in the final table + const hasTxB = result.some((row) => row.key === "B"); + expect(hasTxB).toBe(false); + + // Verify that all transactions from the accepted block have count = 4 + const acceptedTxs = result.filter((row) => + row.data?.startsWith("ACCEPTED_"), + ); + expect(acceptedTxs.length).toBe(4); // A, C, D, E + expect(acceptedTxs.every((row) => row.count === 4)).toBe(true); + }); + + it("should handle multiple pending blocks with updates", async () => { + const { db, client, dbName } = getRandomDatabase(); + + // This test simulates a more complex scenario: + // 1. First a pending block with transactions A, B + // 2. The same pending block updated with transactions A, B, C + // 3. The same pending block updated again with transactions A, C, D (B removed) + // 4. Finally the block becomes accepted with transactions A, C, D, E + // We want to ensure that only the final accepted transactions are in the table + + const indexer = getMockIndexer({ + override: { + plugins: [mongoStorage({ client, dbName, collections: ["test"] })], + async transform({ endCursor, block: { data }, finality }) { + const db = useMongoStorage(); + const collection = db.collection("test"); + + if (data) { + // Extract transaction IDs from the data string + const parts = data.split("_"); + const blockType = parts[0]; // "PENDING" or "ACCEPTED" + const txIds = parts.slice(1); // Transaction IDs + + // For each transaction in the block + for (const txId of txIds) { + // Insert new transaction + await collection.insertOne({ + blockNumber: Number(endCursor?.orderKey), + key: txId, + count: txIds.length, + data: `${blockType}_${txId}`, + }); + } + } + }, + }, + }); + + // Create a custom mock client that simulates the scenario + const mockClient = new MockClient( + (request, options) => { + return [ + // First pending block with transactions A, B + { + _tag: "data", + data: { + cursor: { orderKey: 5000000n }, + endCursor: { orderKey: 5000001n }, + finality: "pending", + data: [{ data: "PENDING_A_B" }], + production: "backfill", + }, + }, + // Same pending block updated with transactions A, B, C + { + _tag: "data", + data: { + cursor: { orderKey: 5000000n }, + endCursor: { orderKey: 5000001n }, + finality: "pending", + data: [{ data: "PENDING_A_B_C" }], + production: "backfill", + }, + }, + // Same pending block updated again with transactions A, C, D (B removed) + { + _tag: "data", + data: { + cursor: { orderKey: 5000000n }, + endCursor: { orderKey: 5000001n }, + finality: "pending", + data: [{ data: "PENDING_A_C_D" }], + production: "backfill", + }, + }, + // Finally the block becomes accepted with transactions A, C, D, E + { + _tag: "data", + data: { + cursor: { orderKey: 5000000n }, + endCursor: { orderKey: 5000001n }, + finality: "accepted", + data: [{ data: "ACCEPTED_A_C_D_E" }], + production: "backfill", + }, + }, + ]; + }, + ); + + await run(mockClient, indexer); + + // Query the database to see what transactions were stored + const result = await db.collection("test").find().toArray(); + + // Sort by key for consistent test results + const sortedResult = result.sort((a, b) => + (a.key || "").localeCompare(b.key || ""), + ); + + // We expect to see only transactions from the accepted block (A, C, D, E) + // with the correct count (4) for each + expect( + sortedResult.map(({ _id, _cursor, ...r }) => r), + ).toMatchInlineSnapshot(` + [ + { + "blockNumber": 5000001, + "count": 4, + "data": "ACCEPTED_A", + "key": "A", + }, + { + "blockNumber": 5000001, + "count": 4, + "data": "ACCEPTED_C", + "key": "C", + }, + { + "blockNumber": 5000001, + "count": 4, + "data": "ACCEPTED_D", + "key": "D", + }, + { + "blockNumber": 5000001, + "count": 4, + "data": "ACCEPTED_E", + "key": "E", + }, + ] + `); + + // Verify that transaction B is not in the final table + const hasTxB = result.some((row) => row.key === "B"); + expect(hasTxB).toBe(false); + + // Verify that all transactions from the accepted block have count = 4 + const acceptedTxs = result.filter((row) => + row.data?.startsWith("ACCEPTED_"), + ); + expect(acceptedTxs.length).toBe(4); // A, C, D, E + expect(acceptedTxs.every((row) => row.count === 4)).toBe(true); + }); + + it("should not persist state for pending blocks", async () => { + const { db, client, dbName } = getRandomDatabase(); + // This test verifies that state is not persisted for pending blocks + // and that the checkpoint only advances for accepted blocks + + const indexer = getMockIndexer({ + override: { + plugins: [ + mongoStorage({ + client, + dbName, + collections: ["test"], + persistState: true, + }), + ], + async transform({ endCursor, block: { data }, finality }) { + const db = useMongoStorage(); + const collection = db.collection("test"); + + // Insert a record for each block + await collection.insertOne({ + blockNumber: Number(endCursor?.orderKey), + key: `block_${Number(endCursor?.orderKey)}`, + count: 1, + data: `${finality}_${data}`, + }); + }, + }, + }); + + // Create a mock client that alternates between pending and accepted blocks + const mockClient = new MockClient( + (request, options) => { + return [ + // Block 1: Pending + { + _tag: "data", + data: { + cursor: { orderKey: 5000000n }, + endCursor: { orderKey: 5000001n }, + finality: "pending", + data: [{ data: "block1" }], + production: "backfill", + }, + }, + // Block 1: Accepted + { + _tag: "data", + data: { + cursor: { orderKey: 5000000n }, + endCursor: { orderKey: 5000001n }, + finality: "accepted", + data: [{ data: "block1" }], + production: "backfill", + }, + }, + // Block 2: Pending + { + _tag: "data", + data: { + cursor: { orderKey: 5000001n }, + endCursor: { orderKey: 5000002n }, + finality: "pending", + data: [{ data: "block2" }], + production: "backfill", + }, + }, + // Block 2: Accepted + { + _tag: "data", + data: { + cursor: { orderKey: 5000001n }, + endCursor: { orderKey: 5000002n }, + finality: "accepted", + data: [{ data: "block2" }], + production: "backfill", + }, + }, + // Block 3: Pending + { + _tag: "data", + data: { + cursor: { orderKey: 5000002n }, + endCursor: { orderKey: 5000003n }, + finality: "pending", + data: [{ data: "block3" }], + production: "backfill", + }, + }, + ]; + }, + ); + + await run(mockClient, indexer); + + // Check the database records + const result = await db.collection("test").find().toArray(); + + // data is invalidated before each transform if the prev block was pending + expect(result.map(({ _id, ...r }) => r)).toMatchInlineSnapshot(` + [ + { + "_cursor": { + "from": 5000001, + "to": null, + }, + "blockNumber": 5000001, + "count": 1, + "data": "accepted_block1", + "key": "block_5000001", + }, + { + "_cursor": { + "from": 5000002, + "to": null, + }, + "blockNumber": 5000002, + "count": 1, + "data": "accepted_block2", + "key": "block_5000002", + }, + { + "_cursor": { + "from": 5000003, + "to": null, + }, + "blockNumber": 5000003, + "count": 1, + "data": "pending_block3", + "key": "block_5000003", + }, + ] + `); + + // Check the checkpoints collection + const checkpointsResult = await db + .collection(checkpointCollectionName) + .find() + .toArray(); + + // should only have non-pending blocks in the checkpoint + expect(checkpointsResult.map(({ _id, ...r }) => r)).toMatchInlineSnapshot(` + [ + { + "id": "indexer_testing_default", + "orderKey": 5000002, + "uniqueKey": null, + }, + ] + `); + + // The checkpoint should be at block 2, which is the last accepted block + expect(checkpointsResult[0].orderKey).toBe(5000002); + + // Verify that the pending blocks are stored correctly + const pendingBlocks = result.filter((row) => + row.data?.startsWith("pending_"), + ); + expect(pendingBlocks.length).toBe(1); // Only the last pending block should be present + + // Verify that all accepted blocks are present + const acceptedBlocks = result.filter((row) => + row.data?.startsWith("accepted_"), + ); + expect(acceptedBlocks.length).toBe(2); // Both accepted blocks should be present + }); }); function getRandomDatabase() { From 2d89be874a7424a4d72fce98ae7e797ca165dcd7 Mon Sep 17 00:00:00 2001 From: jaipaljadeja Date: Fri, 14 Mar 2025 03:37:02 +0530 Subject: [PATCH 6/8] plugin-sqlite: handle pending blocks --- packages/plugin-sqlite/src/index.ts | 33 +++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/packages/plugin-sqlite/src/index.ts b/packages/plugin-sqlite/src/index.ts index dcb1a083..9b216d91 100644 --- a/packages/plugin-sqlite/src/index.ts +++ b/packages/plugin-sqlite/src/index.ts @@ -1,6 +1,7 @@ import { useIndexerContext } from "@apibara/indexer"; import { defineIndexerPlugin } from "@apibara/indexer/plugins"; import { isCursor } from "@apibara/protocol"; +import type { Cursor, DataFinality } from "@apibara/protocol"; import type { Database as SqliteDatabase } from "better-sqlite3"; import { generateIndexerId } from "@apibara/indexer/internal"; @@ -76,6 +77,7 @@ export function sqliteStorage({ }: SqliteStorageOptions) { return defineIndexerPlugin((indexer) => { let indexerId = ""; + let prevFinality: DataFinality | undefined; indexer.hooks.hook("run:before", async () => { const { indexerName: indexerFileName, availableIndexers } = @@ -206,22 +208,39 @@ export function sqliteStorage({ indexer.hooks.hook("handler:middleware", ({ use }) => { use(async (ctx, next) => { - if (!ctx.finality) { + const { endCursor, finality, cursor } = ctx as { + cursor: Cursor; + endCursor: Cursor; + finality: DataFinality; + }; + + if (!finality) { throw new SqliteStorageError("finality is undefined"); } - if (!ctx.endCursor || !isCursor(ctx.endCursor)) { + if (!endCursor || !isCursor(endCursor)) { throw new SqliteStorageError( "endCursor is undefined or not a cursor", ); } await withTransaction(database, async (db) => { + if (prevFinality === "pending") { + // invalidate if previous block's finality was "pending" + if (enablePersistState) { + invalidateState({ db, cursor, indexerId }); + } + + if (enableKeyValueStore) { + invalidateKV(db, cursor); + } + } + if (enableKeyValueStore) { ctx[KV_PROPERTY] = new KeyValueStore( db, - ctx.endCursor, - ctx.finality, + endCursor, + finality, serializeFn, deserializeFn, ); @@ -229,13 +248,15 @@ export function sqliteStorage({ await next(); - if (enablePersistState) { - persistState({ db, endCursor: ctx.endCursor, indexerId }); + if (enablePersistState && finality !== "pending") { + persistState({ db, endCursor, indexerId }); } if (enableKeyValueStore) { delete ctx[KV_PROPERTY]; } + + prevFinality = finality; }); }); }); From d342e71c0732a23dd10010577bb99910f37777a3 Mon Sep 17 00:00:00 2001 From: jaipaljadeja Date: Fri, 14 Mar 2025 03:39:09 +0530 Subject: [PATCH 7/8] plugin-sqlite: add tests for pending blocks --- ...-c30e94ee-538a-48f6-b0b1-66d2783ae188.json | 7 ++ ...-5e0243be-f427-4e3b-b06e-f3e7bec2af3b.json | 7 ++ ...-5e9a70b3-021c-462d-9865-f886dc09b8ca.json | 7 ++ ...-e6c5f565-8996-4a19-857e-54031f098166.json | 7 ++ packages/plugin-sqlite/src/kv.ts | 2 +- packages/plugin-sqlite/src/persistence.ts | 13 +++ packages/plugin-sqlite/tests/kv.test.ts | 106 ++++++++++++++++++ .../plugin-sqlite/tests/persistence.test.ts | 67 +++++++++++ 8 files changed, 215 insertions(+), 1 deletion(-) create mode 100644 change/@apibara-indexer-c30e94ee-538a-48f6-b0b1-66d2783ae188.json create mode 100644 change/@apibara-plugin-drizzle-5e0243be-f427-4e3b-b06e-f3e7bec2af3b.json create mode 100644 change/@apibara-plugin-mongo-5e9a70b3-021c-462d-9865-f886dc09b8ca.json create mode 100644 change/@apibara-plugin-sqlite-e6c5f565-8996-4a19-857e-54031f098166.json diff --git a/change/@apibara-indexer-c30e94ee-538a-48f6-b0b1-66d2783ae188.json b/change/@apibara-indexer-c30e94ee-538a-48f6-b0b1-66d2783ae188.json new file mode 100644 index 00000000..597a2704 --- /dev/null +++ b/change/@apibara-indexer-c30e94ee-538a-48f6-b0b1-66d2783ae188.json @@ -0,0 +1,7 @@ +{ + "type": "prerelease", + "comment": "indexer: do not call factory for pending blocks", + "packageName": "@apibara/indexer", + "email": "jadejajaipal5@gmail.com", + "dependentChangeType": "patch" +} diff --git a/change/@apibara-plugin-drizzle-5e0243be-f427-4e3b-b06e-f3e7bec2af3b.json b/change/@apibara-plugin-drizzle-5e0243be-f427-4e3b-b06e-f3e7bec2af3b.json new file mode 100644 index 00000000..e89b29b9 --- /dev/null +++ b/change/@apibara-plugin-drizzle-5e0243be-f427-4e3b-b06e-f3e7bec2af3b.json @@ -0,0 +1,7 @@ +{ + "type": "prerelease", + "comment": "plugin-drizzle: handle pending blocks", + "packageName": "@apibara/plugin-drizzle", + "email": "jadejajaipal5@gmail.com", + "dependentChangeType": "patch" +} diff --git a/change/@apibara-plugin-mongo-5e9a70b3-021c-462d-9865-f886dc09b8ca.json b/change/@apibara-plugin-mongo-5e9a70b3-021c-462d-9865-f886dc09b8ca.json new file mode 100644 index 00000000..672d23c4 --- /dev/null +++ b/change/@apibara-plugin-mongo-5e9a70b3-021c-462d-9865-f886dc09b8ca.json @@ -0,0 +1,7 @@ +{ + "type": "prerelease", + "comment": "plugin-mongo: handle pending blocks", + "packageName": "@apibara/plugin-mongo", + "email": "jadejajaipal5@gmail.com", + "dependentChangeType": "patch" +} diff --git a/change/@apibara-plugin-sqlite-e6c5f565-8996-4a19-857e-54031f098166.json b/change/@apibara-plugin-sqlite-e6c5f565-8996-4a19-857e-54031f098166.json new file mode 100644 index 00000000..5cb98992 --- /dev/null +++ b/change/@apibara-plugin-sqlite-e6c5f565-8996-4a19-857e-54031f098166.json @@ -0,0 +1,7 @@ +{ + "type": "prerelease", + "comment": "plugin-sqlite: handle pending blocks", + "packageName": "@apibara/plugin-sqlite", + "email": "jadejajaipal5@gmail.com", + "dependentChangeType": "patch" +} diff --git a/packages/plugin-sqlite/src/kv.ts b/packages/plugin-sqlite/src/kv.ts index 830a7c82..482ee8e2 100644 --- a/packages/plugin-sqlite/src/kv.ts +++ b/packages/plugin-sqlite/src/kv.ts @@ -72,7 +72,7 @@ export function invalidateKV(db: Database, cursor: Cursor) { ); } -type KeyValueRow = { +export type KeyValueRow = { from_block: number; to_block: number; k: string; diff --git a/packages/plugin-sqlite/src/persistence.ts b/packages/plugin-sqlite/src/persistence.ts index 37e3736b..38868998 100644 --- a/packages/plugin-sqlite/src/persistence.ts +++ b/packages/plugin-sqlite/src/persistence.ts @@ -100,6 +100,19 @@ export function invalidateState(props: { ); } +export type CheckpointRow = { + id: string; + order_key: number; + unique_key: string | null; +}; + +export type FilterRow = { + id: string; + filter: string; + from_block: number; + to_block: number | null; +}; + const statements = { createCheckpointsTable: ` CREATE TABLE IF NOT EXISTS checkpoints ( diff --git a/packages/plugin-sqlite/tests/kv.test.ts b/packages/plugin-sqlite/tests/kv.test.ts index 5aaa34d5..cef3f22a 100644 --- a/packages/plugin-sqlite/tests/kv.test.ts +++ b/packages/plugin-sqlite/tests/kv.test.ts @@ -13,6 +13,8 @@ import { describe, expect, it } from "vitest"; import type { Finalize, Invalidate } from "@apibara/protocol"; import { sqliteStorage, useSqliteKeyValueStore } from "../src"; +import type { KeyValueRow } from "../src/kv"; +import type { CheckpointRow } from "../src/persistence"; describe("SQLite key-value store", () => { it("should be able to store and retrieve values", async () => { @@ -472,4 +474,108 @@ describe("SQLite key-value store", () => { ] `); }); + + it("should handle pending and accepted blocks in kv store", async () => { + const db = new Database(":memory:"); + + const client = new MockClient((request, options) => { + return [ + // Block 1: Pending + { + _tag: "data", + data: { + cursor: { orderKey: 5000000n }, + endCursor: { orderKey: 5000001n }, + finality: "pending", + data: [{ data: "block1" }], + production: "backfill", + }, + }, + // Block 1: Accepted + { + _tag: "data", + data: { + cursor: { orderKey: 5000000n }, + endCursor: { orderKey: 5000001n }, + finality: "accepted", + data: [{ data: "block1" }], + production: "backfill", + }, + }, + // Block 2: Pending (no accepted version) + { + _tag: "data", + data: { + cursor: { orderKey: 5000001n }, + endCursor: { orderKey: 5000002n }, + finality: "pending", + data: [{ data: "block2" }], + production: "backfill", + }, + }, + ]; + }); + + const indexer = getMockIndexer({ + override: { + plugins: [ + sqliteStorage({ + database: db, + keyValueStore: true, + persistState: true, + }), + ], + async transform({ block: { data }, endCursor, finality }) { + const kv = useSqliteKeyValueStore(); + + // Store different values based on finality + if (finality === "pending") { + kv.put(`block-${endCursor?.orderKey}`, `pending-${data}`); + } else { + kv.put(`block-${endCursor?.orderKey}`, `accepted-${data}`); + } + }, + }, + }); + + await run(client, indexer); + + // Query the KV store + const rows = db + .prepare("SELECT * FROM kvs ORDER BY k") + .all() as KeyValueRow[]; + + // We expect values to reflect the latest state + expect(rows).toMatchInlineSnapshot(` + [ + { + "from_block": 5000001, + "k": "block-5000001", + "to_block": null, + "v": ""accepted-block1"", + }, + { + "from_block": 5000002, + "k": "block-5000002", + "to_block": null, + "v": ""pending-block2"", + }, + ] + `); + + // The value for block-5000001 should reflect accepted state + const block1 = rows.find((row) => row.k === "block-5000001"); + expect(block1?.v).toBe('"accepted-block1"'); + + // The value for block-5000002 should reflect pending state + const block2 = rows.find((row) => row.k === "block-5000002"); + expect(block2?.v).toBe('"pending-block2"'); + + // Checkpoint should only include the accepted block + const checkpointsResult = db + .prepare("SELECT * FROM checkpoints") + .all() as CheckpointRow[]; + + expect(checkpointsResult[0].order_key).toBe(5000001); + }); }); diff --git a/packages/plugin-sqlite/tests/persistence.test.ts b/packages/plugin-sqlite/tests/persistence.test.ts index 5c3dd0c7..a33c50a1 100644 --- a/packages/plugin-sqlite/tests/persistence.test.ts +++ b/packages/plugin-sqlite/tests/persistence.test.ts @@ -13,6 +13,7 @@ import { describe, expect, it } from "vitest"; import type { Finalize, Invalidate } from "@apibara/protocol"; import { sqliteStorage } from "../src"; +import type { CheckpointRow } from "../src/persistence"; describe("SQLite persistence", () => { it("should store the latest block number", async () => { @@ -625,4 +626,70 @@ describe("SQLite persistence", () => { ] `); }); + + it("should handle pending and accepted blocks correctly", async () => { + const db = new Database(":memory:"); + + // Create a mock client that alternates between pending and accepted blocks + const client = new MockClient((request, options) => { + return [ + // Block 1: Pending + { + _tag: "data", + data: { + cursor: { orderKey: 5000000n }, + endCursor: { orderKey: 5000001n }, + finality: "pending", + data: [{ data: "block1-pending" }], + production: "backfill", + }, + }, + // Block 1: Accepted + { + _tag: "data", + data: { + cursor: { orderKey: 5000000n }, + endCursor: { orderKey: 5000001n }, + finality: "accepted", + data: [{ data: "block1-accepted" }], + production: "backfill", + }, + }, + // Block 2: Pending + { + _tag: "data", + data: { + cursor: { orderKey: 5000001n }, + endCursor: { orderKey: 5000002n }, + finality: "pending", + data: [{ data: "block2-pending" }], + production: "backfill", + }, + }, + ]; + }); + + const indexer = getMockIndexer({ + override: { + plugins: [sqliteStorage({ database: db, persistState: true })], + }, + }); + + await run(client, indexer); + + // Check the checkpoints table - should only contain the accepted block + const checkpointsResult = db + .prepare("SELECT * FROM checkpoints") + .all() as CheckpointRow[]; + + expect(checkpointsResult).toMatchInlineSnapshot(` + [ + { + "id": "indexer_testing_default", + "order_key": 5000001, + "unique_key": null, + }, + ] + `); + }); }); From 0b6ccd0f9c56f2beb2f77eb0e209427022cd6d6d Mon Sep 17 00:00:00 2001 From: jaipaljadeja Date: Mon, 17 Mar 2025 11:07:30 +0530 Subject: [PATCH 8/8] plugins: remove unnecessary persistence invalidation for pending blocks --- packages/plugin-drizzle/src/index.ts | 4 ---- packages/plugin-mongo/src/index.ts | 4 ---- packages/plugin-sqlite/src/index.ts | 4 ---- 3 files changed, 12 deletions(-) diff --git a/packages/plugin-drizzle/src/index.ts b/packages/plugin-drizzle/src/index.ts index b1219cba..954e5b03 100644 --- a/packages/plugin-drizzle/src/index.ts +++ b/packages/plugin-drizzle/src/index.ts @@ -320,10 +320,6 @@ export function drizzleStorage< if (prevFinality === "pending") { // invalidate if previous block's finality was "pending" await invalidate(tx, cursor, idColumn, indexerId); - - if (enablePersistence) { - await invalidateState({ tx, cursor, indexerId }); - } } if (finality !== "finalized") { diff --git a/packages/plugin-mongo/src/index.ts b/packages/plugin-mongo/src/index.ts index a1bf7b0c..1f68098d 100644 --- a/packages/plugin-mongo/src/index.ts +++ b/packages/plugin-mongo/src/index.ts @@ -208,10 +208,6 @@ export function mongoStorage({ if (prevFinality === "pending") { // invalidate if previous block's finality was "pending" await invalidate(db, session, cursor, collections); - - if (enablePersistence) { - await invalidateState({ db, session, cursor, indexerId }); - } } await next(); diff --git a/packages/plugin-sqlite/src/index.ts b/packages/plugin-sqlite/src/index.ts index 9b216d91..b6e7707d 100644 --- a/packages/plugin-sqlite/src/index.ts +++ b/packages/plugin-sqlite/src/index.ts @@ -227,10 +227,6 @@ export function sqliteStorage({ await withTransaction(database, async (db) => { if (prevFinality === "pending") { // invalidate if previous block's finality was "pending" - if (enablePersistState) { - invalidateState({ db, cursor, indexerId }); - } - if (enableKeyValueStore) { invalidateKV(db, cursor); }