Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ jobs:
uses: arduino/setup-protoc@v1
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Start MongoDB
uses: supercharge/mongodb-github-action@1.11.0
with:
mongodb-version: '7.0'
mongodb-replica-set: rs0
mongodb-port: 27017
- name: Install dependencies
run: pnpm install --strict-peer-dependencies=false
- name: Run lint
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"type": "prerelease",
"comment": "indexer: rename drizzle sink factory and fix update method",
"packageName": "@apibara/indexer",
"email": "jadejajaipal5@gmail.com",
"dependentChangeType": "patch"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"type": "prerelease",
"comment": "sink-mongo: add mongodb sink",
"packageName": "@apibara/sink-mongo",
"email": "jadejajaipal5@gmail.com",
"dependentChangeType": "patch"
}
2 changes: 1 addition & 1 deletion examples/cli/indexers/1-evm.indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { EvmStream } from "@apibara/evm";
import { defineIndexer, useSink } from "@apibara/indexer";
import { drizzlePersistence } from "@apibara/indexer/plugins/drizzle-persistence";
import { useLogger } from "@apibara/indexer/plugins/logger";
import { drizzle as drizzleSink } from "@apibara/indexer/sinks/drizzle";
import { drizzleSink } from "@apibara/indexer/sinks/drizzle";

