From 9ceaf78dec2cfd5cb2227f12ad3546cd60ba546f Mon Sep 17 00:00:00 2001 From: Francesco Ceccon Date: Sat, 30 Nov 2024 16:55:06 +0100 Subject: [PATCH] indexer: reconnect indexer on internal stream errors --- ...-875c9447-9ce4-466c-8149-26432d4c5d47.json | 7 ++ ...-94de228f-0ad4-4db9-bf0b-ceb0d5939b04.json | 7 ++ ...-110721a2-c849-415f-8c45-de26c4daa99c.json | 7 ++ packages/cli/src/runtime/dev.ts | 4 +- packages/cli/src/runtime/internal/app.ts | 5 + packages/cli/src/runtime/start.ts | 4 +- packages/indexer/src/indexer.ts | 97 ++++++++++++++++--- packages/indexer/src/plugins/persistence.ts | 30 ++++++ packages/protocol/src/client.ts | 2 +- 9 files changed, 146 insertions(+), 17 deletions(-) create mode 100644 change/@apibara-indexer-875c9447-9ce4-466c-8149-26432d4c5d47.json create mode 100644 change/@apibara-protocol-94de228f-0ad4-4db9-bf0b-ceb0d5939b04.json create mode 100644 change/apibara-110721a2-c849-415f-8c45-de26c4daa99c.json diff --git a/change/@apibara-indexer-875c9447-9ce4-466c-8149-26432d4c5d47.json b/change/@apibara-indexer-875c9447-9ce4-466c-8149-26432d4c5d47.json new file mode 100644 index 00000000..44040444 --- /dev/null +++ b/change/@apibara-indexer-875c9447-9ce4-466c-8149-26432d4c5d47.json @@ -0,0 +1,7 @@ +{ + "type": "prerelease", + "comment": "indexer: reconnect indexer on internal stream errors", + "packageName": "@apibara/indexer", + "email": "francesco@ceccon.me", + "dependentChangeType": "patch" +} diff --git a/change/@apibara-protocol-94de228f-0ad4-4db9-bf0b-ceb0d5939b04.json b/change/@apibara-protocol-94de228f-0ad4-4db9-bf0b-ceb0d5939b04.json new file mode 100644 index 00000000..b4211e36 --- /dev/null +++ b/change/@apibara-protocol-94de228f-0ad4-4db9-bf0b-ceb0d5939b04.json @@ -0,0 +1,7 @@ +{ + "type": "prerelease", + "comment": "indexer: reconnect indexer on internal stream errors", + "packageName": "@apibara/protocol", + "email": "francesco@ceccon.me", + "dependentChangeType": "patch" +} diff --git a/change/apibara-110721a2-c849-415f-8c45-de26c4daa99c.json b/change/apibara-110721a2-c849-415f-8c45-de26c4daa99c.json new file mode 100644 index 00000000..aa731b09 --- /dev/null +++ b/change/apibara-110721a2-c849-415f-8c45-de26c4daa99c.json @@ -0,0 +1,7 @@ +{ + "type": "prerelease", + "comment": "indexer: reconnect indexer on internal stream errors", + "packageName": "apibara", + "email": "francesco@ceccon.me", + "dependentChangeType": "patch" +} diff --git a/packages/cli/src/runtime/dev.ts b/packages/cli/src/runtime/dev.ts index 01a937dd..3abc740f 100644 --- a/packages/cli/src/runtime/dev.ts +++ b/packages/cli/src/runtime/dev.ts @@ -1,4 +1,4 @@ -import { run } from "@apibara/indexer"; +import { runWithReconnect } from "@apibara/indexer"; import { createClient } from "@apibara/protocol"; import { defineCommand, runMain } from "citty"; import { availableIndexers, createIndexer } from "./internal/app"; @@ -43,7 +43,7 @@ const startCommand = defineCommand({ indexerInstance.options.streamUrl, ); - await run(client, indexerInstance); + await runWithReconnect(client, indexerInstance); }), ); }, diff --git a/packages/cli/src/runtime/internal/app.ts b/packages/cli/src/runtime/internal/app.ts index 64894804..d2f57599 100644 --- a/packages/cli/src/runtime/internal/app.ts +++ b/packages/cli/src/runtime/internal/app.ts @@ -1,5 +1,6 @@ import { createIndexer as _createIndexer } from "@apibara/indexer"; import { type ConsolaReporter, logger } from "@apibara/indexer/plugins/logger"; +import { inMemoryPersistence } from "@apibara/indexer/plugins/persistence"; import { config } from "#apibara-internal-virtual/config"; import { indexers } from "#apibara-internal-virtual/indexers"; @@ -55,7 +56,11 @@ export function createIndexer(indexerName: string, preset?: string) { }); } + // Put the in-memory persistence plugin first so that it can be overridden by any user-defined + // persistence plugin. + // Put the logger last since we want to override any user-defined logger. definition.plugins = [ + inMemoryPersistence(), ...(definition.plugins ?? []), logger({ logger: reporter }), ]; diff --git a/packages/cli/src/runtime/start.ts b/packages/cli/src/runtime/start.ts index c446fed3..b5b9b8c6 100644 --- a/packages/cli/src/runtime/start.ts +++ b/packages/cli/src/runtime/start.ts @@ -1,4 +1,4 @@ -import { run } from "@apibara/indexer"; +import { runWithReconnect } from "@apibara/indexer"; import { createClient } from "@apibara/protocol"; import { defineCommand, runMain } from "citty"; import { createIndexer } from "./internal/app"; @@ -29,7 +29,7 @@ const startCommand = defineCommand({ indexerInstance.options.streamUrl, ); - await run(client, indexerInstance); + await runWithReconnect(client, indexerInstance); }, }); diff --git a/packages/indexer/src/indexer.ts b/packages/indexer/src/indexer.ts index 3026a6be..9c3efba0 100644 --- a/packages/indexer/src/indexer.ts +++ b/packages/indexer/src/indexer.ts @@ -1,15 +1,17 @@ -import type { - Client, - Cursor, - DataFinality, - Finalize, - Heartbeat, - Invalidate, - StreamConfig, - StreamDataOptions, - StreamDataRequest, - StreamDataResponse, - SystemMessage, +import { + type Client, + ClientError, + type Cursor, + type DataFinality, + type Finalize, + type Heartbeat, + type Invalidate, + Status, + type StreamConfig, + type StreamDataOptions, + type StreamDataRequest, + type StreamDataResponse, + type SystemMessage, } from "@apibara/protocol"; import consola from "consola"; import { @@ -149,9 +151,71 @@ export function createIndexer({ return indexer; } +export interface ReconnectOptions { + maxRetries?: number; + retryDelay?: number; + maxWait?: number; +} + +export async function runWithReconnect( + client: Client, + indexer: Indexer, + options: ReconnectOptions = {}, +) { + let retryCount = 0; + + const maxRetries = options.maxRetries ?? 10; + const retryDelay = options.retryDelay ?? 1_000; + const maxWait = options.maxWait ?? 30_000; + + const runOptions: RunOptions = { + onConnect() { + retryCount = 0; + }, + }; + + while (true) { + try { + await run(client, indexer, runOptions); + return; + } catch (error) { + // Only reconnect on internal/server errors. + // All other errors should be rethrown. + + retryCount++; + + if (error instanceof ClientError) { + if (error.code === Status.INTERNAL) { + if (retryCount < maxRetries) { + consola.error( + "Internal server error, reconnecting...", + error.message, + ); + + // Add jitter to the retry delay to avoid all clients retrying at the same time. + const delay = Math.random() * (retryDelay * 0.2) + retryDelay; + await new Promise((resolve) => + setTimeout(resolve, Math.min(retryCount * delay, maxWait)), + ); + + continue; + } + } + } + + throw error; + } + } +} + +export interface RunOptions { + onConnect?: () => void | Promise; +} + export async function run( client: Client, indexer: Indexer, + runOptions: RunOptions = {}, ) { await indexerAsyncContext.callAsync({}, async () => { const context = useIndexerContext(); @@ -195,6 +259,8 @@ export async function run( await indexer.hooks.callHook("connect:after"); + let onConnectCalled = false; + while (true) { const { value: message, done } = await stream.next(); @@ -202,6 +268,13 @@ export async function run( break; } + if (!onConnectCalled) { + onConnectCalled = true; + if (runOptions.onConnect) { + await runOptions.onConnect(); + } + } + await indexer.hooks.callHook("message", { message }); switch (message._tag) { diff --git a/packages/indexer/src/plugins/persistence.ts b/packages/indexer/src/plugins/persistence.ts index 99d912a2..72068d12 100644 --- a/packages/indexer/src/plugins/persistence.ts +++ b/packages/indexer/src/plugins/persistence.ts @@ -3,6 +3,36 @@ import type { Database as SqliteDatabase, Statement } from "better-sqlite3"; import { deserialize, serialize } from "../vcr"; import { defineIndexerPlugin } from "./config"; +export function inMemoryPersistence() { + return defineIndexerPlugin((indexer) => { + let lastCursor: Cursor | undefined; + let lastFilter: TFilter | undefined; + + indexer.hooks.hook("connect:before", ({ request }) => { + if (lastCursor) { + request.startingCursor = lastCursor; + } + + if (lastFilter) { + request.filter[1] = lastFilter; + } + }); + + indexer.hooks.hook("transaction:commit", ({ endCursor }) => { + if (endCursor) { + lastCursor = endCursor; + } + }); + + indexer.hooks.hook("connect:factory", ({ request, endCursor }) => { + if (request.filter[1]) { + lastCursor = endCursor; + lastFilter = request.filter[1]; + } + }); + }); +} + export function sqlitePersistence({ database, }: { database: SqliteDatabase }) { diff --git a/packages/protocol/src/client.ts b/packages/protocol/src/client.ts index 877019bb..0947ed24 100644 --- a/packages/protocol/src/client.ts +++ b/packages/protocol/src/client.ts @@ -21,7 +21,7 @@ import { } from "./status"; import { type StreamDataRequest, StreamDataResponse } from "./stream"; -export type { ClientError, Status } from "nice-grpc"; +export { ClientError, Status } from "nice-grpc"; /** Client call options. */ export interface ClientCallOptions {