-
Notifications
You must be signed in to change notification settings - Fork 11
indexer: reconnect indexer on internal stream errors #121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
📝 Walkthrough📝 WalkthroughWalkthroughThis pull request introduces several JSON configuration files for the Changes
Possibly related PRs
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (5)
packages/indexer/src/indexer.ts (5)
154-158: Add documentation comments forReconnectOptionsinterfaceConsider adding JSDoc comments to the
ReconnectOptionsinterface properties (maxRetries,retryDelay,maxWait) to improve code readability and provide clarity on their usage and default values.
160-209: Enhance error handling by retrying on additional statusesCurrently,
runWithReconnectretries only onClientErrorwithStatus.INTERNAL. To improve resilience against transient network issues, consider adding retries for other error statuses likeUNAVAILABLEorUNKNOWN.Apply this diff to include additional retryable statuses:
if (error instanceof ClientError) { - if (error.code === Status.INTERNAL) { + if ( + error.code === Status.INTERNAL || + error.code === Status.UNAVAILABLE || + error.code === Status.UNKNOWN + ) { if (retryCount < maxRetries) {
196-199: Review retry delay calculation for exponential backoffThe current retry delay calculation uses a linear backoff strategy with jitter:
const delay = Math.random() * (retryDelay * 0.2) + retryDelay; await new Promise((resolve) => setTimeout(resolve, Math.min(retryCount * delay, maxWait)), );Consider implementing exponential backoff to manage retries more effectively and avoid overwhelming the server during high-load situations.
Here’s how you might adjust the calculation:
const delay = Math.random() * (retryDelay * 0.2) + retryDelay; -await new Promise((resolve) => - setTimeout(resolve, Math.min(retryCount * delay, maxWait)), +const exponentialDelay = Math.min( + Math.pow(2, retryCount) * delay, + maxWait, +); +await new Promise((resolve) => setTimeout(resolve, exponentialDelay)); );
262-276: ConfirmonConnectcallback is invoked at the correct timeThe
onConnectcallback is called after receiving the first message from the stream. If the intended behavior is to triggeronConnectimmediately after establishing the connection (before any messages are received), consider moving the callback invocation to just after the stream is created.Apply this diff to adjust the
onConnectinvocation:let onConnectCalled = false; while (true) { const { value: message, done } = await stream.next(); if (done) { break; } + if (!onConnectCalled) { + onConnectCalled = true; + if (runOptions.onConnect) { + await runOptions.onConnect(); + } + } await indexer.hooks.callHook("message", { message }); - if (!onConnectCalled) { - onConnectCalled = true; - if (runOptions.onConnect) { - await runOptions.onConnect(); - } - }
185-207: Enhance error logging before rethrowing exceptionsIn the catch block of
runWithReconnect, consider logging unexpected errors to aid in debugging. This ensures that unhandled exceptions are not silently ignored.Apply this diff to log unexpected errors:
// Only reconnect on internal/server errors. // All other errors should be rethrown. retryCount++; if (error instanceof ClientError) { // Existing error handling... } +consola.error("Unhandled error in runWithReconnect:", error); throw error;
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (9)
change/@apibara-indexer-875c9447-9ce4-466c-8149-26432d4c5d47.json(1 hunks)change/@apibara-protocol-94de228f-0ad4-4db9-bf0b-ceb0d5939b04.json(1 hunks)change/apibara-110721a2-c849-415f-8c45-de26c4daa99c.json(1 hunks)packages/cli/src/runtime/dev.ts(2 hunks)packages/cli/src/runtime/internal/app.ts(2 hunks)packages/cli/src/runtime/start.ts(2 hunks)packages/indexer/src/indexer.ts(3 hunks)packages/indexer/src/plugins/persistence.ts(1 hunks)packages/protocol/src/client.ts(1 hunks)
✅ Files skipped from review due to trivial changes (2)
- change/apibara-110721a2-c849-415f-8c45-de26c4daa99c.json
- change/@apibara-indexer-875c9447-9ce4-466c-8149-26432d4c5d47.json
🔇 Additional comments (8)
change/@apibara-protocol-94de228f-0ad4-4db9-bf0b-ceb0d5939b04.json (1)
1-7: JSON Configuration is Correctly Formatted
The JSON file is well-structured and includes all necessary fields for the prerelease update.
packages/cli/src/runtime/start.ts (1)
1-1: Correctly Importing runWithReconnect
The import statement properly includes runWithReconnect from @apibara/indexer.
packages/cli/src/runtime/dev.ts (2)
1-1: Proper Import of runWithReconnect
The runWithReconnect function is correctly imported from @apibara/indexer.
46-46: Implementing runWithReconnect for Robust Indexer Execution
Using runWithReconnect enhances the robustness of the indexer by adding reconnection logic. Verify that this change integrates seamlessly with the existing code and that any potential side effects are handled.
To ensure that all indexer executions utilize the updated runWithReconnect function, you may execute the following script:
This script will help identify any instances of the old run( function that may still exist in the codebase.
✅ Verification successful
Let me gather more information about the runWithReconnect function to understand its implementation and usage.
Let me check the implementation details of the regular run function to compare it with runWithReconnect.
Let me try a different approach to find the run function implementation.
Let me try one final search to find the run function with a more lenient pattern.
Based on the gathered information, I can now provide the final response:
runWithReconnect Implementation is Appropriate and Well-integrated
The change to use runWithReconnect is correct and beneficial. The function adds automatic reconnection logic with configurable retry parameters (maxRetries, retryDelay, maxWait) while maintaining the core functionality of the original run function. The implementation is properly integrated in both dev and start runtime modes.
- The original
runfunction is still used in test files and examples, which is appropriate as they don't require reconnection handling - The CLI commands (dev.ts and start.ts) consistently use
runWithReconnectfor production scenarios where connection resilience is important
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Ensure all indexer instances use `runWithReconnect`
rg 'run\(' -g '!**/node_modules/**' -t js -t ts | grep -v 'runWithReconnect'
Length of output: 3442
Script:
#!/bin/bash
# Find the implementation and usage of runWithReconnect
rg 'runWithReconnect' -A 5 -B 5 -g '!**/node_modules/**' -t ts
Length of output: 2771
Script:
#!/bin/bash
# Find the implementation of the run function
ast-grep --pattern 'export async function run($$$) {
$$$
}'
Length of output: 63
Script:
#!/bin/bash
# Find the implementation of the run function using ripgrep
rg 'export.*function run\(' -A 10 -g '!**/node_modules/**' -t ts
Length of output: 66
Script:
#!/bin/bash
# Find the implementation of the run function in indexer.ts
rg 'function run.*\(' -B 2 -A 15 packages/indexer/src/indexer.ts
Length of output: 1066
packages/cli/src/runtime/internal/app.ts (2)
3-3: Import statement added correctly
The inMemoryPersistence import is properly added from @apibara/indexer/plugins/persistence.
59-63: Plugins ordering ensures proper overrides
By adding inMemoryPersistence() at the beginning of the definition.plugins array and logger({ logger: reporter }) at the end, the code allows user-defined persistence plugins to override the in-memory persistence while ensuring the built-in logger overrides any user-defined loggers.
packages/protocol/src/client.ts (1)
24-24: Export ClientError and Status as values
Changing from export type to export for ClientError and Status correctly exports them as values. This is necessary if they are used at runtime, such as for error handling or checking status codes.
packages/indexer/src/indexer.ts (1)
271-278:
Move run:after hook outside the message loop
The run:after hook is currently called inside the message processing loop, which means it's invoked after each message. If it's intended to signal the end of the entire indexing run, it should be moved outside the loop to execute only once after the stream ends.
Apply this diff to move the hook call:
await indexer.hooks.callHook("run:after");
}
}
+await indexer.hooks.callHook("run:after");
Likely invalid or redundant comment.
| ); | ||
|
|
||
| await run(client, indexerInstance); | ||
| await runWithReconnect(client, indexerInstance); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Based on the findings, I can now generate the final response:
run Function Still in Active Use - Migration Required
The codebase shows multiple instances where the run function is still actively used, particularly in core functionality:
packages/indexer/src/indexer.ts: Both functions coexist, withrunbeing the base implementation- Multiple examples and test files still rely on the original
runfunction - Core indexer components in
packages/indexer/src/vcr/userun
The migration to runWithReconnect is incomplete and requires systematic updates across the codebase to maintain consistent error handling behavior.
🔗 Analysis chain
Enhancing Indexer Resilience with runWithReconnect
Replacing run with runWithReconnect improves the indexer's ability to handle internal stream errors through automatic reconnection. Ensure that this change aligns with the intended behavior and that all required error handling is appropriately managed.
To confirm that runWithReconnect is used consistently and that there are no lingering instances of the old run function, you can run the following script:
This will help identify any occurrences of run( that might need to be updated.
Let me gather more context about the runWithReconnect implementation and its relationship with run.
Let me try a different pattern to locate these functions.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Find any usages of `run(` not replaced by `runWithReconnect(`
rg 'run\(' -g '!**/node_modules/**' -t js -t ts | grep -v 'runWithReconnect'
Length of output: 3442
Script:
#!/bin/bash
# Find the implementation of runWithReconnect and run
ast-grep --pattern 'async function runWithReconnect($_) { $$$ }'
ast-grep --pattern 'async function run($_) { $$$ }'
Length of output: 120
Script:
#!/bin/bash
# Search for runWithReconnect and run function definitions
rg -A 10 "export (async )?function (run|runWithReconnect)" --type ts
Length of output: 1521
| if (lastCursor) { | ||
| // request.startingCursor = lastCursor; | ||
| // TODO: IF YOU SEE THIS DONT MERGE THE PRRRRRRRR | ||
| request.startingCursor = { | ||
| orderKey: lastCursor.orderKey - 1_000n, | ||
| uniqueKey: lastCursor.uniqueKey, | ||
| }; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: Remove debugging code and TODO comment before merging
The code contains a TODO comment indicating the PR should not be merged:
// TODO: IF YOU SEE THIS DONT MERGE THE PRRRRRRRRAdditionally, the request.startingCursor is being set to a modified value for testing purposes:
request.startingCursor = {
orderKey: lastCursor.orderKey - 1_000n,
uniqueKey: lastCursor.uniqueKey,
};Please remove the temporary debugging code and restore the correct assignment:
- // request.startingCursor = lastCursor;
- // TODO: IF YOU SEE THIS DONT MERGE THE PRRRRRRRR
- request.startingCursor = {
- orderKey: lastCursor.orderKey - 1_000n,
- uniqueKey: lastCursor.uniqueKey,
- };
+ request.startingCursor = lastCursor;991c30e to
9ceaf78
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (5)
packages/indexer/src/plugins/persistence.ts (2)
6-34: Consider adding memory management and error handling.The in-memory persistence implementation could benefit from some improvements:
Memory Management:
- Add a cleanup mechanism to prevent memory leaks in long-running processes
- Consider adding size limits for the stored cursors/filters
Error Handling:
- Add error handling for invalid cursor/filter states
- Consider adding validation for the filter structure
Here's a suggested implementation with these improvements:
export function inMemoryPersistence<TFilter, TBlock, TTxnParams>() { return defineIndexerPlugin<TFilter, TBlock, TTxnParams>((indexer) => { let lastCursor: Cursor | undefined; let lastFilter: TFilter | undefined; + let isValid = true; + + // Add cleanup method + const cleanup = () => { + lastCursor = undefined; + lastFilter = undefined; + isValid = false; + }; indexer.hooks.hook("connect:before", ({ request }) => { + if (!isValid) { + throw new Error("Persistence plugin is no longer valid"); + } + if (lastCursor) { + // Validate cursor structure + if (!('orderKey' in lastCursor && 'uniqueKey' in lastCursor)) { + cleanup(); + throw new Error("Invalid cursor structure"); + } request.startingCursor = lastCursor; } if (lastFilter) { + // Basic filter validation (adjust based on your needs) + if (typeof lastFilter !== 'object' || lastFilter === null) { + cleanup(); + throw new Error("Invalid filter structure"); + } request.filter[1] = lastFilter; } });
21-25: Consider adding reconnection-specific handling.While the implementation supports reconnection by persisting state, consider adding specific handling for reconnection scenarios:
- Add reconnection attempt tracking
- Implement backoff strategy for repeated failures
- Add logging for reconnection events
Here's a suggested implementation:
+ let reconnectAttempts = 0; + const MAX_RECONNECT_ATTEMPTS = 3; + indexer.hooks.hook("transaction:commit", ({ endCursor }) => { + // Reset reconnection counter on successful commit + reconnectAttempts = 0; if (endCursor) { lastCursor = endCursor; } }); + + indexer.hooks.hook("connect:error", () => { + reconnectAttempts++; + if (reconnectAttempts > MAX_RECONNECT_ATTEMPTS) { + cleanup(); + throw new Error("Max reconnection attempts reached"); + } + });packages/indexer/src/indexer.ts (3)
160-209: LGTM: Robust reconnection implementation with exponential backoffThe implementation correctly handles internal server errors with proper retry logic, including:
- Exponential backoff with jitter to prevent thundering herd
- Configurable retry limits
- Proper error filtering
Consider adding logging for the current retry attempt number to aid in debugging.
if (retryCount < maxRetries) { consola.error( - "Internal server error, reconnecting...", + `Internal server error, reconnecting (attempt ${retryCount}/${maxRetries})...`, error.message, );
160-209: Consider exposing retry events through hooksThe reconnection logic could benefit from exposing retry events through hooks to allow users to implement custom monitoring or handling of reconnection attempts.
export interface IndexerHooks<TFilter, TBlock> { + "reconnect:before": ({ attempt, error }: { attempt: number, error: Error }) => void; + "reconnect:after": ({ attempt }: { attempt: number }) => void; // ... existing hooks } // In runWithReconnect: if (error instanceof ClientError) { if (error.code === Status.INTERNAL) { if (retryCount < maxRetries) { + await indexer.hooks.callHook("reconnect:before", { + attempt: retryCount, + error + }); consola.error( "Internal server error, reconnecting...", error.message, ); // ... retry logic ... + await indexer.hooks.callHook("reconnect:after", { + attempt: retryCount + }); continue; } } }
262-277: LGTM: Connection callback implementation with proper execution guaranteesThe implementation ensures the connection callback is called exactly once after the first successful message. Consider adding error handling for the callback execution.
if (!onConnectCalled) { onConnectCalled = true; if (runOptions.onConnect) { - await runOptions.onConnect(); + try { + await runOptions.onConnect(); + } catch (error) { + consola.error('Error in onConnect callback:', error); + throw error; + } } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (9)
change/@apibara-indexer-875c9447-9ce4-466c-8149-26432d4c5d47.json(1 hunks)change/@apibara-protocol-94de228f-0ad4-4db9-bf0b-ceb0d5939b04.json(1 hunks)change/apibara-110721a2-c849-415f-8c45-de26c4daa99c.json(1 hunks)packages/cli/src/runtime/dev.ts(2 hunks)packages/cli/src/runtime/internal/app.ts(2 hunks)packages/cli/src/runtime/start.ts(2 hunks)packages/indexer/src/indexer.ts(3 hunks)packages/indexer/src/plugins/persistence.ts(1 hunks)packages/protocol/src/client.ts(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (7)
- packages/cli/src/runtime/dev.ts
- change/@apibara-protocol-94de228f-0ad4-4db9-bf0b-ceb0d5939b04.json
- change/apibara-110721a2-c849-415f-8c45-de26c4daa99c.json
- packages/cli/src/runtime/internal/app.ts
- packages/cli/src/runtime/start.ts
- packages/protocol/src/client.ts
- change/@apibara-indexer-875c9447-9ce4-466c-8149-26432d4c5d47.json
🔇 Additional comments (3)
packages/indexer/src/plugins/persistence.ts (1)
11-32: LGTM: Well-integrated with existing architecture.
The implementation follows the established plugin patterns and integrates well with the existing architecture:
- Consistent hook usage matching the SQLite implementation
- Proper state management following similar patterns
- Clean integration with the plugin system
packages/indexer/src/indexer.ts (2)
1-14: LGTM: Well-structured interfaces for reconnection handling
The new interfaces and imports provide a clean API for configuring reconnection behavior and connection callbacks.
Also applies to: 154-158, 211-213
177-208: Verify error propagation in reconnection logic
Let's verify that errors are properly propagated and that the reconnection logic doesn't mask critical errors.
✅ Verification successful
Based on the shell script results, I can now generate the final response:
Error handling implementation is correct and consistent
The error handling implementation follows best practices and maintains consistency:
- Only retries on
Status.INTERNALerrors, all other errors are properly propagated - Retry configuration (
maxRetries,retryDelay,maxWait) is well-defined and localized - No conflicting error handling patterns found in the codebase
- Clear separation between retryable and non-retryable errors
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Search for error handling patterns in the codebase to ensure consistency
# Check for other instances of ClientError handling
rg -A 5 "ClientError" --type ts
# Check for other retry implementations that might need similar logic
rg -A 5 "maxRetries|retryDelay|maxWait" --type ts
# Check for other error propagation patterns
ast-grep --pattern 'try {
$$$
} catch (error) {
throw error;
}'
Length of output: 3059
jaipaljadeja
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks
No description provided.