diff --git a/change/@apibara-plugin-drizzle-fb22b1bc-f1ab-4299-a476-2ac25460956d.json b/change/@apibara-plugin-drizzle-fb22b1bc-f1ab-4299-a476-2ac25460956d.json new file mode 100644 index 00000000..264986d1 --- /dev/null +++ b/change/@apibara-plugin-drizzle-fb22b1bc-f1ab-4299-a476-2ac25460956d.json @@ -0,0 +1,7 @@ +{ + "type": "prerelease", + "comment": "plugin-drizzle: add idColumn map support with validation and fix cleanup errors", + "packageName": "@apibara/plugin-drizzle", + "email": "jadejajaipal5@gmail.com", + "dependentChangeType": "patch" +} diff --git a/change/@apibara-plugin-mongo-c9018c01-2b4f-4a5a-a7a7-e13eb567d66f.json b/change/@apibara-plugin-mongo-c9018c01-2b4f-4a5a-a7a7-e13eb567d66f.json new file mode 100644 index 00000000..65735232 --- /dev/null +++ b/change/@apibara-plugin-mongo-c9018c01-2b4f-4a5a-a7a7-e13eb567d66f.json @@ -0,0 +1,7 @@ +{ + "type": "prerelease", + "comment": "plugin-mongo: move cleanup part after initialization", + "packageName": "@apibara/plugin-mongo", + "email": "jadejajaipal5@gmail.com", + "dependentChangeType": "patch" +} diff --git a/change/@apibara-plugin-sqlite-22633833-27a0-4ae1-be3b-824b3514f5e2.json b/change/@apibara-plugin-sqlite-22633833-27a0-4ae1-be3b-824b3514f5e2.json new file mode 100644 index 00000000..7b17b993 --- /dev/null +++ b/change/@apibara-plugin-sqlite-22633833-27a0-4ae1-be3b-824b3514f5e2.json @@ -0,0 +1,7 @@ +{ + "type": "prerelease", + "comment": "add cleanup logic and add indexerId relation to KV Store", + "packageName": "@apibara/plugin-sqlite", + "email": "jadejajaipal5@gmail.com", + "dependentChangeType": "patch" +} diff --git a/change/apibara-34f7f808-dcf6-45ec-bbe4-5e74b0193361.json b/change/apibara-34f7f808-dcf6-45ec-bbe4-5e74b0193361.json new file mode 100644 index 00000000..28426837 --- /dev/null +++ b/change/apibara-34f7f808-dcf6-45ec-bbe4-5e74b0193361.json @@ -0,0 +1,7 @@ +{ + "type": "prerelease", + "comment": "cli: fix dev reload, change flag names, args validation, bump drizzle-orm version in template", + "packageName": "apibara", + "email": "jadejajaipal5@gmail.com", + "dependentChangeType": "patch" +} diff --git a/examples/cli-drizzle/indexers/2-starknet.indexer.ts b/examples/cli-drizzle/indexers/2-starknet.indexer.ts index 77bef1c3..6d2e2280 100644 --- a/examples/cli-drizzle/indexers/2-starknet.indexer.ts +++ b/examples/cli-drizzle/indexers/2-starknet.indexer.ts @@ -28,7 +28,9 @@ export default function (runtimeConfig: ApibaraRuntimeConfig) { plugins: [ drizzleStorage({ db: database, - idColumn: "_id", + idColumn: { + "*": "_id", + }, persistState: true, indexerName: "starknet-usdc-transfers", migrate: { diff --git a/examples/cli-drizzle/package.json b/examples/cli-drizzle/package.json index d2f3de33..96154e8e 100644 --- a/examples/cli-drizzle/package.json +++ b/examples/cli-drizzle/package.json @@ -31,7 +31,7 @@ "@apibara/starknet": "workspace:*", "@electric-sql/pglite": "^0.2.17", "apibara": "workspace:*", - "drizzle-orm": "^0.37.0", + "drizzle-orm": "^0.40.1", "pg": "^8.13.1", "starknet": "^6.11.0", "viem": "^2.21.53" diff --git a/packages/cli/src/cli/commands/add.ts b/packages/cli/src/cli/commands/add.ts index ce6f41f7..b0fb44bc 100644 --- a/packages/cli/src/cli/commands/add.ts +++ b/packages/cli/src/cli/commands/add.ts @@ -1,5 +1,6 @@ import { addIndexer } from "apibara/create"; import { defineCommand } from "citty"; +import { checkForUnknownArgs } from "../common"; export default defineCommand({ meta: { @@ -35,7 +36,9 @@ export default defineCommand({ "Root directory - apibara project root where apibara.config is located | default: current working directory", }, }, - async run({ args }) { + async run({ args, cmd }) { + await checkForUnknownArgs(args, cmd); + const { indexerId, chain, network, storage, dnaUrl, dir } = args; await addIndexer({ diff --git a/packages/cli/src/cli/commands/build.ts b/packages/cli/src/cli/commands/build.ts index ad22437a..143f38cd 100644 --- a/packages/cli/src/cli/commands/build.ts +++ b/packages/cli/src/cli/commands/build.ts @@ -2,7 +2,7 @@ import { build, createApibara, prepare, writeTypes } from "apibara/core"; import { runtimeDir } from "apibara/runtime/meta"; import { defineCommand } from "citty"; import { join, resolve } from "pathe"; -import { commonArgs } from "../common"; +import { checkForUnknownArgs, commonArgs } from "../common"; export default defineCommand({ meta: { @@ -12,8 +12,11 @@ export default defineCommand({ args: { ...commonArgs, }, - async run({ args }) { + async run({ args, cmd }) { + await checkForUnknownArgs(args, cmd); + const rootDir = resolve((args.dir || args._dir || ".") as string); + const apibara = await createApibara({ rootDir, }); diff --git a/packages/cli/src/cli/commands/dev.ts b/packages/cli/src/cli/commands/dev.ts index 5bb4a275..65bcdc7e 100644 --- a/packages/cli/src/cli/commands/dev.ts +++ b/packages/cli/src/cli/commands/dev.ts @@ -5,7 +5,7 @@ import type { Apibara } from "apibara/types"; import { defineCommand } from "citty"; import { colors } from "consola/utils"; import { join, resolve } from "pathe"; -import { commonArgs } from "../common"; +import { checkForUnknownArgs, commonArgs } from "../common"; // Hot module reloading key regex // for only runtimeConfig.* keys @@ -26,17 +26,19 @@ export default defineCommand({ type: "string", description: "Preset to use", }, - alwaysReindex: { + "always-reindex": { type: "boolean", default: false, description: - "Reindex the indexers from the starting block on every restart (default: false)", + "Reindex the indexers from the starting block on every restart | default: `false`", }, }, - async run({ args }) { + async run({ args, data, cmd, rawArgs }) { + await checkForUnknownArgs(args, cmd); + const rootDir = resolve((args.dir || args._dir || ".") as string); - if (args.alwaysReindex) { + if (args["always-reindex"]) { process.env.APIBARA_ALWAYS_REINDEX = "true"; } @@ -125,7 +127,6 @@ export default defineCommand({ code !== null ? ` with code ${colors.red(code)}` : "" }`, ); - process.exit(code ?? 0); }); }); }; diff --git a/packages/cli/src/cli/commands/init.ts b/packages/cli/src/cli/commands/init.ts index 4b27383e..79c26a64 100644 --- a/packages/cli/src/cli/commands/init.ts +++ b/packages/cli/src/cli/commands/init.ts @@ -1,5 +1,6 @@ import { initializeProject } from "apibara/create"; import { defineCommand } from "citty"; +import { checkForUnknownArgs } from "../common"; export default defineCommand({ meta: { @@ -14,23 +15,27 @@ export default defineCommand({ }, language: { type: "string", - description: "Language to use: typescript, ts or javascript, js", + description: + "Language to use: typescript, ts or javascript, js | default: `ts`", default: "ts", alias: "l", }, - noIndexer: { + "create-indexer": { type: "boolean", - description: "Do not create an indexer after initialization", - default: false, + name: "create-indexer", + default: true, + description: "TODO", }, }, - async run({ args }) { - const { dir: targetDir, noIndexer, language } = args; + async run({ args, cmd }) { + await checkForUnknownArgs(args, cmd); + + const { dir: targetDir, "create-indexer": createIndexer, language } = args; await initializeProject({ argTargetDir: targetDir, argLanguage: language, - argNoCreateIndexer: noIndexer, + argNoCreateIndexer: !createIndexer, }); }, }); diff --git a/packages/cli/src/cli/commands/prepare.ts b/packages/cli/src/cli/commands/prepare.ts index a129b618..fa6161bc 100644 --- a/packages/cli/src/cli/commands/prepare.ts +++ b/packages/cli/src/cli/commands/prepare.ts @@ -2,7 +2,7 @@ import { createApibara, writeTypes } from "apibara/core"; import {} from "apibara/types"; import { defineCommand } from "citty"; import { resolve } from "pathe"; -import { commonArgs } from "../common"; +import { checkForUnknownArgs, commonArgs } from "../common"; export default defineCommand({ meta: { @@ -12,7 +12,9 @@ export default defineCommand({ args: { ...commonArgs, }, - async run({ args }) { + async run({ args, cmd }) { + await checkForUnknownArgs(args, cmd); + const rootDir = resolve((args.dir || ".") as string); const apibara = await createApibara({ rootDir }); await writeTypes(apibara); diff --git a/packages/cli/src/cli/commands/start.ts b/packages/cli/src/cli/commands/start.ts index d45357db..50b04a85 100644 --- a/packages/cli/src/cli/commands/start.ts +++ b/packages/cli/src/cli/commands/start.ts @@ -4,7 +4,7 @@ import { defineCommand } from "citty"; import { colors } from "consola/utils"; import fse from "fs-extra"; import { resolve } from "pathe"; -import { commonArgs } from "../common"; +import { checkForUnknownArgs, commonArgs } from "../common"; export default defineCommand({ meta: { @@ -23,10 +23,12 @@ export default defineCommand({ description: "The preset to use", }, }, - async run({ args }) { + async run({ args, cmd }) { const { indexer, preset } = args; const rootDir = resolve((args.dir || args._dir || ".") as string); + await checkForUnknownArgs(args, cmd); + const apibara = await createApibara({ rootDir, preset, @@ -67,7 +69,6 @@ export default defineCommand({ code !== null ? ` with code ${colors.red(code)}` : "" }`, ); - process.exit(code ?? 0); }); }, }); diff --git a/packages/cli/src/cli/common.ts b/packages/cli/src/cli/common.ts index d0793cf7..b3a4d8ab 100644 --- a/packages/cli/src/cli/common.ts +++ b/packages/cli/src/cli/common.ts @@ -1,4 +1,10 @@ -import type { ArgsDef } from "citty"; +import { + type ArgsDef, + type CommandDef, + type ParsedArgs, + renderUsage, +} from "citty"; +import consola from "consola"; export const commonArgs = { dir: { @@ -6,3 +12,29 @@ export const commonArgs = { description: "project root directory", }, }; + +export const checkForUnknownArgs = async ( + args: ParsedArgs, + cmd: CommandDef, +) => { + // Create a list of defined args including both the main arg names and their aliases + const definedArgs: string[] = []; + if (cmd.args) { + for (const [argName, argDef] of Object.entries(cmd.args)) { + definedArgs.push(argName); + // Add alias to definedArgs if it exists + if (argDef.alias) { + definedArgs.push(argDef.alias); + } + } + } + + const providedArgs = Object.keys(args).filter((arg) => arg !== "_"); + const wrongArgs = providedArgs.filter((arg) => !definedArgs.includes(arg)); + + if (wrongArgs.length > 0) { + consola.error(`Unknown arguments: ${wrongArgs.join(", ")}`); + consola.info(await renderUsage(cmd)); + process.exit(1); + } +}; diff --git a/packages/cli/src/create/constants.ts b/packages/cli/src/create/constants.ts index b42071f3..1fbd16ba 100644 --- a/packages/cli/src/create/constants.ts +++ b/packages/cli/src/create/constants.ts @@ -79,7 +79,7 @@ export const packageVersions = { "@apibara/plugin-sqlite": "next", // Postgres Dependencies "@electric-sql/pglite": "^0.2.17", - "drizzle-orm": "^0.37.0", + "drizzle-orm": "^0.40.1", pg: "^8.13.1", "@types/pg": "^8.11.10", "drizzle-kit": "^0.29.0", diff --git a/packages/plugin-drizzle/src/index.ts b/packages/plugin-drizzle/src/index.ts index 954e5b03..429ef3fe 100644 --- a/packages/plugin-drizzle/src/index.ts +++ b/packages/plugin-drizzle/src/index.ts @@ -32,10 +32,18 @@ import { registerTriggers, removeTriggers, } from "./storage"; -import { DrizzleStorageError, sleep, withTransaction } from "./utils"; +import { + DrizzleStorageError, + type IdColumnMap, + getIdColumnForTable, + sleep, + withTransaction, +} from "./utils"; export * from "./helper"; +export type { IdColumnMap }; + const MAX_RETRIES = 5; export type DrizzleStorage< @@ -89,9 +97,32 @@ export interface DrizzleStorageOptions< */ schema?: Record; /** - * The column to use as the id. Defaults to 'id'. + * The column to use as the primary identifier for each table. + * + * This identifier is used for tracking changes during reorgs and rollbacks. + * + * Can be specified in two ways: + * + * 1. As a single string that applies to all tables: + * ```ts + * idColumn: "_id" // Uses "_id" column for all tables + * ``` + * + * 2. As an object mapping table names to their ID columns: + * ```ts + * idColumn: { + * transfers: "transaction_hash", // Use "transaction_hash" for transfers table + * blocks: "block_number", // Use "block_number" for blocks table + * "*": "_id" // Use "_id" for all other tables | defaults to "id" + * } + * ``` + * + * The special "*" key acts as a fallback for any tables not explicitly mapped. + * + * @default "id" + * @type {string | Partial} */ - idColumn?: string; + idColumn?: string | Partial; /** * The options for the database migration. When provided, the database will automatically run migrations before the indexer runs. */ @@ -120,8 +151,8 @@ export function drizzleStorage< db, persistState: enablePersistence = true, indexerName: identifier = "default", - schema, - idColumn = "id", + schema: _schema, + idColumn, migrate: migrateOptions, }: DrizzleStorageOptions) { return defineIndexerPlugin((indexer) => { @@ -129,17 +160,37 @@ export function drizzleStorage< let indexerId = ""; const alwaysReindex = process.env["APIBARA_ALWAYS_REINDEX"] === "true"; let prevFinality: DataFinality | undefined; + const schema: TSchema = (_schema as TSchema) ?? db._.schema ?? {}; + const idColumnMap: IdColumnMap = { + "*": typeof idColumn === "string" ? idColumn : "id", + ...(typeof idColumn === "object" ? idColumn : {}), + }; try { - tableNames = Object.values((schema as TSchema) ?? db._.schema ?? {}).map( - (table) => table.dbName, - ); + tableNames = Object.values(schema).map((table) => table.dbName); } catch (error) { throw new DrizzleStorageError("Failed to get table names from schema", { cause: error, }); } + // Check if specified idColumn exists in all the tables in schema + for (const table of Object.values(schema)) { + const columns = table.columns; + const tableIdColumn = getIdColumnForTable(table.dbName, idColumnMap); + + const columnExists = Object.values(columns).some( + (column) => column.name === tableIdColumn, + ); + + if (!columnExists) { + throw new DrizzleStorageError( + `Column \`"${tableIdColumn}"\` does not exist in table \`"${table.dbName}"\`. ` + + "Make sure the table has the specified column or provide a valid `idColumn` mapping to `drizzleStorage`.", + ); + } + } + indexer.hooks.hook("run:before", async () => { const internalContext = useInternalContext(); const context = useIndexerContext(); @@ -153,25 +204,11 @@ export function drizzleStorage< indexerId = generateIndexerId(indexerFileName, identifier); - if (alwaysReindex) { - logger.warn( - `Reindexing: Deleting all data from tables - ${tableNames.join(", ")}`, - ); - await withTransaction(db, async (tx) => { - await cleanupStorage(tx, tableNames, indexerId); - - if (enablePersistence) { - await resetPersistence({ tx, indexerId }); - } - - logger.success("Tables have been cleaned up for reindexing"); - }); - } - let retries = 0; // incase the migrations are already applied, we don't want to run them again let migrationsApplied = false; + let cleanupApplied = false; while (retries <= MAX_RETRIES) { try { @@ -186,6 +223,22 @@ export function drizzleStorage< if (enablePersistence) { await initializePersistentState(tx); } + + if (alwaysReindex && !cleanupApplied) { + logger.warn( + `Reindexing: Deleting all data from tables - ${tableNames.join(", ")}`, + ); + + await cleanupStorage(tx, tableNames, indexerId); + + if (enablePersistence) { + await resetPersistence({ tx, indexerId }); + } + + cleanupApplied = true; + + logger.success("Tables have been cleaned up for reindexing"); + } }); break; } catch (error) { @@ -239,7 +292,8 @@ export function drizzleStorage< } await withTransaction(db, async (tx) => { - await invalidate(tx, cursor, idColumn, indexerId); + // Use the appropriate idColumn for each table when calling invalidate + await invalidate(tx, cursor, idColumnMap, indexerId); if (enablePersistence) { await invalidateState({ tx, cursor, indexerId }); @@ -289,7 +343,8 @@ export function drizzleStorage< } await withTransaction(db, async (tx) => { - await invalidate(tx, cursor, idColumn, indexerId); + // Use the appropriate idColumn for each table when calling invalidate + await invalidate(tx, cursor, idColumnMap, indexerId); if (enablePersistence) { await invalidateState({ tx, cursor, indexerId }); @@ -319,7 +374,7 @@ export function drizzleStorage< if (prevFinality === "pending") { // invalidate if previous block's finality was "pending" - await invalidate(tx, cursor, idColumn, indexerId); + await invalidate(tx, cursor, idColumnMap, indexerId); } if (finality !== "finalized") { @@ -327,7 +382,7 @@ export function drizzleStorage< tx, tableNames, endCursor, - idColumn, + idColumnMap, indexerId, ); } diff --git a/packages/plugin-drizzle/src/storage.ts b/packages/plugin-drizzle/src/storage.ts index cf0ab4c6..90ab143b 100644 --- a/packages/plugin-drizzle/src/storage.ts +++ b/packages/plugin-drizzle/src/storage.ts @@ -16,7 +16,11 @@ import { text, } from "drizzle-orm/pg-core"; import { SCHEMA_NAME } from "./constants"; -import { DrizzleStorageError } from "./utils"; +import { + DrizzleStorageError, + type IdColumnMap, + getIdColumnForTable, +} from "./utils"; const ROLLBACK_TABLE_NAME = "reorg_rollback"; @@ -125,11 +129,14 @@ export async function registerTriggers< tx: PgTransaction, tables: string[], endCursor: Cursor, - idColumn: string, + idColumnMap: IdColumnMap, indexerId: string, ) { try { for (const table of tables) { + // Determine the column ID for this specific table + const tableIdColumn = getIdColumnForTable(table, idColumnMap); + await tx.execute( sql.raw( `DROP TRIGGER IF EXISTS ${getReorgTriggerName(table, indexerId)} ON ${table};`, @@ -140,7 +147,7 @@ export async function registerTriggers< CREATE CONSTRAINT TRIGGER ${getReorgTriggerName(table, indexerId)} AFTER INSERT OR UPDATE OR DELETE ON ${table} DEFERRABLE INITIALLY DEFERRED - FOR EACH ROW EXECUTE FUNCTION ${SCHEMA_NAME}.reorg_checkpoint('${idColumn}', ${`${Number(endCursor.orderKey)}`}, '${indexerId}'); + FOR EACH ROW EXECUTE FUNCTION ${SCHEMA_NAME}.reorg_checkpoint('${tableIdColumn}', ${Number(endCursor.orderKey)}, '${indexerId}'); `), ); } @@ -184,7 +191,7 @@ export async function invalidate< >( tx: PgTransaction, cursor: Cursor, - idColumn: string, + idColumnMap: IdColumnMap, indexerId: string, ) { // Get and delete operations after cursor in one query, ordered by newest first @@ -208,6 +215,9 @@ export async function invalidate< // Process each operation in reverse order for (const op of result) { + // Determine the column ID for this specific table + const tableIdColumn = getIdColumnForTable(op.table_name, idColumnMap); + switch (op.op) { case "I": try { @@ -218,7 +228,7 @@ export async function invalidate< await tx.execute( sql.raw(` DELETE FROM ${op.table_name} - WHERE ${idColumn} = '${op.row_id}' + WHERE ${tableIdColumn} = '${op.row_id}' `), ); } catch (error) { @@ -271,7 +281,9 @@ export async function invalidate< ? JSON.parse(op.row_value) : op.row_value; - const nonIdKeys = Object.keys(rowValue).filter((k) => k !== idColumn); + const nonIdKeys = Object.keys(rowValue).filter( + (k) => k !== tableIdColumn, + ); const fields = nonIdKeys.map((c) => `${c} = prev.${c}`).join(", "); @@ -281,7 +293,7 @@ export async function invalidate< FROM ( SELECT * FROM json_populate_record(null::${op.table_name}, '${JSON.stringify(op.row_value)}'::json) ) as prev - WHERE ${op.table_name}.${idColumn} = '${op.row_id}' + WHERE ${op.table_name}.${tableIdColumn} = '${op.row_id}' `); await tx.execute(query); diff --git a/packages/plugin-drizzle/src/utils.ts b/packages/plugin-drizzle/src/utils.ts index 5e6da210..70ebf5fd 100644 --- a/packages/plugin-drizzle/src/utils.ts +++ b/packages/plugin-drizzle/src/utils.ts @@ -48,3 +48,22 @@ export function serialize(obj: T): string { export function sleep(ms: number) { return new Promise((resolve) => setTimeout(resolve, ms)); } + +export interface IdColumnMap extends Record { + /** + * Wildcard mapping for all tables. + */ + "*": string; +} + +export const getIdColumnForTable = ( + tableName: string, + idColumn: IdColumnMap, +): string => { + // If there's a specific mapping for this table, use it + if (idColumn[tableName]) { + return idColumn[tableName]; + } + // Default fallback + return idColumn["*"]; +}; diff --git a/packages/plugin-mongo/src/index.ts b/packages/plugin-mongo/src/index.ts index 1f68098d..64f03ee3 100644 --- a/packages/plugin-mongo/src/index.ts +++ b/packages/plugin-mongo/src/index.ts @@ -70,13 +70,16 @@ export function mongoStorage({ indexerId = generateIndexerId(indexerName, identifier); const logger = useLogger(); - if (alwaysReindex) { - logger.warn( - `Reindexing: Deleting all data from collections - ${collections.join(", ")}`, - ); + await withTransaction(client, async (session) => { + const db = client.db(dbName, dbOptions); + if (enablePersistence) { + await initializePersistentState(db, session); + } - await withTransaction(client, async (session) => { - const db = client.db(dbName, dbOptions); + if (alwaysReindex) { + logger.warn( + `Reindexing: Deleting all data from collections - ${collections.join(", ")}`, + ); await cleanupStorage(db, session, collections); @@ -85,13 +88,6 @@ export function mongoStorage({ } logger.success("All data has been cleaned up for reindexing"); - }); - } - - await withTransaction(client, async (session) => { - const db = client.db(dbName, dbOptions); - if (enablePersistence) { - await initializePersistentState(db, session); } }); }); diff --git a/packages/plugin-sqlite/src/index.ts b/packages/plugin-sqlite/src/index.ts index 3304ae5b..d49affa7 100644 --- a/packages/plugin-sqlite/src/index.ts +++ b/packages/plugin-sqlite/src/index.ts @@ -1,5 +1,5 @@ import { useIndexerContext } from "@apibara/indexer"; -import { defineIndexerPlugin } from "@apibara/indexer/plugins"; +import { defineIndexerPlugin, useLogger } from "@apibara/indexer/plugins"; import type { Cursor, DataFinality } from "@apibara/protocol"; import type { Database as SqliteDatabase } from "better-sqlite3"; @@ -7,6 +7,7 @@ import { generateIndexerId } from "@apibara/indexer/internal"; import { useInternalContext } from "@apibara/indexer/internal/plugins"; import { KeyValueStore, + cleanupKV, finalizeKV, initializeKeyValueStore, invalidateKV, @@ -17,6 +18,7 @@ import { initializePersistentState, invalidateState, persistState, + resetPersistence, } from "./persistence"; import { type DeserializeFn, @@ -77,15 +79,20 @@ export function sqliteStorage({ return defineIndexerPlugin((indexer) => { let indexerId = ""; let prevFinality: DataFinality | undefined; + const alwaysReindex = process.env["APIBARA_ALWAYS_REINDEX"] === "true"; indexer.hooks.hook("run:before", async () => { const { indexerName: indexerFileName, availableIndexers } = useInternalContext(); + const logger = useLogger(); + indexerId = generateIndexerId(indexerFileName, identifier); let retries = 0; + let cleanupApplied = false; + while (retries <= MAX_RETRIES) { try { await withTransaction(database, async (db) => { @@ -96,6 +103,22 @@ export function sqliteStorage({ if (enableKeyValueStore) { initializeKeyValueStore(db); } + + if (alwaysReindex && !cleanupApplied) { + if (enableKeyValueStore) { + logger.warn("Reindexing: Cleaning up key-value store"); + cleanupKV(db, indexerId); + } + + if (enablePersistState) { + logger.warn("Reindexing: Resetting persistence state"); + resetPersistence({ db, indexerId }); + } + + cleanupApplied = true; + + logger.success("All data has been cleaned up for reindexing"); + } }); break; } catch (error) { @@ -145,7 +168,7 @@ export function sqliteStorage({ } if (enableKeyValueStore) { - invalidateKV(db, cursor); + invalidateKV(db, cursor, indexerId); } }); }); @@ -182,7 +205,7 @@ export function sqliteStorage({ } if (enableKeyValueStore) { - finalizeKV(db, cursor); + finalizeKV(db, cursor, indexerId); } }); }); @@ -200,7 +223,7 @@ export function sqliteStorage({ } if (enableKeyValueStore) { - invalidateKV(db, cursor); + invalidateKV(db, cursor, indexerId); } }); }); @@ -227,7 +250,7 @@ export function sqliteStorage({ if (prevFinality === "pending") { // invalidate if previous block's finality was "pending" if (enableKeyValueStore) { - invalidateKV(db, cursor); + invalidateKV(db, cursor, indexerId); } } @@ -238,6 +261,7 @@ export function sqliteStorage({ finality, serializeFn, deserializeFn, + indexerId, ); } diff --git a/packages/plugin-sqlite/src/kv.ts b/packages/plugin-sqlite/src/kv.ts index 482ee8e2..aaee8a20 100644 --- a/packages/plugin-sqlite/src/kv.ts +++ b/packages/plugin-sqlite/src/kv.ts @@ -19,64 +19,80 @@ export class KeyValueStore { private readonly finality: DataFinality, private readonly serialize: SerializeFn, private readonly deserialize: DeserializeFn, + private readonly indexerId: string, ) { assertInTransaction(db); } get(key: string): T | undefined { - const row = this.db.prepare(statements.get).get(key); + const row = this.db + .prepare<[string, string], KeyValueRow>(statements.get) + .get(key, this.indexerId); return row ? this.deserialize(row.v) : undefined; } put(key: string, value: T) { this.db - .prepare<[number, string], KeyValueRow>(statements.updateToBlock) - .run(Number(this.endCursor.orderKey), key); + .prepare<[number, string, string], KeyValueRow>(statements.updateToBlock) + .run(Number(this.endCursor.orderKey), key, this.indexerId); this.db - .prepare<[number, string, string], KeyValueRow>(statements.insertIntoKvs) + .prepare<[number, string, string, string], KeyValueRow>( + statements.insertIntoKvs, + ) .run( Number(this.endCursor.orderKey), key, this.serialize(value as Record), + this.indexerId, ); } del(key: string) { this.db - .prepare<[number, string], KeyValueRow>(statements.del) - .run(Number(this.endCursor.orderKey), key); + .prepare<[number, string, string], KeyValueRow>(statements.del) + .run(Number(this.endCursor.orderKey), key, this.indexerId); } } -export function finalizeKV(db: Database, cursor: Cursor) { +export function finalizeKV(db: Database, cursor: Cursor, indexerId: string) { assertInTransaction(db); - db.prepare<[number], KeyValueRow>(statements.finalize).run( + db.prepare<[number, string], KeyValueRow>(statements.finalize).run( Number(cursor.orderKey), + indexerId, ); } -export function invalidateKV(db: Database, cursor: Cursor) { +export function invalidateKV(db: Database, cursor: Cursor, indexerId: string) { assertInTransaction(db); // Delete entries that started after the invalidation cursor - db.prepare<[number], KeyValueRow>(statements.invalidateDelete).run( + db.prepare<[number, string], KeyValueRow>(statements.invalidateDelete).run( Number(cursor.orderKey), + indexerId, ); // Update entries that were supposed to end after the invalidation cursor - db.prepare<[number], KeyValueRow>(statements.invalidateUpdate).run( + db.prepare<[number, string], KeyValueRow>(statements.invalidateUpdate).run( Number(cursor.orderKey), + indexerId, ); } +export function cleanupKV(db: Database, indexerId: string) { + assertInTransaction(db); + + db.prepare<[string], KeyValueRow>(statements.cleanup).run(indexerId); +} + export type KeyValueRow = { from_block: number; to_block: number; k: string; v: string; + id: string; }; const statements = { @@ -86,31 +102,35 @@ const statements = { to_block INTEGER, k TEXT NOT NULL, v BLOB NOT NULL, - PRIMARY KEY (from_block, k) + id TEXT NOT NULL, + PRIMARY KEY (from_block, k, id) );`, get: ` SELECT v FROM kvs - WHERE k = ? AND to_block IS NULL`, + WHERE k = ? AND id = ? AND to_block IS NULL`, updateToBlock: ` UPDATE kvs SET to_block = ? - WHERE k = ? AND to_block IS NULL`, + WHERE k = ? AND id = ? AND to_block IS NULL`, insertIntoKvs: ` - INSERT INTO kvs (from_block, to_block, k, v) - VALUES (?, NULL, ?, ?)`, + INSERT INTO kvs (from_block, to_block, k, v, id) + VALUES (?, NULL, ?, ?, ?)`, del: ` UPDATE kvs SET to_block = ? - WHERE k = ? AND to_block IS NULL`, + WHERE k = ? AND id = ? AND to_block IS NULL`, finalize: ` DELETE FROM kvs - WHERE to_block <= ?`, + WHERE to_block <= ? AND id = ?`, invalidateDelete: ` DELETE FROM kvs - WHERE from_block > ?`, + WHERE from_block > ? AND id = ?`, invalidateUpdate: ` UPDATE kvs SET to_block = NULL - WHERE to_block > ?`, -}; + WHERE to_block > ? AND id = ?`, + cleanup: ` + DELETE FROM kvs + WHERE id = ?`, +} as const; diff --git a/packages/plugin-sqlite/src/persistence.ts b/packages/plugin-sqlite/src/persistence.ts index 38868998..8df45056 100644 --- a/packages/plugin-sqlite/src/persistence.ts +++ b/packages/plugin-sqlite/src/persistence.ts @@ -100,6 +100,16 @@ export function invalidateState(props: { ); } +export function resetPersistence(props: { + db: Database; + indexerId: string; +}) { + const { db, indexerId } = props; + assertInTransaction(db); + db.prepare<[string]>(statements.resetCheckpoint).run(indexerId); + db.prepare<[string]>(statements.resetFilter).run(indexerId); +} + export type CheckpointRow = { id: string; order_key: number; @@ -168,4 +178,10 @@ const statements = { UPDATE filters SET to_block = NULL WHERE id = ? AND to_block > ?`, + resetCheckpoint: ` + DELETE FROM checkpoints + WHERE id = ?`, + resetFilter: ` + DELETE FROM filters + WHERE id = ?`, }; diff --git a/packages/plugin-sqlite/tests/kv.test.ts b/packages/plugin-sqlite/tests/kv.test.ts index cef3f22a..ae285325 100644 --- a/packages/plugin-sqlite/tests/kv.test.ts +++ b/packages/plugin-sqlite/tests/kv.test.ts @@ -52,36 +52,42 @@ describe("SQLite key-value store", () => { [ { "from_block": 5000000, + "id": "indexer_testing_default", "k": "data-5000000", "to_block": 5000001, "v": ""5000000"", }, { "from_block": 5000000, + "id": "indexer_testing_default", "k": "latest", "to_block": 5000001, "v": "5000000", }, { "from_block": 5000001, + "id": "indexer_testing_default", "k": "data-5000001", "to_block": 5000002, "v": ""5000001"", }, { "from_block": 5000001, + "id": "indexer_testing_default", "k": "latest", "to_block": 5000002, "v": "5000001", }, { "from_block": 5000002, + "id": "indexer_testing_default", "k": "data-5000002", "to_block": null, "v": ""5000002"", }, { "from_block": 5000002, + "id": "indexer_testing_default", "k": "latest", "to_block": null, "v": "5000002", @@ -267,18 +273,21 @@ describe("SQLite key-value store", () => { [ { "from_block": 103, + "id": "indexer_testing_default", "k": "data-103B", "to_block": null, "v": ""103B"", }, { "from_block": 104, + "id": "indexer_testing_default", "k": "data-104B", "to_block": null, "v": ""104B"", }, { "from_block": 105, + "id": "indexer_testing_default", "k": "data-105B", "to_block": null, "v": ""105B"", @@ -449,24 +458,28 @@ describe("SQLite key-value store", () => { [ { "from_block": 103, + "id": "indexer_testing_default", "k": "data-103", "to_block": null, "v": ""103B"", }, { "from_block": 104, + "id": "indexer_testing_default", "k": "data-104", "to_block": null, "v": ""104B"", }, { "from_block": 106, + "id": "indexer_testing_default", "k": "data-106", "to_block": null, "v": ""106BC"", }, { "from_block": 107, + "id": "indexer_testing_default", "k": "data-107", "to_block": null, "v": ""107BC"", @@ -550,12 +563,14 @@ describe("SQLite key-value store", () => { [ { "from_block": 5000001, + "id": "indexer_testing_default", "k": "block-5000001", "to_block": null, "v": ""accepted-block1"", }, { "from_block": 5000002, + "id": "indexer_testing_default", "k": "block-5000002", "to_block": null, "v": ""pending-block2"", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 1ea7c435..bf5b55ab 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -70,8 +70,8 @@ importers: specifier: workspace:* version: link:../../packages/cli drizzle-orm: - specifier: ^0.37.0 - version: 0.37.0(@electric-sql/pglite@0.2.17)(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.11)(@types/pg@8.11.10)(better-sqlite3@11.5.0)(pg@8.13.1)(postgres@3.4.4) + specifier: ^0.40.1 + version: 0.40.1(@electric-sql/pglite@0.2.17)(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.11)(@types/pg@8.11.10)(better-sqlite3@11.5.0)(pg@8.13.1)(postgres@3.4.4) pg: specifier: ^8.13.1 version: 8.13.1 @@ -2840,98 +2840,6 @@ packages: resolution: {integrity: sha512-OvHL8RVyYiPR3LLRE3SHdcON8xGXl+qMfR9uTTnFWBPIqVk/3NWYZPb7nfpM1Bhix3H+BsxqPyyagG7YZ+Z63A==} hasBin: true - drizzle-orm@0.37.0: - resolution: {integrity: sha512-AsCNACQ/T2CyZUkrBRUqFT2ibHJ9ZHz3+lzYJFFn3hnj7ylIeItMz5kacRG89uSE74nXYShqehr6u+6ks4JR1A==} - peerDependencies: - '@aws-sdk/client-rds-data': '>=3' - '@cloudflare/workers-types': '>=4' - '@electric-sql/pglite': '>=0.2.0' - '@libsql/client': '>=0.10.0' - '@libsql/client-wasm': '>=0.10.0' - '@neondatabase/serverless': '>=0.10.0' - '@op-engineering/op-sqlite': '>=2' - '@opentelemetry/api': ^1.4.1 - '@planetscale/database': '>=1' - '@prisma/client': '*' - '@tidbcloud/serverless': '*' - '@types/better-sqlite3': '*' - '@types/pg': '*' - '@types/react': '>=18' - '@types/sql.js': '*' - '@vercel/postgres': '>=0.8.0' - '@xata.io/client': '*' - better-sqlite3: '>=7' - bun-types: '*' - expo-sqlite: '>=14.0.0' - knex: '*' - kysely: '*' - mysql2: '>=2' - pg: '>=8' - postgres: '>=3' - prisma: '*' - react: '>=18' - sql.js: '>=1' - sqlite3: '>=5' - peerDependenciesMeta: - '@aws-sdk/client-rds-data': - optional: true - '@cloudflare/workers-types': - optional: true - '@electric-sql/pglite': - optional: true - '@libsql/client': - optional: true - '@libsql/client-wasm': - optional: true - '@neondatabase/serverless': - optional: true - '@op-engineering/op-sqlite': - optional: true - '@opentelemetry/api': - optional: true - '@planetscale/database': - optional: true - '@prisma/client': - optional: true - '@tidbcloud/serverless': - optional: true - '@types/better-sqlite3': - optional: true - '@types/pg': - optional: true - '@types/react': - optional: true - '@types/sql.js': - optional: true - '@vercel/postgres': - optional: true - '@xata.io/client': - optional: true - better-sqlite3: - optional: true - bun-types: - optional: true - expo-sqlite: - optional: true - knex: - optional: true - kysely: - optional: true - mysql2: - optional: true - pg: - optional: true - postgres: - optional: true - prisma: - optional: true - react: - optional: true - sql.js: - optional: true - sqlite3: - optional: true - drizzle-orm@0.40.1: resolution: {integrity: sha512-aPNhtiJiPfm3qxz1czrnIDkfvkSdKGXYeZkpG55NPTVI186LmK2fBLMi4dsHpPHlJrZeQ92D322YFPHADBALew==} peerDependencies: @@ -6850,16 +6758,6 @@ snapshots: transitivePeerDependencies: - supports-color - drizzle-orm@0.37.0(@electric-sql/pglite@0.2.17)(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.11)(@types/pg@8.11.10)(better-sqlite3@11.5.0)(pg@8.13.1)(postgres@3.4.4): - optionalDependencies: - '@electric-sql/pglite': 0.2.17 - '@opentelemetry/api': 1.9.0 - '@types/better-sqlite3': 7.6.11 - '@types/pg': 8.11.10 - better-sqlite3: 11.5.0 - pg: 8.13.1 - postgres: 3.4.4 - drizzle-orm@0.40.1(@electric-sql/pglite@0.2.17)(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.11)(@types/pg@8.11.10)(better-sqlite3@11.5.0)(pg@8.13.1)(postgres@3.4.4): optionalDependencies: '@electric-sql/pglite': 0.2.17