Skip to content

[SPARK-56233][SQL][SS] V2 streaming write for FileTable#55232

Draft
LuciferYang wants to merge 11 commits intoapache:masterfrom
LuciferYang:SPARK-56233
Draft

[SPARK-56233][SQL][SS] V2 streaming write for FileTable#55232
LuciferYang wants to merge 11 commits intoapache:masterfrom
LuciferYang:SPARK-56233

Conversation

@LuciferYang
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Implements StreamingWrite support for V2 file tables, enabling structured streaming writes through the V2 data source path.

  • New FileStreamingWrite implementing StreamingWrite — uses ManifestFileCommitProtocol for file commit and FileStreamSinkLog for metadata tracking, with idempotent commit(epochId, messages) that skips already-committed batches
  • New FileStreamingWriterFactory bridging DataWriterFactory to StreamingDataWriterFactory
  • Override FileWrite.toStreaming() to create FileStreamingWrite with the same metadata log layout as V1 (_spark_metadata)
  • Add STREAMING_WRITE to FileTable.CAPABILITIES
  • Support retention option for metadata log cleanup (V1 parity)

The implementation follows the same WriteTaskResultTaskCommitMessageSinkFileStatus extraction pattern as FileBatchWrite.commit(). Uses useCommitCoordinator = true (unlike batch's false) because ManifestFileCommitProtocol writes files directly to the output path without a temp-to-final rename step.

Why are the changes needed?

File streaming writes currently fall back to V1 FileStreamSink, preventing deprecation of V1 file source code. Together with SPARK-56232 (streaming read), this completes the streaming support needed for full V1 deprecation under SPARK-56170.

Does this PR introduce any user-facing change?

No. By default, USE_V1_SOURCE_LIST includes all file formats, so streaming writes still use V1. Users can opt into V2 by clearing the list. Existing checkpoints and _spark_metadata are compatible.

How was this patch tested?

New FileStreamV2WriteSuite with 4 E2E tests: basic streaming write, multiple batches, checkpoint recovery, and JSON format. Existing FileStreamSinkV1Suite passes. Total: 108 streaming file tests pass across all suites.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code

…Frame API writes and delete FallBackFileSourceV2

Key changes:
- FileWrite: added partitionSchema, customPartitionLocations,
  dynamicPartitionOverwrite, isTruncate; path creation and truncate
  logic; dynamic partition overwrite via FileCommitProtocol
- FileTable: createFileWriteBuilder with SupportsDynamicOverwrite
  and SupportsTruncate; capabilities now include TRUNCATE and
  OVERWRITE_DYNAMIC; fileIndex skips file existence checks when
  userSpecifiedSchema is provided (write path)
- All file format writes (Parquet, ORC, CSV, JSON, Text, Avro) use
  createFileWriteBuilder with partition/truncate/overwrite support
- DataFrameWriter.lookupV2Provider: enabled FileDataSourceV2 for
  non-partitioned Append and Overwrite via df.write.save(path)
- DataFrameWriter.insertInto: V1 fallback for file sources
  (TODO: SPARK-56175)
- DataFrameWriter.saveAsTable: V1 fallback for file sources
  (TODO: SPARK-56230, needs StagingTableCatalog)
- DataSourceV2Utils.getTableProvider: V1 fallback for file sources
  (TODO: SPARK-56175)
- Removed FallBackFileSourceV2 rule
- V2SessionCatalog.createTable: V1 FileFormat data type validation
…catalog table loading, and gate removal

Key changes:
- FileTable extends SupportsPartitionManagement with createPartition,
  dropPartition, listPartitionIdentifiers, partitionSchema
- Partition operations sync to catalog metastore (best-effort)
- V2SessionCatalog.loadTable returns FileTable instead of V1Table,
  sets catalogTable and useCatalogFileIndex on FileTable
- V2SessionCatalog.getDataSourceOptions includes storage.properties
  for proper option propagation (header, ORC bloom filter, etc.)
- V2SessionCatalog.createTable validates data types via FileTable
- FileTable.columns() restores NOT NULL constraints from catalogTable
- FileTable.partitioning() falls back to userSpecifiedPartitioning
  or catalog partition columns
- FileTable.fileIndex uses CatalogFileIndex when catalog has
  registered partitions (custom partition locations)
- FileTable.schema checks column name duplication for non-catalog
  tables only
- DataSourceV2Utils.getTableProvider: removed FileDataSourceV2 gate
- DataFrameWriter.insertInto: enabled V2 for file sources
- DataFrameWriter.saveAsTable: V1 fallback (TODO: SPARK-56230)
- ResolveSessionCatalog: V1 fallback for FileTable-backed commands
  (AnalyzeTable, AnalyzeColumn, TruncateTable, TruncatePartition,
  ShowPartitions, RecoverPartitions, AddPartitions, RenamePartitions,
  DropPartitions, SetTableLocation, CREATE TABLE validation,
  REPLACE TABLE blocking)
- FindDataSourceTable: streaming V1 fallback for FileTable
  (TODO: SPARK-56233)
- DataSource.planForWritingFileFormat: graceful V2 handling
Enable bucketed writes for V2 file tables via catalog BucketSpec.

Key changes:
- FileWrite: add bucketSpec field, use V1WritesUtils.getWriterBucketSpec()
  instead of hardcoded None
- FileTable: createFileWriteBuilder passes catalogTable.bucketSpec
  to the write pipeline
- FileDataSourceV2: getTable uses collect to skip BucketTransform
  (handled via catalogTable.bucketSpec instead)
- FileWriterFactory: use DynamicPartitionDataConcurrentWriter for
  bucketed writes since V2's RequiresDistributionAndOrdering cannot
  express hash-based ordering
- All 6 format Write/Table classes updated with BucketSpec parameter

Note: bucket pruning and bucket join (read-path optimization) are
not included in this patch (tracked under SPARK-56231).
Add RepairTableExec to sync filesystem partition directories with
catalog metastore for V2 file tables.

Key changes:
- New RepairTableExec: scans filesystem partitions via
  FileTable.listPartitionIdentifiers(), compares with catalog,
  registers missing partitions and drops orphaned entries
- DataSourceV2Strategy: route RepairTable and RecoverPartitions
  for FileTable to new V2 exec node
Implement SupportsOverwriteV2 for V2 file tables to support static
partition overwrite (INSERT OVERWRITE TABLE t PARTITION(p=1) SELECT ...).

Key changes:
- FileTable: replace SupportsTruncate with SupportsOverwriteV2 on
  WriteBuilder, implement overwrite(predicates)
- FileWrite: extend toBatch() to delete only the matching partition
  directory, ordered by partitionSchema
- FileTable.CAPABILITIES: add OVERWRITE_BY_FILTER
- All 6 format Write/Table classes: plumb overwritePredicates parameter

This is a prerequisite for SPARK-56304 (ifPartitionNotExists).
…EAD)

