From 4a5f9e8dbd3579dcab392797813059ee42ad6870 Mon Sep 17 00:00:00 2001 From: Erez Rokah Date: Thu, 10 Aug 2023 17:41:44 +0200 Subject: [PATCH] fix: Implement sync, scheduler, resource encoding (#44) Fixes https://github.com/cloudquery/plugin-sdk-javascript/issues/39 --- package-lock.json | 152 ++++++++++++++++++++++++++++++------- package.json | 4 + src/grpc/plugin.ts | 1 + src/memdb/memdb.ts | 90 ++++++++++++++++++++-- src/plugin/plugin.ts | 6 +- src/plugin/serve.ts | 4 +- src/scalar/bool.ts | 2 +- src/scalar/float64.ts | 2 +- src/scalar/int64.ts | 2 +- src/scalar/scalar.ts | 23 ++++++ src/scalar/text.ts | 2 +- src/scalar/timestamp.ts | 2 +- src/scheduler/scheduler.ts | 109 +++++++++++++++++++++++++- src/schema/column.ts | 11 ++- src/schema/meta.ts | 6 +- src/schema/resolvers.ts | 10 +++ src/schema/resource.ts | 25 +++++- src/schema/table.ts | 10 +-- 18 files changed, 400 insertions(+), 61 deletions(-) create mode 100644 src/schema/resolvers.ts diff --git a/package-lock.json b/package-lock.json index ff70190..4a1b1a3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12,9 +12,13 @@ "@apache-arrow/esnext-esm": "^12.0.1", "@cloudquery/plugin-pb-javascript": "^0.0.7", "@types/luxon": "^3.3.1", + "ajv": "^8.12.0", "boolean": "^3.2.0", + "dot-prop": "^8.0.2", "luxon": "^3.4.0", "matcher": "^5.0.0", + "p-map": "^6.0.0", + "p-timeout": "^6.1.2", "winston": "^3.10.0", "yargs": "^17.7.2" }, @@ -359,6 +363,22 @@ "url": "https://opencollective.com/eslint" } }, + "node_modules/@eslint/eslintrc/node_modules/ajv": { + "version": "6.12.6", + "resolved": "https://registry.npmjs.org/ajv/-/ajv-6.12.6.tgz", + "integrity": "sha512-j3fVLgvTo527anyYyJOGTYJbG+vnnQYvE0m5mmkc1TK+nxAppkCLMIL0aZ4dblVCNoGShhm+kzE4ZUykBoMg4g==", + "dev": true, + "dependencies": { + "fast-deep-equal": "^3.1.1", + "fast-json-stable-stringify": "^2.0.0", + "json-schema-traverse": "^0.4.1", + "uri-js": "^4.2.2" + }, + "funding": { + "type": "github", + "url": "https://github.com/sponsors/epoberezkin" + } + }, "node_modules/@eslint/eslintrc/node_modules/argparse": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/argparse/-/argparse-2.0.1.tgz", @@ -377,6 +397,12 @@ "js-yaml": "bin/js-yaml.js" } }, + "node_modules/@eslint/eslintrc/node_modules/json-schema-traverse": { + "version": "0.4.1", + "resolved": "https://registry.npmjs.org/json-schema-traverse/-/json-schema-traverse-0.4.1.tgz", + "integrity": "sha512-xbbCH5dCYU5T8LcEhhuh7HJ88HXuW3qsI3Y0zOZFKfZEHcpWiHU/Jxzk629Brsab/mMiHQti9wMP+845RPe3Vg==", + "dev": true + }, "node_modules/@eslint/js": { "version": "8.46.0", "resolved": "https://registry.npmjs.org/@eslint/js/-/js-8.46.0.tgz", @@ -961,14 +987,13 @@ } }, "node_modules/ajv": { - "version": "6.12.6", - "resolved": "https://registry.npmjs.org/ajv/-/ajv-6.12.6.tgz", - "integrity": "sha512-j3fVLgvTo527anyYyJOGTYJbG+vnnQYvE0m5mmkc1TK+nxAppkCLMIL0aZ4dblVCNoGShhm+kzE4ZUykBoMg4g==", - "dev": true, + "version": "8.12.0", + "resolved": "https://registry.npmjs.org/ajv/-/ajv-8.12.0.tgz", + "integrity": "sha512-sRu1kpcO9yLtYxBKvqfTeh9KzZEwO3STyX1HT+4CaDzC6HpTGYhIhPIzj9XuKU7KYDwnaeh5hcOwjy1QuJzBPA==", "dependencies": { "fast-deep-equal": "^3.1.1", - "fast-json-stable-stringify": "^2.0.0", - "json-schema-traverse": "^0.4.1", + "json-schema-traverse": "^1.0.0", + "require-from-string": "^2.0.2", "uri-js": "^4.2.2" }, "funding": { @@ -1274,6 +1299,21 @@ "url": "https://github.com/chalk/ansi-styles?sponsor=1" } }, + "node_modules/ava/node_modules/p-map": { + "version": "5.5.0", + "resolved": "https://registry.npmjs.org/p-map/-/p-map-5.5.0.tgz", + "integrity": "sha512-VFqfGDHlx87K66yZrNdI4YGtD70IRyd+zSvgks6mzHPRNkoKy+9EKP4SFC77/vTTQYmRmti7dvqC+m5jBrBAcg==", + "dev": true, + "dependencies": { + "aggregate-error": "^4.0.0" + }, + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/ava/node_modules/strip-ansi": { "version": "7.1.0", "resolved": "https://registry.npmjs.org/strip-ansi/-/strip-ansi-7.1.0.tgz", @@ -2025,6 +2065,31 @@ "node": ">=6.0.0" } }, + "node_modules/dot-prop": { + "version": "8.0.2", + "resolved": "https://registry.npmjs.org/dot-prop/-/dot-prop-8.0.2.tgz", + "integrity": "sha512-xaBe6ZT4DHPkg0k4Ytbvn5xoxgpG0jOS1dYxSOwAHPuNLjP3/OzN0gH55SrLqpx8cBfSaVt91lXYkApjb+nYdQ==", + "dependencies": { + "type-fest": "^3.8.0" + }, + "engines": { + "node": ">=16" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, + "node_modules/dot-prop/node_modules/type-fest": { + "version": "3.13.1", + "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-3.13.1.tgz", + "integrity": "sha512-tLq3bSNx+xSpwvAJnzrK0Ep5CLNWjvFTOp71URMaAEWBfRb9nnJiBoUe0tF8bI4ZFO3omgBR6NvnbzVUT3Ly4g==", + "engines": { + "node": ">=14.16" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/eastasianwidth": { "version": "0.2.0", "resolved": "https://registry.npmjs.org/eastasianwidth/-/eastasianwidth-0.2.0.tgz", @@ -2673,6 +2738,22 @@ "url": "https://opencollective.com/eslint" } }, + "node_modules/eslint/node_modules/ajv": { + "version": "6.12.6", + "resolved": "https://registry.npmjs.org/ajv/-/ajv-6.12.6.tgz", + "integrity": "sha512-j3fVLgvTo527anyYyJOGTYJbG+vnnQYvE0m5mmkc1TK+nxAppkCLMIL0aZ4dblVCNoGShhm+kzE4ZUykBoMg4g==", + "dev": true, + "dependencies": { + "fast-deep-equal": "^3.1.1", + "fast-json-stable-stringify": "^2.0.0", + "json-schema-traverse": "^0.4.1", + "uri-js": "^4.2.2" + }, + "funding": { + "type": "github", + "url": "https://github.com/sponsors/epoberezkin" + } + }, "node_modules/eslint/node_modules/argparse": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/argparse/-/argparse-2.0.1.tgz", @@ -2747,6 +2828,12 @@ "js-yaml": "bin/js-yaml.js" } }, + "node_modules/eslint/node_modules/json-schema-traverse": { + "version": "0.4.1", + "resolved": "https://registry.npmjs.org/json-schema-traverse/-/json-schema-traverse-0.4.1.tgz", + "integrity": "sha512-xbbCH5dCYU5T8LcEhhuh7HJ88HXuW3qsI3Y0zOZFKfZEHcpWiHU/Jxzk629Brsab/mMiHQti9wMP+845RPe3Vg==", + "dev": true + }, "node_modules/eslint/node_modules/locate-path": { "version": "6.0.0", "resolved": "https://registry.npmjs.org/locate-path/-/locate-path-6.0.0.tgz", @@ -2923,8 +3010,7 @@ "node_modules/fast-deep-equal": { "version": "3.1.3", "resolved": "https://registry.npmjs.org/fast-deep-equal/-/fast-deep-equal-3.1.3.tgz", - "integrity": "sha512-f3qQ9oQy9j2AhBe/H9VC91wLmKBCCU/gDOnKNAYG5hswO7BLKj09Hc5HYNz9cGI++xlpDCIgDaitVs03ATR84Q==", - "dev": true + "integrity": "sha512-f3qQ9oQy9j2AhBe/H9VC91wLmKBCCU/gDOnKNAYG5hswO7BLKj09Hc5HYNz9cGI++xlpDCIgDaitVs03ATR84Q==" }, "node_modules/fast-diff": { "version": "1.3.0", @@ -3972,10 +4058,9 @@ "dev": true }, "node_modules/json-schema-traverse": { - "version": "0.4.1", - "resolved": "https://registry.npmjs.org/json-schema-traverse/-/json-schema-traverse-0.4.1.tgz", - "integrity": "sha512-xbbCH5dCYU5T8LcEhhuh7HJ88HXuW3qsI3Y0zOZFKfZEHcpWiHU/Jxzk629Brsab/mMiHQti9wMP+845RPe3Vg==", - "dev": true + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/json-schema-traverse/-/json-schema-traverse-1.0.0.tgz", + "integrity": "sha512-NM8/P9n3XjXhIZn1lLhkFaACTOURQXjWhV4BA/RnOv8xvgqtqpAX9IO4mRQxSx1Rlo4tqzeqb0sOlruaOy3dug==" }, "node_modules/json-stable-stringify-without-jsonify": { "version": "1.0.1", @@ -4500,6 +4585,18 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/p-event/node_modules/p-timeout": { + "version": "5.1.0", + "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-5.1.0.tgz", + "integrity": "sha512-auFDyzzzGZZZdHz3BtET9VEz0SE/uMEAx7uWfGPucfzEwwe/xH0iVeZibQmANYE/hp9T2+UUZT5m+BKyrDp3Ew==", + "dev": true, + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/p-limit": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/p-limit/-/p-limit-4.0.0.tgz", @@ -4531,27 +4628,22 @@ } }, "node_modules/p-map": { - "version": "5.5.0", - "resolved": "https://registry.npmjs.org/p-map/-/p-map-5.5.0.tgz", - "integrity": "sha512-VFqfGDHlx87K66yZrNdI4YGtD70IRyd+zSvgks6mzHPRNkoKy+9EKP4SFC77/vTTQYmRmti7dvqC+m5jBrBAcg==", - "dev": true, - "dependencies": { - "aggregate-error": "^4.0.0" - }, + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/p-map/-/p-map-6.0.0.tgz", + "integrity": "sha512-T8BatKGY+k5rU+Q/GTYgrEf2r4xRMevAN5mtXc2aPc4rS1j3s+vWTaO2Wag94neXuCAUAs8cxBL9EeB5EA6diw==", "engines": { - "node": ">=12" + "node": ">=16" }, "funding": { "url": "https://github.com/sponsors/sindresorhus" } }, "node_modules/p-timeout": { - "version": "5.1.0", - "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-5.1.0.tgz", - "integrity": "sha512-auFDyzzzGZZZdHz3BtET9VEz0SE/uMEAx7uWfGPucfzEwwe/xH0iVeZibQmANYE/hp9T2+UUZT5m+BKyrDp3Ew==", - "dev": true, + "version": "6.1.2", + "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-6.1.2.tgz", + "integrity": "sha512-UbD77BuZ9Bc9aABo74gfXhNvzC9Tx7SxtHSh1fxvx3jTLLYvmVhiQZZrJzqqU0jKbN32kb5VOKiLEQI/3bIjgQ==", "engines": { - "node": ">=12" + "node": ">=14.16" }, "funding": { "url": "https://github.com/sponsors/sindresorhus" @@ -4907,7 +4999,6 @@ "version": "2.3.0", "resolved": "https://registry.npmjs.org/punycode/-/punycode-2.3.0.tgz", "integrity": "sha512-rRV+zQD8tVFys26lAGR9WUuS4iUAngJScM+ZRSKtvl5tKeZ2t5bvdNFdNHBW9FWR4guGHlgmsZ1G7BSm2wTbuA==", - "dev": true, "engines": { "node": ">=6" } @@ -5139,6 +5230,14 @@ "node": ">=0.10.0" } }, + "node_modules/require-from-string": { + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/require-from-string/-/require-from-string-2.0.2.tgz", + "integrity": "sha512-Xf0nWe6RseziFMu+Ap9biiUbmplq6S9/p+7w7YXP/JBHhrUDDUhwa+vANyubuqfZWTveU//DYVGsDG7RKL/vEw==", + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/resolve": { "version": "1.22.4", "resolved": "https://registry.npmjs.org/resolve/-/resolve-1.22.4.tgz", @@ -6132,7 +6231,6 @@ "version": "4.4.1", "resolved": "https://registry.npmjs.org/uri-js/-/uri-js-4.4.1.tgz", "integrity": "sha512-7rKUyy33Q1yc98pQ1DAmLtwX109F7TIfWlW1Ydo8Wl1ii1SeHieeh0HHfPeL2fMXK6z0s8ecKs9frCuLJvndBg==", - "dev": true, "dependencies": { "punycode": "^2.1.0" } diff --git a/package.json b/package.json index 3f9a1b3..7912e68 100644 --- a/package.json +++ b/package.json @@ -82,9 +82,13 @@ "@apache-arrow/esnext-esm": "^12.0.1", "@cloudquery/plugin-pb-javascript": "^0.0.7", "@types/luxon": "^3.3.1", + "ajv": "^8.12.0", "boolean": "^3.2.0", + "dot-prop": "^8.0.2", "luxon": "^3.4.0", "matcher": "^5.0.0", + "p-map": "^6.0.0", + "p-timeout": "^6.1.2", "winston": "^3.10.0", "yargs": "^17.7.2" } diff --git a/src/grpc/plugin.ts b/src/grpc/plugin.ts index 810ebef..8fc4208 100644 --- a/src/grpc/plugin.ts +++ b/src/grpc/plugin.ts @@ -6,6 +6,7 @@ import { encodeTables } from '../schema/table.js'; export class MigrateTable extends pluginV3.cloudquery.plugin.v3.Sync.MessageMigrateTable {} export class SyncRequest extends pluginV3.cloudquery.plugin.v3.Sync.Request {} +export class Insert extends pluginV3.cloudquery.plugin.v3.Sync.MessageInsert {} export class SyncResponse extends pluginV3.cloudquery.plugin.v3.Sync.Response {} export class ReadRequest extends pluginV3.cloudquery.plugin.v3.Read.Request {} export class ReadResponse extends pluginV3.cloudquery.plugin.v3.Read.Response {} diff --git a/src/memdb/memdb.ts b/src/memdb/memdb.ts index de89a41..895eb2f 100644 --- a/src/memdb/memdb.ts +++ b/src/memdb/memdb.ts @@ -1,9 +1,12 @@ -import { StructRowProxy } from '@apache-arrow/esnext-esm'; +import { StructRowProxy, Utf8 } from '@apache-arrow/esnext-esm'; import { pluginV3 } from '@cloudquery/plugin-pb-javascript'; +import { default as Ajv } from 'ajv'; import { WriteRequest, WriteStream, ReadStream, ReadRequest } from '../grpc/plugin.js'; -import { Plugin, newPlugin, SyncOptions, TableOptions, NewClientOptions } from '../plugin/plugin.js'; +import { Plugin, newPlugin, SyncOptions, TableOptions, NewClientFunction } from '../plugin/plugin.js'; import { sync } from '../scheduler/scheduler.js'; +import { createColumn } from '../schema/column.js'; +import { pathResolver } from '../schema/resolvers.js'; import { Table, createTable, filterTables, decodeTable, decodeRecord, getPrimaryKeys } from '../schema/table.js'; export const createMemDBClient = () => { @@ -17,14 +20,60 @@ export const createMemDBClient = () => { }; }; +const spec = { + type: 'object', + properties: { + concurrency: { type: 'integer' }, + }, +}; + +type Spec = { + concurrency: number; +}; + +const ajv = new Ajv.default(); +const validate = ajv.compile(spec); + export const newMemDBPlugin = (): Plugin => { const memdbClient = createMemDBClient(); const memoryDB = memdbClient.memoryDB; const tables = memdbClient.tables; const allTables: Table[] = [ - createTable({ name: 'table1', title: 'Table 1', description: 'Table 1 description' }), - createTable({ name: 'table2', title: 'Table 2', description: 'Table 2 description' }), + createTable({ + name: 'table1', + title: 'Table 1', + description: 'Table 1 description', + resolver: (clientMeta, parent, stream) => { + stream.write({ id: 'table1-name1' }); + stream.write({ id: 'table1-name2' }); + return Promise.resolve(); + }, + columns: [ + createColumn({ + name: 'id', + type: new Utf8(), + resolver: pathResolver('id'), + }), + ], + }), + createTable({ + name: 'table2', + title: 'Table 2', + description: 'Table 2 description', + resolver: (clientMeta, parent, stream) => { + stream.write({ name: 'table2-name1' }); + stream.write({ name: 'table2-name2' }); + return Promise.resolve(); + }, + columns: [ + createColumn({ + name: 'name', + type: new Utf8(), + resolver: pathResolver('name'), + }), + ], + }), ]; const memdb: { inserts: unknown[]; [key: string]: unknown } = { @@ -86,7 +135,8 @@ export const newMemDBPlugin = (): Plugin => { }; const pluginClient = { - init: (spec: string, options: NewClientOptions) => Promise.resolve(), + plugin: null as unknown as Plugin, + spec: null as unknown as Spec, close: () => Promise.resolve(), tables: (options: TableOptions) => { const { tables, skipTables, skipDependentTables } = options; @@ -96,7 +146,19 @@ export const newMemDBPlugin = (): Plugin => { sync: async (options: SyncOptions) => { const { stream, tables, skipTables, skipDependentTables, deterministicCQId } = options; const filtered = filterTables(allTables, tables, skipTables, skipDependentTables); - return await sync(memdbClient, filtered, stream, { deterministicCQId }); + const logger = pluginClient.plugin.getLogger(); + const { + spec: { concurrency }, + } = pluginClient; + + return await sync({ + logger, + client: memdbClient, + stream, + tables: filtered, + deterministicCQId, + concurrency, + }); }, write(stream: WriteStream): Promise { return new Promise((resolve, reject) => { @@ -176,5 +238,19 @@ export const newMemDBPlugin = (): Plugin => { }, }; - return newPlugin('memdb', '0.0.1', () => Promise.resolve(pluginClient)); + const newClient: NewClientFunction = (logger, spec, options) => { + const parsedSpec = JSON.parse(spec) as Partial; + const validSchema = validate(parsedSpec); + if (!validSchema) { + const messages = validate.errors?.map((error) => error.message).join(', '); + return Promise.reject(new Error(`Invalid spec: ${messages}`)); + } + const { concurrency = 10_000 } = parsedSpec; + pluginClient.spec = { concurrency }; + return Promise.resolve(pluginClient); + }; + + const plugin = newPlugin('memdb', '0.0.1', newClient); + pluginClient.plugin = plugin; + return plugin; }; diff --git a/src/plugin/plugin.ts b/src/plugin/plugin.ts index 8f21762..fb07a34 100644 --- a/src/plugin/plugin.ts +++ b/src/plugin/plugin.ts @@ -40,14 +40,15 @@ export interface DestinationClient { } export interface Client extends SourceClient, DestinationClient { - init: (spec: string, options: NewClientOptions) => Promise; close: () => Promise; } export interface Plugin extends Client { + getLogger: () => Logger; setLogger: (logger: Logger) => void; name: () => string; version: () => string; + init: (spec: string, options: NewClientOptions) => Promise; } export const newUnimplementedSource = (): SourceClient => { @@ -76,6 +77,9 @@ export const newPlugin = (name: string, version: string, newClient: NewClientFun read: (stream: ReadStream) => { return plugin.client?.read(stream) ?? new Error('client not initialized'); }, + getLogger: () => { + return plugin.logger!; + }, setLogger: (logger: Logger) => { plugin.logger = logger; }, diff --git a/src/plugin/serve.ts b/src/plugin/serve.ts index b245257..08dad5b 100644 --- a/src/plugin/serve.ts +++ b/src/plugin/serve.ts @@ -25,9 +25,9 @@ export const createServeCommand = (plugin: Plugin) => { 'serve', 'start plugin gRPC server', () => {}, - ({ address, network, logLevel, logFormat, sentry: sentry, otelEndpoint, telemetryLevel }: ServeArguments) => { + ({ address, logLevel, logFormat }: ServeArguments) => { const logger = createLogger(logLevel, logFormat); - logger.info(JSON.stringify({ address, network, logLevel, logFormat, sentry, otelEndpoint, telemetryLevel })); + plugin.setLogger(logger); startServer(logger, address, plugin); }, ) diff --git a/src/scalar/bool.ts b/src/scalar/bool.ts index a0a01fe..66af397 100644 --- a/src/scalar/bool.ts +++ b/src/scalar/bool.ts @@ -8,7 +8,7 @@ export class Bool implements Scalar { private _valid = false; private _value = false; - public constructor(v: unknown) { + public constructor(v?: unknown) { this.value = v; return this; } diff --git a/src/scalar/float64.ts b/src/scalar/float64.ts index 3f6a35d..713bd03 100644 --- a/src/scalar/float64.ts +++ b/src/scalar/float64.ts @@ -7,7 +7,7 @@ export class Float64 implements Scalar { private _valid = false; private _value: number = 0; - public constructor(v: unknown) { + public constructor(v?: unknown) { this.value = v; return this; } diff --git a/src/scalar/int64.ts b/src/scalar/int64.ts index 828fd6c..fea0d8b 100644 --- a/src/scalar/int64.ts +++ b/src/scalar/int64.ts @@ -7,7 +7,7 @@ export class Int64 implements Scalar { private _valid = false; private _value: bigint = BigInt(0); - public constructor(v: unknown) { + public constructor(v?: unknown) { this.value = v; return this; } diff --git a/src/scalar/scalar.ts b/src/scalar/scalar.ts index 3789be6..13847c4 100644 --- a/src/scalar/scalar.ts +++ b/src/scalar/scalar.ts @@ -1,5 +1,11 @@ import { DataType } from '@apache-arrow/esnext-esm'; +import { Bool } from './bool.js'; +import { Float64 } from './float64.js'; +import { Int64 } from './int64.js'; +import { Text } from './text.js'; +import { Timestamp } from './timestamp.js'; + export interface Scalar { toString: () => string; get valid(): boolean; @@ -9,3 +15,20 @@ export interface Scalar { } export type Vector = Scalar[]; + +export const newScalar = (dataType: DataType): Scalar => { + if (DataType.isBool(dataType)) { + return new Bool(); + } + if (DataType.isInt(dataType)) { + return new Int64(); + } + if (DataType.isFloat(dataType)) { + return new Float64(); + } + if (DataType.isTimestamp(dataType)) { + return new Timestamp(); + } + + return new Text(); +}; diff --git a/src/scalar/text.ts b/src/scalar/text.ts index deb4e41..18ce130 100644 --- a/src/scalar/text.ts +++ b/src/scalar/text.ts @@ -7,7 +7,7 @@ export class Text implements Scalar { private _valid = false; private _value = ''; - public constructor(v: unknown) { + public constructor(v?: unknown) { this.value = v; return this; } diff --git a/src/scalar/timestamp.ts b/src/scalar/timestamp.ts index eb6e873..2c2687a 100644 --- a/src/scalar/timestamp.ts +++ b/src/scalar/timestamp.ts @@ -9,7 +9,7 @@ export class Timestamp implements Scalar { private _value: DateTime = DateTime.fromMillis(0); private _unit: TimeUnit = TimeUnit.NANOSECOND; - public constructor(v: unknown, unit?: TimeUnit) { + public constructor(v?: unknown, unit?: TimeUnit) { this.value = v; if (unit) { this._unit = unit; diff --git a/src/scheduler/scheduler.ts b/src/scheduler/scheduler.ts index bf88476..eb5f5d2 100644 --- a/src/scheduler/scheduler.ts +++ b/src/scheduler/scheduler.ts @@ -1,17 +1,118 @@ -import { SyncStream, SyncResponse, MigrateTable } from '../grpc/plugin.js'; +import { Duplex } from 'node:stream'; + +import pMap from 'p-map'; +import pTimeout from 'p-timeout'; +import { Logger } from 'winston'; + +import { SyncStream, SyncResponse, MigrateTable, Insert } from '../grpc/plugin.js'; +import { Column } from '../schema/column.js'; import { ClientMeta } from '../schema/meta.js'; +import { Resource, encodeResource } from '../schema/resource.js'; import { Table, encodeTable } from '../schema/table.js'; +import { Nullable } from '../schema/types.js'; export type Options = { + logger: Logger; + client: ClientMeta; + tables: Table[]; + stream: SyncStream; deterministicCQId: boolean; + concurrency: number; +}; + +class TableResolverStream extends Duplex { + queue: unknown[] = []; + + constructor() { + super({ objectMode: true }); + } + + _read() { + while (this.queue.length > 0) { + this.push(this.queue.shift()); + } + if (this.writableEnded) { + // end readable stream if writable stream has ended + this.push(null); + } + } + + _write(chunk: unknown, _: string, next: (error?: Error | null) => void) { + this.queue.push(chunk); + next(); + } +} + +const resolveColumn = async (client: ClientMeta, table: Table, resource: Resource, column: Column) => { + try { + return await column.resolver(client, resource, column); + } catch (error) { + throw new Error(`error resolving column ${column.name} for table ${table.name}: ${error}`); + } }; -export const sync = async (client: ClientMeta, tables: Table[], stream: SyncStream, options: Options) => { +const resolveTable = async ( + logger: Logger, + client: ClientMeta, + table: Table, + parent: Nullable, + syncStream: SyncStream, +) => { + logger.info(`resolving table ${table.name}`); + const stream = new TableResolverStream(); + try { + await table.resolver(client, null, stream); + } catch (error) { + logger.error(`error resolving table ${table.name}: ${error}`); + return; + } finally { + stream.end(); + } + + for await (const data of stream) { + logger.info(`resolving resource for table ${table.name}`); + const resolveResourceTimeout = 10 * 60 * 1000; + const resource = new Resource(table, parent, data); + + try { + await pTimeout(table.preResourceResolver(client, resource), { milliseconds: resolveResourceTimeout }); + } catch (error) { + logger.error(`error resolving preResourceResolver for table ${table.name}: ${error}`); + continue; + } + + try { + const allColumnsPromise = pMap(table.columns, (column) => resolveColumn(client, table, resource, column), { + stopOnError: true, + }); + await pTimeout(allColumnsPromise, { milliseconds: resolveResourceTimeout }); + } catch (error) { + logger.error(`error resolving columns for table ${table.name}: ${error}`); + continue; + } + + try { + await table.postResourceResolver(client, resource); + } catch (error) { + logger.error(`error resolving postResourceResolver for table ${table.name}: ${error}`); + continue; + } + + syncStream.write(new SyncResponse({ insert: new Insert({ record: encodeResource(resource) }) })); + + await Promise.all(table.relations.map((child) => resolveTable(logger, client, child, resource, syncStream))); + } +}; + +export const sync = async ({ logger, client, tables, stream: syncStream, concurrency }: Options) => { for (const table of tables) { + logger.info(`sending migrate message for table ${table.name}`); // eslint-disable-next-line @typescript-eslint/naming-convention - stream.write(new SyncResponse({ migrate_table: new MigrateTable({ table: encodeTable(table) }) })); + syncStream.write(new SyncResponse({ migrate_table: new MigrateTable({ table: encodeTable(table) }) })); } - stream.end(); + await pMap(tables, (table) => resolveTable(logger, client, table, null, syncStream), { concurrency }); + + syncStream.end(); return await Promise.resolve(); }; diff --git a/src/schema/column.ts b/src/schema/column.ts index 8292883..064e52f 100644 --- a/src/schema/column.ts +++ b/src/schema/column.ts @@ -6,7 +6,7 @@ import * as arrow from './arrow.js'; import { ClientMeta } from './meta.js'; import { Resource } from './resource.js'; -export type ColumnResolver = (meta: ClientMeta, resource: Resource, c: Column) => void; +export type ColumnResolver = (meta: ClientMeta, resource: Resource, c: Column) => Promise; export type Column = { name: string; @@ -16,10 +16,12 @@ export type Column = { notNull: boolean; incrementalKey: boolean; unique: boolean; - resolver?: ColumnResolver; + resolver: ColumnResolver; ignoreInTests: boolean; }; +const emptyResolver = () => Promise.resolve(); + export const createColumn = ({ name = '', type = new Bool(), @@ -27,7 +29,9 @@ export const createColumn = ({ incrementalKey = false, notNull = false, primaryKey = false, + resolver = emptyResolver, unique = false, + ignoreInTests = false, }: Partial = {}): Column => ({ name, type, @@ -35,8 +39,9 @@ export const createColumn = ({ primaryKey, notNull, incrementalKey, + resolver, unique, - ignoreInTests: false, + ignoreInTests, }); export const formatColumn = (column: Column): string => { diff --git a/src/schema/meta.ts b/src/schema/meta.ts index ccf5a7b..e610073 100644 --- a/src/schema/meta.ts +++ b/src/schema/meta.ts @@ -12,13 +12,13 @@ export type ClientMeta = { export const parentCqUUIDResolver = (): ColumnResolver => { return (_: ClientMeta, r: Resource, c: Column) => { if (r.parent === null) { - return r.setColumData(c.name, null); + return Promise.resolve(r.setColumData(c.name, null)); } const parentCqID = r.parent.getColumnData(cqIDColumn.name); if (parentCqID == null) { - return r.setColumData(c.name, null); + return Promise.resolve(r.setColumData(c.name, null)); } - return r.setColumData(c.name, parentCqID); + return Promise.resolve(r.setColumData(c.name, parentCqID)); }; }; diff --git a/src/schema/resolvers.ts b/src/schema/resolvers.ts new file mode 100644 index 0000000..9b88ef2 --- /dev/null +++ b/src/schema/resolvers.ts @@ -0,0 +1,10 @@ +import { getProperty } from 'dot-prop'; + +import { ColumnResolver } from './column.js'; + +export const pathResolver = (path: string): ColumnResolver => { + return (_, resource, c) => { + resource.setColumData(c.name, getProperty(resource.getItem(), path)); + return Promise.resolve(); + }; +}; diff --git a/src/schema/resource.ts b/src/schema/resource.ts index c3627c4..704ad23 100644 --- a/src/schema/resource.ts +++ b/src/schema/resource.ts @@ -1,6 +1,8 @@ -import { Scalar, Vector } from '../scalar/scalar.js'; +import { tableToIPC, Table as ArrowTable, RecordBatch, vectorFromArray } from '@apache-arrow/esnext-esm'; -import { Table } from './table.js'; +import { Scalar, Vector, newScalar } from '../scalar/scalar.js'; + +import { Table, toArrowSchema } from './table.js'; import { Nullable } from './types.js'; export class Resource { @@ -13,8 +15,7 @@ export class Resource { this.table = table; this.parent = parent; this.item = item; - // TODO: Init from table columns - this.data = []; + this.data = table.columns.map((column) => newScalar(column.type)); } getColumnData(columnName: string): Scalar { @@ -41,3 +42,19 @@ export class Resource { this.item = item; } } + +export const encodeResource = (resource: Resource): Uint8Array => { + const { table } = resource; + const schema = toArrowSchema(table); + // TODO: Check if this can be simplified + let batch = new RecordBatch(schema, undefined); + for (let index = 0; index < table.columns.length; index++) { + const column = table.columns[index]; + const data = resource.getColumnData(column.name); + const vector = vectorFromArray([data], column.type); + batch = batch.setChildAt(index, vector); + } + const arrowTable = new ArrowTable(schema, batch); + const bytes = tableToIPC(arrowTable); + return bytes; +}; diff --git a/src/schema/table.ts b/src/schema/table.ts index 1b95bb8..182ae17 100644 --- a/src/schema/table.ts +++ b/src/schema/table.ts @@ -9,8 +9,8 @@ import { ClientMeta } from './meta.js'; import { Resource } from './resource.js'; import { Nullable } from './types.js'; -export type TableResolver = (clientMeta: ClientMeta, parent: Nullable, stream: Writable) => void; -export type RowResolver = (clientMeta: ClientMeta, resource: Resource) => void; +export type TableResolver = (clientMeta: ClientMeta, parent: Nullable, stream: Writable) => Promise; +export type RowResolver = (clientMeta: ClientMeta, resource: Resource) => Promise; export type Multiplexer = (clientMeta: ClientMeta) => ClientMeta[]; export type Transform = (table: Table) => void; @@ -38,10 +38,10 @@ export const createTable = ({ columns = [], relations = [], transform = () => {}, - resolver = () => {}, + resolver = () => Promise.resolve(), multiplexer = () => [], - postResourceResolver = () => {}, - preResourceResolver = () => {}, + postResourceResolver = () => Promise.resolve(), + preResourceResolver = () => Promise.resolve(), isIncremental = false, ignoreInTests = false, parent = null,