Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions hypaware-core/plugins-workspace/format-iceberg/src/commit.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,72 @@ export async function commitBatch(input, priorState) {
dataFiles,
bytesWritten: toNumber(summary['added-files-size']),
rowCount: toNumber(summary['added-records']),
metadata: postMetadata,
}
}

const DEFAULT_STREAM_BYTE_LIMIT = 128 * 1024 * 1024
const DEFAULT_STREAM_ROW_LIMIT = 100_000

/**
* Stream rows from an async iterable and commit them in target-sized
* batches. Each batch is a single Iceberg append; the table is
* created on the first non-empty batch.
*
* Returns cumulative stats across all committed batches.
*
* @param {{
* tableUrl: string,
* columns: readonly ColumnSpec[],
* rows: AsyncIterable<Record<string, unknown>>,
* resolver: Resolver,
* lister: Lister,
* }} input
* @param {{ exists: boolean, metadata: TableMetadata | null }} priorState
* @param {{ batchByteLimit?: number, batchRowLimit?: number }} [opts]
* @returns {Promise<{ snapshotId: string, bytesWritten: number, rowCount: number, batchCount: number }>}
*/
export async function commitRowStream(input, priorState, opts = {}) {
const batchByteLimit = opts.batchByteLimit ?? DEFAULT_STREAM_BYTE_LIMIT
const batchRowLimit = opts.batchRowLimit ?? DEFAULT_STREAM_ROW_LIMIT

let state = { exists: priorState.exists, metadata: priorState.metadata }
let totalBytesWritten = 0
let totalRowCount = 0
let batchCount = 0
let lastSnapshotId = ''

/** @type {Record<string, unknown>[]} */
let batch = []
let batchBytes = 0

async function flushBatch() {
if (batch.length === 0) return
const result = await commitBatch(
{ tableUrl: input.tableUrl, columns: input.columns, rows: batch, resolver: input.resolver, lister: input.lister },
state
)
state = { exists: true, metadata: result.metadata }
totalBytesWritten += result.bytesWritten
totalRowCount += result.rowCount || batch.length
batchCount += 1
lastSnapshotId = result.snapshotId
batch = []
batchBytes = 0
}

for await (const row of input.rows) {
batch.push(row)
batchBytes += Buffer.byteLength(JSON.stringify(row), 'utf8')
if (batch.length >= batchRowLimit || batchBytes >= batchByteLimit) {
await flushBatch()
}
}
await flushBatch()

return { snapshotId: lastSnapshotId, bytesWritten: totalBytesWritten, rowCount: totalRowCount, batchCount }
}

/**
* Pluck data-file paths out of the snapshot summary. Iceberg V2/V3
* snapshots don't surface the path list directly; we use the manifest
Expand Down
85 changes: 40 additions & 45 deletions hypaware-core/plugins-workspace/format-iceberg/src/table-format.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import { getTracer, SpanStatusCode } from '../../../../src/core/observability/index.js'

import { createBlobStoreIO, pathToKey, tableUrlForBlobPrefix } from './blob-io.js'
import { commitBatch, probeTable } from './commit.js'
import { commitBatch, commitRowStream, probeTable } from './commit.js'
import { loadMarker, markerKey, markerSubsumedBySnapshot, writeMarker } from './state.js'

/**
Expand Down Expand Up @@ -207,21 +207,11 @@ async function exportDataset({ ctx, batch, dataset, partitions, prefix, log }) {
const markerPath = markerKey(prefix, ctx.name, dataset, batch.batchId)
const tracer = getTracer('plugin.format-iceberg')

// Drain rows up-front so the commit span only fires once we've
// already paid the IO cost. `storage.flushTable` is called per
// partition so any pending cache buffer lands before we read.
/** @type {Record<string, unknown>[]} */
const rows = []
let rowsLoaded = 0
// Flush any pending spool buffers so the row iterables are current.
for (const partition of partitions) {
if (partition.tablePath) {
await flushIfSupported(ctx.storage, partition.tablePath, 'iceberg_export')
}
const iterable = openRows(ctx.storage, partition)
for await (const row of iterable) {
rows.push(row)
rowsLoaded += 1
}
}

// Probe the table and check the marker before staging anything.
Expand Down Expand Up @@ -267,29 +257,13 @@ async function exportDataset({ ctx, batch, dataset, partitions, prefix, log }) {
return { partitionsExported: partitions.length, bytesWritten: 0, status: 'skipped' }
}

