Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"type": "prerelease",
"comment": "indexer: update reloadIndexer() logic with context and update generateMockMessages()",
"packageName": "@apibara/indexer",
"email": "jadejajaipal5@gmail.com",
"dependentChangeType": "patch"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"type": "prerelease",
"comment": "plugin-drizzle: add record reorgs option and forward original error",
"packageName": "@apibara/plugin-drizzle",
"email": "jadejajaipal5@gmail.com",
"dependentChangeType": "patch"
}
7 changes: 7 additions & 0 deletions change/apibara-219f9106-650b-4821-9005-6046fe67c0ae.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"type": "prerelease",
"comment": "cli: add reloadIndexer() and async support for indexer constructors",
"packageName": "apibara",
"email": "jadejajaipal5@gmail.com",
"dependentChangeType": "patch"
}
12 changes: 9 additions & 3 deletions examples/cli-drizzle/indexers/2-starknet.indexer.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
import { starknetUsdcTransfers } from "@/lib/schema";
import { drizzleStorage, useDrizzleStorage } from "@apibara/plugin-drizzle";
import { drizzle } from "@apibara/plugin-drizzle";
import { StarknetStream } from "@apibara/starknet";
import { defineIndexer } from "apibara/indexer";

import { starknetUsdcTransfers } from "@/lib/schema";
import { useLogger } from "apibara/plugins";
import type { ApibaraRuntimeConfig } from "apibara/types";
import { hash } from "starknet";

