Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions hypaware-core/plugins-workspace/ai-gateway/src/dataset.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { AI_GATEWAY_MESSAGE_COLUMNS } from './message_projector.js'
/**
* @import { ColumnSpec, DatasetDataSourceContext, DatasetDiscoveryContext, DatasetRefreshResult, DatasetRegistration, QueryPartition, QueryStorageService } from '../../../../collectivus-plugin-kernel-types.d.ts'
* @import { ExtendedQueryStorageService } from '../../../../src/core/cache/types.d.ts'
* @import { AsyncDataSource } from 'squirreling'
*/

export const DATASET_NAME = 'ai_gateway_messages'
Expand Down Expand Up @@ -117,7 +118,7 @@ export async function createDataSource(partitions, ctx) {
tablePaths.add(p.path)
}

/** @type {import('squirreling').AsyncDataSource[]} */
/** @type {AsyncDataSource[]} */
const sources = []
for (const tablePath of tablePaths) {
const source = await storage.dataSourceForTable(tablePath)
Expand All @@ -132,8 +133,8 @@ export async function createDataSource(partitions, ctx) {
/**
* Merge multiple AsyncDataSources into a single union source.
*
* @param {import('squirreling').AsyncDataSource[]} sources
* @returns {import('squirreling').AsyncDataSource}
* @param {AsyncDataSource[]} sources
* @returns {AsyncDataSource}
*/
function unionSources(sources) {
/** @type {Set<string>} */
Expand Down
6 changes: 4 additions & 2 deletions hypaware-core/plugins-workspace/format-iceberg/src/commit.js
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ const DEFAULT_STREAM_ROW_LIMIT = 100_000
* }} input
* @param {{ exists: boolean, metadata: TableMetadata | null }} priorState
* @param {{ batchByteLimit?: number, batchRowLimit?: number }} [opts]
* @returns {Promise<{ snapshotId: string, bytesWritten: number, rowCount: number, batchCount: number, dataFiles: string[] }>}
* @returns {Promise<{ snapshotId: string, metadataVersion: string, bytesWritten: number, rowCount: number, batchCount: number, dataFiles: string[] }>}
*/
export async function commitRowStream(input, priorState, opts = {}) {
const batchByteLimit = opts.batchByteLimit ?? DEFAULT_STREAM_BYTE_LIMIT
Expand All @@ -156,6 +156,7 @@ export async function commitRowStream(input, priorState, opts = {}) {
let totalRowCount = 0
let batchCount = 0
let lastSnapshotId = ''
let lastMetadataVersion = ''
/** @type {string[]} */
const allDataFiles = []

Expand All @@ -174,6 +175,7 @@ export async function commitRowStream(input, priorState, opts = {}) {
totalRowCount += result.rowCount || batch.length
batchCount += 1
lastSnapshotId = result.snapshotId
lastMetadataVersion = result.metadataVersion
allDataFiles.push(...result.dataFiles)
batch = []
batchBytes = 0
Expand All @@ -188,7 +190,7 @@ export async function commitRowStream(input, priorState, opts = {}) {
}
await flushBatch()

return { snapshotId: lastSnapshotId, bytesWritten: totalBytesWritten, rowCount: totalRowCount, batchCount, dataFiles: allDataFiles }
return { snapshotId: lastSnapshotId, metadataVersion: lastMetadataVersion, bytesWritten: totalBytesWritten, rowCount: totalRowCount, batchCount, dataFiles: allDataFiles }
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,7 @@ import { createBlobStoreIO, tableUrlForBlobPrefix } from './blob-io.js'
/**
* @import { BlobStore } from '../../../../collectivus-plugin-kernel-types.d.ts'
* @import { Resolver, Lister, TableMetadata } from 'icebird/src/types.js'
*/

/**
* @typedef {{
* min_snapshots_to_keep: number
* max_snapshot_age_hours: number
* }} ExportRetentionConfig
*
* @typedef {{
* dataset: string
* snapshotsExpired: number
* snapshotsBefore: number
* compactionSupported: false
* }} ExportMaintenanceDatasetReport
*
* @typedef {{
* datasets: ExportMaintenanceDatasetReport[]
* totalSnapshotsExpired: number
* compactionSupported: false
* dryRun: boolean
* elapsedMs: number
* }} ExportMaintenanceReport
* @import { ExportRetentionConfig, ExportMaintenanceDatasetReport, ExportMaintenanceReport } from './types.d.ts'
*/

/** @type {ExportRetentionConfig} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { loadMarker, markerKey, markerSubsumedBySnapshot, writeMarker } from './

/**
* @import { BlobStore, ColumnSpec, ExportBatch, ExportOptions, ExportResult, PluginLogger, QueryPartition, QueryRegistry, QueryStorageService, Sink, SinkEncoder, TableFormatCreateContext, TableFormatProvider } from '../../../../collectivus-plugin-kernel-types.d.ts'
* @import { ExportRetentionConfig } from './types.d.ts'
*/

const PLUGIN_NAME = '@hypaware/format-iceberg'
Expand Down Expand Up @@ -68,7 +69,7 @@ function buildSink(ctx) {
const config = ctx.sinkInstanceConfig ?? {}
const prefix = resolvePrefix(config)
const log = ctx.log
const maintenanceConfig = /** @type {Partial<import('./maintenance.js').ExportRetentionConfig> | undefined} */ (
const maintenanceConfig = /** @type {Partial<ExportRetentionConfig> | undefined} */ (
config.maintenance
)

Expand Down Expand Up @@ -176,7 +177,7 @@ function buildSink(ctx) {
* partitions: QueryPartition[],
* prefix: string,
* log: PluginLogger,
* maintenanceConfig?: Partial<import('./maintenance.js').ExportRetentionConfig>,
* maintenanceConfig?: Partial<ExportRetentionConfig>,
* }} input
* @returns {Promise<{ partitionsExported: number, bytesWritten: number, status: 'committed' | 'skipped' }>}
*/
Expand Down Expand Up @@ -336,7 +337,7 @@ async function exportDataset({ ctx, batch, dataset, partitions, prefix, log, mai
bytesWritten: 0,
dataFiles: [],
snapshotId: priorState.currentSnapshotId,
metadataVersion: '',
metadataVersion: `v${priorState.metadata?.['last-sequence-number'] ?? priorState.metadata?.['format-version'] ?? 1}`,
committedAt: new Date().toISOString(),
})
}
Expand All @@ -351,7 +352,7 @@ async function exportDataset({ ctx, batch, dataset, partitions, prefix, log, mai
bytesWritten: commit.bytesWritten,
dataFiles: commit.dataFiles,
snapshotId: commit.snapshotId,
metadataVersion: '',
metadataVersion: commit.metadataVersion,
committedAt: new Date().toISOString(),
})

Expand Down
20 changes: 20 additions & 0 deletions hypaware-core/plugins-workspace/format-iceberg/src/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,23 @@ export interface BlobIOWriteEvent {

export type BlobIOWriteObserver = (event: BlobIOWriteEvent) => void

export interface ExportRetentionConfig {
min_snapshots_to_keep: number
max_snapshot_age_hours: number
}

export interface ExportMaintenanceDatasetReport {
dataset: string
snapshotsExpired: number
snapshotsBefore: number
compactionSupported: false
}

export interface ExportMaintenanceReport {
datasets: ExportMaintenanceDatasetReport[]
totalSnapshotsExpired: number
compactionSupported: false
dryRun: boolean
elapsedMs: number
}

12 changes: 8 additions & 4 deletions src/core/cache/maintenance.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ import { appendRowsToTable, scanRowsFromTable, tableExists } from './iceberg/sto
* MaintenanceOptions,
* MaintenancePartitionReport,
* MaintenanceReport,
* PartitionCursor,
* } from './types.d.ts'
* @import { ColumnSpec } from '../../../collectivus-plugin-kernel-types.d.ts'
* @import { TableMetadata } from 'icebird/src/types.js'
* @import { Dirent } from 'node:fs'
*/

export const SNAPSHOT_RETENTION_DEFAULTS = Object.freeze({
Expand Down Expand Up @@ -215,7 +219,7 @@ async function expireSnapshots(epochDir, cfg, opts) {
const url = tableUrlForDir(epochDir)
const { resolver, lister } = await createLocalIcebergIO()

/** @type {import('icebird/src/types.js').TableMetadata} */
/** @type {TableMetadata} */
let metadata
try {
const loaded = await loadLatestFileCatalogMetadata({ tableUrl: url, resolver, lister })
Expand Down Expand Up @@ -274,7 +278,7 @@ function needsCompaction(epochDir, cfg) {

/**
* @param {string} partitionDir
* @param {import('./types.d.ts').PartitionCursor} cursor
* @param {PartitionCursor} cursor
* @param {MaintenanceConfig} _cfg
* @returns {Promise<{ newEpoch: number, rowCount: number, dataFiles: number } | null>}
*/
Expand All @@ -288,7 +292,7 @@ async function compactPartition(partitionDir, cursor, _cfg) {
const newEpochDir = path.join(partitionDir, `epoch=${newEpoch}`)

const seen = new Set()
/** @type {import('../../../collectivus-plugin-kernel-types.d.ts').ColumnSpec[] | null} */
/** @type {ColumnSpec[] | null} */
let columns = null
/** @type {Record<string, unknown>[]} */
let batch = []
Expand Down Expand Up @@ -357,7 +361,7 @@ async function cleanRetiredEpochs(cacheRoot) {
* @param {string} dir
*/
async function walkForRetired(dir) {
/** @type {import('node:fs').Dirent[]} */
/** @type {Dirent[]} */
let entries
try {
entries = await fsPromises.readdir(dir, { withFileTypes: true })
Expand Down
6 changes: 1 addition & 5 deletions src/core/cache/spool.js
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,7 @@ export function createCacheSpool(args) {
await writeProgress(filePath, batch.resumeOffset)
}

if (fileMalformed > 0) {
malformedCount += fileMalformed
continue
}

malformedCount += fileMalformed
await removeProgress(filePath)
await fs.rm(filePath, { force: true })
}
Expand Down
39 changes: 35 additions & 4 deletions src/core/cache/storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
} from './partition.js'
import { cacheTablePath, datasetForTablePath } from './paths.js'
import { createCacheSpool, DEFAULT_SPOOL_BYTES_THRESHOLD } from './spool.js'
import { INTERNAL_FIELDS } from './streaming-reader.js'

import path from 'node:path'

Expand Down Expand Up @@ -121,12 +122,42 @@ export function createQueryStorageService({ cacheRoot }) {
return icebergTableUrl(resolveIcebergDir(tablePath))
},

readRows(tablePath, columns) {
return scanRowsFromTable(resolveIcebergDir(tablePath), columns)
async *readRows(tablePath, columns) {
const projected = columns?.filter((c) => !INTERNAL_FIELDS.includes(c))
for await (const row of scanRowsFromTable(resolveIcebergDir(tablePath), projected)) {
for (const f of INTERNAL_FIELDS) delete row[f]
yield row
}
},

dataSourceForTable(tablePath) {
return dataSourceForTable(resolveIcebergDir(tablePath))
async dataSourceForTable(tablePath) {
const source = await dataSourceForTable(resolveIcebergDir(tablePath))
if (!source) return null
return {
numRows: source.numRows,
columns: source.columns.filter((c) => !INTERNAL_FIELDS.includes(c)),
scan(options) {
const inner = source.scan({
...options,
columns: options.columns?.filter((c) => !INTERNAL_FIELDS.includes(c)),
})
return {
appliedWhere: inner.appliedWhere,
appliedLimitOffset: inner.appliedLimitOffset,
async *rows() {
for await (const row of inner.rows()) {
yield {
columns: row.columns.filter((c) => !INTERNAL_FIELDS.includes(c)),
cells: row.cells,
resolved: row.resolved
? Object.fromEntries(Object.entries(row.resolved).filter(([k]) => !INTERNAL_FIELDS.includes(k)))
: undefined,
}
}
},
}
},
}
},

async flushTable(tablePath, opts = {}) {
Expand Down
15 changes: 1 addition & 14 deletions src/core/cache/streaming-reader.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,7 @@ import fs from 'node:fs/promises'

/**
* @import { ColumnSpec } from '../../../collectivus-plugin-kernel-types.d.ts'
*/

/**
* @typedef {{
* columns: readonly ColumnSpec[],
* rows: Record<string, unknown>[],
* }} FlushChunk
*/

/**
* @typedef {{
* byteOffset: number,
* updatedAt: string,
* }} ProgressState
* @import { FlushChunk, ProgressState } from './types.d.ts'
*/

export const BATCH_BYTE_LIMIT = 128 * 1024 * 1024
Expand Down
10 changes: 10 additions & 0 deletions src/core/cache/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ export interface RetentionConfig {
wait_for_sink_ack?: boolean
}

export interface FlushChunk {
columns: readonly ColumnSpec[]
rows: Record<string, unknown>[]
}

export interface ProgressState {
byteOffset: number
updatedAt: string
}

export interface SpoolAppendResult {
bytesWritten: number
pendingBytes: number
Expand Down
6 changes: 5 additions & 1 deletion test/core/cache-migrate.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ import { appendRowsToPartition, discoverCachePartitions } from '../../src/core/c
import { appendRowsToTable } from '../../src/core/cache/iceberg/store.js'
import { migrateLegacyPartitions } from '../../src/core/cache/migrate.js'

/** @type {import('../../collectivus-plugin-kernel-types.d.ts').ColumnSpec[]} */
/**
* @import { ColumnSpec } from '../../collectivus-plugin-kernel-types.d.ts'
*/

/** @type {ColumnSpec[]} */
const TEST_COLUMNS = [
{ name: 'id', type: 'INT32', nullable: false },
{ name: 'value', type: 'STRING', nullable: true },
Expand Down
6 changes: 5 additions & 1 deletion test/core/cache-partition.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ import {
} from '../../src/core/cache/partition.js'
import { appendRowsToTable } from '../../src/core/cache/iceberg/store.js'

/** @type {import('../../collectivus-plugin-kernel-types.d.ts').ColumnSpec[]} */
/**
* @import { ColumnSpec } from '../../collectivus-plugin-kernel-types.d.ts'
*/

/** @type {ColumnSpec[]} */
const TEST_COLUMNS = [
{ name: 'id', type: 'INT32', nullable: false },
{ name: 'value', type: 'STRING', nullable: true },
Expand Down