Skip to content

Spark: Support long-valued streaming max rows per micro-batch#16571

Open
colinre wants to merge 2 commits into
apache:mainfrom
colinre:codex/long-streaming-max-rows
Open

Spark: Support long-valued streaming max rows per micro-batch#16571
colinre wants to merge 2 commits into
apache:mainfrom
colinre:codex/long-streaming-max-rows

Conversation

@colinre
Copy link
Copy Markdown

@colinre colinre commented May 26, 2026

Problem

Spark Structured Streaming row-based micro-batch planning was effectively capped at Integer.MAX_VALUE rows. This made very large initial streaming backfills impractical because streams over multi-trillion-row tables could require thousands of micro-batches before reaching ongoing incremental ingestion.

As a result, users may need to operate a separate batch backfill implementation and then hand off to streaming, even though a single streaming job should be able to handle both phases.

Root Cause

streaming-max-rows-per-micro-batch was parsed and propagated as an int, and planner defaults initialized the effective row limit to Integer.MAX_VALUE even when no row limit was configured.

Change

This updates Spark streaming row-limit handling to use long values internally and uses Long.MAX_VALUE as the unconfigured row-limit sentinel. ReadLimit.maxRows(...) now receives the long-valued limit without narrowing, and unconfigured streaming reads no longer get an implicit Integer.MAX_VALUE row cap.

The legacy SparkReadConf.maxRecordsPerMicroBatch() int accessor is retained for source and binary compatibility, deprecated, and supplemented with maxRecordsPerMicroBatchLong() for the long-valued behavior.

File-count rate limiting, offsets, checkpoint representation, and complete-file soft-limit semantics are unchanged.

This is a Codex change. I'm unfamiliar with this codebase.

Tests

Added or updated coverage for:

  • parsing row limits above Integer.MAX_VALUE,
  • unconfigured planning across synthetic multi-trillion-row backlogs,
  • explicit long-valued soft limits,
  • existing small row-limit behavior,
  • forward progress for files larger than the configured row soft limit,
  • planner default unpacking without an implicit Integer.MAX_VALUE row cap,
  • legacy int accessor compatibility and new long accessor behavior.

Validated with:

  • compileTestJava for Spark 3.5, 4.0, and 4.1
  • focused TestSparkReadConf.testMaxRecordsPerMicroBatch* tests for Spark 3.5, 4.0, and 4.1
  • representative Spark 4.1 structured-streaming long-row regression tests
  • spotlessJavaApply
  • git diff --check

Compatibility

Existing option names and configurations remain valid. Values at or below Integer.MAX_VALUE preserve existing behavior.

The existing int maxRecordsPerMicroBatch() method remains available to avoid breaking downstream callers, but is deprecated because it cannot represent newly supported values above Integer.MAX_VALUE. New callers should use maxRecordsPerMicroBatchLong().

@colinre colinre marked this pull request as draft May 26, 2026 19:54
@colinre colinre marked this pull request as ready for review May 26, 2026 21:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant