diff --git a/src/create.js b/src/create.js index a533d7c..a8f33b8 100644 --- a/src/create.js +++ b/src/create.js @@ -1,4 +1,4 @@ -import { validateSchemaForVersion } from './schema.js' +import { maxFieldId, validateSchemaForVersion } from './schema.js' import { uuid4 } from './utils.js' /** @@ -95,19 +95,7 @@ export async function icebergCreate({ } /** - * @import {Field, PartitionField, PartitionSpec, Schema, SortOrder, TableMetadata} from '../src/types.js' - * @param {Field[]} fields - * @returns {number} - */ -function maxFieldId (fields = []) { - let max = 0 - for (const f of fields) { - if (max < f.id) max = f.id - } - return max -} - -/** + * @import {PartitionField, PartitionSpec, Schema, SortOrder, TableMetadata} from '../src/types.js' * @param {PartitionField[]} partitionFields * @returns {number} */ diff --git a/src/schema.js b/src/schema.js index 7af7f38..797d9bb 100644 --- a/src/schema.js +++ b/src/schema.js @@ -1,5 +1,5 @@ /** - * @import {IcebergType, Schema} from '../src/types.js' + * @import {Field, IcebergType, Schema} from '../src/types.js' */ /** @@ -18,6 +18,39 @@ export function validateSchemaForVersion(schema, formatVersion) { } } +/** + * @param {Field[]} fields + * @returns {number} + */ +export function maxFieldId(fields = []) { + let max = 0 + for (const field of fields) { + if (max < field.id) max = field.id + const nested = maxNestedFieldId(field.type) + if (max < nested) max = nested + } + return max +} + +/** + * @param {IcebergType} type + * @returns {number} + */ +function maxNestedFieldId(type) { + if (typeof type === 'string') return 0 + if (type.type === 'list') { + const elementId = type['element-id'] ?? 0 + return Math.max(elementId, maxNestedFieldId(type.element)) + } + if (type.type === 'map') { + const keyId = type['key-id'] ?? 0 + const valueId = type['value-id'] ?? 0 + return Math.max(keyId, valueId, maxNestedFieldId(type.key), maxNestedFieldId(type.value)) + } + if (type.type === 'struct') return maxFieldId(type.fields) + return 0 +} + /** * Spec v3 ยง"Reserved Field IDs": user schemas must not use field ids * greater than 2147483447 (`Integer.MAX_VALUE - 200`). The reserved range @@ -30,7 +63,6 @@ export function validateSchemaForVersion(schema, formatVersion) { const MAX_USER_FIELD_ID = 2147483447 /** - * @import {Field} from '../src/types.js' * @param {Field} field * @param {number} formatVersion * @param {string} path diff --git a/src/write/commit.js b/src/write/commit.js index 205e7a0..72381be 100644 --- a/src/write/commit.js +++ b/src/write/commit.js @@ -1,4 +1,4 @@ -import { validateSchemaForVersion } from '../schema.js' +import { maxFieldId, validateSchemaForVersion } from '../schema.js' import { parseDecimalType } from './conversions.js' /** @@ -240,17 +240,10 @@ export function applyUpdates(metadata, updates) { const newSchema = { ...up.schema, 'schema-id': schemaId } validateSchemaForVersion(newSchema, next['format-version']) const priorLastColumnId = next['last-column-id'] ?? 0 + const priorAssignedIds = currentAssignedIdIndex(schemas, next['current-schema-id']) + validateAssignedFieldIds(newSchema, priorAssignedIds, priorLastColumnId) validateSchemaEvolution(schemas, newSchema, priorLastColumnId, next['format-version']) - for (const field of newSchema.fields) { - if (field.id > priorLastColumnId && field.required) { - if (field['initial-default'] == null) { - throw new Error(`add-schema: required field ${field.name} (id ${field.id}) needs a non-null initial-default`) - } - if (field['write-default'] == null) { - throw new Error(`add-schema: required field ${field.name} (id ${field.id}) needs a non-null write-default`) - } - } - } + validateNewRequiredFields(newSchema, priorLastColumnId) next = { ...next, schemas: [...schemas, newSchema], @@ -347,6 +340,134 @@ export function applyUpdates(metadata, updates) { return next } +/** + * @typedef {{ kind: string, path: string }} AssignedFieldId + */ + +/** + * @param {Schema[]} schemas + * @param {number} currentSchemaId + * @returns {Map} + */ +function currentAssignedIdIndex(schemas, currentSchemaId) { + const currentSchema = schemas.find(s => s['schema-id'] === currentSchemaId) ?? schemas[schemas.length - 1] + const assignedIds = new Map() + if (currentSchema) indexAssignedFieldIds(currentSchema.fields, '', assignedIds) + return assignedIds +} + +/** + * @param {Field[]} fields + * @param {string} prefix + * @param {Map} assignedIds + */ +function indexAssignedFieldIds(fields, prefix, assignedIds) { + for (const field of fields) { + const path = prefix ? `${prefix}.${field.name}` : field.name + assignedIds.set(field.id, { kind: 'field', path }) + indexAssignedTypeIds(field.type, path, assignedIds) + } +} + +/** + * @param {IcebergType} type + * @param {string} path + * @param {Map} assignedIds + */ +function indexAssignedTypeIds(type, path, assignedIds) { + if (typeof type === 'string') return + if (type.type === 'struct') { + indexAssignedFieldIds(type.fields, path, assignedIds) + } else if (type.type === 'list') { + assignedIds.set(type['element-id'], { kind: 'list element', path: `${path}.element` }) + indexAssignedTypeIds(type.element, `${path}.element`, assignedIds) + } else if (type.type === 'map') { + assignedIds.set(type['key-id'], { kind: 'map key', path: `${path}.key` }) + assignedIds.set(type['value-id'], { kind: 'map value', path: `${path}.value` }) + indexAssignedTypeIds(type.key, `${path}.key`, assignedIds) + indexAssignedTypeIds(type.value, `${path}.value`, assignedIds) + } +} + +/** + * @param {Schema} schema + * @param {Map} priorAssignedIds + * @param {number} priorLastColumnId + */ +function validateAssignedFieldIds(schema, priorAssignedIds, priorLastColumnId) { + validateAssignedFields(schema.fields, '', priorAssignedIds, priorLastColumnId) +} + +/** + * @param {Field[]} fields + * @param {string} prefix + * @param {Map} priorAssignedIds + * @param {number} priorLastColumnId + */ +function validateAssignedFields(fields, prefix, priorAssignedIds, priorLastColumnId) { + for (const field of fields) { + const path = prefix ? `${prefix}.${field.name}` : field.name + validateAssignedId(field.id, 'field', path, priorAssignedIds, priorLastColumnId) + validateAssignedTypeIds(field.type, path, priorAssignedIds, priorLastColumnId) + } +} + +/** + * @param {IcebergType} type + * @param {string} path + * @param {Map} priorAssignedIds + * @param {number} priorLastColumnId + */ +function validateAssignedTypeIds(type, path, priorAssignedIds, priorLastColumnId) { + if (typeof type === 'string') return + if (type.type === 'struct') { + validateAssignedFields(type.fields, path, priorAssignedIds, priorLastColumnId) + } else if (type.type === 'list') { + validateAssignedId(type['element-id'], 'list element', `${path}.element`, priorAssignedIds, priorLastColumnId) + validateAssignedTypeIds(type.element, `${path}.element`, priorAssignedIds, priorLastColumnId) + } else if (type.type === 'map') { + validateAssignedId(type['key-id'], 'map key', `${path}.key`, priorAssignedIds, priorLastColumnId) + validateAssignedId(type['value-id'], 'map value', `${path}.value`, priorAssignedIds, priorLastColumnId) + validateAssignedTypeIds(type.key, `${path}.key`, priorAssignedIds, priorLastColumnId) + validateAssignedTypeIds(type.value, `${path}.value`, priorAssignedIds, priorLastColumnId) + } +} + +/** + * @param {number} id + * @param {string} kind + * @param {string} path + * @param {Map} priorAssignedIds + * @param {number} priorLastColumnId + */ +function validateAssignedId(id, kind, path, priorAssignedIds, priorLastColumnId) { + if (id > priorLastColumnId) return + const prior = priorAssignedIds.get(id) + if (!prior) { + throw new Error(`add-schema: ${kind} ${path} uses unassigned id ${id} (last-column-id ${priorLastColumnId})`) + } + if (prior.kind !== kind) { + throw new Error(`add-schema: ${kind} ${path} uses id ${id} previously assigned to ${prior.kind} ${prior.path}`) + } +} + +/** + * @param {Schema} schema + * @param {number} priorLastColumnId + */ +function validateNewRequiredFields(schema, priorLastColumnId) { + for (const field of schema.fields) { + if (field.id > priorLastColumnId && field.required) { + if (field['initial-default'] == null) { + throw new Error(`add-schema: required field ${field.name} (id ${field.id}) needs a non-null initial-default`) + } + if (field['write-default'] == null) { + throw new Error(`add-schema: required field ${field.name} (id ${field.id}) needs a non-null write-default`) + } + } + } +} + /** * Validate schema evolution rules that require comparing the new schema with * prior schemas: existing field ids may be renamed/reordered, but their @@ -546,19 +667,3 @@ function idsListEquivalent(a, b) { } return true } - -/** - * Highest field id at the top level of a schema. Mirrors `create.js` and is - * intentionally non-recursive; nested-type ids are not yet supported on - * schema add. - * - * @param {Field[]} fields - * @returns {number} - */ -function maxFieldId(fields = []) { - let max = 0 - for (const f of fields) { - if (f.id > max) max = f.id - } - return max -} diff --git a/src/write/parquet.js b/src/write/parquet.js index c748d2b..067cfb6 100644 --- a/src/write/parquet.js +++ b/src/write/parquet.js @@ -30,17 +30,12 @@ export function writeParquet({ writer, schema, records, codec }) { for (const field of schema.fields) { const name = sanitize(field.name) - const fieldElements = icebergTypeToParquetFields(name, field) + const fieldElements = icebergTypeToParquetFields(name, field.type, field.required, field.id) if (!fieldElements.length) continue columnData.push({ name, data: extractColumn(records, field), }) - // Iceberg requires parquet columns to carry the iceberg `field-id` so - // readers (Spark, pyiceberg) can map by id instead of by name. The - // top-level element of each iceberg field gets the id; nested logical - // types (variant's metadata/value) inherit it via their parent. - fieldElements[0].field_id = field.id parquetFields.push(...fieldElements) rootChildren++ } @@ -61,31 +56,181 @@ export function writeParquet({ writer, schema, records, codec }) { */ function extractColumn(records, field) { const out = new Array(records.length) - const writeDefault = field['write-default'] for (let i = 0; i < records.length; i++) { - const v = records[i][field.name] - if (v !== undefined) { - out[i] = v - } else { - out[i] = writeDefault !== undefined ? writeDefault : null - } + out[i] = materializeFieldValue(records[i][field.name], field) } return out } /** - * @param {string} name + * @param {any} value * @param {Field} field + * @returns {any} + */ +function materializeFieldValue(value, field) { + const writeDefault = field['write-default'] + const v = value !== undefined ? value : writeDefault !== undefined ? writeDefault : null + return materializeNestedDefaults(v, field.type) +} + +/** + * @param {any} value + * @param {IcebergType} type + * @returns {any} + */ +function materializeNestedDefaults(value, type) { + if (value === null || value === undefined || typeof type !== 'object') return value + if (type.type === 'struct') { + if (typeof value !== 'object' || Array.isArray(value)) return value + const out = { ...value } + for (const child of type.fields) { + out[child.name] = materializeFieldValue(value[child.name], child) + } + return out + } + if (type.type === 'list') { + if (!Array.isArray(value)) return value + return value.map(v => materializeNestedDefaults(v, type.element)) + } + if (type.type === 'map') { + return materializeMapDefaults(value, type) + } + return value +} + +/** + * @param {any} value + * @param {Extract} type + * @returns {any} + */ +function materializeMapDefaults(value, type) { + if (typeof type.key !== 'object' && typeof type.value !== 'object') return value + if (value instanceof Map) { + return Array.from(value.entries(), ([key, entryValue]) => ({ + key: materializeNestedDefaults(key, type.key), + value: materializeNestedDefaults(entryValue, type.value), + })) + } + if (Array.isArray(value)) { + return value.map(entry => { + if (entry && typeof entry === 'object' && 'key' in entry && 'value' in entry) { + return { + key: materializeNestedDefaults(entry.key, type.key), + value: materializeNestedDefaults(entry.value, type.value), + } + } + if (Array.isArray(entry) && entry.length === 2) { + return { + key: materializeNestedDefaults(entry[0], type.key), + value: materializeNestedDefaults(entry[1], type.value), + } + } + return entry + }) + } + if (typeof value === 'object') { + return Object.fromEntries(Object.entries(value).map(([key, entryValue]) => [ + key, + materializeNestedDefaults(entryValue, type.value), + ])) + } + return value +} + +/** + * Iceberg requires parquet columns to carry the iceberg `field-id` so readers + * (Spark, pyiceberg) can map by id instead of by name. The top-level element + * of each iceberg field gets the id; nested logical types (variant's + * metadata/value, list's repeated wrapper) inherit it via their parent. + * + * @param {string} name + * @param {IcebergType} type + * @param {boolean} required + * @param {number} fieldId * @returns {SchemaElement[]} */ -function icebergTypeToParquetFields(name, field) { - const type = typeName(field.type) - const repetition_type = field.required ? 'REQUIRED' : 'OPTIONAL' +function icebergTypeToParquetFields(name, type, required, fieldId) { + const repetition_type = required ? 'REQUIRED' : 'OPTIONAL' + if (typeof type === 'object') { + if (type.type === 'list') { + const elementFields = icebergTypeToParquetFields( + 'element', type.element, type['element-required'], type['element-id'] + ) + if (!elementFields.length) { + throw new Error(`unsupported iceberg list element type: ${typeName(type.element)}`) + } + return [ + { + name, + converted_type: 'LIST', + logical_type: { type: 'LIST' }, + repetition_type, + num_children: 1, + field_id: fieldId, + }, + { name: 'list', repetition_type: 'REPEATED', num_children: 1 }, + ...elementFields, + ] + } + if (type.type === 'struct') { + // Parquet group: group { children... }. + // Use child names verbatim (no `sanitize`) so hyparquet-writer's dremel + // can descend via `record[parent][child]` using the user's iceberg + // names. Nested fields with names that wouldn't survive Avro + // sanitization aren't supported here. + /** @type {SchemaElement[]} */ + const allChildren = [] + let directChildren = 0 + for (const child of type.fields) { + const sub = icebergTypeToParquetFields(child.name, child.type, child.required, child.id) + if (!sub.length) continue + allChildren.push(...sub) + directChildren++ + } + if (!directChildren) { + throw new Error(`struct ${name} has no writable children`) + } + return [ + { name, repetition_type, num_children: directChildren, field_id: fieldId }, + ...allChildren, + ] + } + if (type.type === 'map') { + if (type.key !== 'string' && type.key !== 'int') { + throw new Error(`unsupported iceberg map key type: ${typeName(type.key)}`) + } + // Iceberg map keys are always required (no `key-required` in the spec). + const keyFields = icebergTypeToParquetFields('key', type.key, true, type['key-id']) + const valueFields = icebergTypeToParquetFields( + 'value', type.value, type['value-required'], type['value-id'] + ) + if (!keyFields.length) { + throw new Error(`unsupported iceberg map key type: ${typeName(type.key)}`) + } + if (!valueFields.length) { + throw new Error(`unsupported iceberg map value type: ${typeName(type.value)}`) + } + return [ + { + name, + converted_type: 'MAP', + logical_type: { type: 'MAP' }, + repetition_type, + num_children: 1, + field_id: fieldId, + }, + { name: 'key_value', repetition_type: 'REPEATED', num_children: 2 }, + ...keyFields, + ...valueFields, + ] + } + throw new Error(`unsupported iceberg nested type: ${JSON.stringify(type)}`) + } if (type.startsWith('geometry')) { - return [{ name, type: 'BYTE_ARRAY', logical_type: { type: 'GEOMETRY' }, repetition_type }] + return [{ name, type: 'BYTE_ARRAY', logical_type: { type: 'GEOMETRY' }, repetition_type, field_id: fieldId }] } if (type.startsWith('geography')) { - return [{ name, type: 'BYTE_ARRAY', logical_type: { type: 'GEOGRAPHY' }, repetition_type }] + return [{ name, type: 'BYTE_ARRAY', logical_type: { type: 'GEOGRAPHY' }, repetition_type, field_id: fieldId }] } const decimal = parseDecimalType(type) if (decimal) { @@ -99,33 +244,34 @@ function icebergTypeToParquetFields(name, field) { precision, scale, repetition_type, + field_id: fieldId, }] } const fixedLen = parseFixedType(type) if (fixedLen !== undefined) { - return [{ name, type: 'FIXED_LEN_BYTE_ARRAY', type_length: fixedLen, repetition_type }] + return [{ name, type: 'FIXED_LEN_BYTE_ARRAY', type_length: fixedLen, repetition_type, field_id: fieldId }] } switch (type) { case 'unknown': - if (field.required) throw new Error('unsupported required iceberg type: unknown') + if (required) throw new Error('unsupported required iceberg type: unknown') return [] case 'variant': return [ - { name, repetition_type, num_children: 2, logical_type: { type: 'VARIANT' } }, + { name, repetition_type, num_children: 2, logical_type: { type: 'VARIANT' }, field_id: fieldId }, { name: 'metadata', type: 'BYTE_ARRAY', repetition_type: 'REQUIRED' }, { name: 'value', type: 'BYTE_ARRAY', repetition_type: 'OPTIONAL' }, ] - case 'boolean': return [{ name, type: 'BOOLEAN', repetition_type }] - case 'int': return [{ name, type: 'INT32', repetition_type }] - case 'long': return [{ name, type: 'INT64', repetition_type }] - case 'float': return [{ name, type: 'FLOAT', repetition_type }] - case 'double': return [{ name, type: 'DOUBLE', repetition_type }] - case 'string': return [{ name, type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type }] - case 'binary': return [{ name, type: 'BYTE_ARRAY', repetition_type }] + case 'boolean': return [{ name, type: 'BOOLEAN', repetition_type, field_id: fieldId }] + case 'int': return [{ name, type: 'INT32', repetition_type, field_id: fieldId }] + case 'long': return [{ name, type: 'INT64', repetition_type, field_id: fieldId }] + case 'float': return [{ name, type: 'FLOAT', repetition_type, field_id: fieldId }] + case 'double': return [{ name, type: 'DOUBLE', repetition_type, field_id: fieldId }] + case 'string': return [{ name, type: 'BYTE_ARRAY', converted_type: 'UTF8', repetition_type, field_id: fieldId }] + case 'binary': return [{ name, type: 'BYTE_ARRAY', repetition_type, field_id: fieldId }] case 'uuid': - return [{ name, type: 'FIXED_LEN_BYTE_ARRAY', type_length: 16, logical_type: { type: 'UUID' }, repetition_type }] + return [{ name, type: 'FIXED_LEN_BYTE_ARRAY', type_length: 16, logical_type: { type: 'UUID' }, repetition_type, field_id: fieldId }] case 'date': - return [{ name, type: 'INT32', converted_type: 'DATE', logical_type: { type: 'DATE' }, repetition_type }] + return [{ name, type: 'INT32', converted_type: 'DATE', logical_type: { type: 'DATE' }, repetition_type, field_id: fieldId }] case 'time': return [{ name, @@ -133,15 +279,16 @@ function icebergTypeToParquetFields(name, field) { converted_type: 'TIME_MICROS', logical_type: { type: 'TIME', isAdjustedToUTC: false, unit: 'MICROS' }, repetition_type, + field_id: fieldId, }] case 'timestamp': - return [timestampField(name, repetition_type, false, 'MICROS')] + return [timestampField(name, repetition_type, false, 'MICROS', fieldId)] case 'timestamptz': - return [timestampField(name, repetition_type, true, 'MICROS')] + return [timestampField(name, repetition_type, true, 'MICROS', fieldId)] case 'timestamp_ns': - return [timestampField(name, repetition_type, false, 'NANOS')] + return [timestampField(name, repetition_type, false, 'NANOS', fieldId)] case 'timestamptz_ns': - return [timestampField(name, repetition_type, true, 'NANOS')] + return [timestampField(name, repetition_type, true, 'NANOS', fieldId)] default: throw new Error(`unsupported iceberg type: ${type}`) } @@ -164,13 +311,15 @@ function parseFixedType(type) { * @param {'REQUIRED'|'OPTIONAL'|'REPEATED'} repetition_type * @param {boolean} isAdjustedToUTC * @param {'MICROS'|'NANOS'} unit + * @param {number} field_id * @returns {SchemaElement} */ -function timestampField(name, repetition_type, isAdjustedToUTC, unit) { +function timestampField(name, repetition_type, isAdjustedToUTC, unit, field_id) { return { name, type: 'INT64', logical_type: { type: 'TIMESTAMP', isAdjustedToUTC, unit }, repetition_type, + field_id, } } diff --git a/src/write/stats.js b/src/write/stats.js index 5262842..cb1ed31 100644 --- a/src/write/stats.js +++ b/src/write/stats.js @@ -46,6 +46,10 @@ export function computeColumnStats(records, schema) { for (const field of schema.fields) { const type = typeName(field.type) if (type === 'unknown') continue + // Iceberg metrics are reported per-leaf field id. Skip nested top-level + // fields entirely rather than emit value_counts keyed by the parent id + // (which Spark/pyiceberg never write). + if (type === 'list' || type === 'map' || type === 'struct') continue if (isGeoType(type)) { const { value_count, null_count, lower, upper } = computeGeoBounds(records, field) diff --git a/test/roundtrip.test.js b/test/roundtrip.test.js index 385dbc0..b008a0c 100644 --- a/test/roundtrip.test.js +++ b/test/roundtrip.test.js @@ -82,6 +82,133 @@ describe('icebergCreate + icebergStageAppend + icebergRead round-trip', () => { expect(read).toEqual(records) }) + it('round-trips list columns through icebergCreate + icebergRead', async () => { + vi.spyOn(Date, 'now').mockReturnValue(1700000000000) + const tableUrl = 'http://test/list' + const { resolver } = memResolver() + + /** @type {Schema} */ + const schema = { + type: 'struct', + 'schema-id': 0, + fields: [ + { id: 1, name: 'id', required: true, type: 'long' }, + { + id: 2, + name: 'tags', + required: false, + type: { type: 'list', 'element-id': 3, 'element-required': false, element: 'string' }, + }, + { + id: 4, + name: 'counts', + required: false, + type: { type: 'list', 'element-id': 5, 'element-required': true, element: 'int' }, + }, + ], + } + + const created = await icebergCreate({ tableUrl, resolver, schema }) + expect(created['last-column-id']).toBe(5) + + const records = [ + { id: 1n, tags: ['a', 'b'], counts: [1, 2, 3] }, + { id: 2n, tags: [], counts: [] }, + { id: 3n, tags: ['only'], counts: [42] }, + ] + + const staged = await icebergStageAppend({ tableUrl, metadata: created, records, resolver }) + const committed = await fileCatalogCommit({ tableUrl, metadata: created, staged, resolver }) + + const read = await icebergRead({ tableUrl, metadata: committed, resolver }) + expect(read).toEqual(records) + }) + + it('round-trips a map column through icebergCreate + icebergRead', async () => { + vi.spyOn(Date, 'now').mockReturnValue(1700000000000) + const tableUrl = 'http://test/map' + const { resolver } = memResolver() + + /** @type {Schema} */ + const schema = { + type: 'struct', + 'schema-id': 0, + fields: [ + { id: 1, name: 'id', required: true, type: 'long' }, + { + id: 2, + name: 'attrs', + required: false, + type: { + type: 'map', + 'key-id': 3, + key: 'string', + 'value-id': 4, + 'value-required': false, + value: 'string', + }, + }, + ], + } + + const created = await icebergCreate({ tableUrl, resolver, schema }) + expect(created['last-column-id']).toBe(4) + + const records = [ + { id: 1n, attrs: { color: 'red', size: 'L' } }, + { id: 2n, attrs: {} }, + { id: 3n, attrs: { only: 'one' } }, + ] + + const staged = await icebergStageAppend({ tableUrl, metadata: created, records, resolver }) + const committed = await fileCatalogCommit({ tableUrl, metadata: created, staged, resolver }) + + const read = await icebergRead({ tableUrl, metadata: committed, resolver }) + expect(read).toEqual(records) + }) + + it('round-trips a struct column through icebergCreate + icebergRead', async () => { + vi.spyOn(Date, 'now').mockReturnValue(1700000000000) + const tableUrl = 'http://test/struct' + const { resolver } = memResolver() + + /** @type {Schema} */ + const schema = { + type: 'struct', + 'schema-id': 0, + fields: [ + { id: 1, name: 'id', required: true, type: 'long' }, + { + id: 2, + name: 'point', + required: false, + type: { + type: 'struct', + 'schema-id': 0, + fields: [ + { id: 3, name: 'x', required: true, type: 'double' }, + { id: 4, name: 'y', required: true, type: 'double' }, + ], + }, + }, + ], + } + + const created = await icebergCreate({ tableUrl, resolver, schema }) + expect(created['last-column-id']).toBe(4) + + const records = [ + { id: 1n, point: { x: 1.5, y: 2.5 } }, + { id: 2n, point: { x: -1, y: 0 } }, + ] + + const staged = await icebergStageAppend({ tableUrl, metadata: created, records, resolver }) + const committed = await fileCatalogCommit({ tableUrl, metadata: created, staged, resolver }) + + const read = await icebergRead({ tableUrl, metadata: committed, resolver }) + expect(read).toEqual(records) + }) + it('partitions a decimal column by bucket and round-trips', async () => { vi.spyOn(Date, 'now').mockReturnValue(1700000000000) const tableUrl = 'http://test/decimal-bucket' diff --git a/test/write/metadata-updates.test.js b/test/write/metadata-updates.test.js index 2365145..8975420 100644 --- a/test/write/metadata-updates.test.js +++ b/test/write/metadata-updates.test.js @@ -72,6 +72,63 @@ describe('metadata schema updates', () => { expect(next['last-column-id']).toBe(99) }) + it('updates last-column-id from nested field ids in an added schema', () => { + /** @type {Schema} */ + const nextSchema = { + type: 'struct', + 'schema-id': -1, + fields: [ + ...idSchema.fields, + { + id: 2, + name: 'tags', + required: false, + type: { type: 'list', 'element-id': 3, 'element-required': false, element: 'string' }, + }, + ], + } + + const next = applyUpdates(tableMetadata(), [ + { action: 'add-schema', schema: nextSchema }, + { action: 'set-current-schema', 'schema-id': -1 }, + ]) + + expect(next['current-schema-id']).toBe(1) + expect(next['last-column-id']).toBe(3) + }) + + it('rejects add-schema when a new field fills an id gap below nested last-column-id', () => { + /** @type {Schema} */ + const listSchema = { + type: 'struct', + 'schema-id': 0, + fields: [ + { + id: 1, + name: 'tags', + required: false, + type: { type: 'list', 'element-id': 3, 'element-required': false, element: 'string' }, + }, + ], + } + /** @type {Schema} */ + const nextSchema = { + type: 'struct', + 'schema-id': -1, + fields: [ + ...listSchema.fields, + { id: 2, name: 'tag', required: true, type: 'string' }, + ], + } + + expect(() => applyUpdates(tableMetadata({ + 'last-column-id': 3, + schemas: [listSchema], + }), [ + { action: 'add-schema', schema: nextSchema }, + ])).toThrow(/field tag uses unassigned id 2 \(last-column-id 3\)/) + }) + it('rejects add-schema with write-default on a v2 table', () => { /** @type {Schema} */ const schemaWithDefault = { diff --git a/test/write/parquet.test.js b/test/write/parquet.test.js index 57344f5..af672fa 100644 --- a/test/write/parquet.test.js +++ b/test/write/parquet.test.js @@ -267,6 +267,487 @@ describe('writeParquet', () => { ]) }) + it('writes a list column with 3-level LIST structure and stamped element-id', async () => { + const writer = new ByteWriter() + /** @type {Schema} */ + const listSchema = { + type: 'struct', + 'schema-id': 0, + fields: [ + { id: 1, name: 'id', required: true, type: 'long' }, + { + id: 2, + name: 'scores', + required: false, + type: { type: 'list', 'element-id': 3, 'element-required': false, element: 'long' }, + }, + ], + } + const records = [ + { id: 1n, scores: [10n, 20n] }, + { id: 2n, scores: [] }, + { id: 3n, scores: null }, + { id: 4n, scores: [99n, null, 1n] }, + ] + writeParquet({ writer, schema: listSchema, records }) + const file = writer.getBuffer() + const meta = parquetMetadata(file) + + const rows = await parquetReadObjects({ file, compressors }) + expect(rows[0]).toEqual({ id: 1n, scores: [10n, 20n] }) + expect(rows[1]).toEqual({ id: 2n, scores: [] }) + expect(rows[2].scores ?? null).toBeNull() + expect(rows[3]).toEqual({ id: 4n, scores: [99n, null, 1n] }) + + const scores = meta.schema.find(s => s.name === 'scores') + expect(scores).toMatchObject({ + converted_type: 'LIST', + logical_type: { type: 'LIST' }, + repetition_type: 'OPTIONAL', + num_children: 1, + field_id: 2, + }) + const repeated = meta.schema.find(s => s.name === 'list') + expect(repeated).toMatchObject({ repetition_type: 'REPEATED', num_children: 1 }) + const element = meta.schema.find(s => s.name === 'element') + expect(element).toMatchObject({ type: 'INT64', repetition_type: 'OPTIONAL', field_id: 3 }) + }) + + it('writes a required list with required elements', async () => { + const writer = new ByteWriter() + /** @type {Schema} */ + const listSchema = { + type: 'struct', + 'schema-id': 0, + fields: [ + { + id: 1, + name: 'tags', + required: true, + type: { type: 'list', 'element-id': 2, 'element-required': true, element: 'string' }, + }, + ], + } + const records = [ + { tags: ['a', 'b'] }, + { tags: ['c'] }, + ] + writeParquet({ writer, schema: listSchema, records }) + const file = writer.getBuffer() + const meta = parquetMetadata(file) + + expect(meta.schema.find(s => s.name === 'tags')?.repetition_type).toBe('REQUIRED') + expect(meta.schema.find(s => s.name === 'element')).toMatchObject({ + type: 'BYTE_ARRAY', + converted_type: 'UTF8', + repetition_type: 'REQUIRED', + field_id: 2, + }) + + const rows = await parquetReadObjects({ file, compressors }) + expect(rows).toEqual(records) + }) + + it('writes nested list>', async () => { + const writer = new ByteWriter() + /** @type {Schema} */ + const listSchema = { + type: 'struct', + 'schema-id': 0, + fields: [ + { + id: 1, + name: 'matrix', + required: false, + type: { + type: 'list', + 'element-id': 2, + 'element-required': false, + element: { type: 'list', 'element-id': 3, 'element-required': false, element: 'int' }, + }, + }, + ], + } + const records = [ + { matrix: [[1, 2], [3]] }, + { matrix: [] }, + { matrix: null }, + ] + writeParquet({ writer, schema: listSchema, records }) + const file = writer.getBuffer() + const rows = await parquetReadObjects({ file, compressors }) + expect(rows[0]).toEqual({ matrix: [[1, 2], [3]] }) + expect(rows[1]).toEqual({ matrix: [] }) + expect(rows[2].matrix ?? null).toBeNull() + }) + + it('writes a map column with 3-level MAP structure', async () => { + const writer = new ByteWriter() + /** @type {Schema} */ + const mapSchema = { + type: 'struct', + 'schema-id': 0, + fields: [ + { id: 1, name: 'id', required: true, type: 'long' }, + { + id: 2, + name: 'props', + required: false, + type: { + type: 'map', + 'key-id': 3, + key: 'string', + 'value-id': 4, + 'value-required': false, + value: 'int', + }, + }, + ], + } + const records = [ + { id: 1n, props: { a: 1, b: 2 } }, + { id: 2n, props: {} }, + { id: 3n, props: null }, + { id: 4n, props: { only: 7 } }, + ] + writeParquet({ writer, schema: mapSchema, records }) + const file = writer.getBuffer() + const meta = parquetMetadata(file) + + expect(meta.schema.find(s => s.name === 'props')).toMatchObject({ + converted_type: 'MAP', + logical_type: { type: 'MAP' }, + repetition_type: 'OPTIONAL', + num_children: 1, + field_id: 2, + }) + expect(meta.schema.find(s => s.name === 'key_value')).toMatchObject({ + repetition_type: 'REPEATED', + num_children: 2, + }) + expect(meta.schema.find(s => s.name === 'key')).toMatchObject({ + type: 'BYTE_ARRAY', + converted_type: 'UTF8', + repetition_type: 'REQUIRED', + field_id: 3, + }) + expect(meta.schema.find(s => s.name === 'value')).toMatchObject({ + type: 'INT32', + repetition_type: 'OPTIONAL', + field_id: 4, + }) + + const rows = await parquetReadObjects({ file, compressors }) + expect(rows[0]).toEqual({ id: 1n, props: { a: 1, b: 2 } }) + expect(rows[1]).toEqual({ id: 2n, props: {} }) + expect(rows[2].props ?? null).toBeNull() + expect(rows[3]).toEqual({ id: 4n, props: { only: 7 } }) + }) + + it('rejects unsupported map key types', () => { + const writer = new ByteWriter() + /** @type {Schema} */ + const mapSchema = { + type: 'struct', + 'schema-id': 0, + fields: [ + { + id: 1, + name: 'm', + required: false, + type: { + type: 'map', + 'key-id': 2, + key: 'long', + 'value-id': 3, + 'value-required': false, + value: 'string', + }, + }, + ], + } + + expect(() => writeParquet({ writer, schema: mapSchema, records: [{ m: new Map([[1n, 'one']]) }] })) + .toThrow('unsupported iceberg map key type: long') + }) + + it('writes a map column', async () => { + const writer = new ByteWriter() + /** @type {Schema} */ + const mapSchema = { + type: 'struct', + 'schema-id': 0, + fields: [ + { + id: 1, + name: 'm', + required: false, + type: { + type: 'map', + 'key-id': 2, + key: 'int', + 'value-id': 3, + 'value-required': false, + value: 'string', + }, + }, + ], + } + + writeParquet({ writer, schema: mapSchema, records: [{ m: new Map([[1, 'one'], [2, 'two']]) }] }) + const rows = await parquetReadObjects({ file: writer.getBuffer(), compressors }) + expect(rows).toEqual([{ m: { 1: 'one', 2: 'two' } }]) + }) + + it('accepts ES Map and array-of-pairs inputs for map columns', async () => { + const writer = new ByteWriter() + /** @type {Schema} */ + const mapSchema = { + type: 'struct', + 'schema-id': 0, + fields: [ + { + id: 1, + name: 'm', + required: true, + type: { + type: 'map', + 'key-id': 2, + key: 'string', + 'value-id': 3, + 'value-required': true, + value: 'long', + }, + }, + ], + } + const records = [ + { m: new Map([['x', 1n], ['y', 2n]]) }, + { m: [['k1', 10n], ['k2', 20n]] }, + { m: [{ key: 'k', value: 99n }] }, + ] + writeParquet({ writer, schema: mapSchema, records }) + const rows = await parquetReadObjects({ file: writer.getBuffer(), compressors }) + expect(rows).toEqual([ + { m: { x: 1n, y: 2n } }, + { m: { k1: 10n, k2: 20n } }, + { m: { k: 99n } }, + ]) + }) + + it('writes nested list>', async () => { + const writer = new ByteWriter() + /** @type {Schema} */ + const nestedSchema = { + type: 'struct', + 'schema-id': 0, + fields: [ + { + id: 1, + name: 'sessions', + required: false, + type: { + type: 'list', + 'element-id': 2, + 'element-required': false, + element: { + type: 'map', + 'key-id': 3, + key: 'string', + 'value-id': 4, + 'value-required': false, + value: 'int', + }, + }, + }, + ], + } + const records = [ + { sessions: [{ a: 1 }, { b: 2, c: 3 }] }, + { sessions: [] }, + ] + writeParquet({ writer, schema: nestedSchema, records }) + const rows = await parquetReadObjects({ file: writer.getBuffer(), compressors }) + expect(rows).toEqual(records) + }) + + it('writes a struct column as a parquet group with stamped child field ids', async () => { + const writer = new ByteWriter() + /** @type {Schema} */ + const structSchema = { + type: 'struct', + 'schema-id': 0, + fields: [ + { id: 1, name: 'id', required: true, type: 'long' }, + { + id: 2, + name: 'point', + required: false, + type: { + type: 'struct', + 'schema-id': 0, + fields: [ + { id: 3, name: 'x', required: true, type: 'double' }, + { id: 4, name: 'y', required: true, type: 'double' }, + ], + }, + }, + ], + } + const records = [ + { id: 1n, point: { x: 1.5, y: 2.5 } }, + { id: 2n, point: null }, + { id: 3n, point: { x: -1, y: 0 } }, + ] + writeParquet({ writer, schema: structSchema, records }) + const file = writer.getBuffer() + const meta = parquetMetadata(file) + + expect(meta.schema.find(s => s.name === 'point')).toMatchObject({ + repetition_type: 'OPTIONAL', + num_children: 2, + field_id: 2, + }) + expect(meta.schema.find(s => s.name === 'x')).toMatchObject({ + type: 'DOUBLE', + repetition_type: 'REQUIRED', + field_id: 3, + }) + expect(meta.schema.find(s => s.name === 'y')).toMatchObject({ + type: 'DOUBLE', + repetition_type: 'REQUIRED', + field_id: 4, + }) + + const rows = await parquetReadObjects({ file, compressors }) + expect(rows[0]).toEqual({ id: 1n, point: { x: 1.5, y: 2.5 } }) + expect(rows[1].point ?? null).toBeNull() + expect(rows[2]).toEqual({ id: 3n, point: { x: -1, y: 0 } }) + }) + + it('materializes nested struct write-defaults', async () => { + const writer = new ByteWriter() + /** @type {Schema} */ + const structSchema = { + type: 'struct', + 'schema-id': 0, + fields: [ + { id: 1, name: 'id', required: true, type: 'long' }, + { + id: 2, + name: 'point', + required: false, + type: { + type: 'struct', + 'schema-id': 0, + fields: [ + { id: 3, name: 'x', required: true, type: 'double', 'write-default': 0 }, + { id: 4, name: 'y', required: false, type: 'double', 'write-default': 0 }, + ], + }, + }, + ], + } + const records = [ + { id: 1n, point: { x: 1.5, y: 2.5 } }, + { id: 2n, point: {} }, + { id: 3n, point: { y: 4.5 } }, + { id: 4n, point: null }, + ] + writeParquet({ writer, schema: structSchema, records }) + + const rows = await parquetReadObjects({ file: writer.getBuffer(), compressors }) + expect(rows[0]).toEqual({ id: 1n, point: { x: 1.5, y: 2.5 } }) + expect(rows[1]).toEqual({ id: 2n, point: { x: 0, y: 0 } }) + expect(rows[2]).toEqual({ id: 3n, point: { x: 0, y: 4.5 } }) + expect(rows[3].point ?? null).toBeNull() + }) + + it('writes a struct containing list and map fields', async () => { + const writer = new ByteWriter() + /** @type {Schema} */ + const nestedSchema = { + type: 'struct', + 'schema-id': 0, + fields: [ + { + id: 1, + name: 'profile', + required: true, + type: { + type: 'struct', + 'schema-id': 0, + fields: [ + { id: 2, name: 'name', required: true, type: 'string' }, + { + id: 3, + name: 'roles', + required: false, + type: { type: 'list', 'element-id': 4, 'element-required': true, element: 'string' }, + }, + { + id: 5, + name: 'meta', + required: false, + type: { + type: 'map', + 'key-id': 6, + key: 'string', + 'value-id': 7, + 'value-required': true, + value: 'int', + }, + }, + ], + }, + }, + ], + } + const records = [ + { profile: { name: 'alice', roles: ['admin', 'editor'], meta: { age: 30 } } }, + { profile: { name: 'bob', roles: [], meta: {} } }, + ] + writeParquet({ writer, schema: nestedSchema, records }) + const rows = await parquetReadObjects({ file: writer.getBuffer(), compressors }) + expect(rows).toEqual(records) + }) + + it('writes a struct nested inside a list', async () => { + const writer = new ByteWriter() + /** @type {Schema} */ + const nestedSchema = { + type: 'struct', + 'schema-id': 0, + fields: [ + { + id: 1, + name: 'points', + required: false, + type: { + type: 'list', + 'element-id': 2, + 'element-required': true, + element: { + type: 'struct', + 'schema-id': 0, + fields: [ + { id: 3, name: 'x', required: true, type: 'int' }, + { id: 4, name: 'y', required: true, type: 'int' }, + ], + }, + }, + }, + ], + } + const records = [ + { points: [{ x: 1, y: 2 }, { x: 3, y: 4 }] }, + { points: [] }, + ] + writeParquet({ writer, schema: nestedSchema, records }) + const rows = await parquetReadObjects({ file: writer.getBuffer(), compressors }) + expect(rows).toEqual(records) + }) + it('rejects required unknown columns', () => { const writer = new ByteWriter() /** @type {Schema} */