// Empty batches still emit a no-op trace + marker so a partition
// with zero ready rows doesn't leave the batch in a half-state.
if (rows.length === 0) {
log.debug('iceberg.export_dataset.empty', {
hyp_plugin: PLUGIN_NAME,
hyp_sink_instance: ctx.name,
hyp_dataset: dataset,
hyp_batch_id: batch.batchId,
})
if (priorState.currentSnapshotId) {
await writeMarker(ctx.blobStore, markerPath, {
dataset,
batchId: batch.batchId,
partition: collectPartitionKeys(partitions),
rowCount: 0,
bytesWritten: 0,
dataFiles: [],
snapshotId: priorState.currentSnapshotId,
metadataVersion: '',
committedAt: new Date().toISOString(),
})
// Stream rows through target-sized batch commits instead of draining
// everything into memory first. Each batch is a single Iceberg append.
async function* rowStream() {
for (const partition of partitions) {
const iterable = openRows(ctx.storage, partition)
for await (const row of iterable) yield row
}
return { partitionsExported: partitions.length, bytesWritten: 0, status: 'skipped' }
}

const commitSpanName = priorState.exists ? 'iceberg.snapshot.commit' : 'iceberg.table.create'
Expand All @@ -302,21 +276,20 @@ async function exportDataset({ ctx, batch, dataset, partitions, prefix, log }) {
hyp_dataset: dataset,
hyp_batch_id: batch.batchId,
encoder_format: ctx.encoder.format,
row_count: rowsLoaded,
status: 'ok',
...destinationAttrs,
},
},
async (span) => {
try {
const result = await commitBatch(
{ tableUrl, columns, rows, resolver, lister },
const result = await commitRowStream(
{ tableUrl, columns, rows: rowStream(), resolver, lister },
{ exists: priorState.exists, metadata: priorState.metadata }
)
span.setAttribute('snapshot_id', result.snapshotId)
span.setAttribute('metadata_version', result.metadataVersion)
span.setAttribute('data_file_count', result.dataFiles.length)
span.setAttribute('bytes_written', result.bytesWritten)
span.setAttribute('row_count', result.rowCount)
span.setAttribute('batch_count', result.batchCount)
if (lastMetadataWrite?.etag) span.setAttribute('etag', lastMetadataWrite.etag)
return result
} catch (err) {
Expand All @@ -325,8 +298,6 @@ async function exportDataset({ ctx, batch, dataset, partitions, prefix, log }) {
span.setStatus({ code: SpanStatusCode.ERROR, message })
span.setAttribute('status', 'failed')
span.setAttribute('error_kind', errorKind)
// Sister log so consumers don't have to walk the trace tree to
// see the failure name.
log.warn('iceberg.snapshot.commit_failed', {
hyp_plugin: PLUGIN_NAME,
hyp_sink_instance: ctx.name,
Expand All @@ -343,15 +314,38 @@ async function exportDataset({ ctx, batch, dataset, partitions, prefix, log }) {
}
)

if (commit.rowCount === 0) {
log.debug('iceberg.export_dataset.empty', {
hyp_plugin: PLUGIN_NAME,
hyp_sink_instance: ctx.name,
hyp_dataset: dataset,
hyp_batch_id: batch.batchId,
})
if (priorState.currentSnapshotId) {
await writeMarker(ctx.blobStore, markerPath, {
dataset,
batchId: batch.batchId,
partition: collectPartitionKeys(partitions),
rowCount: 0,
bytesWritten: 0,
dataFiles: [],
snapshotId: priorState.currentSnapshotId,
metadataVersion: '',
committedAt: new Date().toISOString(),
})
}
return { partitionsExported: partitions.length, bytesWritten: 0, status: 'skipped' }
}

await writeMarker(ctx.blobStore, markerPath, {
dataset,
batchId: batch.batchId,
partition: collectPartitionKeys(partitions),
rowCount: commit.rowCount || rowsLoaded,
rowCount: commit.rowCount,
bytesWritten: commit.bytesWritten,
dataFiles: commit.dataFiles,
dataFiles: [],
snapshotId: commit.snapshotId,
metadataVersion: commit.metadataVersion,
metadataVersion: '',
committedAt: new Date().toISOString(),
})

Expand All @@ -361,8 +355,9 @@ async function exportDataset({ ctx, batch, dataset, partitions, prefix, log }) {
hyp_dataset: dataset,
hyp_batch_id: batch.batchId,
snapshot_id: commit.snapshotId,
row_count: commit.rowCount || rowsLoaded,
row_count: commit.rowCount,
bytes_written: commit.bytesWritten,
batch_count: commit.batchCount,
})

return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export interface CommitResult {
dataFiles: string[]
bytesWritten: number
rowCount: number
metadata: TableMetadata
}

export interface ExportMarker {
Expand Down
Loading