import type { ApibaraRuntimeConfig } from "apibara/types";
import type {
Expand Down
2 changes: 1 addition & 1 deletion examples/cli/indexers/2-starknet.indexer.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { defineIndexer, useSink } from "@apibara/indexer";
import { drizzlePersistence } from "@apibara/indexer/plugins/drizzle-persistence";
import { useLogger } from "@apibara/indexer/plugins/logger";
import { drizzle as drizzleSink } from "@apibara/indexer/sinks/drizzle";
import { drizzleSink } from "@apibara/indexer/sinks/drizzle";
import { StarknetStream } from "@apibara/starknet";

import type { ApibaraRuntimeConfig } from "apibara/types";
Expand Down
2 changes: 1 addition & 1 deletion examples/indexer/src/indexer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { EvmStream } from "@apibara/evm";
import { defineIndexer, useSink } from "@apibara/indexer";
import { drizzle as drizzleSink } from "@apibara/indexer/sinks/drizzle";
import { drizzleSink } from "@apibara/indexer/sinks/drizzle";
import consola from "consola";
import { pgTable, serial, text, varchar } from "drizzle-orm/pg-core";
import { drizzle } from "drizzle-orm/postgres-js";
Expand Down
5 changes: 1 addition & 4 deletions examples/starknet-indexer/src/indexer.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import { defineIndexer, useSink } from "@apibara/indexer";
import {
drizzle as drizzleSink,
pgIndexerTable,
} from "@apibara/indexer/sinks/drizzle";
import { drizzleSink, pgIndexerTable } from "@apibara/indexer/sinks/drizzle";
import { StarknetStream } from "@apibara/starknet";
import consola from "consola";
import { bigint } from "drizzle-orm/pg-core";
Expand Down
1 change: 1 addition & 0 deletions packages/indexer/build.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export default defineBuildConfig({
"./src/plugins/logger.ts",
"./src/plugins/persistence.ts",
"./src/plugins/drizzle-persistence.ts",
"./src/internal/testing.ts",
],
clean: true,
outDir: "./dist",
Expand Down
10 changes: 10 additions & 0 deletions packages/indexer/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
version: "3.8"

services:
postgres:
image: postgres:16
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
ports:
- 5432:5432
6 changes: 6 additions & 0 deletions packages/indexer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@
"import": "./dist/plugins/drizzle-persistence.mjs",
"require": "./dist/plugins/drizzle-persistence.cjs",
"default": "./dist/plugins/drizzle-persistence.mjs"
},
"./internal": {
"types": "./dist/internal/testing.d.ts",
"import": "./dist/internal/testing.mjs",
"require": "./dist/internal/testing.cjs",
"default": "./dist/internal/testing.mjs"
}
},
"scripts": {
Expand Down
13 changes: 8 additions & 5 deletions packages/indexer/src/sinks/drizzle/drizzle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { run } from "../../indexer";
import { generateMockMessages, getMockIndexer } from "../../internal/testing";
import { useSink } from "../../sink";
import type { Int8Range } from "./Int8Range";
import { drizzle as drizzleSink } from "./drizzle";
import { drizzleSink } from "./drizzle";
import { getDrizzleCursor, pgIndexerTable } from "./utils";

const testTable = pgIndexerTable("test_table", {
Expand All @@ -35,7 +35,7 @@ describe("Drizzle Test", () => {
await db.execute(sql`DROP TABLE IF EXISTS test_table`);
// create test_table with db
await db.execute(
sql`CREATE TABLE test_table (id SERIAL PRIMARY KEY, data TEXT, _cursor INT8RANGE)`,
sql`CREATE TABLE test_table (id SERIAL, data TEXT, _cursor INT8RANGE)`,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Removing PRIMARY KEY constraint could lead to duplicated ids.
The table definition lacks a primary key, potentially allowing multiple rows with the same id. Confirm this is intentional, or reintroduce a primary key to guard against duplicate data.

);
});

Expand Down Expand Up @@ -108,11 +108,14 @@ describe("Drizzle Test", () => {

const result = await db.select().from(testTable).orderBy(asc(testTable.id));

expect(result).toHaveLength(5);
expect(result[2].data).toBe("0000000");
expect(result).toHaveLength(6);
expect(
result.find((r) => r.id === 5000002 && r._cursor?.range.upper === null)
?.data,
).toBe("0000000");
});

it("should delete data", async () => {
it("should soft delete data", async () => {
const client = new MockClient<MockFilter, MockBlock>((request, options) => {
return generateMockMessages(5);
});
Expand Down
2 changes: 1 addition & 1 deletion packages/indexer/src/sinks/drizzle/drizzle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ export class DrizzleSink<
}
}

export const drizzle = <
export const drizzleSink = <
TQueryResult extends PgQueryResultHKT,
TFullSchema extends Record<string, unknown> = Record<string, never>,
TSchema extends
Expand Down
35 changes: 29 additions & 6 deletions packages/indexer/src/sinks/drizzle/update.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import type {
PgUpdateBase,
PgUpdateSetSource,
} from "drizzle-orm/pg-core";
import type { Int8Range } from "./Int8Range";
import { getDrizzleCursor } from "./utils";

export class DrizzleSinkUpdate<
TTable extends PgTable,
Expand All @@ -32,15 +34,36 @@ export class DrizzleSinkUpdate<
return {
...originalSet,
where: async (where: SQL | undefined) => {
await this.db
.update(this.table)
.set({
_cursor: sql`int8range(lower(_cursor), ${Number(this.endCursor?.orderKey!)}, '[)')`,
} as PgUpdateSetSource<TTable>)
// 1. Find and store old versions of matching records
const oldRecords = await this.db
.select()
.from(this.table)
.where(sql`${where ? sql`${where} AND ` : sql``}upper_inf(_cursor)`)
.execute();

return originalSet.where(where);
// 2. Insert old versions with updated upperbound cursor
if (oldRecords.length > 0) {
const oldRecordsWithNewCursor = oldRecords.map((record) => ({
...record,
_cursor: getDrizzleCursor([
BigInt((record._cursor as Int8Range).range.lower!),
this.endCursor?.orderKey,
]),
}));

await this.db
.insert(this.table)
.values(oldRecordsWithNewCursor)
.execute();
}

// 3. Update matching records with new values and new 'lowerbound' cursor
return originalUpdate
.set({
...values,
_cursor: sql`int8range(${Number(this.endCursor?.orderKey!)}, NULL, '[)')`,
} as PgUpdateSetSource<TTable>)
.where(sql`${where ? sql`${where} AND ` : sql``}upper_inf(_cursor)`);
Comment on lines +60 to +66
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure safe conversion from BigInt to Number.
Using Number(this.endCursor?.orderKey!) might risk losing precision for very large BigInt values. Verify that this range is acceptable for your domain.

},
} as PgUpdateBase<TTable, TQueryResult>;
}
Expand Down
7 changes: 7 additions & 0 deletions packages/sink-mongo/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# `@apibara/sink-mongo`

TODO

## Installation

TODO
11 changes: 11 additions & 0 deletions packages/sink-mongo/build.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { defineBuildConfig } from "unbuild";

export default defineBuildConfig({
entries: ["./src/index.ts"],
clean: true,
outDir: "./dist",
declaration: true,
rollup: {
emitCJS: true,
},
});
26 changes: 26 additions & 0 deletions packages/sink-mongo/docker-compose.orbstack.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Reference: https://medium.com/workleap/the-only-local-mongodb-replica-set-with-docker-compose-guide-youll-ever-need-2f0b74dd8384

version: "3.8"

services:
mongo1:
image: mongo:7.0
command: ["--replSet", "rs0", "--bind_ip_all", "--port", "27017"]
ports:
- 27017:27017
extra_hosts:
- "localhost:host-gateway"
healthcheck:
test: echo "try { rs.status() } catch (err) { rs.initiate({_id:'rs0',members:[{_id:0,host:'localhost:27017'}]}) }" | mongosh --port 27017 --quiet
interval: 5s
timeout: 30s
start_period: 0s
start_interval: 1s
retries: 30
volumes:
- "mongo1_data:/data/db"
- "mongo1_config:/data/configdb"

volumes:
mongo1_data:
mongo1_config:
26 changes: 26 additions & 0 deletions packages/sink-mongo/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Reference: https://medium.com/workleap/the-only-local-mongodb-replica-set-with-docker-compose-guide-youll-ever-need-2f0b74dd8384

version: "3.8"

services:
mongo1:
image: mongo:7.0
command: ["--replSet", "rs0", "--bind_ip_all", "--port", "27017"]
ports:
- 27017:27017
extra_hosts:
- "host.docker.internal:host-gateway"
healthcheck:
test: echo "try { rs.status() } catch (err) { rs.initiate({_id:'rs0',members:[{_id:0,host:'host.docker.internal:27017'}]}) }" | mongosh --port 27017 --quiet
interval: 5s
timeout: 30s
start_period: 0s
start_interval: 1s
retries: 30
volumes:
- "mongo1_data:/data/db"
- "mongo1_config:/data/configdb"

volumes:
mongo1_data:
mongo1_config:
41 changes: 41 additions & 0 deletions packages/sink-mongo/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"name": "@apibara/sink-mongo",
"version": "2.0.0-beta.26",
"type": "module",
"files": [
"dist",
"src",
"README.md"
],
"main": "./dist/index.mjs",
"types": "./dist/index.d.ts",
"exports": {
".": {
"types": "./dist/index.d.ts",
"import": "./dist/index.mjs",
"require": "./dist/index.cjs",
"default": "./dist/index.mjs"
}
},
"scripts": {
"build": "unbuild",
"typecheck": "tsc --noEmit",
"lint": "biome check .",
"lint:fix": "pnpm lint --write",
"test": "vitest",
"test:ci": "vitest run"
},
"devDependencies": {
"@types/node": "^20.14.0",
"mongodb": "^6.12.0",
"unbuild": "^2.0.0",
"vitest": "^1.6.0"
},
"peerDependencies": {
"mongodb": "^6.12.0"
},
"dependencies": {
"@apibara/indexer": "workspace:*",
"@apibara/protocol": "workspace:*"
}
}
Loading
Loading