// USDC Transfers on Starknet
export default function (runtimeConfig: ApibaraRuntimeConfig) {
export default async function (runtimeConfig: ApibaraRuntimeConfig) {
const {
starknet: { startingBlock },
} = runtimeConfig;

console.log("[2-starknet] Starknet indexer waiting.....");

// Simulating a API call that takes 500 ms
await new Promise((resolve) => setTimeout(resolve, 500));

console.log("[2-starknet] Starknet indexer done waiting.....");

// connectionString defaults to process.env["POSTGRES_CONNECTION_STRING"](postgresql) ?? "memory://" (in memory pglite)
const database = drizzle({
schema: {
Expand Down
51 changes: 51 additions & 0 deletions examples/cli-drizzle/indexers/4-async.indexer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { StarknetStream } from "@apibara/starknet";
import { defineIndexer } from "apibara/indexer";
import { useLogger } from "apibara/plugins";
import type { ApibaraRuntimeConfig } from "apibara/types";
import { hash } from "starknet";

export default async function (runtimeConfig: ApibaraRuntimeConfig) {
const {
starknet: { startingBlock },
} = runtimeConfig;

// Simulating a API call that takes 500 ms
await new Promise((resolve) => setTimeout(resolve, 500));

return defineIndexer(StarknetStream)({
streamUrl: "https://mainnet.starknet.a5a.ch",
finality: "accepted",
startingBlock: BigInt(startingBlock),
filter: {
events: [
{
address:
"0x053c91253bc9682c04929ca02ed00b3e423f6710d2ee7e0d5ebb06f3ecf368a8" as `0x${string}`,
keys: [hash.getSelectorFromName("Transfer") as `0x${string}`],
},
],
},
hooks: {
"run:before": ({ abortSignal }) => {
// const logger = useLogger();
// logger.info("=== FILE WATCHER SET UP ===");
// watch("./tmp/test", { signal: abortSignal, }, (eventType, filename) => {
// logger.info("=== FILE CHANGED ===");
// reloadIndexer();
// });
},
},
async transform({ endCursor, finality }) {
const logger = useLogger();

logger.info(
"Transforming block | orderKey: ",
endCursor?.orderKey,
" | finality: ",
finality,
);

await new Promise((resolve) => setTimeout(resolve, 3000));
},
});
}
2 changes: 1 addition & 1 deletion examples/cli-drizzle/test/starknet.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const vcr = createVcr();

describe("Starknet USDC Transfers indexer", () => {
it("should work", async () => {
const indexer = createIndexer({
const indexer = await createIndexer({
evm: { startingBlock: 10_000_000 },
starknet: { startingBlock: 800_000 },
});
Expand Down
1 change: 1 addition & 0 deletions packages/cli/src/rolldown/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const runtimeDependencies = [
// https://socket.io/docs/v4/server-installation/#additional-packages
"utf-8-validate",
"bufferutil",
"fsevents",
// was giving unresolved import warnings from `node-fetch` library.
"encoding",
];
Expand Down
73 changes: 45 additions & 28 deletions packages/cli/src/runtime/dev.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,53 @@
import { runWithReconnect } from "@apibara/indexer";
import { ReloadIndexerRequest, runWithReconnect } from "@apibara/indexer";
import { createAuthenticatedClient } from "@apibara/protocol";
import { getRuntimeDataFromEnv } from "apibara/common";
import { defineCommand, runMain } from "citty";
import type { ConsolaInstance } from "consola";
import { blueBright } from "picocolors";
import { availableIndexers, createIndexer } from "./internal/app";

async function startIndexer(indexer: string) {
let _logger: ConsolaInstance | undefined;
while (true) {
try {
const { processedRuntimeConfig, preset } = getRuntimeDataFromEnv();

const { indexer: indexerInstance, logger } =
(await createIndexer({
indexerName: indexer,
processedRuntimeConfig,
preset,
})) ?? {};

_logger = logger;

if (!indexerInstance) {
return;
}

const client = createAuthenticatedClient(
indexerInstance.streamConfig,
indexerInstance.options.streamUrl,
indexerInstance.options.clientOptions,
);

if (logger) {
logger.info(`Indexer ${blueBright(indexer)} started`);
}

await runWithReconnect(client, indexerInstance);

return;
} catch (error) {
if (error instanceof ReloadIndexerRequest) {
_logger?.info(`Indexer ${blueBright(indexer)} reloaded`);
continue;
}
throw error;
}
}
}

const startCommand = defineCommand({
meta: {
name: "start",
Expand All @@ -19,8 +62,6 @@ const startCommand = defineCommand({
async run({ args }) {
const { indexers: indexersArgs } = args;

const { processedRuntimeConfig, preset } = getRuntimeDataFromEnv();

let selectedIndexers = availableIndexers;
if (indexersArgs) {
selectedIndexers = indexersArgs.split(",");
Expand All @@ -34,31 +75,7 @@ const startCommand = defineCommand({
}
}

await Promise.all(
selectedIndexers.map(async (indexer) => {
const { indexer: indexerInstance, logger } =
createIndexer({
indexerName: indexer,
processedRuntimeConfig,
preset,
}) ?? {};
if (!indexerInstance) {
return;
}

const client = createAuthenticatedClient(
indexerInstance.streamConfig,
indexerInstance.options.streamUrl,
indexerInstance.options.clientOptions,
);

if (logger) {
logger.info(`Indexer ${blueBright(indexer)} started`);
}

await runWithReconnect(client, indexerInstance);
}),
);
await Promise.all(selectedIndexers.map((indexer) => startIndexer(indexer)));
},
});

Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/runtime/internal/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { createLogger } from "./logger";

export const availableIndexers = indexers.map((i) => i.name);

export function createIndexer({
export async function createIndexer({
indexerName,
processedRuntimeConfig,
preset,
Expand Down Expand Up @@ -48,7 +48,7 @@ export function createIndexer({

const definition =
typeof indexerModule === "function"
? indexerModule(processedRuntimeConfig)
? await indexerModule(processedRuntimeConfig)
: indexerModule;

let reporter: ConsolaReporter = createLogger({
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/runtime/project-info.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ const startCommand = defineCommand({
});
for (const indexer of availableIndexers) {
const { indexer: indexerInstance } =
createIndexer({
(await createIndexer({
indexerName: indexer,
processedRuntimeConfig,
preset,
}) ?? {};
})) ?? {};
if (!indexerInstance) {
continue;
}
Expand Down
104 changes: 61 additions & 43 deletions packages/cli/src/runtime/start.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { runWithReconnect } from "@apibara/indexer";
import { ReloadIndexerRequest, runWithReconnect } from "@apibara/indexer";
import { createAuthenticatedClient } from "@apibara/protocol";
import {
checkForUnknownArgs,
getProcessedRuntimeConfig,
getRuntimeDataFromEnv,
} from "apibara/common";
import { defineCommand, runMain } from "citty";
import consola from "consola";
import consola, { type ConsolaInstance } from "consola";
import { blueBright } from "picocolors";
import { register } from "#apibara-internal-virtual/instrumentation";
// used when running with node .apibara/build/start.mjs as these values are made static during build time (except userEnvRuntimeConfig)
Expand Down Expand Up @@ -44,54 +44,72 @@ const startCommand = defineCommand({
const { indexer, preset: argPreset, standalone } = args;
await checkForUnknownArgs(args, cmd);

let preset: string | undefined;
let processedRuntimeConfig: Record<string, unknown> | undefined;
let _logger: ConsolaInstance | undefined;

if (standalone) {
// when user does node .apibara/build/start.mjs
preset = argPreset ?? originalPreset;
processedRuntimeConfig = getProcessedRuntimeConfig({
preset,
presets,
runtimeConfig,
userEnvRuntimeConfig,
});
} else {
// When user does apibara start
const envResult = getRuntimeDataFromEnv();
preset = envResult.preset;
processedRuntimeConfig = envResult.processedRuntimeConfig;
}
while (true) {
try {
let preset: string | undefined;
let processedRuntimeConfig: Record<string, unknown> | undefined;

const { indexer: indexerInstance, logger } =
createIndexer({
indexerName: indexer,
processedRuntimeConfig,
preset,
}) ?? {};
if (standalone) {
// when user does node .apibara/build/start.mjs
preset = argPreset ?? originalPreset;
processedRuntimeConfig = getProcessedRuntimeConfig({
preset,
presets,
runtimeConfig,
userEnvRuntimeConfig,
});
} else {
// When user does apibara start
const envResult = getRuntimeDataFromEnv();
preset = envResult.preset;
processedRuntimeConfig = envResult.processedRuntimeConfig;
}

if (!indexerInstance) {
consola.error(`Specified indexer "${indexer}" but it was not defined`);
process.exit(1);
}
const { indexer: indexerInstance, logger } =
(await createIndexer({
indexerName: indexer,
processedRuntimeConfig,
preset,
})) ?? {};

const client = createAuthenticatedClient(
indexerInstance.streamConfig,
indexerInstance.options.streamUrl,
indexerInstance.options.clientOptions,
);
_logger = logger;

if (register) {
consola.start("Registering from instrumentation");
await register();
consola.success("Registered from instrumentation");
}
if (!indexerInstance) {
consola.error(
`Specified indexer "${indexer}" but it was not defined`,
);
process.exit(1);
}

if (logger) {
logger.info(`Indexer ${blueBright(indexer)} started`);
}
const client = createAuthenticatedClient(
indexerInstance.streamConfig,
indexerInstance.options.streamUrl,
indexerInstance.options.clientOptions,
);

await runWithReconnect(client, indexerInstance);
if (register) {
consola.start("Registering from instrumentation");
await register();
consola.success("Registered from instrumentation");
}

if (logger) {
logger.info(`Indexer ${blueBright(indexer)} started`);
}

await runWithReconnect(client, indexerInstance);

return;
} catch (error) {
if (error instanceof ReloadIndexerRequest) {
_logger?.info(`Indexer ${blueBright(indexer)} reloaded`);
continue;
}
throw error;
}
}
},
});

Expand Down
2 changes: 1 addition & 1 deletion packages/indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,4 @@
"peerDependencies": {
"vitest": "^1.6.0"
}
}
}
1 change: 1 addition & 0 deletions packages/indexer/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from "./indexer";
export { useIndexerContext } from "./context";
export { reloadIndexer, ReloadIndexerRequest } from "./utils";
Loading