-
Notifications
You must be signed in to change notification settings - Fork 11
Remove sink in favour of middleware #128
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
2f162f7 to
76904be
Compare
📝 WalkthroughWalkthroughThis pull request introduces a comprehensive refactoring of the Apibara indexer architecture, focusing on replacing the sink mechanism with a middleware-based approach. The changes span multiple packages, including Changes
Sequence DiagramsequenceDiagram
participant Client
participant Indexer
participant Middleware
participant Storage
Client->>Indexer: Send request
Indexer->>Middleware: Apply middleware
Middleware->>Storage: Persist data
Storage-->>Middleware: Confirmation
Middleware-->>Indexer: Process complete
Indexer-->>Client: Response
Possibly related PRs
Suggested Reviewers
Poem
Tip CodeRabbit's docstrings feature is now available as part of our Early Access Program! Simply use the command Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
76904be to
36a3408
Compare
c21b6c2 to
28d4c0c
Compare
28d4c0c to
e71a2b1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 13
🔭 Outside diff range comments (3)
packages/plugin-mongo/src/storage.ts (1)
Line range hint
55-91: Soft-delete approach
Inserting new docs with _cursor.from and updating old docs with _cursor.to=orderKey effectively transforms updates into an append-only model. This is a valid approach, but verify that your DB indexes effectively handle queries by “_cursor.to = null.”Add indexes on “_cursor.to” to maintain performance at scale:
// Example code snippet in your database initialization logic: + db.collection('<collectionName>').createIndex({ "_cursor.to": 1 });packages/indexer/src/indexer.test.ts (2)
Line range hint
35-208: Add test cases for concurrent message processingThe current tests validate sequential message processing, but middleware might be used in concurrent scenarios.
Consider adding:
it("should handle concurrent message processing", async () => { const output: unknown[] = []; const indexer = getMockIndexer({ override: { plugins: [mockSink({ output })], transform: async (...args) => { // Simulate concurrent processing with random delays await new Promise(resolve => setTimeout(resolve, Math.random() * 100) ); return transform(...args); }, }, }); await run(client, indexer); // Verify output order matches cursor order expect(output.map(o => o.cursor)).toBeSorted(); });
Line range hint
209-413: Add test cases for middleware error propagationThe tests should verify that errors from middleware components are properly propagated and handled.
Consider adding:
it("should propagate middleware errors", async () => { const error = new Error("Middleware error"); const indexer = getMockIndexer({ override: { plugins: [ () => ({ beforeMessage: async () => { throw error; } }) ], transform, }, }); await expect(run(client, indexer)).rejects.toThrow(error); });
🧹 Nitpick comments (40)
pnpm-workspace.yaml (1)
4-4: Add newline at end of fileFollowing YAML best practices, add a newline character at the end of the file.
packages: - "packages/*" - "examples/**" - - "!examples/cli" + - "!examples/cli" +🧰 Tools
🪛 yamllint (1.35.1)
[error] 4-4: no new line character at the end of file
(new-line-at-end-of-file)
packages/plugin-drizzle/src/persistence.ts (2)
Line range hint
16-37: Potential naming convention improvement for 'checkpoints' and 'filters' tables.While the table names are meaningful, consider prefixing or suffixing them (e.g., "drizzle_checkpoints", "drizzle_filters") if your project commonly uses global naming conventions in shared databases. This helps avoid collisions and improves clarity.
Line range hint
71-192: Validate types and handle unexpected or malformed cursor/filter data.Methods like _getCheckpoint, _putCheckpoint, _getFilter, and _putFilter rely on valid cursor/filter values. Consider adding checks or error handling (e.g., DrizzleStorageError) to guard against corrupted or partial data in the database.
packages/plugin-sqlite/package.json (1)
20-27: Consider adding clean and type generation scriptsWhile the current scripts are good, consider adding:
- A
cleanscript to remove the dist directory before builds- A
build:typesscript to explicitly generate type declarations"scripts": { + "clean": "rm -rf dist", + "build:types": "tsc --emitDeclarationOnly", "build": "unbuild", "typecheck": "tsc --noEmit", "lint": "biome check .", "lint:fix": "pnpm lint --write", "test": "vitest", "test:ci": "vitest run" }packages/indexer/src/plugins/index.ts (1)
1-7: LGTM! Explicit exports improve code maintainability.The change from wildcard exports to explicit named exports provides better visibility into the public API and aligns with the transition to middleware pattern.
Consider documenting these exported entities in a README or API documentation to help users understand the new middleware-based architecture.
packages/indexer/src/plugins/persistence.ts (2)
5-7: Good use of documentation comments.
The description clearly conveys the in-memory persistence behavior and intent. Consider adding an example usage snippet to help new users quickly integrate the plugin.
30-36: Middleware approach for storing cursor is concise and flexible.
The approach ensures that the latest cursor is captured after each block is processed. However, if multiple workers or parallel executions are introduced later, the singlelastCursorcould cause race conditions or inconsistent states. Keep in mind concurrency concerns if the indexer evolves to parallel block processing.packages/plugin-sqlite/src/kv.ts (3)
15-24: Constructor ensures a transaction is in progress.
Requiring open transactions ensures atomic writes. Just be mindful that if this store is used at a high frequency, you may need to commit and reopen transactions to avoid locking the database for extended periods.
32-44: Multiple writes in a single transaction can lead to concurrency blocking.
Because the entire block is within one transaction, any concurrent writes could be locked. If scaling is expected, consider an approach that reduces transaction time or queues writes appropriately.
53-84: Solid schema for versioned key-value pairs.
The design allows for historical lookups and future expansions. Periodic cleanups may be necessary to remove rows whoseto_blockis no longer needed, preventing unbounded storage growth.packages/indexer/src/compose.ts (1)
33-70: Robust middleware composition logic.
• Handles sequential invocation, ensuring each middleware calls the next.
• Appropriately throws an error if next() is called multiple times, preventing re-entrancy issues.
• The error message "Handler not found" is descriptive, but consider logging the index or context for easier debugging.packages/indexer/src/internal/testing.ts (1)
9-30: Mock indexer approach is straightforward.
ThegenerateMockMessagesfunction now returns data with incremented order keys, which is a good practice to simulate real block increments. If rate or concurrency testing is anticipated, consider generating more varied or randomized data.packages/plugin-mongo/src/index.ts (3)
13-23: Ensure robust error handling for missing MongoStorage
The current approach throws an error when MongoStorage is unavailable, which is good for catching misconfigurations. Consider adding guidance or fallback logic for production environments to safeguard against abrupt crashes.
54-65: Potential fallback strategy for invalidation
When encountering an invalid cursor, you throw a MongoStorageError. As a fallback, it might help to re-request the block range or gracefully skip the invalid data to maintain the indexer’s continuity.
81-102: Persist state TODO
The TODO comment at line 98 suggests future state persistence logic. Consider using a well-known pattern (e.g., a dedicated collection) or leveraging transactions to store indexer state consistently.Would you like help drafting an implementation or opening a follow-up issue?
packages/plugin-sqlite/src/index.ts (3)
26-35: KeyValueStore retrieval
The function cleanly retrieves the store from context. Good error handling. As a small improvement, consider adding logging or partial fallback for production so the entire pipeline doesn’t crash if the store is unavailable.
63-74: Transaction handling in run:before
You initialize persistent state and/or KeyValueStore before running the indexer. This is a solid approach. As a best practice, ensure that large or time-consuming migrations or setups are done in a separate process to reduce indexer startup latency.
94-128: Middleware-based transaction approach
Your code runs the entire transform logic inside SQLite transactions for each block. This is a good design but watch out for potential performance bottlenecks on large blocks. Test for concurrency and throughput to ensure it meets production SLAs.packages/plugin-mongo/src/storage.ts (2)
25-44: Make concurrency and session usage explicit
The new MongoStorage class gracefully wraps theDb and session. Consider clarifying how many concurrent operations might share this session. If you expect parallel calls, either ensure session isolation or switch to session-per-call.
Line range hint
93-164: Document versioning overhead
The approach to versioning documents on every update can cause data growth. Keep an eye on storage usage. Periodic compaction or archival might be beneficial, especially once data is final.packages/indexer/src/indexer.ts (2)
201-202: Context creation
The new code sets up an indexerAsyncContext and extracts a shared context for each message. Track memory usage if the indexer handles a high volume of data concurrently.
260-262: Assigning cursor, endCursor, and finality
Storing these values on the context is convenient. Confirm that you do not inadvertently retain references to large objects in finality or cursor, especially if they contain big logs or state.packages/plugin-mongo/docker-compose.yaml (3)
5-6: Consider restricting port exposureThe MongoDB port is exposed to all interfaces. Consider restricting it to localhost if external access isn't required.
- - 27017:27017 + - 127.0.0.1:27017:27017
7-12: Add newline at end of fileAdd a newline character at the end of the file to comply with POSIX standards.
retries: 30 +🧰 Tools
🪛 yamllint (1.35.1)
[error] 12-12: no new line character at the end of file
(new-line-at-end-of-file)
3-4: Consider pinning to specific MongoDB versionUsing major version tag (7.0) could lead to unexpected behavior if minor versions introduce changes.
- image: mongo:7.0 + image: mongo:7.0.4packages/plugin-mongo/src/utils.ts (1)
10-19: Consider simplifying the transaction handlingThe nested async/await structure could be flattened for better readability while maintaining the same functionality.
export async function withTransaction<T>( client: MongoClient, cb: (session: ClientSession) => Promise<T>, ) { - return await client.withSession(async (session) => { - return await session.withTransaction(async (session) => { - return await cb(session); - }); - }); + const session = await client.startSession(); + try { + const result = await session.withTransaction(() => cb(session)); + return result; + } finally { + await session.endSession(); + } }packages/indexer/src/context.ts (2)
6-6: Consider adding type safety to IndexerContextWhile extending
Record<string, any>provides flexibility for middleware, it might lead to runtime errors. Consider using a discriminated union or branded types to maintain type safety while allowing extensibility.Example approach:
export interface BaseIndexerContext { readonly _type: string; } export interface IndexerContext extends BaseIndexerContext, Record<string, unknown> {}
23-24: Consider adding runtime type checkingThe type assertion might fail silently if the context doesn't contain metadata properties.
Consider adding runtime validation:
export function useMessageMetadataContext(): MessageMetadataContext { const ctx = useIndexerContext(); if (ctx.cursor !== undefined && !(ctx.cursor instanceof Uint8Array)) { throw new Error('Invalid cursor type in context'); } return ctx as MessageMetadataContext; }packages/indexer/src/plugins/logger.ts (1)
Line range hint
10-20: Consider enhancing logger configurationThe logger setup looks good but could benefit from additional middleware-specific logging capabilities.
Consider adding middleware lifecycle logging:
return defineIndexerPlugin<TFilter, TBlock>((indexer) => { indexer.hooks.hook("run:before", () => { const ctx = useIndexerContext(); const loggerInstance = logger ? consola.create({ reporters: [logger] }) : consola.create({}); ctx.logger = loggerInstance; loggerInstance.info('Initializing indexer middleware'); }); indexer.hooks.hook("run:after", () => { const ctx = useIndexerContext(); ctx.logger?.info('Shutting down indexer middleware'); }); });packages/plugin-sqlite/src/utils.ts (1)
41-47: Consider performance impact of pretty printingThe
"\t"parameter in JSON.stringify creates pretty-printed output, which increases storage size.Remove pretty printing unless needed for debugging:
return JSON.stringify( obj, (_, value) => (typeof value === "bigint" ? `${value.toString()}n` : value), - "\t", );packages/plugin-mongo/src/mongo.ts (3)
13-20: Consider adding indexes for _cursor fieldsThe MongoDB operations rely heavily on
_cursor.fromand_cursor.tofields. Consider adding indexes to optimize these queries.Example index creation:
// Add to collection setup await collection.createIndex({ "_cursor.from": 1 }); await collection.createIndex({ "_cursor.to": 1 });Also applies to: 23-31
35-51: Document the finalize function's purposeThe function's role in the lifecycle of documents should be documented, especially its relationship with the invalidate function.
+/** + * Removes documents that have been finalized up to the given cursor. + * This operation is permanent and cannot be undone. + * + * @param db - MongoDB database instance + * @param session - Active MongoDB session for transaction + * @param cursor - Cursor indicating the finalization point + * @param collections - Array of collection names to process + */ export async function finalize(
1-51: Consider implementing retry logic for MongoDB operationsMongoDB operations might fail due to temporary network issues or failovers. Consider implementing retry logic for resilience.
async function withRetry<T>( operation: () => Promise<T>, maxRetries = 3, delay = 1000 ): Promise<T> { let lastError: Error; for (let i = 0; i < maxRetries; i++) { try { return await operation(); } catch (error) { lastError = error; if (i < maxRetries - 1) { await new Promise(resolve => setTimeout(resolve, delay * Math.pow(2, i))); } } } throw lastError; }packages/protocol/src/common.ts (1)
67-69: Add JSDoc comment for the type guard function.The
isCursortype guard implementation is correct and enhances type safety. Consider adding a JSDoc comment to document its purpose and usage.+/** + * Type guard to check if a value is a valid Cursor. + * @param value - The value to check + * @returns True if the value is a Cursor, false otherwise + */ export function isCursor(value: unknown): value is Cursor { return Schema.is(Cursor)(value); }packages/indexer/src/compose.test.ts (1)
41-75: Consider adding more test cases for robustness.The current tests verify the happy path well. Consider adding:
- Error propagation tests
- Negative test cases (e.g., empty middleware array)
- Explicit order verification using timestamps or sequence numbers
Example test case for error propagation:
it("propagates errors through middleware chain", async () => { const errorMiddleware: MiddlewareFunction<C> = async () => { throw new Error("Test error"); }; const context: C = { bag: {}, finalized: false }; const composed = compose<C>([...middleware, errorMiddleware]); await expect(composed(context)).rejects.toThrow("Test error"); });packages/plugin-sqlite/tests/persistence.test.ts (1)
52-61: Consider adding test coverage for concurrent transactionsWhile the error handling test is good, it would be valuable to add tests for concurrent transactions to ensure the middleware handles simultaneous operations correctly.
Example test case:
it("should handle concurrent transactions correctly", async () => { const db = new Database(":memory:"); const client = new MockClient<MockFilter, MockBlock>(() => generateMockMessages(3)); // Run multiple indexers concurrently const indexers = Array(3).fill(null).map(() => getMockIndexer({ override: { plugins: [sqliteStorage({ database: db })] } }) ); await Promise.all(indexers.map(indexer => run(client, indexer))); // Verify database consistency const rows = db.prepare("SELECT * FROM checkpoints").all(); expect(rows).toHaveLength(1); // Should have only one checkpoint });packages/plugin-sqlite/tests/kv.test.ts (1)
47-87: Consider simplifying test assertionsThe snapshot test is quite verbose. Consider using a more focused assertion that checks specific properties rather than the entire data structure.
Example:
const rows = db.prepare("SELECT * FROM kvs").all(); expect(rows).toEqual( expect.arrayContaining([ expect.objectContaining({ k: "latest", v: "5000002" }) ]) );packages/plugin-sqlite/src/persistence.ts (1)
40-66: Improve type safety in state retrievalThe
getStatefunction could benefit from stronger type checking for the filter deserialization.export function getState<TFilter>(db: Database) { assertInTransaction(db); const storedCursor = db .prepare<string, { order_key?: number; unique_key?: string }>( statements.getCheckpoint, ) .get(DEFAULT_INDEXER_ID); const storedFilter = db .prepare<string, { filter: string }>(statements.getFilter) .get(DEFAULT_INDEXER_ID); let cursor: Cursor | undefined; let filter: TFilter | undefined; if (storedCursor?.order_key) { cursor = { orderKey: BigInt(storedCursor.order_key), uniqueKey: storedCursor.unique_key as `0x${string}`, }; } if (storedFilter) { + try { filter = deserialize(storedFilter.filter) as TFilter; + // Add runtime type check if possible + if (!filter || typeof filter !== 'object') { + throw new Error('Invalid filter format'); + } + } catch (error) { + throw new Error(`Failed to deserialize filter: ${error.message}`); + } } return { cursor, filter }; }packages/plugin-mongo/tests/storage.test.ts (1)
211-216: Use environment variables for database configurationThe MongoDB connection string is hardcoded, which could make the tests less portable across different environments.
+const MONGODB_URL = process.env.MONGODB_URL || "mongodb://localhost:27017/?replicaSet=rs0"; function getRandomDatabase() { const dbName = crypto.randomUUID().replace(/-/g, "_"); - const client = new MongoClient("mongodb://localhost:27017/?replicaSet=rs0"); + const client = new MongoClient(MONGODB_URL); const db = client.db(dbName); return { db, client, dbName }; }packages/indexer/src/indexer.test.ts (1)
17-34: Add type validation for transform functionThe transform function's type parameters could be more strictly typed to ensure type safety.
-async function transform<TData>({ +async function transform<TData extends string | number>({
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
pnpm-lock.yamlis excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (80)
change/@apibara-indexer-19b12d18-b9e8-44db-9ece-5cb0d0784012.json(1 hunks)change/@apibara-plugin-drizzle-c6284fe6-3f4f-4d1d-8314-1834f73eb33f.json(1 hunks)change/@apibara-plugin-mongo-d53843ca-d645-4e21-b42f-a658e4276a08.json(1 hunks)change/@apibara-plugin-sqlite-eac88e8e-d56c-4a1e-8f66-0a9fd8f578d0.json(1 hunks)change/@apibara-protocol-5a2a4d68-0d05-4f90-875b-5e3e56de08ec.json(1 hunks)change/apibara-3b2198a6-9656-4ad6-acbd-8ae728aa1d15.json(1 hunks)examples/indexer/package.json(0 hunks)examples/indexer/src/indexer.ts(0 hunks)examples/indexer/src/instrumentation.ts(0 hunks)examples/indexer/src/main.ts(0 hunks)examples/starknet-indexer/biome.json(0 hunks)examples/starknet-indexer/package.json(0 hunks)examples/starknet-indexer/src/indexer.ts(0 hunks)examples/starknet-indexer/src/main.ts(0 hunks)packages/cli/src/runtime/internal/app.ts(1 hunks)packages/cli/src/types/config.ts(1 hunks)packages/indexer/build.config.ts(0 hunks)packages/indexer/package.json(0 hunks)packages/indexer/src/compose.test.ts(1 hunks)packages/indexer/src/compose.ts(1 hunks)packages/indexer/src/context.ts(1 hunks)packages/indexer/src/index.ts(0 hunks)packages/indexer/src/indexer.test.ts(5 hunks)packages/indexer/src/indexer.ts(7 hunks)packages/indexer/src/internal/testing.ts(2 hunks)packages/indexer/src/plugins/config.ts(1 hunks)packages/indexer/src/plugins/index.ts(1 hunks)packages/indexer/src/plugins/kv.test.ts(0 hunks)packages/indexer/src/plugins/kv.ts(0 hunks)packages/indexer/src/plugins/logger.ts(1 hunks)packages/indexer/src/plugins/persistence.test.ts(0 hunks)packages/indexer/src/plugins/persistence.ts(2 hunks)packages/indexer/src/sink.ts(0 hunks)packages/indexer/src/sinks/csv.test.ts(0 hunks)packages/indexer/src/sinks/csv.ts(0 hunks)packages/indexer/src/sinks/drizzle/Int8Range.ts(0 hunks)packages/indexer/src/sinks/drizzle/delete.ts(0 hunks)packages/indexer/src/sinks/drizzle/drizzle.test.ts(0 hunks)packages/indexer/src/sinks/drizzle/drizzle.ts(0 hunks)packages/indexer/src/sinks/drizzle/index.ts(0 hunks)packages/indexer/src/sinks/drizzle/insert.ts(0 hunks)packages/indexer/src/sinks/drizzle/select.ts(0 hunks)packages/indexer/src/sinks/drizzle/transaction.ts(0 hunks)packages/indexer/src/sinks/drizzle/update.ts(0 hunks)packages/indexer/src/sinks/drizzle/utils.ts(0 hunks)packages/indexer/src/sinks/sqlite.test.ts(0 hunks)packages/indexer/src/sinks/sqlite.ts(0 hunks)packages/indexer/src/testing/index.ts(1 hunks)packages/indexer/src/vcr/record.ts(1 hunks)packages/indexer/src/vcr/replay.ts(1 hunks)packages/plugin-drizzle/README.md(1 hunks)packages/plugin-drizzle/package.json(1 hunks)packages/plugin-drizzle/src/index.ts(1 hunks)packages/plugin-drizzle/src/persistence.test.ts(1 hunks)packages/plugin-drizzle/src/persistence.ts(2 hunks)packages/plugin-drizzle/src/utils.ts(1 hunks)packages/plugin-mongo/build.config.ts(1 hunks)packages/plugin-mongo/docker-compose.yaml(1 hunks)packages/plugin-mongo/package.json(1 hunks)packages/plugin-mongo/src/index.ts(1 hunks)packages/plugin-mongo/src/mongo.ts(1 hunks)packages/plugin-mongo/src/storage.ts(3 hunks)packages/plugin-mongo/src/utils.ts(1 hunks)packages/plugin-mongo/tests/storage.test.ts(1 hunks)packages/plugin-sqlite/README.md(1 hunks)packages/plugin-sqlite/build.config.ts(1 hunks)packages/plugin-sqlite/package.json(1 hunks)packages/plugin-sqlite/src/index.ts(1 hunks)packages/plugin-sqlite/src/kv.ts(1 hunks)packages/plugin-sqlite/src/persistence.ts(1 hunks)packages/plugin-sqlite/src/utils.ts(1 hunks)packages/plugin-sqlite/tests/kv.test.ts(1 hunks)packages/plugin-sqlite/tests/persistence.test.ts(1 hunks)packages/protocol/src/common.ts(1 hunks)packages/sink-mongo/docker-compose.yaml(0 hunks)packages/sink-mongo/src/index.ts(0 hunks)packages/sink-mongo/src/mongo.test.ts(0 hunks)packages/sink-mongo/src/mongo.ts(0 hunks)packages/sink-mongo/src/transaction.ts(0 hunks)pnpm-workspace.yaml(1 hunks)
💤 Files with no reviewable changes (34)
- examples/indexer/package.json
- packages/indexer/src/index.ts
- examples/starknet-indexer/biome.json
- packages/indexer/src/sinks/drizzle/index.ts
- examples/starknet-indexer/package.json
- examples/indexer/src/instrumentation.ts
- packages/sink-mongo/src/index.ts
- examples/indexer/src/main.ts
- packages/indexer/src/sinks/csv.test.ts
- packages/indexer/src/sink.ts
- packages/indexer/src/sinks/drizzle/delete.ts
- packages/indexer/src/plugins/kv.test.ts
- examples/starknet-indexer/src/main.ts
- packages/indexer/build.config.ts
- examples/starknet-indexer/src/indexer.ts
- examples/indexer/src/indexer.ts
- packages/indexer/src/sinks/drizzle/transaction.ts
- packages/indexer/src/sinks/drizzle/select.ts
- packages/indexer/src/sinks/sqlite.ts
- packages/indexer/src/sinks/drizzle/insert.ts
- packages/indexer/src/sinks/drizzle/update.ts
- packages/indexer/src/sinks/sqlite.test.ts
- packages/indexer/src/sinks/drizzle/drizzle.test.ts
- packages/indexer/src/sinks/drizzle/drizzle.ts
- packages/indexer/src/plugins/persistence.test.ts
- packages/indexer/src/sinks/csv.ts
- packages/indexer/src/plugins/kv.ts
- packages/sink-mongo/docker-compose.yaml
- packages/indexer/src/sinks/drizzle/utils.ts
- packages/sink-mongo/src/mongo.ts
- packages/sink-mongo/src/mongo.test.ts
- packages/indexer/src/sinks/drizzle/Int8Range.ts
- packages/indexer/package.json
- packages/sink-mongo/src/transaction.ts
✅ Files skipped from review due to trivial changes (14)
- packages/plugin-drizzle/README.md
- packages/plugin-sqlite/README.md
- change/@apibara-plugin-mongo-d53843ca-d645-4e21-b42f-a658e4276a08.json
- change/apibara-3b2198a6-9656-4ad6-acbd-8ae728aa1d15.json
- change/@apibara-indexer-19b12d18-b9e8-44db-9ece-5cb0d0784012.json
- packages/plugin-drizzle/src/persistence.test.ts
- change/@apibara-plugin-sqlite-eac88e8e-d56c-4a1e-8f66-0a9fd8f578d0.json
- change/@apibara-plugin-drizzle-c6284fe6-3f4f-4d1d-8314-1834f73eb33f.json
- change/@apibara-protocol-5a2a4d68-0d05-4f90-875b-5e3e56de08ec.json
- packages/plugin-mongo/build.config.ts
- packages/plugin-sqlite/build.config.ts
- packages/plugin-mongo/package.json
- packages/cli/src/runtime/internal/app.ts
- packages/plugin-drizzle/package.json
🧰 Additional context used
🪛 yamllint (1.35.1)
pnpm-workspace.yaml
[error] 4-4: no new line character at the end of file
(new-line-at-end-of-file)
packages/plugin-mongo/docker-compose.yaml
[error] 12-12: no new line character at the end of file
(new-line-at-end-of-file)
🔇 Additional comments (36)
packages/plugin-drizzle/src/persistence.ts (3)
1-1: Code is fully commented out.
The entire file appears to be enclosed within a block comment (/* ... */), which prevents it from actually being compiled or executed. Verify if this is intentional or if the block comment should be removed to allow the code to run.
Line range hint 39-69: Ensure hooks are idempotent and gracefully handle repeated invocations.
When an indexer restarts or partially replays, the “run:before” and “connect:before” hooks may be called multiple times. Confirm that redundant calls to create or retrieve the DrizzlePersistence instance won't lead to duplicate state or unexpected side effects.
Line range hint 71-192: Consider concurrency or transaction boundaries when writing filters.
If multiple parallel indexing operations or transactions occur, the logic updating the "fromBlock" and "toBlock" could lead to race conditions. Confirm that your transaction handling is robust enough to preserve correct filter history without overwriting.
packages/plugin-drizzle/src/utils.ts (1)
1-6: Good practice to define a custom error class.
Using a custom error name helps distinguish storage-specific issues from generic errors. Ensure it is logged or handled centrally for consistent error reporting within your codebase.
packages/plugin-drizzle/src/index.ts (1)
3-5: Placeholder function is unimplemented.
The drizzleStorage<TFilter, TBlock> function currently throws "Not implemented". Make sure to define the method's logic or remove it to avoid confusion if production code accidentally calls it.
Do you want me to create a placeholder implementation that clarifies the intent or open a GitHub issue to track its progress?
packages/plugin-sqlite/package.json (2)
10-19: LGTM! Well-structured module exports
The export configuration properly supports both ESM and CJS, with TypeScript types correctly declared. This follows modern package.json best practices.
34-40: Verify dependency compatibility with new middleware approach
Please ensure:
- Compatibility with better-sqlite3 v9.0.0
- Workspace dependencies (@apibara/indexer, @apibara/protocol) support the new middleware pattern
✅ Verification successful
Let me analyze the SQLite plugin's implementation to verify its compatibility with the middleware pattern and check the better-sqlite3 version details.
Dependencies are compatible with middleware approach
The SQLite plugin already implements the middleware pattern correctly through the handler:middleware hook, and the workspace dependencies (@apibara/indexer, @apibara/protocol) support the middleware functionality as evidenced by the codebase analysis. The better-sqlite3 v9.0.0 peer dependency is appropriate as the plugin uses basic SQLite operations that are stable across major versions.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check if @apibara/indexer exports middleware-related functionality
rg -A 5 "middleware" "packages/indexer/src/"
# Check for breaking changes in better-sqlite3 v9.0.0
curl -s https://raw.githubusercontent.com/WiseLibs/better-sqlite3/master/CHANGELOG.md | rg -A 10 "9.0.0"
Length of output: 6445
Script:
#!/bin/bash
# Check if SQLite plugin uses middleware pattern
rg -A 5 "middleware" "packages/plugin-sqlite/"
# Check SQLite plugin's integration with indexer
ast-grep --pattern 'class $_ implements $_' packages/plugin-sqlite/src/
# Check better-sqlite3 version in package-lock.json
rg '"better-sqlite3"' package-lock.json -A 2
Length of output: 1044
packages/indexer/src/plugins/index.ts (1)
8-8: Verify the persistence implementation compatibility.
The introduction of inMemoryPersistence aligns with the PR's goal of replacing sinks with middleware.
Let's verify the implementation and its usage:
✅ Verification successful
Implementation verified and properly integrated
The inMemoryPersistence implementation is correctly designed as a middleware plugin that:
- Persists the last cursor and filter in memory
- Properly restores state on reconnection
- Is correctly integrated as the first plugin in the CLI's indexer creation
- Maintains compatibility with the middleware architecture by using the hooks system
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check the implementation and usage of inMemoryPersistence
# Check implementation
ast-grep --pattern 'export function inMemoryPersistence' -A 10
# Check usage patterns
rg -l "inMemoryPersistence" --type typescript
Length of output: 145
Script:
#!/bin/bash
# Let's try a different approach to find the implementation and usage
# Find files containing "inMemoryPersistence"
rg "inMemoryPersistence"
# Look for the implementation with a more flexible pattern
ast-grep --pattern 'export { inMemoryPersistence }'
# Find the actual implementation file
fd persistence.ts
Length of output: 644
Script:
#!/bin/bash
# Let's examine the implementation and its integration
# Check the implementation details
cat packages/indexer/src/plugins/persistence.ts
# Check how it's used in the CLI app
cat packages/cli/src/runtime/internal/app.ts
Length of output: 3181
packages/indexer/src/plugins/persistence.ts (2)
1-2: No issues noted with the import statements.
They appear to be correct and minimal, so there's nothing to change here.
Line range hint 8-29: In-memory storage may risk losing data on indexer restarts.
Storing the cursor and filter in memory is fast and easy but can lead to data loss if the indexer process restarts. Confirm that this tradeoff is acceptable for the intended usage scenario. If long-term persistence is required, consider a fallback to a durable storage mechanism.
packages/plugin-sqlite/src/kv.ts (3)
10-13: Verify transactional integrity for table creation.
The function asserts that we are currently in a transaction before creating the table, which is good. However, if table creation fails, the transaction might be partially complete. Make sure to handle or log any errors accordingly so that partial transactions do not leave the database in an inconsistent state.
26-30: Potential null references.
If no row is returned, the function returns undefined. Callers might fail unexpectedly if they assume a value is always present. Make sure to either handle this upstream or provide a fallback.
46-50: Explicit delete approach is effective for versioning.
Setting to_block tracks key versions across block ranges. This ensures historical or version-like lookups. Confirm you’re comfortable with the growing size of the table over time, as logical deletes can accumulate.
packages/indexer/src/compose.ts (2)
1-23: License header is properly included.
No further action required.
27-31: Clear definitions for NextFunction and MiddlewareFunction.
These definitions are concise and help ensure consistent contracts for middleware. Good job documenting function signatures for clarity.
packages/indexer/src/internal/testing.ts (3)
1-8: Imports are well-organized.
The addition of isCursor ensures runtime checks for cursor validity, which helps maintain type correctness.
48-87: Implementation of a mock sink as a plugin is aligned with the new middleware system.
• Writing to output in handler:middleware is a clean approach for capturing processed data.
• We see you persist lastCursor in metadata for re-connect scenarios, which is a great pattern for replays.
• Make sure concurrency is not an issue if multiple parallel tests run with the same metadata.
90-92: Convenient utility to retrieve the output array.
By exposing useMockSink, test writers can more easily verify results from the indexer’s processing pipeline. Make sure that test frameworks reset or reinitialize the sink state across test runs to avoid leaking data.
packages/plugin-mongo/src/index.ts (2)
41-52: Validate transaction concurrency
This hook finalizes messages within a withTransaction block. Double-check concurrency requirements and potential race conditions if multiple messages arrive in quick succession. Ensure that MongoDB’s session concurrency aligns with your indexing throughput needs.
67-79: Revisit partial rollback logic
You’re invalidating data for blocks that might not have persisted on restart. Confirm via integration testing that you’re not discarding data that was successfully written but not updated in the local state.
packages/plugin-sqlite/src/index.ts (2)
76-92: Selective state restoration
You restore the cursor and filter based on persisted state. Confirm that an uninitialized or corrupted database doesn’t lead to a partial restore scenario that breaks the indexer’s flow. A safe fallback or reinitialization code path might be necessary.
130-142: connect:factory usage
The code forces a transaction to be open for the connect factory logic, ensuring consistent state writes. Double-check that the re-request or reconfiguration doesn’t repeatedly open nested transactions.
packages/indexer/src/indexer.ts (4)
25-36: Middleware composition
Introducing the new UseMiddlewareFunction is a clean approach to unify your pipeline under a standard abstraction. Just ensure that each piece of middleware is well-tested and that error flows are properly handled.
48-50: Request handling in connect:after
The addition of the request parameter allows hooking advanced logic after establishing a connection. Confirm that hooking in at this stage is not too late for certain transformations (e.g., if the server has already started sending blocks).
58-58: handler:middleware
Replacing multiple hooks with a single “handler:middleware” is simpler. Verify there are no corner cases from removed hooks (e.g., “handler:before,” “transaction:commit”), especially if logic was previously distributed among them.
264-336: Dynamic reconfiguration with factory blocks
The logic merges new filters via the factory approach. This is sophisticated but can get complex in production if you have frequent filter updates. Test for edge cases like large or invalid filter merges.
packages/plugin-mongo/src/utils.ts (1)
3-8: LGTM! Well-structured error class.
The MongoStorageError class follows best practices by extending Error and setting a custom name.
packages/indexer/src/context.ts (1)
17-21: LGTM! Well-structured metadata context
The MessageMetadataContext interface is well-designed with optional fields, allowing for flexible usage in different scenarios.
packages/indexer/src/vcr/replay.ts (1)
12-12: Verify type parameter removal across codebase
The removal of TTxnParams from the Indexer type aligns with the PR objective of simplifying the architecture. However, we should ensure this change is consistently applied across all usages.
✅ Verification successful
Type parameter removal is consistent across codebase
The search results confirm that the Indexer interface and all its usages throughout the codebase consistently use only two type parameters (TFilter, TBlock). There are no instances of the interface or its usage with three type parameters, confirming that the removal of TTxnParams has been properly applied across the entire codebase.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for any remaining usage of TTxnParams in indexer types
rg "Indexer<.*,.*,.*>" --type ts
Length of output: 34
Script:
#!/bin/bash
# Let's try a broader search for Indexer type usage
rg "Indexer<" --type ts -A 2
# Also search for class/interface definition
ast-grep --pattern 'interface Indexer<$_>'
ast-grep --pattern 'class Indexer<$_>'
Length of output: 2558
packages/indexer/src/vcr/record.ts (1)
16-16: LGTM! Consistent with replay.ts changes
The removal of TTxnParams maintains consistency with the changes in replay.ts while preserving the core VCR functionality.
packages/indexer/src/testing/index.ts (1)
12-14: LGTM! Type signature simplification aligns with middleware transition.
The removal of TTxnParams from the run method signature is consistent with the PR's objective of transitioning from sinks to middleware. The simplified type parameters (TFilter and TBlock) maintain the essential functionality while reducing complexity.
packages/indexer/src/compose.test.ts (2)
4-9: LGTM! Clear type definitions for test context.
The type definitions for C and MiddlewareTuple are well-structured and focused.
12-32: LGTM! Well-structured test middleware functions.
The middleware functions a, b, c, and handler provide good coverage of different middleware patterns:
- Pre-processing (function
a) - Post-processing (function
b) - Context modification (function
c) - Request/response handling (function
handler)
packages/plugin-sqlite/tests/persistence.test.ts (1)
24-28: LGTM: Good use of middleware pattern
The test correctly demonstrates the new middleware approach by using the plugins array instead of a sink configuration. This aligns well with the PR's objective of transitioning from sink to middleware.
packages/plugin-sqlite/tests/kv.test.ts (1)
30-41: LGTM: Excellent hook-based implementation
The use of useSqliteKeyValueStore hook demonstrates a clean, middleware-friendly approach to accessing the key-value store. This is a good example of the transition from sink to middleware pattern.
packages/plugin-sqlite/src/persistence.ts (1)
68-113: LGTM: Well-structured SQL statements
The SQL statements are well-organized and follow good practices for SQLite table design and query optimization.
| packages: | ||
| - "packages/*" | ||
| - "examples/**" | ||
| - "!examples/cli" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
CLI example needs updating to use middleware instead of being excluded
The CLI examples are actively using the sink mechanism that's being replaced:
examples/cli/lib/schema.tsimports sink-related code from@apibara/indexer/sinks/drizzle- Both indexer examples use
drizzleSinkconfiguration - The examples should be updated to use the new middleware approach rather than being excluded from the workspace
Instead of excluding the CLI examples from the workspace, consider:
- Updating the examples to demonstrate the new middleware pattern
- Using this as an opportunity to showcase the migration path for users
🔗 Analysis chain
Verify the CLI example exclusion impact
Given that this PR replaces the sink mechanism with middleware, please verify if the CLI example needs to be updated rather than excluded from the workspace.
Would you like help updating the CLI example to work with the new middleware approach instead of excluding it?
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check if CLI example contains sink-related code that needs updating
rg -l "sink" examples/cli/
Length of output: 136
Script:
#!/bin/bash
# Check the actual sink usage in CLI example files
echo "=== schema.ts ==="
rg "sink" examples/cli/lib/schema.ts -B 2 -A 2
echo -e "\n=== 1-evm.indexer.ts ==="
rg "sink" examples/cli/indexers/1-evm.indexer.ts -B 2 -A 2
echo -e "\n=== 2-starknet.indexer.ts ==="
rg "sink" examples/cli/indexers/2-starknet.indexer.ts -B 2 -A 2
Length of output: 1351
🧰 Tools
🪛 yamllint (1.35.1)
[error] 4-4: no new line character at the end of file
(new-line-at-end-of-file)
| "files": [ | ||
| "dist", | ||
| "src", | ||
| "README.md" | ||
| ], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
README.md exists but lacks documentation
The README.md file exists but contains only placeholder content with "TODO" sections. Since this package is transitioning from sink to middleware pattern, proper documentation is essential for users to understand the new approach.
packages/plugin-sqlite/README.mdneeds to be updated with:- Comprehensive installation instructions
- Usage examples with the new middleware pattern
- Migration guide from sink to middleware (if applicable)
- API documentation
🔗 Analysis chain
Verify README.md existence and content
The package includes README.md in the files section, but as this is a new package, we should verify its existence and ensure it properly documents the new middleware approach replacing the sink pattern.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check if README.md exists and contains middleware documentation
if [ -f "packages/plugin-sqlite/README.md" ]; then
echo "README.md exists. Checking content..."
rg -i "middleware" "packages/plugin-sqlite/README.md"
else
echo "README.md is missing"
fi
Length of output: 182
Script:
#!/bin/bash
# Let's check if README.md exists and get its content
if [ -f "packages/plugin-sqlite/README.md" ]; then
echo "=== README.md content ==="
cat "packages/plugin-sqlite/README.md"
else
echo "README.md is missing"
fi
Length of output: 201
| @@ -1,4 +1,4 @@ | |||
| import type { ConsolaReporter } from "@apibara/indexer/plugins/logger"; | |||
| import type { ConsolaReporter } from "@apibara/indexer/plugins"; | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Import path consolidation needs attention
The verification revealed inconsistencies in the import patterns:
- The new consolidated path
@apibara/indexer/pluginsis used inpackages/cli/src/types/config.ts - However, several files still use the old path
@apibara/indexer/plugins/logger:- examples/cli/indexers/1-evm.indexer.ts
- examples/cli/indexers/2-starknet.indexer.ts
- examples/cli/indexers/3-starknet-factory.indexer.ts
The plugins/index.ts file correctly re-exports the logger functionality, so all imports should use the consolidated path.
🔗 Analysis chain
LGTM! Import path consolidation improves maintainability.
The updated import path aligns with the new export structure in @apibara/indexer/plugins.
Let's verify the consistency of this import pattern across the codebase:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for consistent import patterns of ConsolaReporter
# Find all imports of ConsolaReporter
rg "import.*ConsolaReporter.*from.*@apibara/indexer" --type typescript
# Ensure no old import paths remain
rg "import.*from.*@apibara/indexer/plugins/logger" --type typescript
Length of output: 213
Script:
#!/bin/bash
# Description: Check for consistent import patterns of ConsolaReporter using ripgrep without file type filter
# Find all imports of ConsolaReporter
rg "import.*ConsolaReporter.*from.*@apibara/indexer"
# Ensure no old import paths remain
rg "import.*from.*@apibara/indexer/plugins/logger"
# Also check for any potential .ts or .tsx files containing these imports
fd "\.(ts|tsx)$" --exec grep -l "ConsolaReporter"
Length of output: 778
Script:
#!/bin/bash
# Let's check the actual implementation and exports in the plugins directory
# Check the exports in plugins/index.ts
cat packages/indexer/src/plugins/index.ts
# Check the logger plugin implementation
cat packages/indexer/src/plugins/logger.ts
# Check how ConsolaReporter is used in the CLI
cat packages/cli/src/runtime/internal/logger.ts
Length of output: 2952
|
|
||
| async function registerMiddleware<TFilter, TBlock>( | ||
| indexer: Indexer<TFilter, TBlock>, | ||
| ): Promise<MiddlewareFunction<IndexerContext>> { | ||
| const middleware: MiddlewareFunction<IndexerContext>[] = []; | ||
| const use = (fn: MiddlewareFunction<IndexerContext>) => { | ||
| middleware.push(fn); | ||
| }; | ||
|
|
||
| await indexer.hooks.callHook("handler:middleware", { use }); | ||
|
|
||
| const composed = compose(middleware); | ||
|
|
||
| // Return a named function to help debugging | ||
| return async function _composedIndexerMiddleware( | ||
| context: IndexerContext, | ||
| next?: NextFunction, | ||
| ) { | ||
| await composed(context, next); | ||
| }; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Middleware setup
Composing middleware at runtime is flexible. Keep an eye on how you handle errors in these functions: unhandled rejections could leave the indexer in an inconsistent state if not carefully managed.
| export type IndexerPlugin<TFilter, TBlock> = ( | ||
| indexer: Indexer<TFilter, TBlock>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
TTxnParams type parameter still used in several files - needs updating
The removal of TTxnParams from IndexerPlugin type is inconsistent with its usage across the codebase. Found remaining references that need to be updated:
packages/plugin-drizzle/src/persistence.ts: Still usesTTxnParamsindefineIndexerPluginpackages/indexer/src/plugins/logger.ts: UsesTTxnParamsin logger functionpackages/indexer/src/vcr/replay.tsandrecord.ts: Both still referenceTTxnParams
These files need to be updated to align with the new middleware-based approach where TTxnParams has been removed from the plugin interface.
🔗 Analysis chain
LGTM! Type signature changes align with middleware transition.
The removal of TTxnParams from IndexerPlugin and defineIndexerPlugin signatures appropriately simplifies the plugin interface as part of the sink-to-middleware transition.
Let's verify the type parameter removal is consistent across the codebase:
Also applies to: 7-10
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for any remaining TTxnParams references that might need updating
rg "TTxnParams" --type typescript
Length of output: 68
Script:
#!/bin/bash
# Search for any remaining TTxnParams references using ripgrep without file type filter
rg "TTxnParams"
# Also search for IndexerPlugin usage to verify type parameter consistency
rg "IndexerPlugin"
Length of output: 2324
| cursor: Cursor, | ||
| collections: string[], | ||
| ) { | ||
| const orderKeyValue = Number(cursor.orderKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add error handling for cursor.orderKey conversion
The Number() conversion could produce NaN for invalid orderKey values. Consider adding validation to handle this case.
- const orderKeyValue = Number(cursor.orderKey);
+ const orderKeyValue = Number(cursor.orderKey);
+ if (Number.isNaN(orderKeyValue)) {
+ throw new Error('Invalid cursor orderKey: expected numeric value');
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const orderKeyValue = Number(cursor.orderKey); | |
| const orderKeyValue = Number(cursor.orderKey); | |
| if (Number.isNaN(orderKeyValue)) { | |
| throw new Error('Invalid cursor orderKey: expected numeric value'); | |
| } |
| export async function invalidate( | ||
| db: Db, | ||
| session: ClientSession, | ||
| cursor: Cursor, | ||
| collections: string[], | ||
| ) { | ||
| const orderKeyValue = Number(cursor.orderKey); | ||
| for (const collection of collections) { | ||
| // Delete documents where the lower bound of _cursor is greater than the invalidate cursor | ||
| await db.collection(collection).deleteMany( | ||
| { | ||
| "_cursor.from": { | ||
| $gt: orderKeyValue, | ||
| }, | ||
| }, | ||
| { session }, | ||
| ); | ||
|
|
||
| // Update documents where the upper bound of _cursor is greater than the invalidate cursor | ||
| await db.collection(collection).updateMany( | ||
| { "_cursor.to": { $gt: orderKeyValue } }, | ||
| { | ||
| $set: { | ||
| "_cursor.to": null, | ||
| }, | ||
| }, | ||
| { session }, | ||
| ); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add validation for collection names
The invalidate function accepts collection names without validation. Consider adding checks for invalid characters or empty strings.
+function validateCollectionName(name: string): void {
+ if (!name || /[$.]/.test(name)) {
+ throw new Error(`Invalid collection name: ${name}`);
+ }
+}
export async function invalidate(
db: Db,
session: ClientSession,
cursor: Cursor,
collections: string[],
) {
+ collections.forEach(validateCollectionName);
const orderKeyValue = Number(cursor.orderKey);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| export async function invalidate( | |
| db: Db, | |
| session: ClientSession, | |
| cursor: Cursor, | |
| collections: string[], | |
| ) { | |
| const orderKeyValue = Number(cursor.orderKey); | |
| for (const collection of collections) { | |
| // Delete documents where the lower bound of _cursor is greater than the invalidate cursor | |
| await db.collection(collection).deleteMany( | |
| { | |
| "_cursor.from": { | |
| $gt: orderKeyValue, | |
| }, | |
| }, | |
| { session }, | |
| ); | |
| // Update documents where the upper bound of _cursor is greater than the invalidate cursor | |
| await db.collection(collection).updateMany( | |
| { "_cursor.to": { $gt: orderKeyValue } }, | |
| { | |
| $set: { | |
| "_cursor.to": null, | |
| }, | |
| }, | |
| { session }, | |
| ); | |
| } | |
| } | |
| function validateCollectionName(name: string): void { | |
| if (!name || /[$.]/.test(name)) { | |
| throw new Error(`Invalid collection name: ${name}`); | |
| } | |
| } | |
| export async function invalidate( | |
| db: Db, | |
| session: ClientSession, | |
| cursor: Cursor, | |
| collections: string[], | |
| ) { | |
| collections.forEach(validateCollectionName); | |
| const orderKeyValue = Number(cursor.orderKey); | |
| for (const collection of collections) { | |
| // Delete documents where the lower bound of _cursor is greater than the invalidate cursor | |
| await db.collection(collection).deleteMany( | |
| { | |
| "_cursor.from": { | |
| $gt: orderKeyValue, | |
| }, | |
| }, | |
| { session }, | |
| ); | |
| // Update documents where the upper bound of _cursor is greater than the invalidate cursor | |
| await db.collection(collection).updateMany( | |
| { "_cursor.to": { $gt: orderKeyValue } }, | |
| { | |
| $set: { | |
| "_cursor.to": null, | |
| }, | |
| }, | |
| { session }, | |
| ); | |
| } | |
| } |
| export function persistState<TFilter>( | ||
| db: Database, | ||
| endCursor: Cursor, | ||
| filter?: TFilter, | ||
| ) { | ||
| assertInTransaction(db); | ||
|
|
||
| db.prepare(statements.putCheckpoint).run( | ||
| DEFAULT_INDEXER_ID, | ||
| Number(endCursor.orderKey), | ||
| endCursor.uniqueKey, | ||
| ); | ||
|
|
||
| if (filter) { | ||
| db.prepare(statements.updateFilterToBlock).run( | ||
| Number(endCursor.orderKey), | ||
| DEFAULT_INDEXER_ID, | ||
| ); | ||
| db.prepare(statements.insertFilter).run( | ||
| DEFAULT_INDEXER_ID, | ||
| serialize(filter as Record<string, unknown>), | ||
| Number(endCursor.orderKey), | ||
| ); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling for serialization failures
The persistState function should handle potential serialization errors when dealing with filters.
export function persistState<TFilter>(
db: Database,
endCursor: Cursor,
filter?: TFilter,
) {
assertInTransaction(db);
+ try {
db.prepare(statements.putCheckpoint).run(
DEFAULT_INDEXER_ID,
Number(endCursor.orderKey),
endCursor.uniqueKey,
);
if (filter) {
db.prepare(statements.updateFilterToBlock).run(
Number(endCursor.orderKey),
DEFAULT_INDEXER_ID,
);
db.prepare(statements.insertFilter).run(
DEFAULT_INDEXER_ID,
serialize(filter as Record<string, unknown>),
Number(endCursor.orderKey),
);
}
+ } catch (error) {
+ throw new Error(`Failed to persist state: ${error.message}`);
+ }
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| export function persistState<TFilter>( | |
| db: Database, | |
| endCursor: Cursor, | |
| filter?: TFilter, | |
| ) { | |
| assertInTransaction(db); | |
| db.prepare(statements.putCheckpoint).run( | |
| DEFAULT_INDEXER_ID, | |
| Number(endCursor.orderKey), | |
| endCursor.uniqueKey, | |
| ); | |
| if (filter) { | |
| db.prepare(statements.updateFilterToBlock).run( | |
| Number(endCursor.orderKey), | |
| DEFAULT_INDEXER_ID, | |
| ); | |
| db.prepare(statements.insertFilter).run( | |
| DEFAULT_INDEXER_ID, | |
| serialize(filter as Record<string, unknown>), | |
| Number(endCursor.orderKey), | |
| ); | |
| } | |
| } | |
| export function persistState<TFilter>( | |
| db: Database, | |
| endCursor: Cursor, | |
| filter?: TFilter, | |
| ) { | |
| assertInTransaction(db); | |
| try { | |
| db.prepare(statements.putCheckpoint).run( | |
| DEFAULT_INDEXER_ID, | |
| Number(endCursor.orderKey), | |
| endCursor.uniqueKey, | |
| ); | |
| if (filter) { | |
| db.prepare(statements.updateFilterToBlock).run( | |
| Number(endCursor.orderKey), | |
| DEFAULT_INDEXER_ID, | |
| ); | |
| db.prepare(statements.insertFilter).run( | |
| DEFAULT_INDEXER_ID, | |
| serialize(filter as Record<string, unknown>), | |
| Number(endCursor.orderKey), | |
| ); | |
| } | |
| } catch (error) { | |
| throw new Error(`Failed to persist state: ${error.message}`); | |
| } | |
| } |
| it("should store data with a cursor", async () => { | ||
| const { db, client, dbName } = getRandomDatabase(); | ||
|
|
||
| const indexer = getMockIndexer({ | ||
| override: { | ||
| plugins: [mongoStorage({ client, dbName, collections: ["test"] })], | ||
| async transform({ endCursor, block: { data } }) { | ||
| const db = useMongoStorage(); | ||
| const rows = await db.collection("test").find({}).toArray(); | ||
|
|
||
| await db.collection("test").insertOne({ | ||
| blockNumber: Number(endCursor?.orderKey), | ||
| count: rows.length, | ||
| data, | ||
| }); | ||
| }, | ||
| }, | ||
| }); | ||
|
|
||
| const mockClient = new MockClient<MockFilter, MockBlock>( | ||
| (request, options) => { | ||
| return generateMockMessages(3); | ||
| }, | ||
| ); | ||
|
|
||
| await run(mockClient, indexer); | ||
|
|
||
| const result = await db.collection("test").find().toArray(); | ||
| expect(result.map(({ _id, ...rest }) => rest)).toMatchInlineSnapshot(` | ||
| [ | ||
| { | ||
| "_cursor": { | ||
| "from": 5000000, | ||
| "to": null, | ||
| }, | ||
| "blockNumber": 5000000, | ||
| "count": 0, | ||
| "data": "5000000", | ||
| }, | ||
| { | ||
| "_cursor": { | ||
| "from": 5000001, | ||
| "to": null, | ||
| }, | ||
| "blockNumber": 5000001, | ||
| "count": 1, | ||
| "data": "5000001", | ||
| }, | ||
| { | ||
| "_cursor": { | ||
| "from": 5000002, | ||
| "to": null, | ||
| }, | ||
| "blockNumber": 5000002, | ||
| "count": 2, | ||
| "data": "5000002", | ||
| }, | ||
| ] | ||
| `); | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add database cleanup and error handling tests
- The tests should clean up the test database after execution to prevent resource leaks.
- Missing error handling test cases for database operations (e.g., connection failures, write errors).
Consider adding:
- Cleanup in an
afterEachhook:
afterEach(async () => {
const { client, db } = getRandomDatabase();
await db.dropDatabase();
await client.close();
});- Error handling test cases:
it("should handle database connection errors", async () => {
const { client, dbName } = getRandomDatabase();
await client.close(); // Force connection error
const indexer = getMockIndexer({
override: {
plugins: [mongoStorage({ client, dbName, collections: ["test"] })],
transform: async () => {
// Test should fail when trying to use closed connection
},
},
});
await expect(run(mockClient, indexer)).rejects.toThrow();
});Also applies to: 78-150, 152-208
| import { MongoClient } from "mongodb"; | ||
| import { mongoStorage, useMongoStorage } from "../src"; | ||
|
|
||
| describe("SQLite persistence", () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix incorrect test suite name
The test suite is named "SQLite persistence" but it's actually testing MongoDB persistence.
-describe("SQLite persistence", () => {
+describe("MongoDB persistence", () => {📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| describe("SQLite persistence", () => { | |
| describe("MongoDB persistence", () => { |
jaipaljadeja
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks
This PR removes the "Sink" abstraction in favour of a more general purpose middleware pattern.
It's now technically possible to have multiple sinks in one indexer!