From fbb7c94341f47a7959182e643b2bce75c5226791 Mon Sep 17 00:00:00 2001 From: Erez Rokah Date: Fri, 11 Aug 2023 17:35:48 +0200 Subject: [PATCH] feat: Add CQID and Parent CQID (#53) --- package-lock.json | 16 ++++++++++ package.json | 2 ++ src/memdb/tables.ts | 58 +++++++++++++++++++++++++++++----- src/scalar/list.ts | 6 ++-- src/scalar/scalar.ts | 8 +++-- src/scalar/text.test.ts | 2 +- src/scalar/text.ts | 6 ++++ src/scheduler/cqid.test.ts | 64 +++++++++++++++++++++++++++++++++++++ src/scheduler/cqid.ts | 28 ++++++++++++++++ src/scheduler/scheduler.ts | 54 +++++++++++++++++++++++++------ src/schema/column.ts | 4 +-- src/schema/meta.test.ts | 65 +++++++++++++++++++++++++++++--------- src/schema/meta.ts | 9 +++--- src/schema/resolvers.ts | 15 +++++++++ src/schema/resource.ts | 13 ++++++-- src/schema/table.test.ts | 7 ++-- src/schema/table.ts | 39 +++++++++++++++++------ src/types/json.ts | 14 ++------ src/types/uuid.ts | 15 ++------- 19 files changed, 340 insertions(+), 85 deletions(-) create mode 100644 src/scheduler/cqid.test.ts create mode 100644 src/scheduler/cqid.ts diff --git a/package-lock.json b/package-lock.json index 4a1b1a3..036d3d8 100644 --- a/package-lock.json +++ b/package-lock.json @@ -19,6 +19,7 @@ "matcher": "^5.0.0", "p-map": "^6.0.0", "p-timeout": "^6.1.2", + "uuid": "^9.0.0", "winston": "^3.10.0", "yargs": "^17.7.2" }, @@ -26,6 +27,7 @@ "@ava/typescript": "^4.1.0", "@grpc/grpc-js": "^1.9.0", "@tsconfig/node16": "^16.1.0", + "@types/uuid": "^9.0.2", "@types/yargs": "^17.0.24", "@typescript-eslint/eslint-plugin": "^6.2.1", "@typescript-eslint/parser": "^6.2.1", @@ -706,6 +708,12 @@ "resolved": "https://registry.npmjs.org/@types/triple-beam/-/triple-beam-1.3.2.tgz", "integrity": "sha512-txGIh+0eDFzKGC25zORnswy+br1Ha7hj5cMVwKIU7+s0U2AxxJru/jZSMU6OC9MJWP6+pc/hc6ZjyZShpsyY2g==" }, + "node_modules/@types/uuid": { + "version": "9.0.2", + "resolved": "https://registry.npmjs.org/@types/uuid/-/uuid-9.0.2.tgz", + "integrity": "sha512-kNnC1GFBLuhImSnV7w4njQkUiJi0ZXUycu1rUaouPqiKlXkh77JKgdRnTAp1x5eBwcIwbtI+3otwzuIDEuDoxQ==", + "dev": true + }, "node_modules/@types/yargs": { "version": "17.0.24", "resolved": "https://registry.npmjs.org/@types/yargs/-/yargs-17.0.24.tgz", @@ -6240,6 +6248,14 @@ "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz", "integrity": "sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==" }, + "node_modules/uuid": { + "version": "9.0.0", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-9.0.0.tgz", + "integrity": "sha512-MXcSTerfPa4uqyzStbRoTgt5XIe3x5+42+q1sDuy3R5MDk66URdLMOZe5aPX/SQd+kuYAh0FdP/pO28IkQyTeg==", + "bin": { + "uuid": "dist/bin/uuid" + } + }, "node_modules/v8-compile-cache-lib": { "version": "3.0.1", "resolved": "https://registry.npmjs.org/v8-compile-cache-lib/-/v8-compile-cache-lib-3.0.1.tgz", diff --git a/package.json b/package.json index 7912e68..466f79c 100644 --- a/package.json +++ b/package.json @@ -46,6 +46,7 @@ "@ava/typescript": "^4.1.0", "@grpc/grpc-js": "^1.9.0", "@tsconfig/node16": "^16.1.0", + "@types/uuid": "^9.0.2", "@types/yargs": "^17.0.24", "@typescript-eslint/eslint-plugin": "^6.2.1", "@typescript-eslint/parser": "^6.2.1", @@ -89,6 +90,7 @@ "matcher": "^5.0.0", "p-map": "^6.0.0", "p-timeout": "^6.1.2", + "uuid": "^9.0.0", "winston": "^3.10.0", "yargs": "^17.7.2" } diff --git a/src/memdb/tables.ts b/src/memdb/tables.ts index f1202f5..9ea80af 100644 --- a/src/memdb/tables.ts +++ b/src/memdb/tables.ts @@ -1,8 +1,8 @@ -import { Utf8 } from '@apache-arrow/esnext-esm'; +import { Int64 } from '@apache-arrow/esnext-esm'; import { createColumn } from '../schema/column.js'; import { addCQIDsColumns } from '../schema/meta.js'; -import { pathResolver } from '../schema/resolvers.js'; +import { pathResolver, parentColumnResolver } from '../schema/resolvers.js'; import { createTable } from '../schema/table.js'; export const createTables = () => { @@ -12,14 +12,13 @@ export const createTables = () => { title: 'Table 1', description: 'Table 1 description', resolver: (clientMeta, parent, stream) => { - stream.write({ id: 'table1-name1' }); - stream.write({ id: 'table1-name2' }); + stream.write({ id: 'id-1' }); + stream.write({ id: 'id-2' }); return Promise.resolve(); }, columns: [ createColumn({ name: 'id', - type: new Utf8(), resolver: pathResolver('id'), }), ], @@ -29,18 +28,61 @@ export const createTables = () => { title: 'Table 2', description: 'Table 2 description', resolver: (clientMeta, parent, stream) => { - stream.write({ name: 'table2-name1' }); - stream.write({ name: 'table2-name2' }); + stream.write({ name: 'name-1' }); + stream.write({ name: 'name-2' }); return Promise.resolve(); }, columns: [ createColumn({ name: 'name', - type: new Utf8(), resolver: pathResolver('name'), }), ], }), + createTable({ + name: 'table3', + title: 'Table 3', + description: 'Table 3 description', + resolver: (clientMeta, parent, stream) => { + stream.write({ name: 'name-1' }); + stream.write({ name: 'name-2' }); + return Promise.resolve(); + }, + columns: [ + createColumn({ + name: 'name', + primaryKey: true, + resolver: pathResolver('name'), + }), + ], + relations: [ + createTable({ + name: 'table3_child1', + resolver: (clientMeta, parent, stream) => { + stream.write({ name: 'name-1', id: 1 }); + stream.write({ name: 'name-2', id: 2 }); + return Promise.resolve(); + }, + columns: [ + createColumn({ + name: 'name', + resolver: pathResolver('name'), + }), + createColumn({ + name: 'id', + resolver: pathResolver('id'), + type: new Int64(), + primaryKey: true, + }), + createColumn({ + name: 'parent_name', + resolver: parentColumnResolver('name'), + primaryKey: true, + }), + ], + }), + ], + }), ]; const tableWithCQIDs = allTables.map((table) => addCQIDsColumns(table)); diff --git a/src/scalar/list.ts b/src/scalar/list.ts index f1c3f9e..d64525f 100644 --- a/src/scalar/list.ts +++ b/src/scalar/list.ts @@ -1,11 +1,11 @@ import { DataType, List as ArrowList } from '@apache-arrow/esnext-esm'; -import { Scalar } from './scalar.js'; +import { Scalar, Stringable } from './scalar.js'; import { isInvalid, NULL_VALUE } from './util.js'; -type TVector> = T[]; +type TVector> = T[]; -export class List> implements Scalar> { +export class List> implements Scalar> { private _type: new (value?: unknown) => T; private _valid = false; private _value: TVector = []; diff --git a/src/scalar/scalar.ts b/src/scalar/scalar.ts index 13847c4..bd8ad82 100644 --- a/src/scalar/scalar.ts +++ b/src/scalar/scalar.ts @@ -6,7 +6,9 @@ import { Int64 } from './int64.js'; import { Text } from './text.js'; import { Timestamp } from './timestamp.js'; -export interface Scalar { +export type Stringable = { toString: () => string }; + +export interface Scalar { toString: () => string; get valid(): boolean; get value(): T; @@ -14,9 +16,9 @@ export interface Scalar { get dataType(): DataType; } -export type Vector = Scalar[]; +export type Vector = Scalar[]; -export const newScalar = (dataType: DataType): Scalar => { +export const newScalar = (dataType: DataType): Scalar => { if (DataType.isBool(dataType)) { return new Bool(); } diff --git a/src/scalar/text.test.ts b/src/scalar/text.test.ts index dee83d9..c7ce80b 100644 --- a/src/scalar/text.test.ts +++ b/src/scalar/text.test.ts @@ -4,7 +4,7 @@ import test from 'ava'; import { Text } from './text.js'; // eslint-disable-next-line unicorn/no-null -[null, undefined].forEach((v) => { +[null, undefined, new Text()].forEach((v) => { test(`should set values to empty string when ${v} is passed`, (t) => { const s = new Text(v); t.is(s.value, ''); diff --git a/src/scalar/text.ts b/src/scalar/text.ts index 18ce130..d39a91b 100644 --- a/src/scalar/text.ts +++ b/src/scalar/text.ts @@ -42,6 +42,12 @@ export class Text implements Scalar { return; } + if (value instanceof Text) { + this._value = value.value; + this._valid = value.valid; + return; + } + if (typeof value!.toString === 'function' && value!.toString !== Object.prototype.toString) { this._value = value!.toString(); this._valid = true; diff --git a/src/scheduler/cqid.test.ts b/src/scheduler/cqid.test.ts new file mode 100644 index 0000000..ea5fa48 --- /dev/null +++ b/src/scheduler/cqid.test.ts @@ -0,0 +1,64 @@ +import { createHash } from 'node:crypto'; + +import test from 'ava'; + +import { createColumn } from '../schema/column.js'; +import { addCQIDsColumns, cqIDColumn } from '../schema/meta.js'; +import { Resource } from '../schema/resource.js'; +import { createTable } from '../schema/table.js'; + +import { setCQId } from './cqid.js'; + +test('setCQId - should set to random value if deterministicCQId is false', (t): void => { + const resource = new Resource(addCQIDsColumns(createTable({ name: 'table1' })), null, null); + + setCQId(resource, false, () => 'random'); + + t.is(resource.getColumnData(cqIDColumn.name).valid, true); + t.is(resource.getColumnData(cqIDColumn.name).value.toString(), 'random'); +}); + +test('setCQId - should set to random value if deterministicCQId is true and table does not have non _cq_id PKs', (t): void => { + const resource = new Resource(addCQIDsColumns(createTable({ name: 'table1' })), null, null); + + setCQId(resource, true, () => 'random'); + + t.is(resource.getColumnData(cqIDColumn.name).valid, true); + t.is(resource.getColumnData(cqIDColumn.name).value.toString(), 'random'); +}); + +test('setCQId - should set to fixed value if deterministicCQId is true and table has non _cq_id PKs', (t): void => { + const resource = new Resource( + addCQIDsColumns( + createTable({ + name: 'table1', + columns: [ + createColumn({ name: 'pk1', primaryKey: true, unique: true, notNull: true }), + createColumn({ name: 'pk2', primaryKey: true, unique: true, notNull: true }), + createColumn({ name: 'pk3', primaryKey: true, unique: true, notNull: true }), + createColumn({ name: 'non_pk' }), + ], + }), + ), + null, + null, + ); + + resource.setColumData('pk1', 'pk1-value'); + resource.setColumData('pk2', 'pk2-value'); + resource.setColumData('pk3', 'pk3-value'); + resource.setColumData('non_pk', 'non-pk-value'); + + const expectedSha256 = createHash('sha256'); + expectedSha256.update('pk1'); + expectedSha256.update('pk1-value'); + expectedSha256.update('pk2'); + expectedSha256.update('pk2-value'); + expectedSha256.update('pk3'); + expectedSha256.update('pk3-value'); + + setCQId(resource, true); + + t.is(resource.getColumnData(cqIDColumn.name).valid, true); + t.is(resource.getColumnData(cqIDColumn.name).value.toString(), expectedSha256.digest('hex')); +}); diff --git a/src/scheduler/cqid.ts b/src/scheduler/cqid.ts new file mode 100644 index 0000000..4e4ecdd --- /dev/null +++ b/src/scheduler/cqid.ts @@ -0,0 +1,28 @@ +import { createHash } from 'node:crypto'; + +import { v4 as uuidv4 } from 'uuid'; + +import { cqIDColumn } from '../schema/meta.js'; +import { Resource } from '../schema/resource.js'; +import { getPrimaryKeys } from '../schema/table.js'; + +export const setCQId = (resource: Resource, deterministicCQId: boolean, generator: () => string = uuidv4) => { + const randomCQId = generator(); + if (!deterministicCQId) { + resource.setCqId(randomCQId); + } + + const primaryKeys = getPrimaryKeys(resource.table); + const hasNonCqPKs = primaryKeys.some((pk) => pk !== cqIDColumn.name); + if (hasNonCqPKs) { + const sha256 = createHash('sha256'); + primaryKeys.sort(); + for (const pk of primaryKeys) { + sha256.update(pk); + sha256.update(resource.getColumnData(pk).toString()); + } + return resource.setCqId(sha256.digest('hex')); + } + + return resource.setCqId(randomCQId); +}; diff --git a/src/scheduler/scheduler.ts b/src/scheduler/scheduler.ts index 49b2e38..9811c7e 100644 --- a/src/scheduler/scheduler.ts +++ b/src/scheduler/scheduler.ts @@ -8,9 +8,11 @@ import { SyncStream, SyncResponse, MigrateTable, Insert } from '../grpc/plugin.j import { Column } from '../schema/column.js'; import { ClientMeta } from '../schema/meta.js'; import { Resource, encodeResource } from '../schema/resource.js'; -import { Table, encodeTable } from '../schema/table.js'; +import { Table, encodeTable, flattenTables } from '../schema/table.js'; import { Nullable } from '../schema/types.js'; +import { setCQId } from './cqid.js'; + export type Options = { logger: Logger; client: ClientMeta; @@ -49,6 +51,16 @@ class TableResolverStream extends Duplex { } } +const validateResource = (resource: Resource) => { + const missingPKs = resource.table.columns + .filter((column, index) => column.primaryKey && !resource.data[index].valid) + .map((column) => column.name); + + if (missingPKs.length > 0) { + throw new Error(`missing primary key(s) ${missingPKs.join(', ')}`); + } +}; + const resolveColumn = async (client: ClientMeta, table: Table, resource: Resource, column: Column) => { try { return await column.resolver(client, resource, column); @@ -63,6 +75,7 @@ const resolveTable = async ( table: Table, parent: Nullable, syncStream: SyncStream, + deterministicCQId: boolean, ) => { logger.info(`resolving table ${table.name}`); const stream = new TableResolverStream(); @@ -104,21 +117,37 @@ const resolveTable = async ( continue; } + setCQId(resource, deterministicCQId); + validateResource(resource); + syncStream.write(new SyncResponse({ insert: new Insert({ record: encodeResource(resource) }) })); - await pMap(table.relations, (child) => resolveTable(logger, client, child, resource, syncStream)); + await pMap(table.relations, (child) => + resolveTable(logger, client, child, resource, syncStream, deterministicCQId), + ); } }; -const syncDfs = async ({ logger, client, tables, stream: syncStream, concurrency }: Omit) => { +const syncDfs = async ({ + logger, + client, + tables, + stream: syncStream, + concurrency, + deterministicCQId, +}: Omit) => { const tableClients = tables.flatMap((table) => { const clients = table.multiplexer(client); return clients.map((client) => ({ table, client })); }); - await pMap(tableClients, ({ table, client }) => resolveTable(logger, client, table, null, syncStream), { - concurrency, - }); + await pMap( + tableClients, + ({ table, client }) => resolveTable(logger, client, table, null, syncStream, deterministicCQId), + { + concurrency, + }, + ); }; export const getRoundRobinTableClients = (tables: Table[], client: ClientMeta) => { @@ -143,11 +172,16 @@ const syncRoundRobin = async ({ tables, stream: syncStream, concurrency, + deterministicCQId, }: Omit) => { const tableClients = getRoundRobinTableClients(tables, client); - await pMap(tableClients, ({ table, client }) => resolveTable(logger, client, table, null, syncStream), { - concurrency, - }); + await pMap( + tableClients, + ({ table, client }) => resolveTable(logger, client, table, null, syncStream, deterministicCQId), + { + concurrency, + }, + ); }; export const sync = async ({ @@ -159,7 +193,7 @@ export const sync = async ({ strategy = Strategy.dfs, deterministicCQId, }: Options) => { - for (const table of tables) { + for (const table of flattenTables(tables)) { logger.info(`sending migrate message for table ${table.name}`); // eslint-disable-next-line @typescript-eslint/naming-convention stream.write(new SyncResponse({ migrate_table: new MigrateTable({ table: encodeTable(table) }) })); diff --git a/src/schema/column.ts b/src/schema/column.ts index 064e52f..4225c66 100644 --- a/src/schema/column.ts +++ b/src/schema/column.ts @@ -1,6 +1,6 @@ import { isDeepStrictEqual } from 'node:util'; -import { DataType, Field, Bool } from '@apache-arrow/esnext-esm'; +import { DataType, Field, Utf8 } from '@apache-arrow/esnext-esm'; import * as arrow from './arrow.js'; import { ClientMeta } from './meta.js'; @@ -24,7 +24,7 @@ const emptyResolver = () => Promise.resolve(); export const createColumn = ({ name = '', - type = new Bool(), + type = new Utf8(), description = '', incrementalKey = false, notNull = false, diff --git a/src/schema/meta.test.ts b/src/schema/meta.test.ts index 9db4cb3..cdb582f 100644 --- a/src/schema/meta.test.ts +++ b/src/schema/meta.test.ts @@ -1,14 +1,15 @@ import test from 'ava'; import { createColumn } from './column.js'; -import { addCQIDsColumns, cqIDColumn, cqParentIDColumn } from './meta.js'; +import { addCQIDsColumns, cqIDColumn, cqParentIDColumn, parentCqUUIDResolver } from './meta.js'; +import { Resource } from './resource.js'; import { createTable } from './table.js'; test('addCQIDsColumns', (t) => { const table = createTable({ name: 'table1', columns: [ - createColumn({ name: 'column1' }), + createColumn({ name: 'column1', primaryKey: true }), createColumn({ name: 'column2' }), createColumn({ name: 'column3' }), createColumn({ name: 'column4' }), @@ -29,7 +30,7 @@ test('addCQIDsColumns', (t) => { columns: [ createColumn({ name: 'column1' }), createColumn({ name: 'column2' }), - createColumn({ name: 'column3' }), + createColumn({ name: 'column3', primaryKey: true }), ], }), ], @@ -40,26 +41,60 @@ test('addCQIDsColumns', (t) => { const tableWithCQIDs = addCQIDsColumns(table); t.is(tableWithCQIDs.columns.length, 6); - t.is(tableWithCQIDs.columns[0], cqIDColumn); - t.is(tableWithCQIDs.columns[1], cqParentIDColumn); + t.deepEqual(tableWithCQIDs.columns[0], { ...cqIDColumn, primaryKey: false }); + t.deepEqual(tableWithCQIDs.columns[1], cqParentIDColumn); t.is(tableWithCQIDs.relations[0].columns.length, 4); - t.is(tableWithCQIDs.relations[0].columns[0], cqIDColumn); - t.is(tableWithCQIDs.relations[0].columns[1], cqParentIDColumn); + t.deepEqual(tableWithCQIDs.relations[0].columns[0], { ...cqIDColumn, primaryKey: true }); + t.deepEqual(tableWithCQIDs.relations[0].columns[1], cqParentIDColumn); t.is(tableWithCQIDs.relations[1].columns.length, 3); - t.is(tableWithCQIDs.relations[1].columns[0], cqIDColumn); - t.is(tableWithCQIDs.relations[1].columns[1], cqParentIDColumn); + t.deepEqual(tableWithCQIDs.relations[1].columns[0], { ...cqIDColumn, primaryKey: true }); + t.deepEqual(tableWithCQIDs.relations[1].columns[1], cqParentIDColumn); t.is(tableWithCQIDs.relations[2].columns.length, 3); - t.is(tableWithCQIDs.relations[2].columns[0], cqIDColumn); - t.is(tableWithCQIDs.relations[2].columns[1], cqParentIDColumn); + t.deepEqual(tableWithCQIDs.relations[2].columns[0], { ...cqIDColumn, primaryKey: true }); + t.deepEqual(tableWithCQIDs.relations[2].columns[1], cqParentIDColumn); t.is(tableWithCQIDs.relations[3].columns.length, 3); - t.is(tableWithCQIDs.relations[3].columns[0], cqIDColumn); - t.is(tableWithCQIDs.relations[3].columns[1], cqParentIDColumn); + t.deepEqual(tableWithCQIDs.relations[3].columns[0], { ...cqIDColumn, primaryKey: true }); + t.deepEqual(tableWithCQIDs.relations[3].columns[1], cqParentIDColumn); t.is(tableWithCQIDs.relations[3].relations[0].columns.length, 5); - t.is(tableWithCQIDs.relations[3].relations[0].columns[0], cqIDColumn); - t.is(tableWithCQIDs.relations[3].relations[0].columns[1], cqParentIDColumn); + t.deepEqual(tableWithCQIDs.relations[3].relations[0].columns[0], { ...cqIDColumn, primaryKey: false }); + t.deepEqual(tableWithCQIDs.relations[3].relations[0].columns[1], cqParentIDColumn); +}); + +test('parentCqUUIDResolver - should set to null for null parent', (t) => { + const table = addCQIDsColumns(createTable({ name: 'table1' })); + const resource = new Resource(table, null, null); + + parentCqUUIDResolver()({ id: () => '' }, resource, cqParentIDColumn); + + t.is(resource.getColumnData(cqParentIDColumn.name).valid, false); +}); + +test('parentCqUUIDResolver - should set to null for parent with _cq_id column', (t) => { + const table = addCQIDsColumns(createTable({ name: 'table1', relations: [createTable({ name: 'table1-child1' })] })); + + const parentResource = new Resource(table, null, null); + parentResource.setColumData(cqIDColumn.name, null); + const childResource = new Resource(table.relations[0], parentResource, null); + + parentCqUUIDResolver()({ id: () => '' }, childResource, cqParentIDColumn); + + t.is(childResource.getColumnData(cqParentIDColumn.name).valid, false); +}); + +test('parentCqUUIDResolver - should set to _cq_id column value when parent has it', (t) => { + const table = addCQIDsColumns(createTable({ name: 'table1', relations: [createTable({ name: 'table1-child1' })] })); + + const parentResource = new Resource(table, null, null); + parentResource.setColumData(cqIDColumn.name, 'parent-cq-id'); + const childResource = new Resource(table.relations[0], parentResource, null); + + parentCqUUIDResolver()({ id: () => '' }, childResource, cqParentIDColumn); + + t.is(childResource.getColumnData(cqParentIDColumn.name).value, 'parent-cq-id'); + t.is(childResource.getColumnData(cqParentIDColumn.name).valid, true); }); diff --git a/src/schema/meta.ts b/src/schema/meta.ts index d60962f..c1b0dec 100644 --- a/src/schema/meta.ts +++ b/src/schema/meta.ts @@ -4,7 +4,7 @@ import { UUIDType } from '../types/uuid.js'; import { Column, createColumn, ColumnResolver } from './column.js'; import { Resource } from './resource.js'; -import { Table } from './table.js'; +import { Table, getPrimaryKeys } from './table.js'; export type ClientMeta = { id: () => string; @@ -16,9 +16,6 @@ export const parentCqUUIDResolver = (): ColumnResolver => { return Promise.resolve(r.setColumData(c.name, null)); } const parentCqID = r.parent.getColumnData(cqIDColumn.name); - if (parentCqID == null) { - return Promise.resolve(r.setColumData(c.name, null)); - } return Promise.resolve(r.setColumData(c.name, parentCqID)); }; }; @@ -54,9 +51,11 @@ export const cqSourceNameColumn = createColumn({ }); export const addCQIDsColumns = (table: Table): Table => { + const hasPks = getPrimaryKeys(table).length > 0; + const cqID = hasPks ? cqIDColumn : { ...cqIDColumn, primaryKey: true }; return { ...table, - columns: [cqIDColumn, cqParentIDColumn, ...table.columns], + columns: [cqID, cqParentIDColumn, ...table.columns], relations: table.relations.map((relation) => addCQIDsColumns(relation)), }; }; diff --git a/src/schema/resolvers.ts b/src/schema/resolvers.ts index 9b88ef2..aa7e294 100644 --- a/src/schema/resolvers.ts +++ b/src/schema/resolvers.ts @@ -8,3 +8,18 @@ export const pathResolver = (path: string): ColumnResolver => { return Promise.resolve(); }; }; + +export const parentColumnResolver = (parentColumn: string): ColumnResolver => { + return (_, resource, c) => { + const parent = resource.parent; + if (!parent) { + throw new Error(`parent not found for column ${c.name}`); + } + const parentData = parent.getColumnData(parentColumn); + if (!parentData) { + throw new Error(`parent data not found for column ${c.name}`); + } + resource.setColumData(c.name, parentData); + return Promise.resolve(); + }; +}; diff --git a/src/schema/resource.ts b/src/schema/resource.ts index 704ad23..3d70f02 100644 --- a/src/schema/resource.ts +++ b/src/schema/resource.ts @@ -1,7 +1,8 @@ import { tableToIPC, Table as ArrowTable, RecordBatch, vectorFromArray } from '@apache-arrow/esnext-esm'; -import { Scalar, Vector, newScalar } from '../scalar/scalar.js'; +import { Scalar, Vector, newScalar, Stringable } from '../scalar/scalar.js'; +import { cqIDColumn } from './meta.js'; import { Table, toArrowSchema } from './table.js'; import { Nullable } from './types.js'; @@ -18,7 +19,7 @@ export class Resource { this.data = table.columns.map((column) => newScalar(column.type)); } - getColumnData(columnName: string): Scalar { + getColumnData(columnName: string): Scalar { const columnIndex = this.table.columns.findIndex((c) => c.name === columnName); if (columnIndex === undefined) { throw new Error(`Column '${columnName}' not found`); @@ -34,6 +35,14 @@ export class Resource { this.data[columnIndex].value = value; } + setCqId(value: string): void { + const columnIndex = this.table.columns.findIndex((c) => c.name === cqIDColumn.name); + if (columnIndex === -1) { + return; + } + this.data[columnIndex].value = value; + } + getItem(): unknown { return this.item; } diff --git a/src/schema/table.test.ts b/src/schema/table.test.ts index 0181468..3a530a4 100644 --- a/src/schema/table.test.ts +++ b/src/schema/table.test.ts @@ -1,6 +1,6 @@ import test from 'ava'; -import { filterTables, createTable } from './table.js'; +import { filterTables, createTable, flattenTables } from './table.js'; const tableA = createTable({ name: 'a' }); const tableC = createTable({ name: 'c' }); @@ -52,7 +52,7 @@ const testCases = [ { name: 'should skip dependent tables when skipDependentTables is specified', allTables, - tables: ['*'], + tables: ['a', 'b'], skipTables: [], skipDependentTables: true, expected: [tableA, tableB], @@ -75,7 +75,8 @@ testCases.forEach((testCase) => { t.throws(() => filterTables(allTables, tables, skipTables, skipDependentTables), { message: expectedError }); return; } - const actual = filterTables(allTables, tables, skipTables, skipDependentTables); + const filtered = filterTables(allTables, tables, skipTables, skipDependentTables); + const actual = flattenTables(filtered); t.deepEqual( actual.map(({ name }) => name), expected.map(({ name }) => name), diff --git a/src/schema/table.ts b/src/schema/table.ts index 3c63175..4cc46b9 100644 --- a/src/schema/table.ts +++ b/src/schema/table.ts @@ -95,6 +95,21 @@ export const getAllParents = (table: Table): Table[] => { return [table.parent, ...getAllParents(table.parent)]; }; +const getAllChildren = (table: Table): Table[] => { + return table.relations.flatMap((relation) => [relation, ...getAllChildren(relation)]); +}; + +const filterByNamesRecursive = (tables: Table[], names: string[]): Table[] => { + const filtered = tables + .filter((table) => names.includes(table.name)) + .map((table) => ({ ...table, relations: filterByNamesRecursive(table.relations, names) })); + return filtered; +}; + +const deduplicate = (tables: Table[]): Table[] => { + return tables.filter((table, index, array) => array.findIndex((t) => t.name === table.name) === index); +}; + export const filterTables = ( tables: Table[], include: string[], @@ -103,22 +118,22 @@ export const filterTables = ( ): Table[] => { const flattened = flattenTables(tables); - const withIncludes = flattened.filter((table) => { - return isMatch(table.name, include) || getAllParents(table).some((parent) => isMatch(parent.name, include)); - }); + const withIncludes = flattened.filter((table) => isMatch(table.name, include)); + + // Include all children of included tables if skipDependantTables is false + const withChildren = skipDependantTables + ? withIncludes + : deduplicate(withIncludes.flatMap((table) => [table, ...getAllChildren(table)])); + // If a child was included, include the parent as well - const withParents = withIncludes - .flatMap((table) => [...getAllParents(table), table]) - .filter((value, index, array) => array.indexOf(value) === index); + const withParents = deduplicate(withChildren.flatMap((table) => [...getAllParents(table), table])); const withSkipped = withParents.filter((table) => { return !isMatch(table.name, skip) && !getAllParents(table).some((parent) => isMatch(parent.name, skip)); }); - const withSkipDependant = withSkipped.filter((table) => table.parent === null || !skipDependantTables); - const skippedParents = withParents - .filter((table) => table.parent && !withSkipDependant.includes(table.parent)) + .filter((table) => table.parent && !withSkipped.includes(table.parent)) .map((table) => table.parent!.name); if (skippedParents.length > 0) { @@ -127,7 +142,11 @@ export const filterTables = ( ); } - return withSkipDependant; + const filtered = filterByNamesRecursive( + tables, + withSkipped.map((table) => table.name), + ); + return filtered; }; export const toArrowSchema = (table: Table): Schema => { diff --git a/src/types/json.ts b/src/types/json.ts index 0f48a3c..ab0cb4e 100644 --- a/src/types/json.ts +++ b/src/types/json.ts @@ -1,21 +1,13 @@ import { DataType, Type } from '@apache-arrow/esnext-esm'; -export class JSONType extends DataType { +export class JSONType extends DataType { readonly extensionName: string = 'json'; constructor() { super(); - // Assuming there's no direct way to set the storage type in the constructor, - // this is just a representation of the JSONType. } - serialize(): ArrayBuffer { - // Implement your serialization logic here. - return new TextEncoder().encode('json-serialized').buffer; - } - - static deserialize(/*storageType: Binary, serialized: ArrayBuffer*/): JSONType { - // Implement your deserialization logic here. - return new JSONType(); + get typeId(): Type.Utf8 { + return Type.Utf8; } } diff --git a/src/types/uuid.ts b/src/types/uuid.ts index ba58a19..62fa404 100644 --- a/src/types/uuid.ts +++ b/src/types/uuid.ts @@ -1,22 +1,13 @@ import { DataType, Type } from '@apache-arrow/esnext-esm'; -export class UUIDType extends DataType { +export class UUIDType extends DataType { readonly extensionName: string = 'uuid'; constructor() { super(); - // The underlying storage type is a binary of 16 bytes, representing a UUID. - // Assuming there's no direct way to set the storage type in the constructor, - // this is just a representation of the UUIDType. } - serialize(): ArrayBuffer { - // Implement your serialization logic here. - return new TextEncoder().encode('uuid-serialized').buffer; - } - - static deserialize(/*storageType: Binary, serialized: ArrayBuffer*/): UUIDType { - // Implement your deserialization logic here. - return new UUIDType(); + get typeId(): Type.Utf8 { + return Type.Utf8; } }