### What changes were proposed in this pull request?

Implements `MicroBatchStream` support for V2 file tables, enabling structured streaming reads through the V2 path instead of falling back to V1 `FileStreamSource`.

Key changes:
- New `FileMicroBatchStream` class implementing `MicroBatchStream`, `SupportsAdmissionControl`, and `SupportsTriggerAvailableNow` — handles file discovery, offset management, rate limiting, and partition planning
- Override `FileScan.toMicroBatchStream()` to return `FileMicroBatchStream`
- Add `withFileIndex` method to `FileScan` and all 6 concrete scans for creating batch-specific scans
- Add `MICRO_BATCH_READ` to `FileTable.CAPABILITIES`
- Update `ResolveDataSource` to allow `FileDataSourceV2` into the V2 streaming path (respects `USE_V1_SOURCE_LIST` for backward compatibility)
- Remove the `FileTable` streaming fallback in `FindDataSourceTable`
- Reuses V1 infrastructure (`FileStreamSourceLog`, `FileStreamSourceOffset`, `SeenFilesMap`) for checkpoint compatibility

### Why are the changes needed?

V2 file tables cannot be fully adopted until streaming reads are supported. Without this, the V1 `FileStreamSource` fallback prevents deprecation of V1 file source code.

### Does this PR introduce _any_ user-facing change?

No. By default, `USE_V1_SOURCE_LIST` includes all file formats, so streaming reads still use V1. Users can opt into V2 by clearing the list. Existing checkpoints are compatible.

### How was this patch tested?

New `FileStreamV2ReadSuite` with 6 E2E tests. Existing `FileStreamSourceSuite` (76 tests) passes with V1 forced via `USE_V1_SOURCE_LIST`.
…ITE)

### What changes were proposed in this pull request?

Implements `StreamingWrite` support for V2 file tables, enabling structured streaming writes through the V2 path instead of falling back to V1 `FileStreamSink`.

Key changes:
- New `FileStreamingWrite` class implementing `StreamingWrite` — uses `ManifestFileCommitProtocol` for file commit and `FileStreamSinkLog` for metadata tracking
- New `FileStreamingWriterFactory` bridging `DataWriterFactory` to `StreamingDataWriterFactory`
- Override `FileWrite.toStreaming()` to return `FileStreamingWrite`
- Add `STREAMING_WRITE` to `FileTable.CAPABILITIES`
- Idempotent `commit(epochId, messages)` — skips already-committed batches
- Supports `retention` option for metadata log cleanup (V1 parity)
- Checkpoint compatible with V1 `FileStreamSink` (same `_spark_metadata` format)

### Why are the changes needed?

V2 file tables cannot be fully adopted until streaming writes are supported. Without this, the V1 `FileStreamSink` fallback prevents deprecation of V1 file source code. Together with SPARK-56232 (streaming read), this completes the streaming support needed for V1 deprecation.

### Does this PR introduce _any_ user-facing change?

No. By default, `USE_V1_SOURCE_LIST` includes all file formats, so streaming writes still use V1. Users can opt into V2 by clearing the list. Existing checkpoints are compatible.

### How was this patch tested?

New `FileStreamV2WriteSuite` with 4 E2E tests. Existing `FileStreamSinkV1Suite` passes. All 108 streaming file tests pass.
@LuciferYang LuciferYang marked this pull request as draft April 7, 2026 07:03
@LuciferYang
Copy link
Copy Markdown
Contributor Author

This is the 11th PR for SPARK-56170. The commit 396a03f contains the changes for this patch.

@LuciferYang LuciferYang changed the title [SPARK-56233][SQL][SS] V2 streaming write for FileTable (STREAMING_WRITE) [SPARK-56233][SQL][SS] V2 streaming write for FileTable Apr 8, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant