Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"type": "prerelease",
"comment": "indexer: reconnect indexer on internal stream errors",
"packageName": "@apibara/indexer",
"email": "francesco@ceccon.me",
"dependentChangeType": "patch"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"type": "prerelease",
"comment": "indexer: reconnect indexer on internal stream errors",
"packageName": "@apibara/protocol",
"email": "francesco@ceccon.me",
"dependentChangeType": "patch"
}
7 changes: 7 additions & 0 deletions change/apibara-110721a2-c849-415f-8c45-de26c4daa99c.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"type": "prerelease",
"comment": "indexer: reconnect indexer on internal stream errors",
"packageName": "apibara",
"email": "francesco@ceccon.me",
"dependentChangeType": "patch"
}
4 changes: 2 additions & 2 deletions packages/cli/src/runtime/dev.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { run } from "@apibara/indexer";
import { runWithReconnect } from "@apibara/indexer";
import { createClient } from "@apibara/protocol";
import { defineCommand, runMain } from "citty";
import { availableIndexers, createIndexer } from "./internal/app";
Expand Down Expand Up @@ -43,7 +43,7 @@ const startCommand = defineCommand({
indexerInstance.options.streamUrl,
);

await run(client, indexerInstance);
await runWithReconnect(client, indexerInstance);
}),
);
},
Expand Down
5 changes: 5 additions & 0 deletions packages/cli/src/runtime/internal/app.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { createIndexer as _createIndexer } from "@apibara/indexer";
import { type ConsolaReporter, logger } from "@apibara/indexer/plugins/logger";
import { inMemoryPersistence } from "@apibara/indexer/plugins/persistence";

import { config } from "#apibara-internal-virtual/config";
import { indexers } from "#apibara-internal-virtual/indexers";
Expand Down Expand Up @@ -55,7 +56,11 @@ export function createIndexer(indexerName: string, preset?: string) {
});
}

// Put the in-memory persistence plugin first so that it can be overridden by any user-defined
// persistence plugin.
// Put the logger last since we want to override any user-defined logger.
definition.plugins = [
inMemoryPersistence(),
...(definition.plugins ?? []),
logger({ logger: reporter }),
];
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/runtime/start.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { run } from "@apibara/indexer";
import { runWithReconnect } from "@apibara/indexer";
import { createClient } from "@apibara/protocol";
import { defineCommand, runMain } from "citty";
import { createIndexer } from "./internal/app";
Expand Down Expand Up @@ -29,7 +29,7 @@ const startCommand = defineCommand({
indexerInstance.options.streamUrl,
);

await run(client, indexerInstance);
await runWithReconnect(client, indexerInstance);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Based on the findings, I can now generate the final response:

run Function Still in Active Use - Migration Required

The codebase shows multiple instances where the run function is still actively used, particularly in core functionality:

  • packages/indexer/src/indexer.ts: Both functions coexist, with run being the base implementation
  • Multiple examples and test files still rely on the original run function
  • Core indexer components in packages/indexer/src/vcr/ use run

The migration to runWithReconnect is incomplete and requires systematic updates across the codebase to maintain consistent error handling behavior.

🔗 Analysis chain

Enhancing Indexer Resilience with runWithReconnect

Replacing run with runWithReconnect improves the indexer's ability to handle internal stream errors through automatic reconnection. Ensure that this change aligns with the intended behavior and that all required error handling is appropriately managed.

To confirm that runWithReconnect is used consistently and that there are no lingering instances of the old run function, you can run the following script:

This will help identify any occurrences of run( that might need to be updated.


Let me gather more context about the runWithReconnect implementation and its relationship with run.


Let me try a different pattern to locate these functions.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Find any usages of `run(` not replaced by `runWithReconnect(`

rg 'run\(' -g '!**/node_modules/**' -t js -t ts | grep -v 'runWithReconnect'

Length of output: 3442


Script:

#!/bin/bash
# Find the implementation of runWithReconnect and run
ast-grep --pattern 'async function runWithReconnect($_) { $$$ }'
ast-grep --pattern 'async function run($_) { $$$ }'

Length of output: 120


Script:

#!/bin/bash
# Search for runWithReconnect and run function definitions
rg -A 10 "export (async )?function (run|runWithReconnect)" --type ts

Length of output: 1521

},
});

Expand Down
97 changes: 85 additions & 12 deletions packages/indexer/src/indexer.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import type {
Client,
Cursor,
DataFinality,
Finalize,
Heartbeat,
Invalidate,
StreamConfig,
StreamDataOptions,
StreamDataRequest,
StreamDataResponse,
SystemMessage,
import {
type Client,
ClientError,
type Cursor,
type DataFinality,
type Finalize,
type Heartbeat,
type Invalidate,
Status,
type StreamConfig,
type StreamDataOptions,
type StreamDataRequest,
type StreamDataResponse,
type SystemMessage,
} from "@apibara/protocol";
import consola from "consola";
import {
Expand Down Expand Up @@ -149,9 +151,71 @@ export function createIndexer<TFilter, TBlock, TTxnParams>({
return indexer;
}

export interface ReconnectOptions {
maxRetries?: number;
retryDelay?: number;
maxWait?: number;
}

export async function runWithReconnect<TFilter, TBlock, TTxnParams>(
client: Client<TFilter, TBlock>,
indexer: Indexer<TFilter, TBlock, TTxnParams>,
options: ReconnectOptions = {},
) {
let retryCount = 0;

const maxRetries = options.maxRetries ?? 10;
const retryDelay = options.retryDelay ?? 1_000;
const maxWait = options.maxWait ?? 30_000;

const runOptions: RunOptions = {
onConnect() {
retryCount = 0;
},
};

while (true) {
try {
await run(client, indexer, runOptions);
return;
} catch (error) {
// Only reconnect on internal/server errors.
// All other errors should be rethrown.

retryCount++;

if (error instanceof ClientError) {
if (error.code === Status.INTERNAL) {
if (retryCount < maxRetries) {
consola.error(
"Internal server error, reconnecting...",
error.message,
);

// Add jitter to the retry delay to avoid all clients retrying at the same time.
const delay = Math.random() * (retryDelay * 0.2) + retryDelay;
await new Promise((resolve) =>
setTimeout(resolve, Math.min(retryCount * delay, maxWait)),
);

continue;
}
}
}

throw error;
}
}
}

export interface RunOptions {
onConnect?: () => void | Promise<void>;
}

export async function run<TFilter, TBlock, TTxnParams>(
client: Client<TFilter, TBlock>,
indexer: Indexer<TFilter, TBlock, TTxnParams>,
runOptions: RunOptions = {},
) {
await indexerAsyncContext.callAsync({}, async () => {
const context = useIndexerContext<TTxnParams>();
Expand Down Expand Up @@ -195,13 +259,22 @@ export async function run<TFilter, TBlock, TTxnParams>(

await indexer.hooks.callHook("connect:after");

let onConnectCalled = false;

while (true) {
const { value: message, done } = await stream.next();

if (done) {
break;
}

if (!onConnectCalled) {
onConnectCalled = true;
if (runOptions.onConnect) {
await runOptions.onConnect();
}
}

await indexer.hooks.callHook("message", { message });

switch (message._tag) {
Expand Down
30 changes: 30 additions & 0 deletions packages/indexer/src/plugins/persistence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,36 @@ import type { Database as SqliteDatabase, Statement } from "better-sqlite3";
import { deserialize, serialize } from "../vcr";
import { defineIndexerPlugin } from "./config";

export function inMemoryPersistence<TFilter, TBlock, TTxnParams>() {
return defineIndexerPlugin<TFilter, TBlock, TTxnParams>((indexer) => {
let lastCursor: Cursor | undefined;
let lastFilter: TFilter | undefined;

indexer.hooks.hook("connect:before", ({ request }) => {
if (lastCursor) {
request.startingCursor = lastCursor;
}

if (lastFilter) {
request.filter[1] = lastFilter;
}
});

indexer.hooks.hook("transaction:commit", ({ endCursor }) => {
if (endCursor) {
lastCursor = endCursor;
}
});

indexer.hooks.hook("connect:factory", ({ request, endCursor }) => {
if (request.filter[1]) {
lastCursor = endCursor;
lastFilter = request.filter[1];
}
});
});
}

export function sqlitePersistence<TFilter, TBlock, TTxnParams>({
database,
}: { database: SqliteDatabase }) {
Expand Down
2 changes: 1 addition & 1 deletion packages/protocol/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {
} from "./status";
import { type StreamDataRequest, StreamDataResponse } from "./stream";

export type { ClientError, Status } from "nice-grpc";
export { ClientError, Status } from "nice-grpc";

/** Client call options. */
export interface ClientCallOptions {
Expand Down